【SpringBoot高级篇】SpringBoot集成RocketMQ消息队列
- RocketMQ简介
- 技术架构
- 基本概念
- Docker环境安装RocketMQ
- rocketmq-client消息发送
- 基本样例
- 消息发送
- 发送同步消息
- 发送异步消息
- 单向发送消息
- 消费消息
- 负载均衡模式
- 广播模式
- 顺序消息
- 顺序消息生产
- 顺序消费消息
- 延时消息
- 启动消息消费者
- 发送延时消息
- 验证
- 使用限制
- 批量消息
- 发送批量消息
- 过滤消息
- Tag过滤
- 消息生产者
- 消息消费者
- SQL过滤
- 消息生产者
- 消息消费者
- 什么时候该用 Topic,什么时候该用 Tag
- 事务消息
- 流程分析
- 事务消息发送及提交
- 事务补偿
- 事务消息状态
- 发送事务消息
- 创建事务性生产者
- 实现事务的监听接口
- 使用限制
- SpringBoot整合RocketMQ
- pom
- application.yml
- 通用常量
- 生产者
- 测试同步消息
- 测试异步消息
- 测试单向消息
- 测试延迟消息
- 测试顺序消费
- 测试消息过滤 tag过滤
- 测试消息过滤 key过滤
- 测试消息消费的模式
- 测试重试机制
- 测试堆积消息
- 消费者
- 去重表设计
- SQL
- Message
- MessageMapper
- JackJsonUtil
- Spring发布事件监听解耦处理
- 消息事件基类
- 消费者监听类
- tag事件处理类
- 启动类
- 重复消费解决方案
- 消息堆积解决方案
- 消息丢失解决方案
RocketMQ简介
RocketMQ是阿里巴巴2016年MQ中间件,使用Java语言开发,RocketMQ 是一款开源的分布式消息系统,基于高可用分布式集群技术,提供低延时的、高可靠的消息发布与订阅服务。同时,广泛应用于多个领域,包括异步通信解耦、企业解决方案、金融支付、电信、电子商务、快递物流、广告营销、社交、即时通信、移动应用、手游、视频、物联网、车联网等。
具有以下特点:
- 能够保证严格的消息顺序
- 提供丰富的消息拉取模式
- 高效的订阅者水平扩展能力
- 实时的消息订阅机制
- 亿级消息堆积能力
为什么要使用MQ
1、要做到系统解耦,当新的模块进来时,可以做到代码改动最小; 能够解耦
2、设置流程缓冲池,可以让后端系统按自身吞吐能力进行消费,不被冲垮; 能够削峰,限流
3、强弱依赖梳理能把非关键调用链路的操作异步化并提升整体系统的吞吐能力;能够异步
技术架构
RocketMQ架构上主要分为四部分,如上图所示:
- Producer:消息发布的角色,支持分布式集群方式部署。Producer通过MQ的负载均衡模块选择相应的Broker集群队列进行消息投递,投递的过程支持快速失败并且低延迟。
- Consumer:消息消费的角色,支持分布式集群方式部署。支持以push推,pull拉两种模式对消息进行消费。同时也支持集群方式和广播方式的消费,它提供实时消息订阅机制,可以满足大多数用户的需求。
- NameServer:NameServer是一个非常简单的Topic路由注册中心,其角色类似Dubbo中的zookeeper,支持Broker的动态注册与发现。主要包括两个功能:Broker管理,NameServer接受Broker集群的注册信息并且保存下来作为路由信息的基本数据。然后提供心跳检测机制,检查Broker是否还存活;路由信息管理,每个NameServer将保存关于Broker集群的整个路由信息和用于客户端查询的队列信息。然后Producer和Conumser通过NameServer就可以知道整个Broker集群的路由信息,从而进行消息的投递和消费。NameServer通常也是集群的方式部署,各实例间相互不进行信息通讯。Broker是向每一台NameServer注册自己的路由信息,所以每一个NameServer实例上面都保存一份完整的路由信息。当某个NameServer因某种原因下线了,Broker仍然可以向其它NameServer同步其路由信息,Producer,Consumer仍然可以动态感知Broker的路由的信息。
- BrokerServer:Broker主要负责消息的存储、投递和查询以及服务高可用保证
基本概念
1、消息模型(Message Model)
- RocketMQ主要由 Producer、Broker、Consumer 三部分组成,其中Producer 负责生产消息,Consumer 负责消费消息,Broker 负责存储消息。Broker 在实际部署过程中对应一台服务器,每个 Broker 可以存储多个Topic的消息,每个Topic的消息也可以分片存储于不同的 Broker。Message Queue 用于存储消息的物理地址,每个Topic中的消息地址存储于多个 Message Queue 中。ConsumerGroup 由多个Consumer 实例构成。
2、消息生产者(Producer)
- 负责生产消息,一般由业务系统负责生产消息。一个消息生产者会把业务应用系统里产生的消息发送到broker服务器。RocketMQ提供多种发送方式,同步发送、异步发送、顺序发送、单向发送。同步和异步方式均需要Broker返回确认信息,单向发送不需要。
3、消息消费者(Consumer)
- 负责消费消息,一般是后台系统负责异步消费。一个消息消费者会从Broker服务器拉取消息、并将其提供给应用程序。从用户应用的角度而言提供了两种消费形式:拉取式消费、推动式消费。
4、生产者组(Producer Group)
- 同一类Producer的集合,这类Producer发送同一类消息且发送逻辑一致。如果发送的是事物消息且原始生产者在发送之后崩溃,则Broker服务器会联系同一生产者组的其他生产者实例以提交或回溯消费。
5、消费者组(Consumer Group)
- 同一类Consumer的集合,这类Consumer通常消费同一类消息且消费逻辑一致。消费者组使得在消息消费方面,实现负载均衡和容错的目标变得非常容易。要注意的是,消费者组的消费者实例必须订阅完全相同的Topic。RocketMQ 支持两种消息模式:集群消费(Clustering)和广播消费(Broadcasting)。
6、代理服务器(Broker Server)
- 消息中转角色,负责存储消息、转发消息。代理服务器在RocketMQ系统中负责接收从生产者发送来的消息并存储、同时为消费者的拉取请求作准备。代理服务器也存储消息相关的元数据,包括消费者组、消费进度偏移和主题和队列消息等。
7、名字服务(Name Server)
- 名称服务充当路由消息的提供者。生产者或消费者能够通过名字服务查找各主题相应的Broker IP列表。多个Namesrv实例组成集群,但相互独立,没有信息交换。
8、主题(Topic)
- 表示一类消息的集合,每个主题包含若干条消息,每条消息只能属于一个主题,是RocketMQ进行消息订阅的基本单位。
9、标签(Tag)
- 为消息设置的标志,用于同一主题下区分不同类型的消息。来自同一业务单元的消息,可以根据不同业务目的在同一主题下设置不同标签。标签能够有效地保持代码的清晰度和连贯性,并优化RocketMQ提供的查询系统。消费者可以根据Tag实现对不同子主题的不同消费逻辑,实现更好的扩展性。
10、消息(Message)
- 消息系统所传输信息的物理载体,生产和消费数据的最小单位,每条消息必须属于一个主题。RocketMQ中每个消息拥有唯一的Message ID,且可以携带具有业务标识的Key。系统提供了通过Message ID和Key查询消息的功能。
11、拉取式消费(Pull Consumer)
- Consumer消费的一种类型,应用通常主动调用Consumer的拉消息方法从Broker服务器拉消息、主动权由应用控制。一旦获取了批量消息,应用就会启动消费过程。
12、推动式消费(Push Consumer)
- Consumer消费的一种类型,该模式下Broker收到数据后会主动推送给消费端,该消费模式一般实时性较高。
13、集群消费(Clustering)
- 集群消费模式下,相同Consumer Group的每个Consumer实例平均分摊消息。
14、广播消费(Broadcasting)
- 广播消费模式下,相同Consumer Group的每个Consumer实例都接收全量的消息。
15、普通顺序消息(Normal Ordered Message)
- 普通顺序消费模式下,消费者通过同一个消费队列收到的消息是有顺序的,不同消息队列收到的消息则可能是无顺序的。
16、严格顺序消息(Strictly Ordered Message)
- 严格顺序消息模式下,消费者收到的所有消息均是有顺序的。
Docker环境安装RocketMQ
【Docker应用篇】Docker安装RocketMQ
rocketmq-client消息发送
- 导入MQ客户端依赖
org.apache.rocketmq rocketmq-client 4.9.6 - 消息发送者步骤分析
1.创建消息生产者producer,并制定生产者组名 2.指定Nameserver地址 3.启动producer 4.创建消息对象,指定主题Topic、Tag和消息体 5.发送消息 6.关闭生产者producer
- 消息消费者步骤分析
1.创建消费者Consumer,制定消费者组名 2.指定Nameserver地址 3.订阅主题Topic和Tag 4.设置回调函数,处理消息 5.启动消费者consumer
基本样例
消息发送
发送同步消息
public interface MqConstant { String NAME_SERVER_ADDRESS = "192.168.171.128:9876"; }
这种可靠性同步地发送方式使用的比较广泛,比如:重要的消息通知,短信通知。
public class SyncProducer { public static void main(String[] args) throws Exception { //1.创建消息生产者producer,并制定生产者组名 DefaultMQProducer producer = new DefaultMQProducer("group1"); //2.指定Nameserver地址 producer.setNamesrvAddr(MqConstant.NAME_SERVER_ADDRESS); //3.启动producer producer.start(); for (int i = 0; i < 10; i++) { //4.创建消息对象,指定主题Topic、Tag和消息体 /** * 参数一:消息主题Topic * 参数二:消息Tag * 参数三:消息内容 */ Message msg = new Message("springboot-mq", "Tag1", ("Hello World" + i).getBytes()); //5.发送消息 SendResult result = producer.send(msg); //发送状态 SendStatus status = result.getSendStatus(); System.out.println("发送结果:" + result); //线程睡1秒 TimeUnit.SECONDS.sleep(1); } //6.关闭生产者producer producer.shutdown(); } }
发送异步消息
异步消息通常用在对响应时间敏感的业务场景,即发送端不能容忍长时间地等待Broker的响应。
public class AsyncProducer { public static void main(String[] args) throws Exception { //1.创建消息生产者producer,并制定生产者组名 DefaultMQProducer producer = new DefaultMQProducer("group1"); //2.指定Nameserver地址 producer.setNamesrvAddr(MqConstant.NAME_SERVER_ADDRESS); //3.启动producer producer.start(); for (int i = 0; i < 10; i++) { //4.创建消息对象,指定主题Topic、Tag和消息体 /** * 参数一:消息主题Topic * 参数二:消息Tag * 参数三:消息内容 */ Message msg = new Message("base", "Tag2" , ("Hello World" + i).getBytes(RemotingHelper.DEFAULT_CHARSET)); //5.发送异步消息 producer.send(msg, new SendCallback() { /** * 发送成功回调函数 * @param sendResult */ @Override public void onSuccess(SendResult sendResult) { System.out.println("发送结果:" + sendResult); } /** * 发送失败回调函数 * @param e */ @Override public void onException(Throwable e) { System.out.println("发送异常:" + e); } }); //线程睡1秒 TimeUnit.SECONDS.sleep(1); } //6.关闭生产者producer producer.shutdown(); } }
单向发送消息
这种方式主要用在不特别关心发送结果的场景,例如日志发送。
public class OneWayProducer { public static void main(String[] args) throws Exception, MQBrokerException { //1.创建消息生产者producer,并制定生产者组名 DefaultMQProducer producer = new DefaultMQProducer("group1"); //2.指定Nameserver地址 producer.setNamesrvAddr(MqConstant.NAME_SERVER_ADDRESS); //3.启动producer producer.start(); for (int i = 0; i < 3; i++) { //4.创建消息对象,指定主题Topic、Tag和消息体 /** * 参数一:消息主题Topic * 参数二:消息Tag * 参数三:消息内容 */ Message msg = new Message("base", "Tag3", ("Hello World,单向消息" + i).getBytes()); //5.发送单向消息 producer.sendOneway(msg); //线程睡1秒 TimeUnit.SECONDS.sleep(5); } //6.关闭生产者producer producer.shutdown(); } }
消费消息
负载均衡模式
消费者采用负载均衡方式消费消息,多个消费者共同消费队列消息,每个消费者处理的消息不同
public static void main(String[] args) throws Exception { //1.创建消费者Consumer,制定消费者组名 DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("group1"); //2.指定Nameserver地址 consumer.setNamesrvAddr(MqConstant.NAME_SERVER_ADDRESS); //3.订阅主题Topic和Tag consumer.subscribe("springboot-mq", "*"); //设定消费模式:负载均衡|广播模式 consumer.setMessageModel(MessageModel.CLUSTERING); //4.设置回调函数,处理消息 // MessageListenerConcurrently 并发模式 多线程消费 consumer.registerMessageListener(new MessageListenerConcurrently() { //接受消息内容 @Override public ConsumeConcurrentlyStatus consumeMessage(List
msgs, ConsumeConcurrentlyContext context) { for (MessageExt msg : msgs) { System.out.println("consumeThread=" + Thread.currentThread().getName() + "," + new String(msg.getBody())); } //返回值CONSUME_SUCCESS成功,消息会从mq出队 // RECONSUME_LATER (报错/null) 失败消息会重新回到队列过一会重新投递出来给当前消费者或者其他消费者消费的 return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } }); //5.启动消费者consumer consumer.start(); System.out.printf("Consumer Started.%n"); } 广播模式
消费者采用广播的方式消费消息,每个消费者消费的消息都是相同的
public class Consumer { public static void main(String[] args) throws Exception { //1.创建消费者Consumer,制定消费者组名 DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("group1"); //2.指定Nameserver地址 consumer.setNamesrvAddr(MqConstant.NAME_SERVER_ADDRESS); //3.订阅主题Topic和Tag consumer.subscribe("springboot-mq", "*"); //设定消费模式:负载均衡|广播模式 consumer.setMessageModel(MessageModel.BROADCASTING); //4.设置回调函数,处理消息 // MessageListenerConcurrently 并发模式 多线程消费 consumer.registerMessageListener(new MessageListenerConcurrently() { //接受消息内容 @Override public ConsumeConcurrentlyStatus consumeMessage(List
msgs, ConsumeConcurrentlyContext context) { for (MessageExt msg : msgs) { System.out.println("consumeThread=" + Thread.currentThread().getName() + "," + new String(msg.getBody())); } //返回值CONSUME_SUCCESS成功,消息会从mq出队 // RECONSUME_LATER (报错/null) 失败消息会重新回到队列过一会重新投递出来给当前消费者或者其他消费者消费的 return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } }); //5.启动消费者consumer consumer.start(); System.out.printf("Consumer Started.%n"); } } 顺序消息
消息有序指的是可以按照消息的发送顺序来消费(FIFO)。RocketMQ可以严格的保证消息有序,可以分为分区有序或者全局有序。
顺序消费的原理解析,在默认的情况下消息发送会采取Round Robin轮询方式把消息发送到不同的queue(分区队列);而消费消息的时候从多个queue上拉取消息,这种情况发送和消费是不能保证顺序。但是如果控制发送的顺序消息只依次发送到同一个queue中,消费的时候只从这个queue上依次拉取,则就保证了顺序。当发送和消费参与的queue只有一个,则是全局有序;如果多个queue参与,则为分区有序,即相对每个queue,消息都是有序的。
下面用订单进行分区有序的示例。一个订单的顺序流程是:创建、付款、推送、完成。订单号相同的消息会被先后发送到同一个队列中,消费时,同一个OrderId获取到的肯定是同一个队列。
顺序消息生产
/** * Producer,发送顺序消息 */ public class Producer { public static void main(String[] args) throws Exception { DefaultMQProducer producer = new DefaultMQProducer("group1"); producer.setNamesrvAddr(MqConstant.NAME_SERVER_ADDRESS); producer.start(); String[] tags = new String[]{"TagA", "TagC", "TagD"}; // 订单列表 List
orderList = new Producer().buildOrders(); Date date = new Date(); SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); String dateStr = sdf.format(date); for (int i = 0; i < 10; i++) { // 加个时间前缀 String body = dateStr + " Hello RocketMQ " + orderList.get(i); Message msg = new Message("OrderTopic", tags[i % tags.length], "KEY" + i, body.getBytes()); // 发相同的订单号去相同的队列 SendResult sendResult = producer.send(msg, new MessageQueueSelector() { @Override public MessageQueue select(List mqs, Message msg, Object arg) { Long id = (Long) arg; //根据订单id选择发送queue // mqs默认为4,取模使消息分布更加散列 long index = id % mqs.size(); return mqs.get((int) index); } }, orderList.get(i).getOrderId());//订单id System.out.println(String.format("SendResult status:%s, queueId:%d, body:%s", sendResult.getSendStatus(), sendResult.getMessageQueue().getQueueId(), body)); } producer.shutdown(); } /** * 订单的步骤 */ private static class OrderStep { private long orderId; private String desc; public long getOrderId() { return orderId; } public void setOrderId(long orderId) { this.orderId = orderId; } public String getDesc() { return desc; } public void setDesc(String desc) { this.desc = desc; } @Override public String toString() { return "OrderStep{" + "orderId=" + orderId + ", desc='" + desc + '\'' + '}'; } } /** * 生成模拟订单数据 */ private List buildOrders() { List orderList = new ArrayList (); OrderStep orderDemo = new OrderStep(); orderDemo.setOrderId(15103111039L); orderDemo.setDesc("创建"); orderList.add(orderDemo); orderDemo = new OrderStep(); orderDemo.setOrderId(15103111065L); orderDemo.setDesc("创建"); orderList.add(orderDemo); orderDemo = new OrderStep(); orderDemo.setOrderId(15103111039L); orderDemo.setDesc("付款"); orderList.add(orderDemo); orderDemo = new OrderStep(); orderDemo.setOrderId(15103117235L); orderDemo.setDesc("创建"); orderList.add(orderDemo); orderDemo = new OrderStep(); orderDemo.setOrderId(15103111065L); orderDemo.setDesc("付款"); orderList.add(orderDemo); orderDemo = new OrderStep(); orderDemo.setOrderId(15103117235L); orderDemo.setDesc("付款"); orderList.add(orderDemo); orderDemo = new OrderStep(); orderDemo.setOrderId(15103111065L); orderDemo.setDesc("完成"); orderList.add(orderDemo); orderDemo = new OrderStep(); orderDemo.setOrderId(15103111039L); orderDemo.setDesc("推送"); orderList.add(orderDemo); orderDemo = new OrderStep(); orderDemo.setOrderId(15103117235L); orderDemo.setDesc("完成"); orderList.add(orderDemo); orderDemo = new OrderStep(); orderDemo.setOrderId(15103111039L); orderDemo.setDesc("完成"); orderList.add(orderDemo); return orderList; } } 顺序消费消息
/** * 顺序消息消费,带事务方式(应用可控制Offset什么时候提交) */ public class ConsumerInOrder { public static void main(String[] args) throws Exception { DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("group1"); consumer.setNamesrvAddr(MqConstant.NAME_SERVER_ADDRESS); /** * 设置Consumer第一次启动是从队列头部开始消费还是队列尾部开始消费
* 如果非第一次启动,那么按照上次消费的位置继续消费 */ consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET); consumer.subscribe("OrderTopic", "TagA || TagC || TagD"); // MessageListenerConcurrently 并发模式,多线程消费,消费失败,重试次数16次 // MessageListenerOrderly 单线程模式,顺序消费,消费失败,无限重试Intger.MAX_VALUE consumer.registerMessageListener(new MessageListenerOrderly() { Random random = new Random(); @Override public ConsumeOrderlyStatus consumeMessage(Listmsgs, ConsumeOrderlyContext context) { context.setAutoCommit(true); for (MessageExt msg : msgs) { // 可以看到每个queue有唯一的consume线程来消费, 订单对每个queue(分区)有序 System.out.println("consumeThread=" + Thread.currentThread().getName() + "queueId=" + msg.getQueueId() + ", content:" + new String(msg.getBody())); } try { //模拟业务逻辑处理中... TimeUnit.SECONDS.sleep(random.nextInt(10)); } catch (Exception e) { e.printStackTrace(); } return ConsumeOrderlyStatus.SUCCESS; } }); consumer.start(); System.out.println("Consumer Started."); } } 延时消息
比如电商里,提交了一个订单就可以发送一个延时消息,1h后去检查这个订单的状态,如果还是未付款就取消订单释放库存。
启动消息消费者
public class ScheduledMessageConsumer { public static void main(String[] args) throws Exception { // 实例化消费者 DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("group1"); consumer.setNamesrvAddr(MqConstant.NAME_SERVER_ADDRESS); // 订阅Topics consumer.subscribe("DelayTopic", "*"); // 注册消息监听者 // MessageListenerConcurrently 并发模式 多线程消费 consumer.registerMessageListener(new MessageListenerConcurrently() { @Override public ConsumeConcurrentlyStatus consumeMessage(List
messages, ConsumeConcurrentlyContext context) { for (MessageExt message : messages) { // Print approximate delay time period System.out.println("Receive message[msgId=" + message.getMsgId() + "] " + (System.currentTimeMillis() - message.getStoreTimestamp()) + "ms later"); } return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } }); // 启动消费者 consumer.start(); } } 发送延时消息
public class ScheduledMessageProducer { public static void main(String[] args) throws Exception { // 实例化一个生产者来产生延时消息 DefaultMQProducer producer = new DefaultMQProducer("group1"); producer.setNamesrvAddr(MqConstant.NAME_SERVER_ADDRESS); // 启动生产者 producer.start(); int totalMessagesToSend = 100; for (int i = 0; i < totalMessagesToSend; i++) { Message message = new Message("DelayTopic", ("Hello scheduled message " + i).getBytes()); // 设置延时等级3,这个消息将在10s之后发送(现在只支持固定的几个时间,详看delayTimeLevel) message.setDelayTimeLevel(3); // 发送消息 producer.send(message); } // 关闭生产者 producer.shutdown(); } }
验证
您将会看到消息的消费比存储时间晚10秒
使用限制
// org/apache/rocketmq/store/config/MessageStoreConfig.java private String messageDelayLevel = "1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h";
现在RocketMq并不支持任意时间的延时,需要设置几个固定的延时等级,从1s到2h分别对应着等级1到18
批量消息
批量发送消息能显著提高传递小消息的性能。限制是这些批量消息应该有相同的topic,相同的waitStoreMsgOK,而且不能是延时消息。此外,这一批消息的总大小不应超过4MB。
发送批量消息
如果您每次只发送不超过4MB的消息,则很容易使用批处理,样例如下:
String topic = "BatchTest"; List
messages = new ArrayList<>(); messages.add(new Message(topic, "TagA", "OrderID001", "Hello world 0".getBytes())); messages.add(new Message(topic, "TagA", "OrderID002", "Hello world 1".getBytes())); messages.add(new Message(topic, "TagA", "OrderID003", "Hello world 2".getBytes())); try { producer.send(messages); } catch (Exception e) { e.printStackTrace(); //处理error } 如果消息的总长度可能大于4MB时,这时候最好把消息进行分割
public class ListSplitter implements Iterator
- > {
private final int SIZE_LIMIT = 1024 * 1024 * 4;
private final List
messages; private int currIndex; public ListSplitter(List messages) { this.messages = messages; } @Override public boolean hasNext() { return currIndex < messages.size(); } @Override public List next() { int nextIndex = currIndex; int totalSize = 0; for (; nextIndex < messages.size(); nextIndex++) { Message message = messages.get(nextIndex); int tmpSize = message.getTopic().length() + message.getBody().length; Map properties = message.getProperties(); for (Map.Entry entry : properties.entrySet()) { tmpSize += entry.getKey().length() + entry.getValue().length(); } tmpSize = tmpSize + 20; // 增加日志的开销20字节 if (tmpSize > SIZE_LIMIT) { //单个消息超过了最大的限制 //忽略,否则会阻塞分裂的进程 if (nextIndex - currIndex == 0) { //假如下一个子列表没有元素,则添加这个子列表然后退出循环,否则只是退出循环 nextIndex++; } break; } if (tmpSize + totalSize > SIZE_LIMIT) { break; } else { totalSize += tmpSize; } } List subList = messages.subList(currIndex, nextIndex); currIndex = nextIndex; return subList; } } //把大的消息分裂成若干个小的消息 ListSplitter splitter = new ListSplitter(messages); while (splitter.hasNext()) { try { List listItem = splitter.next(); producer.send(listItem); } catch (Exception e) { e.printStackTrace(); //处理error } } 过滤消息
Rocketmq提供消息过滤功能,通过tag或者key进行区分
我们往一个主题里面发送消息的时候, 根据业务逻辑,可能需要区分,比如带有tagA标签的被A消费,带有tagB标签的被B消费,还有在事务监听的类里面,只要是事务消息都要走同一个监听,我们也需要通过过滤才区别对待
在大多数情况下,TAG是一个简单而有用的设计,其可以来选择您想要的消息。例如:
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("group1"); consumer.subscribe("FilterTagTopic", "TAGA || TAGB || TAGC");
消费者将接收包含TAGA或TAGB或TAGC的消息。但是限制是一个消息只能有一个标签,这对于复杂的场景可能不起作用。
在这种情况下,可以使用SQL表达式筛选消息。SQL特性可以通过发送消息时的属性来进行计算。在RocketMQ定义的语法下,可以实现一些简单的逻辑。下面是一个例子:
------------ | message | |----------| a > 5 AND b = 'abc' | a = 10 | --------------------> Gotten | b = 'abc'| | c = true | ------------ ------------ | message | |----------| a > 5 AND b = 'abc' | a = 1 | --------------------> Missed | b = 'abc'| | c = true | ------------
Tag过滤
消息生产者
public class Producer { public static void main(String[] args) throws Exception { //1.创建消息生产者producer,并制定生产者组名 DefaultMQProducer producer = new DefaultMQProducer("group1"); //2.指定Nameserver地址 producer.setNamesrvAddr(MqConstant.NAME_SERVER_ADDRESS); //3.启动producer producer.start(); for (int i = 0; i < 3; i++) { //4.创建消息对象,指定主题Topic、Tag和消息体 /** * 参数一:消息主题Topic * 参数二:消息Tag * 参数三:消息内容 */ Message msg = new Message("FilterTagTopic", "Tag2", ("Hello World" + i).getBytes()); //5.发送消息 SendResult result = producer.send(msg); //发送状态 SendStatus status = result.getSendStatus(); System.out.println("发送结果:" + result); //线程睡1秒 TimeUnit.SECONDS.sleep(1); } //6.关闭生产者producer producer.shutdown(); } }
消息消费者
public class Consumer { public static void main(String[] args) throws Exception { //1.创建消费者Consumer,制定消费者组名 DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("group1"); //2.指定Nameserver地址 consumer.setNamesrvAddr(MqConstant.NAME_SERVER_ADDRESS); //3.订阅主题Topic和Tag consumer.subscribe("FilterTagTopic", "Tag1 || Tag2 "); //4.设置回调函数,处理消息 consumer.registerMessageListener(new MessageListenerConcurrently() { //接受消息内容 @Override public ConsumeConcurrentlyStatus consumeMessage(List
msgs, ConsumeConcurrentlyContext context) { for (MessageExt msg : msgs) { System.out.println("consumeThread=" + Thread.currentThread().getName() + "," + new String(msg.getBody())); } return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } }); //5.启动消费者consumer consumer.start(); System.out.println("消费者启动"); } } SQL过滤
RocketMQ只定义了一些基本语法来支持这个特性。你也可以很容易地扩展它。
- 数值比较,比如:>,>=,<,<=,BETWEEN,=;
- 字符比较,比如:=,<>,IN;
- IS NULL 或者 IS NOT NULL;
- 逻辑符号 AND,OR,NOT;
常量支持类型为:
- 数值,比如:123,3.1415;
- 字符,比如:‘abc’,必须用单引号包裹起来;
- NULL,特殊的常量
- 布尔值,TRUE 或 FALSE
只有使用push模式的消费者才能用使用SQL92标准的sql语句,接口如下:
public void subscribe(finalString topic, final MessageSelector messageSelector)
消息生产者
发送消息时,你能通过putUserProperty来设置消息的属性
DefaultMQProducer producer = new DefaultMQProducer("group1"); producer.start(); /** * 参数一:消息主题Topic * 参数二:消息Tag * 参数三:消息内容 */ Message msg = new Message("FilterSQLTopic", tag, ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET) ); // 设置一些属性 msg.putUserProperty("a", String.valueOf(i)); SendResult sendResult = producer.send(msg); producer.shutdown();
消息消费者
用MessageSelector.bySql来使用sql筛选消息
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("group1"); // 只有订阅的消息有这个属性a, a >=0 and a <= 3 consumer.subscribe("FilterSQLTopic", MessageSelector.bySql("a between 0 and 3"); consumer.registerMessageListener(new MessageListenerConcurrently() { @Override public ConsumeConcurrentlyStatus consumeMessage(List
msgs, ConsumeConcurrentlyContext context) { return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } }); consumer.start(); 什么时候该用 Topic,什么时候该用 Tag
总结:不同的业务应该使用不同的Topic如果是相同的业务里面有不同表的表现形式,那么我们要使用tag进行区分
可以从以下几个方面进行判断:
- 消息类型是否一致:如普通消息、事务消息、定时(延时)消息、顺序消息,不同的消息类型使用不同的 Topic,无法通过 Tag 进行区分。
- 业务是否相关联:没有直接关联的消息,如淘宝交易消息,京东物流消息使用不同的 Topic 进行区分;而同样是天猫交易消息,电器类订单、女装类订单、化妆品类订单的消息可以用 Tag 进行区分。
- 消息优先级是否一致:如同样是物流消息,盒马必须小时内送达,天猫超市 24 小时内送达,淘宝物流则相对会慢一些,不同优先级的消息用不同的 Topic 进行区分。
- 消息量级是否相当:有些业务消息虽然量小但是实时性要求高,如果跟某些万亿量级的消息使用同一个 Topic,则有可能会因为过长的等待时间而“饿死”,此时需要将不同量级的消息进行拆分,使用不同的 Topic。
总的来说,针对消息分类,您可以选择创建多个 Topic,或者在同一个 Topic 下创建多个 Tag。但通常情况下,不同的 Topic 之间的消息没有必然的联系,而 Tag 则用来区分同一个 Topic 下相互关联的消息,例如全集和子集的关系、流程先后的关系。
事务消息
流程分析
上图说明了事务消息的大致方案,其中分为两个流程:正常事务消息的发送及提交、事务消息的补偿流程。
事务消息发送及提交
(1) 发送消息(half消息)。
(2) 服务端响应消息写入结果。
(3) 根据发送结果执行本地事务(如果写入失败,此时half消息对业务不可见,本地逻辑不执行)。
(4) 根据本地事务状态执行Commit或者Rollback(Commit操作生成消息索引,消息对消费者可见)
事务补偿
(1) 对没有Commit/Rollback的事务消息(pending状态的消息),从服务端发起一次“回查”
(2) Producer收到回查消息,检查回查消息对应的本地事务的状态
(3) 根据本地事务状态,重新Commit或者Rollback
其中,补偿阶段用于解决消息Commit或者Rollback发生超时或者失败的情况。
事务消息状态
事务消息共有三种状态,提交状态、回滚状态、中间状态:
- TransactionStatus.CommitTransaction: 提交事务,它允许消费者消费此消息。
- TransactionStatus.RollbackTransaction: 回滚事务,它代表该消息将被删除,不允许被消费。
- TransactionStatus.Unknown: 中间状态,它代表需要检查消息队列来确定状态。
发送事务消息
创建事务性生产者
使用 TransactionMQProducer类创建生产者,并指定唯一的 ProducerGroup,就可以设置自定义线程池来处理这些检查请求。执行本地事务后、需要根据执行结果对消息队列进行回复。回传的事务状态在请参考前一节。
public class Producer { public static void main(String[] args) throws MQClientException, InterruptedException { //创建事务监听器 TransactionListener transactionListener = new TransactionListenerImpl(); //创建消息生产者 TransactionMQProducer producer = new TransactionMQProducer("group6"); producer.setNamesrvAddr("192.168.25.135:9876;192.168.25.138:9876"); //生产者这是监听器 producer.setTransactionListener(transactionListener); //启动消息生产者 producer.start(); String[] tags = new String[]{"TagA", "TagB", "TagC"}; for (int i = 0; i < 3; i++) { try { Message msg = new Message("TransactionTopic", tags[i % tags.length], "KEY" + i, ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET)); SendResult sendResult = producer.sendMessageInTransaction(msg, null); System.out.printf("%s%n", sendResult); TimeUnit.SECONDS.sleep(1); } catch (MQClientException | UnsupportedEncodingException e) { e.printStackTrace(); } } //producer.shutdown(); } }
实现事务的监听接口
当发送半消息成功时,我们使用 executeLocalTransaction 方法来执行本地事务。它返回前一节中提到的三个事务状态之一。checkLocalTranscation 方法用于检查本地事务状态,并回应消息队列的检查请求。它也是返回前一节中提到的三个事务状态之一。
public class TransactionListenerImpl implements TransactionListener { @Override public LocalTransactionState executeLocalTransaction(Message msg, Object arg) { System.out.println("执行本地事务"); if (StringUtils.equals("TagA", msg.getTags())) { return LocalTransactionState.COMMIT_MESSAGE; } else if (StringUtils.equals("TagB", msg.getTags())) { return LocalTransactionState.ROLLBACK_MESSAGE; } else { return LocalTransactionState.UNKNOW; } } @Override public LocalTransactionState checkLocalTransaction(MessageExt msg) { System.out.println("MQ检查消息Tag【"+msg.getTags()+"】的本地事务执行结果"); return LocalTransactionState.COMMIT_MESSAGE; } }
使用限制
- 事务消息不支持延时消息和批量消息。
- 为了避免单个消息被检查太多次而导致半队列消息累积,我们默认将单个消息的检查次数限制为 15 次,但是用户可以通过 Broker 配置文件的 transactionCheckMax参数来修改此限制。如果已经检查某条消息超过 N 次的话( N = transactionCheckMax ) 则 Broker 将丢弃此消息,并在默认情况下同时打印错误日志。用户可以通过重写 AbstractTransactionCheckListener 类来修改这个行为。
- 事务消息将在 Broker 配置文件中的参数 transactionMsgTimeout 这样的特定时间长度之后被检查。当发送事务消息时,用户还可以通过设置用户属性 CHECK_IMMUNITY_TIME_IN_SECONDS 来改变这个限制,该参数优先于 transactionMsgTimeout 参数。
- 事务性消息可能不止一次被检查或消费。
- 提交给用户的目标主题消息可能会失败,目前这依日志的记录而定。它的高可用性通过 RocketMQ 本身的高可用性机制来保证,如果希望确保事务消息不丢失、并且事务完整性得到保证,建议使用同步的双重写入机制。
- 事务消息的生产者 ID 不能与其他类型消息的生产者 ID 共享。与其他类型的消息不同,事务消息允许反向查询、MQ服务器能通过它们的生产者 ID 查询到消费者。
SpringBoot整合RocketMQ
pom
org.springframework.boot spring-boot-starter-web org.springframework.boot spring-boot-starter-test org.apache.rocketmq rocketmq-spring-boot-starter 2.1.1 org.apache.rocketmq rocketmq-client org.apache.rocketmq rocketmq-client 4.9.6 org.projectlombok lombok cn.hutool hutool-all 5.8.18 com.alibaba druid 1.1.10 mysql mysql-connector-java runtime com.baomidou mybatis-plus-boot-starter 3.1.1 org.springframework.boot spring-boot-maven-plugin application.yml
server: port: 8888 #rocketmq配置 rocketmq: topic: springboot-mq name-server: 192.168.171.128:9876 # 生产者配置 producer: # 发送同一类消息的设置为同一个group,保证唯一 group: rocketmq-pro-group # 发送消息超时时间,默认3000 sendMessageTimeout: 3000 # 发送消息失败重试次数,默认2 retryTimesWhenSendFailed: 2 # 异步消息重试此处,默认2 retryTimesWhenSendAsyncFailed: 2 # 消息最大长度 默认1024*4(4M) maxMessageSize: 4096 # 是否在内部发送失败时重试另一个broker,默认false retryNextServer: false # 压缩消息阈值,默认4k(1024 * 4) compressMessageBodyThreshold: 4096 consumer: group: rocketmq-consumer-group # DataSource Config spring: datasource: # 数据源基本配置 url: jdbc:mysql://localhost:3306/study_db?useUnicode=true&characterEncoding=UTF-8&serverTimezone=Asia/Shanghai&nullCatalogMeansCurrent=true username: root password: root driver-class-name: com.mysql.cj.jdbc.Driver initialization-mode: always #表示始终都要执行初始化,2.x以上版本需要加上这行配置 type: com.alibaba.druid.pool.DruidDataSource # 数据源其他配置 initialSize: 5 minIdle: 5 maxActive: 20 maxWait: 60000 timeBetweenEvictionRunsMillis: 60000 minEvictableIdleTimeMillis: 300000 validationQuery: SELECT 1 FROM DUAL testWhileIdle: true testOnBorrow: false testOnReturn: false poolPreparedStatements: true # 配置监控统计拦截的filters,去掉后监控界面sql无法统计,'wall'用于防火墙 filters: stat,wall,log4j maxPoolPreparedStatementPerConnectionSize: 20 useGlobalDataSourceStat: true connectionProperties: druid.stat.mergeSql=true;druid.stat.slowSqlMillis=500 logging: level: cn.zysheep.util: info
通用常量
MqConstant
/** * @ClassName: MqConstant */ public interface MqConstant { String NAME_SERVER_ADDRESS = "192.168.171.128:9876"; String TOPIC = "springboot-mq"; String TOPIC_TAG = "springboot-mq:001001"; String HEAP_TOPIC = "heap-up-topic"; }
TagConstant
/** * @ClassName: TagConstant * @Description: * 为消息设置的标志,用于同一主题下区分不同类型的消息。 * 来自同一业务单元的消息,可以根据不同业务目的在同一主题下设置不同标签。 * 标签能够有效地保持代码的清晰度和连贯性,并优化RocketMQ提供的查询系统。 * 消费者可以根据Tag实现对不同子主题的不同消费逻辑,实现更好的扩展性 */ public interface TagConstant { String CODE_001001 = "001001"; }
生产者
测试同步消息
@Data public class Person { private String userId; private String name; private Integer age; }
@Slf4j @SpringBootTest public class ProducerRocketMqBootApiTest { @Autowired private RocketMQTemplate rocketMQTemplate; /** * 发送的是同步消息 * rocketMQTemplate.syncSend() * rocketMQTemplate.send() * rocketMQTemplate.convertAndSend() * 这三种发送消息的方法,底层都是调用syncSend */ /** * 测试发送简单的消息 * * @throws Exception */ @Test public void testSimpleMsg() { SendResult sendResult = rocketMQTemplate.syncSend(MqConstant.TOPIC_TAG, "我是一个同步简单消息"); System.out.println(sendResult.getSendStatus()); System.out.println(sendResult.getMsgId()); System.out.println(sendResult.getMessageQueue()); } /** * 测试发送对象消息 * * @throws Exception */ @Test public void testObjectMsg() { Person person = new Person(); person.setUserId(IdUtil.simpleUUID()); person.setAge(24); person.setName("李少谦"); rocketMQTemplate.syncSend(MqConstant.TOPIC_TAG, person); } /** * 测试发送集合消息 * * @throws Exception */ @Test public void testCollectionMsg() { List
list = new ArrayList<>(); Person person = new Person(); person.setUserId(IdUtil.simpleUUID()); person.setAge(24); person.setName("李少谦"); Person person1 = new Person(); person1.setUserId(IdUtil.simpleUUID()); person1.setAge(24); person1.setName("李谦钰"); list.add(person); list.add(person1); rocketMQTemplate.syncSend(MqConstant.TOPIC_TAG, list); } } 测试异步消息
/** * 发送异步消息 * rocketMQTemplate.asyncSend() */ /** * 测试异步发送消息 * * @throws Exception */ @Test public void testAsyncSend() throws Exception { // 发送异步消息,发送完以后会有一个异步通知 rocketMQTemplate.asyncSend(MqConstant.TOPIC_TAG, "发送一个异步消息", new SendCallback() { /** * 成功的回调 * * @param sendResult */ @Override public void onSuccess(SendResult sendResult) { System.out.println("发送成功"); } /** * 失败的回调 * * @param throwable */ @Override public void onException(Throwable throwable) { System.out.println("发送失败"); } }); // 测试一下异步的效果 System.out.println("谁先执行"); // 挂起jvm 不让方法结束 System.in.read(); }
测试单向消息
/** * 测试单向消息 * * @throws Exception */ @Test public void testOnWay() throws Exception { // 发送单向消息,没有返回值和结果 rocketMQTemplate.sendOneWay(MqConstant.TOPIC_TAG, "这是一个单向消息"); }
测试延迟消息
/** * 测试延迟消息 * * @throws Exception */ @Test public void testDelay() throws Exception { // 构建消息对象 Message
message = MessageBuilder.withPayload("我是一个延迟消息").build(); // 发送一个延时消息,延迟等级为4级,也就是30s后被监听消费 SendResult sendResult = rocketMQTemplate.syncSend(MqConstant.TOPIC_TAG, message, 2000, 4); System.out.println(sendResult.getSendStatus()); } 测试顺序消费
@Data @AllArgsConstructor @NoArgsConstructor public class Order { /** * 订单号 */ private String orderId; /** * 订单名称 */ private String orderName; /** * 订单价格 */ private Double price; /** * 订单号创建时间 */ private Date createTime; /** * 订单描述 */ private String desc; /** * 订单的流程顺序 */ private Integer seq; }
/** * 测试顺序消费 * mq会根据hash的值来存放到一个队列里面去 * * 消费者监听类: @RocketMQMessageListener(topic = "${rocketmq.topic}", consumerGroup = "${rocketmq.consumer.group}", consumeMode = ConsumeMode.ORDERLY) * consumeMode 指定消费类型 * CONCURRENTLY 并发消费 * ORDERLY 顺序消费 messages orderly. one queue, one thread * @throws Exception */ @Test public void testOrderly() throws Exception { List
orders = Arrays.asList( new Order(IdUtil.simpleUUID(), "张三的下订单", null, null, null, 1), new Order(IdUtil.simpleUUID(), "张三的发短信", null, null, null, 1), new Order(IdUtil.simpleUUID(), "张三的物流", null, null, null, 1), new Order(IdUtil.simpleUUID(), "张三的签收", null, null, null, 1), new Order(IdUtil.simpleUUID(), "李四的下订单", null, null, null, 2), new Order(IdUtil.simpleUUID(), "李四的发短信", null, null, null, 2), new Order(IdUtil.simpleUUID(), "李四的物流", null, null, null, 2), new Order(IdUtil.simpleUUID(), "李四的签收", null, null, null, 2) ); // 我们控制流程为 下订单->发短信->物流->签收 hash的值为seq,也就是说 seq相同的会放在同一个队列里面,顺序消费 orders.forEach(order -> { rocketMQTemplate.syncSendOrderly(MqConstant.TOPIC_TAG, order, String.valueOf(order.getSeq())); }); } 测试消息过滤 tag过滤
/** * 发送一个带tag的消息 * * @RocketMQMessageListener(topic = "${rocketmq.topic}", consumerGroup = "${rocketmq.consumer.group}", selectorType = SelectorType.TAG, selectorExpression = "001001") *
* selectorType = SelectorType.TAG,属性指定消费的选择类型为Tag,这个类型也是selectorType属性的默认值(也可以使用sql92 需要在配置文件broker.conf中开启enbalePropertyFilter=true) * selectorExpression = "001001"属性来选择消费的Tag。默认是"*",即会消费该topic下所有的Tag的消息,支持"tag1 || tag2 || tag3" *
* * @throws Exception */ @Test public void testTagMsg() throws Exception { // 发送一个tag为java的数据 rocketMQTemplate.syncSend(MqConstant.TOPIC_TAG, "我是一个带tag的消息"); }
测试消息过滤 key过滤
/** * 发送一个带key的消息,我们使用事务消息 打断点查看消息头 * * @throws Exception */ @Test public void testKeyMsg() { // 可以在发送消息时在key中带上我们业务中的唯一标识,消费者监听消息可以做重复消费逻辑处理 String uuid = IdUtil.simpleUUID(); System.out.println(uuid); Person person = new Person(); person.setUserId(uuid); person.setAge(20); person.setName("李少谦"); // 发送一个key为spring的事务消息 Message
message = MessageBuilder.withPayload(person) .setHeader(RocketMQHeaders.KEYS, uuid) .build(); rocketMQTemplate.asyncSend(MqConstant.TOPIC, message, new SendCallback() { @Override public void onSuccess(SendResult sendResult) { System.out.println(sendResult.getSendStatus()); } @Override public void onException(Throwable throwable) { log.error("错误信息: {}",throwable.getMessage()); } }); } 测试消息消费的模式
/** * 测试消息消费的模式 * Rocketmq消息消费的模式分为两种:负载均衡模和广播模式 {@link MessageModel} * 1、负载均衡模式表示多个消费者交替消费同一个主题里面的消息,默认的消息消费模式 * 2、广播模式表示每个每个消费者都消费一遍订阅的主题的消息 * * @RocketMQMessageListener(topic = "${rocketmq.topic}", consumerGroup = "${rocketmq.consumer.group}", messageModel = MessageModel.BROADCASTING) * @throws Exception */ @Test public void testMsgModel() throws Exception { for (int i = 0; i < 10; i++) { rocketMQTemplate.syncSend(MqConstant.TOPIC_TAG, "我是消息"+i); } }
测试重试机制
/** * 测试重试机制 * 重试的时间间隔 "1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h"; * 默认重试16次 * 2.如果重试了16次(并发模式) 顺序模式下(int最大值次)都是失败的? 是一个死信消息 则会放在一个死信主题中去 主题的名称: %DLQ%rocketmq-consumer-group * -------- * 再实际生产过程中,一般重试3-5次,如果还没有消费成功,则可以把消息签收了,通知人工等处理 */ @Test public void testMsgRetry() { rocketMQTemplate.syncSend(MqConstant.TOPIC_TAG, "发送重试消息"); }
测试堆积消息
/** * 测试堆积消息 */ @Test public void testMsgHeapUp() { for (int i = 0; i < 10000; i++) { rocketMQTemplate.syncSend(MqConstant.HEAP_TOPIC, "发送堆积消息"+i); } }
@Component @RocketMQMessageListener(topic = MqConstant.HEAP_TOPIC,consumerGroup = "dj-consumer-group") public class HeapConsumerListener implements RocketMQListener
{ @Override public void onMessage(MessageExt message) { System.out.println(new String(message.getBody())); } } 消费者
去重表设计
SQL
CREATE TABLE `tb_msg` ( `msg_id` varchar(70) CHARACTER SET utf8mb4 COLLATE utf8mb4_bin DEFAULT NULL COMMENT '消息主键', `msg_topic` varchar(20) COLLATE utf8mb4_bin DEFAULT NULL COMMENT '消息主题', `msg_tag` varchar(10) COLLATE utf8mb4_bin DEFAULT NULL COMMENT '消息标签', `msg_keys` varchar(255) CHARACTER SET utf8mb4 COLLATE utf8mb4_bin DEFAULT NULL COMMENT '消息key(业务唯一值)', `msg_body` varchar(500) COLLATE utf8mb4_bin DEFAULT NULL COMMENT '消息体', `msg_type` char(1) COLLATE utf8mb4_bin DEFAULT NULL COMMENT '消息类型(1/正常消费消息,2/重试消费消息,3死信消费消息)', `msg_retry_id` varchar(255) COLLATE utf8mb4_bin DEFAULT NULL COMMENT '重试消息id', `msg_retry_topic` varchar(255) COLLATE utf8mb4_bin DEFAULT NULL COMMENT '重试消息主题', `create_time` datetime DEFAULT NULL COMMENT '创建时间', UNIQUE KEY `idx_msg_keys` (`msg_keys`) USING BTREE ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin;
Message
@Data @TableName("tb_msg") public class Message { @TableId("msg_id") private String msgId; @TableField("msg_topic") private String msgTopic; @TableField("msg_tag") private String msgTag; @TableField("msg_keys") private String msgKeys; @TableField("msg_body") private String msgBody; @TableField("msg_type") private String msgType; @TableField("msg_retry_id") private String msgRetryId; @TableField("msg_retry_topic") private String msgRetryTopic; @TableField("create_time") private Date createTime; }
MessageMapper
public interface MessageMapper extends BaseMapper
{ } JackJsonUtil
@Slf4j public class JackJsonUtil { private static ObjectMapper objectMapper = new ObjectMapper(); // 时间日期格式 private static final String STANDARD_FORMAT = "yyyy-MM-dd HH:mm:ss"; //以静态代码块初始化 static { //对象的所有字段全部列入序列化 objectMapper.setSerializationInclusion(JsonInclude.Include.ALWAYS); //取消默认转换timestamps形式 objectMapper.configure(SerializationFeature.WRITE_DATES_AS_TIMESTAMPS, false); //忽略空Bean转json的错误 objectMapper.configure(SerializationFeature.FAIL_ON_EMPTY_BEANS, false); //所有的日期格式都统一为以下的格式,即yyyy-MM-dd HH:mm:ss objectMapper.setDateFormat(new SimpleDateFormat(STANDARD_FORMAT)); //忽略 在json字符串中存在,但在java对象中不存在对应属性的情况。防止错误 objectMapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false); } /**===========================以下是从JSON中获取对象====================================*/ public static
T parseObject(String jsonString, Class object) { T t = null; try { t = objectMapper.readValue(jsonString, object); } catch (JsonProcessingException e) { log.error("JsonString转为自定义对象失败:{}", e.getMessage()); } return t; } public static T parseObject(File file, Class object) { T t = null; try { t = objectMapper.readValue(file, object); } catch (IOException e) { log.error("从文件中读取json字符串转为自定义对象失败:{}", e.getMessage()); } return t; } //将json数组字符串转为指定对象List列表或者Map集合 public static T parseJSONArray(String jsonArray, TypeReference reference) { T t = null; try { t = objectMapper.readValue(jsonArray, reference); } catch (JsonProcessingException e) { log.error("JSONArray转为List列表或者Map集合失败:{}", e.getMessage()); } return t; } /**=================================以下是将对象转为JSON=====================================*/ public static String toJSONString(Object object) { String jsonString = null; try { jsonString = objectMapper.writeValueAsString(object); } catch (JsonProcessingException e) { log.error("Object转JSONString失败:{}", e.getMessage()); } return jsonString; } public static byte[] toByteArray(Object object) { byte[] bytes = null; try { bytes = objectMapper.writeValueAsBytes(object); } catch (JsonProcessingException e) { log.error("Object转ByteArray失败:{}", e.getMessage()); } return bytes; } public static void objectToFile(File file, Object object) { try { objectMapper.writeValue(file, object); } catch (JsonProcessingException e) { log.error("Object写入文件失败:{}", e.getMessage()); } catch (IOException e) { e.printStackTrace(); } } /**=============================以下是与JsonNode相关的=======================================*/ //JsonNode和JSONObject一样,都是JSON树形模型,只不过在jackson中,存在的是JsonNode public static JsonNode parseJSONObject(String jsonString) { JsonNode jsonNode = null; try { jsonNode = objectMapper.readTree(jsonString); } catch (JsonProcessingException e) { log.error("JSONString转为JsonNode失败:{}", e.getMessage()); } return jsonNode; } public static JsonNode parseJSONObject(Object object) { JsonNode jsonNode = objectMapper.valueToTree(object); return jsonNode; } public static String toJSONString(JsonNode jsonNode) { String jsonString = null; try { jsonString = objectMapper.writeValueAsString(jsonNode); } catch (JsonProcessingException e) { log.error("JsonNode转JSONString失败:{}", e.getMessage()); } return jsonString; } //JsonNode是一个抽象类,不能实例化,创建JSON树形模型,得用JsonNode的子类ObjectNode,用法和JSONObject大同小异 public static ObjectNode newJSONObject() { return objectMapper.createObjectNode(); } //创建JSON数组对象,就像JSONArray一样用 public static ArrayNode newJSONArray() { return objectMapper.createArrayNode(); } /**===========以下是从JsonNode对象中获取key值的方法,个人觉得有点多余,直接用JsonNode自带的取值方法会好点,出于纠结症,还是补充进来了*/ public static String getString(JsonNode jsonObject, String key) { String s = jsonObject.get(key).asText(); return s; } public static Integer getInteger(JsonNode jsonObject, String key) { Integer i = jsonObject.get(key).asInt(); return i; } public static Boolean getBoolean(JsonNode jsonObject, String key) { Boolean bool = jsonObject.get(key).asBoolean(); return bool; } public static JsonNode getJSONObject(JsonNode jsonObject, String key) { JsonNode json = jsonObject.get(key); return json; } } Spring发布事件监听解耦处理
消息事件基类
public class BaseEvent extends ApplicationEvent { private static final long serialVersionUID = -114655712312251238L; public BaseEvent(Object source) { super(source); } /** * 消息tag */ private String msgTag; /** * 消息类型 0、第一次发送,1、重发 */ private String msgType; private Date date; public BaseEvent(String msgTag, String source) { super(source); this.msgTag = msgTag; } public BaseEvent(String msgTag, String source, Date date) { super(source); this.msgTag = msgTag; this.date = date; } public BaseEvent(String msgTag, String source, String msgType) { super(source); this.msgTag = msgTag; this.msgType = msgType; } public BaseEvent(String msgTag, String source, String msgType, Date date) { super(source); this.msgTag = msgTag; this.msgType = msgType; this.date = date; } public String getMsgTag() { return msgTag; } public void setMsgTag(String msgTag) { this.msgTag = msgTag; } public String getMsgType() { return msgType; } public void setMsgType(String msgType) { this.msgType = msgType; } public Date getDate() { return date; } public void setDate(Date date) { this.date = date; } }
消费者监听类
一种主题对应一个消息监听, 同一主题下不同类型的消息tag对应不同的事件
- topic指定消费的主题,consumerGroup指定消费组,一个主题可以有多个消费者组,一个消息可以被多个不同的组的消费者都消费
- 实现RocketMQListener接口,注意泛型的使用,可以为具体的类型,如果想拿到消息的其他参数可以写成MessageExt
主题为springboot-mq的所有tag消息都被这个类监听处理,不同类型的tag,发布不同的事件,对应不同的事件处理类,而不用再一个类中使用大量大段的if/else判断处理逻辑。达到业务解耦处理,使代码更加简洁。
@Slf4j @Component @RocketMQMessageListener(topic = "${rocketmq.topic}", consumerGroup = "${rocketmq.consumer.group}") public class BaseConsumerListener implements RocketMQListener
, RocketMQPushConsumerLifecycleListener { @Autowired private MessageMapper messageMapper; // @Autowired // private BitMapBloomFilter bitMapBloomFilter; @Autowired private ApplicationContext applicationContext; @Override public void onMessage(MessageExt message) { String topic = message.getTopic(); String tag = message.getTags(); byte[] body = message.getBody(); String keys = message.getKeys(); String msgId = message.getMsgId(); String realTopic = message.getProperty("REAL_TOPIC"); String originMessageId = message.getProperty("ORIGIN_MESSAGE_ID"); // 获取重试的次数 失败一次消息中的失败次数会累加一次 int reconsumeTimes = message.getReconsumeTimes(); String jsonBody = JackJsonUtil.toJSONString((new String(body))); log.info("消息监听类: msgId:{},topic:{}, tag:{}, body:{},keys:{},realTopic:{},originMessageId:{},reconsumeTimes:{}", msgId, topic, tag, jsonBody, keys, realTopic, originMessageId, reconsumeTimes); // 布隆过滤器进行去重 // if (bitMapBloomFilter.contains(keys)) { // return; // } // bitMapBloomFilter.add(keys); // 消费者幂等处理: 设计去重表,防止重复消费 messageMapper.insert(buildMessage(message)); applicationContext.publishEvent(new BaseEvent(tag, jsonBody)); } private Message buildMessage(MessageExt messageExt) { Message message = new Message(); message.setMsgId(messageExt.getMsgId()); message.setMsgTopic(messageExt.getTopic()); message.setMsgTag(messageExt.getTags()); message.setMsgBody(JackJsonUtil.toJSONString((new String(messageExt.getBody())))); // 判断是否是重试消息 String realTopic = messageExt.getProperty("REAL_TOPIC"); String originMessageId = messageExt.getProperty("ORIGIN_MESSAGE_ID"); if (StrUtil.isNotBlank(realTopic) && StrUtil.isNotBlank(originMessageId) ) { message.setMsgType("2"); message.setMsgKeys(messageExt.getKeys()+":"+originMessageId+":"+IdUtil.fastUUID()); } else { message.setMsgType("1"); message.setMsgKeys(messageExt.getKeys()); } message.setMsgRetryId(originMessageId); message.setMsgRetryTopic(realTopic); message.setCreateTime(new Date()); return message; } @Override public void prepareStart(DefaultMQPushConsumer consumer) { // 设置最大重试次数 consumer.setMaxReconsumeTimes(3); // 如下,设置其它consumer相关属性 consumer.setPullBatchSize(16); } } tag事件处理类
springboot-mq:001001消息处理类,后续其他Tag类型的消息,可以扩展其他消息处理类。
@Component @Slf4j public class HandlerFor001001 { @EventListener(condition = "#event.msgTag=='" + TagConstant.CODE_001001 +"'") public void execute(BaseEvent event) { Object source = event.getSource(); log.info("事件监听类: tag: {}, msgType: {}, date: {}, data:{}", event.getMsgTag(), event.getMsgType(), event.getDate(), event.getSource()); } }
启动类
@MapperScan("cn.zysheep.mapper") @SpringBootApplication public class RocketMqApplication { public static void main(String[] args) { SpringApplication.run(RocketMqApplication.class, args); } }
重复消费解决方案
BROADCASTING(广播)模式下,所有注册的消费者都会消费,而这些消费者通常是集群部署的一个个微服务,这样就会多台机器重复消费,当然这个是根据需要来选择。
CLUSTERING(负载均衡)模式下,如果一个topic被多个consumerGroup消费,也会重复消费。
即使是在CLUSTERING模式下,同一个consumerGroup下,一个队列只会分配给一个消费者,看起来好像是不会重复消费。但是,有个特殊情况:一个消费者新上线后,同组的所有消费者要重新负载均衡(反之一个消费者掉线后,也一样)。一个队列所对应的新的消费者要获取之前消费的offset(偏移量,也就是消息消费的点位),此时之前的消费者可能已经消费了一条消息,但是并没有把offset提交给broker,那么新的消费者可能会重新消费一次。虽然orderly模式是前一个消费者先解锁,后一个消费者加锁再消费的模式,比起concurrently要严格了,但是加锁的线程和提交offset的线程不是同一个,所以还是会出现极端情况下的重复消费。
还有在发送批量消息的时候,会被当做一条消息进行处理,那么如果批量消息中有一条业务处理成功,其他失败了,还是会被重新消费一次。
那么如果在CLUSTERING(负载均衡)模式下,并且在同一个消费者组中,不希望一条消息被重复消费,改怎么办呢?我们可以想到去重操作,找到消息唯一的标识,可以是msgId也可以是你自定义的唯一的key,这样就可以去重了
1、可以选择布隆过滤器(BloomFilter)
2、可以设计去重表使用mysql唯一性索引,每次消费处理业务逻辑之前插入自定义唯一的key到去重表中,成功即处理业务逻辑,失败则不为重复消息。
消息堆积解决方案
1、生产消息太快了
- 消息消费者步骤分析
- 消息发送者步骤分析
- 导入MQ客户端依赖
- 严格顺序消息模式下,消费者收到的所有消息均是有顺序的。
- 普通顺序消费模式下,消费者通过同一个消费队列收到的消息是有顺序的,不同消息队列收到的消息则可能是无顺序的。
- 广播消费模式下,相同Consumer Group的每个Consumer实例都接收全量的消息。
- 集群消费模式下,相同Consumer Group的每个Consumer实例平均分摊消息。
- Consumer消费的一种类型,该模式下Broker收到数据后会主动推送给消费端,该消费模式一般实时性较高。
- Consumer消费的一种类型,应用通常主动调用Consumer的拉消息方法从Broker服务器拉消息、主动权由应用控制。一旦获取了批量消息,应用就会启动消费过程。
- 消息系统所传输信息的物理载体,生产和消费数据的最小单位,每条消息必须属于一个主题。RocketMQ中每个消息拥有唯一的Message ID,且可以携带具有业务标识的Key。系统提供了通过Message ID和Key查询消息的功能。
- 为消息设置的标志,用于同一主题下区分不同类型的消息。来自同一业务单元的消息,可以根据不同业务目的在同一主题下设置不同标签。标签能够有效地保持代码的清晰度和连贯性,并优化RocketMQ提供的查询系统。消费者可以根据Tag实现对不同子主题的不同消费逻辑,实现更好的扩展性。
- 表示一类消息的集合,每个主题包含若干条消息,每条消息只能属于一个主题,是RocketMQ进行消息订阅的基本单位。
- 名称服务充当路由消息的提供者。生产者或消费者能够通过名字服务查找各主题相应的Broker IP列表。多个Namesrv实例组成集群,但相互独立,没有信息交换。
- 消息中转角色,负责存储消息、转发消息。代理服务器在RocketMQ系统中负责接收从生产者发送来的消息并存储、同时为消费者的拉取请求作准备。代理服务器也存储消息相关的元数据,包括消费者组、消费进度偏移和主题和队列消息等。
- 同一类Consumer的集合,这类Consumer通常消费同一类消息且消费逻辑一致。消费者组使得在消息消费方面,实现负载均衡和容错的目标变得非常容易。要注意的是,消费者组的消费者实例必须订阅完全相同的Topic。RocketMQ 支持两种消息模式:集群消费(Clustering)和广播消费(Broadcasting)。
- 同一类Producer的集合,这类Producer发送同一类消息且发送逻辑一致。如果发送的是事物消息且原始生产者在发送之后崩溃,则Broker服务器会联系同一生产者组的其他生产者实例以提交或回溯消费。
- 负责消费消息,一般是后台系统负责异步消费。一个消息消费者会从Broker服务器拉取消息、并将其提供给应用程序。从用户应用的角度而言提供了两种消费形式:拉取式消费、推动式消费。
- 负责生产消息,一般由业务系统负责生产消息。一个消息生产者会把业务应用系统里产生的消息发送到broker服务器。RocketMQ提供多种发送方式,同步发送、异步发送、顺序发送、单向发送。同步和异步方式均需要Broker返回确认信息,单向发送不需要。
- RocketMQ主要由 Producer、Broker、Consumer 三部分组成,其中Producer 负责生产消息,Consumer 负责消费消息,Broker 负责存储消息。Broker 在实际部署过程中对应一台服务器,每个 Broker 可以存储多个Topic的消息,每个Topic的消息也可以分片存储于不同的 Broker。Message Queue 用于存储消息的物理地址,每个Topic中的消息地址存储于多个 Message Queue 中。ConsumerGroup 由多个Consumer 实例构成。
还没有评论,来说两句吧...