消息队列怎么保证消息不丢失的


  • #task 消息队列怎么保证消息不丢失的 ✅ 2024-02-18

为什么会发生消息丢失

可能因为网络问题导致消息丢失,以及存储时消息队列宕机了等情况。具体发生位置如下。

还是画个简单的流程图吧,比较清晰:

如何解决

可以从生产者、消息队列本身、消费者解决。

生产者保证消息不丢失

  1. 事务管理:当发送一条消息后,开启事务,等待消息队列传送回消息处理成功之后才继续发送消息。
  2. 生产者确认机制:开启生产者确认机制,只要消息成功发送到交换机之后 RabbitMQ 就会发送一个 ack 给生产者(即使消息没有 Queue 接收,也会发送 ack)。如果消息没有成功发送到交换机,就会发送一条 nack 消息,提示发送失败。

当服务端确定一条或多条消息后就会调用生产者提供的回调方法来提醒生产者自己是否成功接收到了消息并处理。

// 消息是否成功发送到Exchange
final RabbitTemplate.ConfirmCallback confirmCallback = (CorrelationData
correlationData, boolean ack, String cause) -> {
	log.info("correlationData: " + correlationData);
	log.info("ack: " + ack);
	if(!ack) {
	log.info("异常处理....");
	}
};
rabbitTemplate.setConfirmCallback(confirmCallback)

路由交换机 Exchange 保证消息不丢失

生产者只会保证消息到达路由不会丢失,而不能保证从交换机到达某个消息队列,然后从消息队列到达消费者不丢失消息。

解决方法

开启 Return 消息机制。当从交换机到消息队列 queue 时,如果失败则会调用 return 方法,告诉生产者消息发送失败。

final RabbitTemplate.ReturnCallback returnCallback = (Message message, int replyCode, String replyText, String exchange, String routingKey) ->
	log.info("return exchange: " + exchange + ", routingKey: "
	+ routingKey + ", replyCode: " + replyCode + ", replyText: "
	+ replyText);
	rabbitTemplate.setReturnCallback(returnCallback);

消费者消费失败导致消息丢失的问题

因为消费者默认采用自动 ack,所以当消费者收到消息后,还没有执行完,MQ 宕机了,但是由于是自动 ack,所以消费者收到消息后,直接就返回 ack,消息队列收到后就把这条消息从消息队列中删除了。

最终消息没有消费成功,但是却返回了 ack,并导致消息丢失。

解决方法

开启手动发送 ack。

配置:spring.rabbitmq.listener.simple.acknowledge-mode=manual

样例:总之就是执行完自己的处理逻辑再返回 ack。

@RabbitListener(queues = RabbitMqConfig.MAIL_QUEUE)
public void onMessage(Message message, Channel channel) throws IOException {
try {
	Thread.sleep(5000);
} catch (InterruptedException e) {
	e.printStackTrace();
}
long deliveryTag = message.getMessageProperties().getDeliveryTag();
	//总之就是执行完自己的处理逻辑再返回ack。
	channel.basicAck(deliveryTag, true);
	System.out.println("mail listener receive: " + new
	String(message.getBody()));
}

开启消息队列持久化

如果不开启持久化,RabbitMQ 异常重启的时候,就会导致消息丢失,因为不能从其他地方恢复数据。

开启消息队列持久化后,就可以以一定的规则把消息队列的数据同步到磁盘上或者远程服务器上,以保证消息持久化

  • 具体就是,生产者每发送一条消息到交换机以及消息队列,他们就会把消息存储到各自的持久化日志中。接下来,每当一个消息消息消费成功返回 ack 的时候,就从持久化中删除这个消息。

  • 一旦服务宕机,就可以从持久化日志中进行恢复未完成的消息。


文章作者: KTpro
版权声明: 本博客所有文章除特別声明外,均采用 CC BY 4.0 许可协议。转载请注明来源 KTpro !
  目录