目录
一、消息丢失
1、生产者重连
2、生产者确认
3、数据持久化
4、惰性队列
5、消费失败处理
二、消息重复
1、通过业务保证幂等性(优先)
2、通过消息状态去重保证幂等性
三、消息堆积
1、优化消费者处理逻辑
2、增加队列及消费者数量
3、使用惰性队列
四、保证消息顺序消费
一、消息丢失
1、生产者重连
由于网络波动,可能会出现客户端连接失败的情况,需开启重连机制。
SpringBoot项目配置:
spring: rabbitmq: # 连接超时时间 connection-timeout: 500ms template: retry: # 开启失败重连 enabled: true # 失败后重连初始间隔时间 initial-interval: 1000ms # 失败后下次间隔的时长倍数,下次间隔时长=本次间隔时长*multiplier multiplier: 1 # 最大重试次数 max-attempts: 3
注意:重试过程线程是被阻塞的,合理配置等待时长及最大重试次数,或开启异步线程执行,以免影响业务性能。
2、生产者确认
SpringBoot项目配置:
spring: rabbitmq: # 开启生产者确认 publisher-confirm-type: correlated # 返回路由失败消息,一般是开发问题,无需开启 publisher-returns: true
publisher-confirm-type三种模式:
none | 关闭 |
simple | 同步阻塞等待MQ回执消息 |
correlated(推荐) | MQ异步回调返回回执消息 |
注意:以上两种方式均会造成MQ性能下降,非必要不建议开启。失败情况毕竟非常少,可在代码中通过输出日志或存储数据库等方式将发送失败的消息记录下来,稍后手动处理。
3、数据持久化
- 交换机持久化:在声明交换器时将“durable”参数设置为true
- 队列持久化:在声明队列的时将“durable”参数设置为true
- 消息持久化:生产消息时设置属性delivery_mode=2
SpringBoot项目:
交换机和消息默认为持久化,需自行设置队列持久化。
@RabbitListener(bindings = @QueueBinding( value = @Queue(name = "xx.queue", durable = "true"), exchange = @Exchange(name = "xx.topic", type = ExchangeTypes.TOPIC), key = "xx" ))
4、惰性队列
从RabbitMQ的3.6.0版本开始,就增加了Lazy Queues的概念,也就是惰性队列。惰性队列的特征如下:
- 接收到消息后直接存入磁盘而非内存
- 消费者要消费消息时才会从磁盘中读取并加载到内存
- 支持数百万条的消息存储
声明LazyQueue:
@RabbitListener(bindings = @QueueBinding( value = @Queue(name = "xx.queue", durable = "true", arguments = {@Argument(name = "x-queue-mode", value = "lazy"), exchange = @Exchange(name = "xx.topic", type = ExchangeTypes.TOPIC), key = "xx" ))
优点:
-
基于磁盘存储,消息上限高
-
没有间歇性的page-out,性能比较稳定
缺点:
-
基于磁盘存储,消息时效性会降低
-
性能受限于磁盘的IO
5、消费失败处理
失败后尝试在本地重试,重试后依然失败,将消息投递到用于投递失败消息的交换机,存储到失败消息队列中,等待后续手动处理。
SpringBoot项目配置:
spring: rabbitmq: listener: simple: retry: # 开启失败重试 enabled: true
SpringBoot项目配置类:
import org.springframework.amqp.core.Binding; import org.springframework.amqp.core.BindingBuilder; import org.springframework.amqp.core.DirectExchange; import org.springframework.amqp.core.Queue; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.amqp.rabbit.retry.MessageRecoverer; import org.springframework.amqp.rabbit.retry.RepublishMessageRecoverer; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; @Configuration public class RabbitMqErrorConfig { @Bean public DirectExchange errorExchange() { return new DirectExchange("error.direct"); } @Bean public Queue errorQueue() { return new Queue("error.queue"); } @Bean public Binding errorBinding(Queue errorQueue, DirectExchange errorExchange) { return BindingBuilder.bind(errorQueue).to(errorExchange).with("error"); } @Bean public MessageRecoverer messageRecoverer(RabbitTemplate rabbitTemplate) { return new RepublishMessageRecoverer(rabbitTemplate, "error.direct", "error"); } }
二、消息重复
由于网络问题或消息生产消费过程中出现问题,均会导致消息重复的情况。
1、通过业务保证幂等性(优先)
在业务层面上,保证重复执行对结果不产生影响。例如:支付成功后修改订单状态,可以将未支付状态作为修改语句的执行条件。
2、通过消息状态去重保证幂等性
如果不能通过业务保证幂等性,可以将处理过的消息ID记录到redis,如果新到的消息ID已经在记录中,那么就不再处理这条消息。
三、消息堆积
1、优化消费者处理逻辑
优化消费者处理逻辑,使消费者更快处理。
2、增加队列及消费者数量
将队列绑定多个消费者,提高消费速度。
3、使用惰性队列
基于磁盘存储,消息上限高。
四、保证消息顺序消费
发生原因:
- 一个队列绑定多个消费者
- 一个消费者开启多个线程
1、将一个队列拆分成多个队列,保证一个队列只绑定一个消费者,生产者在投递消息时根据业务数据关键值来将需要保证先后顺序的同一类数据发送到同一个队列当中。
2、将队列设置为单活模式
x-single-active-consumer:单活模式,表⽰是否最多只允许⼀个消费者消费,如果有多个消费者同时绑定,则只会激活第⼀个,除⾮第⼀个消费者被取消或者死亡,才会⾃动转到下⼀个消费者。
@RabbitListener(bindings = @QueueBinding( value = @Queue(name = "xx.queue", durable = "true", arguments = {@Argument(name = "x-single-active-consumer", value = "true", type = "java.lang.Boolean")}), exchange = @Exchange(name = "xx.topic", type = ExchangeTypes.TOPIC), key = "xx" ))
还没有评论,来说两句吧...