目录
一、简单模型
1、首先控制台创建一个队列
2、父工程导入依赖
3、生产者配置文件
4、写测试类
5、消费者配置文件
6、消费者接收消息
二、WorkQueues模型
1、在控制台创建一个新的队列
2、生产者生产消息
3、创建两个消费者接收消息
4、能者多劳充分利用每一个消费者的能力
三、交换机
四、Fanout交换机
1、 声明队列
2、 创建交换机
编辑 3、 绑定交换机
4、示例
五、Diect交换机
1、 声明队列
2、创建交换机
3、绑定交换机
4、示例
六、Topic交换机
1、创建队列
2、创建交换机
3、绑定队列
4、示例
7、、声明队列交换机
1、SpringAMQP提供的类声明
2、基于注解声明
七、消息转换器
配置JSON转换器
一、简单模型
创建一个父工程和两个子工程consumer和publisher
1、首先控制台创建一个队列
命名为simple.queue
2、父工程导入依赖
org.projectlombok lombokorg.springframework.boot spring-boot-starter-amqporg.springframework.boot spring-boot-starter-test
3、生产者配置文件
spring: rabbitmq: host: 192.168.200.129 # 你的虚拟机IP port: 5672 # 端口 virtual-host: / # 虚拟主机 username: admin # 用户名 password: 123456 # 密码
4、写测试类
@SpringBootTest public class SpringAmqpTest { @Autowired private RabbitTemplate rabbitTemplate; @Test public void testSimpleQueue() { // 队列名称 String queueName = "simple.queue"; // 消息 String message = "hello, rabbitmq!"; // 发送消息 rabbitTemplate.convertAndSend(queueName, message); } }
查看消息
5、消费者配置文件
spring: rabbitmq: host: 192.168.200.129 # 你的虚拟机IP port: 5672 # 端口 virtual-host: / # 虚拟主机 username: admin # 用户名 password: 123456 # 密码
6、消费者接收消息
@Component public class SpringRabbitListener { // 利用RabbitListener来声明要监听的队列信息 // 将来一旦监听的队列中有了消息,就会推送给当前服务,调用当前方法,处理消息。 // 可以看到方法体中接收的就是消息体的内容 @RabbitListener(queues = "simple.queue") public void listenSimpleQueueMessage(String msg) throws InterruptedException { System.out.println("spring 消费者接收到消息:【" + msg + "】"); } }
二、WorkQueues模型
当消息处理比较耗时的时候,可能生产消息的速度会远远大于消息的消费速度。长此以往,消息就会堆积越来越多,无法及时处理。 此时就可以使用work 模型,多个消费者共同处理消息处理,消息处理的速度就能大大提高了
1、在控制台创建一个新的队列
命名为work.queue
2、生产者生产消息
/** * workQueue * 向队列中不停发送消息,模拟消息堆积。 */ @Test public void testWorkQueue() throws InterruptedException { // 队列名称 String queueName = "work.queue"; // 消息 String message = "hello, message_"; for (int i = 0; i < 50; i++) { // 发送消息,每20毫秒发送一次,相当于每秒发送50条消息 rabbitTemplate.convertAndSend(queueName, message + i); Thread.sleep(20); } }
3、创建两个消费者接收消息
@RabbitListener(queues = "work.queue") public void listenWorkQueue1(String msg) throws InterruptedException { System.out.println("消费者1接收到消息:【" + msg + "】"); } @RabbitListener(queues = "work.queue") public void listenWorkQueue2(String msg) throws InterruptedException { System.err.println("消费者2........接收到消息:【" + msg + "】"); }
结果
如果消费者睡眠时间不同
@RabbitListener(queues = "work.queue") public void listenWorkQueue1(String msg) throws InterruptedException { System.out.println("消费者1接收到消息:【" + msg + "】"); Thread.sleep(20); } @RabbitListener(queues = "work.queue") public void listenWorkQueue2(String msg) throws InterruptedException { System.err.println("消费者2........接收到消息:【" + msg + "】"); Thread.sleep(200); }
- 消费者1 sleep了20毫秒,相当于每秒钟处理50个消息
- 消费者2 sleep了200毫秒,相当于每秒处理5个消息
消息是平均分配给每个消费者,并没有考虑到消费者的处理能力。导致1个消费者空闲,另一个消费者忙的不可开交。没有充分利用每一个消费者的能力,最终消息处理的耗时远远超过了1秒。这样显然是有问题的。
4、能者多劳充分利用每一个消费者的能力
在spring中有一个简单的配置,可以解决这个问题。我们修改consumer服务的application.yml文件,添加配置:
spring: rabbitmq: listener: simple: prefetch: 1 # 每次只能获取一条消息,处理完成才能获取下一个消息
可以发现,由于消费者1处理速度较快,所以处理了更多的消息;消费者2处理速度较慢。而最终总的执行耗时也大大提升。 正所谓能者多劳,这样充分利用了每一个消费者的处理能力,可以有效避免消息积压问题。
三、交换机
在订阅模型中,多了一个exchange角色,而且过程略有变化:
- Publisher:生产者,不再发送消息到队列中,而是发给交换机
- Exchange:交换机,一方面,接收生产者发送的消息。另一方面,知道如何处理消息,例如递交给某个特别队列、递交给所有队列、或是将消息丢弃。到底如何操作,取决于Exchange的类型。
- Queue:消息队列也与以前一样,接收消息、缓存消息。不过队列一定要与交换机绑定。
- Consumer:消费者,与以前一样,订阅队列,没有变化
Exchange(交换机)只负责转发消息,不具备存储消息的能力,因此如果没有任何队列与Exchange绑定,或者没有符合路由规则的队列,那么消息会丢失!
交换机的类型有四种:
- Fanout:广播,将消息交给所有绑定到交换机的队列。我们最早在控制台使用的正是Fanout交换机
- Direct:订阅,基于RoutingKey(路由key)发送给订阅了消息的队列
- Topic:通配符订阅,与Direct类似,只不过RoutingKey可以使用通配符
- Headers:头匹配,基于MQ的消息头匹配,用的较少。
四、Fanout交换机
Fanout,英文翻译是扇出,我觉得在MQ中叫广播更合适。 在广播模式下,消息发送流程是这样的:
- 1) 可以有多个队列
- 2) 每个队列都要绑定到Exchange(交换机)
- 3) 生产者发送的消息,只能发送到交换机
- 4) 交换机把消息发送给绑定过的所有队列
- 5) 订阅队列的消费者都能拿到消息
1、 声明队列
创建两个队列fanout.queue1和fanout.queue2,绑定到交换机hmall.fanout
2、 创建交换机
3、 绑定交换机
4、示例
生产者
/** * Fanout交换机 */ @Test public void testFanoutExchange() { // 交换机名称 String exchangeName = "mq.fanout"; // 消息 String message = "hello, everyone!"; rabbitTemplate.convertAndSend(exchangeName, "", message); }
消费者
@RabbitListener(queues = "fanout.queue1") public void listenFanoutQueue1(String msg) { System.out.println("消费者1接收到Fanout消息:【" + msg + "】"); } @RabbitListener(queues = "fanout.queue2") public void listenFanoutQueue2(String msg) { System.out.println("消费者2接收到Fanout消息:【" + msg + "】"); }
五、Diect交换机
Direct Exchange 会将接收到的消息根据规则路由到指定的Queue,因此称为定向路由。
在Direct模型下:
- 队列与交换机的绑定,不能是任意绑定了,而是要指定一个RoutingKey(路由key)
- 消息的发送方在 向 Exchange发送消息时,也必须指定消息的 RoutingKey。
- Exchange不再把消息交给每一个绑定的队列,而是根据消息的Routing Key进行判断,只有队列的Routingkey与消息的 Routing key完全一致,才会接收到消息
1、 声明队列
首先在控制台声明两个队列direct.queue1和direct.queue2
2、创建交换机
3、绑定交换机
4、示例
生产者:RoutingKey为red
/** * Direct交换机 */ @Test public void testSendDirectExchange() { // 交换机名称 String exchangeName = "mq.direct"; // 消息 String message = "hello direct red"; // 发送消息 rabbitTemplate.convertAndSend(exchangeName, "red", message); }
消费者
@RabbitListener(queues = "direct.queue1") public void listenDirectQueue1(String msg) { System.out.println("消费者1接收到direct.queue1的消息:【" + msg + "】"); } @RabbitListener(queues = "direct.queue2") public void listenDirectQueue2(String msg) { System.out.println("消费者2接收到direct.queue2的消息:【" + msg + "】"); }
RoutingKey为blue
/** * Direct交换机 */ @Test public void testSendDirectExchange() { // 交换机名称 String exchangeName = "mq.direct"; // 消息 String message = "hello direct blue"; // 发送消息 rabbitTemplate.convertAndSend(exchangeName, "blue", message); }
六、Topic交换机
Topic类型的Exchange与Direct相比,都是可以根据RoutingKey把消息路由到不同的队列。 只不过Topic类型Exchange可以让队列在绑定BindingKey 的时候使用通配符!
BindingKey 一般都是有一个或多个单词组成,多个单词之间以.分割,例如: item.insert
通配符规则:
- #:匹配一个或多个词
- *:匹配不多不少恰好1个词
举例:
- item.#:能够匹配item.spu.insert 或者 item.spu
- item.*:只能匹配item.spu
示例
publisher发送的消息使用的RoutingKey共有四种:
- china.news 代表有中国的新闻消息;
- china.weather 代表中国的天气消息;
- japan.news 则代表日本新闻
- japan.weather 代表日本的天气消息;
解释:
- topic.queue1:绑定的是china.# ,凡是以 china.开头的routing key 都会被匹配到,包括:
- china.news
- china.weather
- topic.queue2:绑定的是#.news ,凡是以 .news结尾的 routing key 都会被匹配。包括:
- china.news
- japan.news
1、创建队列
2、创建交换机
3、绑定队列
4、示例
生产者:RoutingKey为china.news
/** * topicExchange */ @Test public void testSendTopicExchange() { // 交换机名称 String exchangeName = "mq.topic"; // 消息 String message = "hello topis china.news"; // 发送消息 rabbitTemplate.convertAndSend(exchangeName, "china.news", message); }
消费者
@RabbitListener(queues = "topic.queue1") public void listenTopicQueue1(String msg){ System.out.println("消费者1接收到topic.queue1的消息:【" + msg + "】"); } @RabbitListener(queues = "topic.queue2") public void listenTopicQueue2(String msg){ System.out.println("消费者2接收到topic.queue2的消息:【" + msg + "】"); }
RoutingKey为china.people
/** * topicExchange */ @Test public void testSendTopicExchange() { // 交换机名称 String exchangeName = "mq.topic"; // 消息 String message = "hello topis china.people"; // 发送消息 rabbitTemplate.convertAndSend(exchangeName, "china.people", message); }
只有消费者1收到消息
7、、声明队列交换机
SpringAMQP提供了几个类,用来声明队列、交换机及其绑定关系:
1、Queue:用于声明队列,可以用工厂类QueueBuilder构建
2、Exchange:用于声明交换机,可以用工厂类ExchangeBuilder构建
3、Binding:用于声明队列和交换机的绑定关系,可以用工厂类BindingBuilder构建
1、SpringAMQP提供的类声明
示例:创建Fanout交换机队列
@Configuration public class FanoutConfig { /** * 声明交换机 * @return Fanout类型交换机 */ @Bean public FanoutExchange fanoutExchange2(){ return new FanoutExchange("mq.fanout2"); } /** * 第1个队列 */ @Bean public Queue fanoutQueue3(){ return new Queue("fanout.queue3"); } /** * 绑定队列和交换机1 */ @Bean public Binding bindingQueue1(Queue fanoutQueue3, FanoutExchange fanoutExchange3){ return BindingBuilder.bind(fanoutQueue3).to(fanoutExchange3); } /** * 第2个队列 */ @Bean public Queue fanoutQueue4(){ return new Queue("fanout.queue4"); } /** * 绑定队列和交换机2 */ @Bean public Binding bindingQueue2(){ return BindingBuilder.bind(fanoutQueue4()).to(fanoutExchange2()); } }
direct示例
@Configuration public class DirectConfig { /** * 声明交换机 * @return Direct类型交换机 */ @Bean public DirectExchange directExchange(){ return ExchangeBuilder.directExchange("mq.direct2").build(); } /** * 第1个队列 */ @Bean public Queue directQueue1(){ return new Queue("direct.queue1"); } /** * 绑定队列和交换机 */ @Bean public Binding bindingQueue1WithRed(Queue directQueue1, DirectExchange directExchange){ return BindingBuilder.bind(directQueue1).to(directExchange).with("red"); } /** * 绑定队列和交换机 */ @Bean public Binding bindingQueue1WithBlue(Queue directQueue1, DirectExchange directExchange){ return BindingBuilder.bind(directQueue1).to(directExchange).with("blue"); } /** * 第2个队列 */ @Bean public Queue directQueue2(){ return new Queue("direct.queue2"); } /** * 绑定队列和交换机 */ @Bean public Binding bindingQueue2WithRed(Queue directQueue2, DirectExchange directExchange){ return BindingBuilder.bind(directQueue2).to(directExchange).with("red"); } /** * 绑定队列和交换机 */ @Bean public Binding bindingQueue2WithYellow(Queue directQueue2, DirectExchange directExchange){ return BindingBuilder.bind(directQueue2).to(directExchange).with("yellow"); } }
direct模式由于要绑定多个KEY,会非常麻烦,每一个Key都要编写一个binding
2、基于注解声明
/** * 注解声明交换机 * @param msg */ @RabbitListener(bindings = @QueueBinding( value = @Queue(name = "direct.queue3"),//队列名称 exchange = @Exchange(name = "mq.direct", //交换机名称 type = ExchangeTypes.DIRECT),//交换机类型 key = {"red", "blue"}//RoutingKey )) public void listenDirectQueue3(String msg){ System.out.println("消费者3接收到direct.queue3的消息:【" + msg + "】"); } @RabbitListener(bindings = @QueueBinding( value = @Queue(name = "direct.queue4"), exchange = @Exchange(name = "mq.direct", type = ExchangeTypes.DIRECT), key = {"red", "yellow"} )) public void listenDirectQueue4(String msg){ System.out.println("消费者4接收到direct.queue4的消息:【" + msg + "】"); }
七、消息转换器
@Test public void testSendMap() throws InterruptedException { // 准备消息 Map
msg = new HashMap<>(); msg.put("name", "张三"); msg.put("age", 21); // 发送消息 rabbitTemplate.convertAndSend("object.queue", msg); } 当发送的数据为Objiet类型时会出现乱码现象,而在数据传输时,它会把你发送的消息序列化为字节发送给MQ,接收消息的时候,还会把字节反序列化为Java对象。 只不过,默认情况下Spring采用的序列化方式是JDK序列化
配置JSON转换器
在publisher和consumer两个服务中都引入依赖:
com.fasterxml.jackson.dataformat jackson-dataformat-xml2.9.10 注意:如果项目中引入了spring-boot-starter-web依赖,则无需再次引入Jackson依赖。
配置消息转换器,在publisher和consumer两个服务的启动类中添加一个Bean即可:
@Bean public MessageConverter messageConverter(){ // 1.定义消息转换器 Jackson2JsonMessageConverter jackson2JsonMessageConverter = new Jackson2JsonMessageConverter(); // 2.配置自动创建消息id,用于识别不同消息,也可以在业务中基于ID判断是否是重复消息 jackson2JsonMessageConverter.setCreateMessageIds(true); return jackson2JsonMessageConverter; }
结果
消费者接收Object
我们在consumer服务中定义一个新的消费者,publisher是用Map发送,那么消费者也一定要用Map接收,格式如下:
@RabbitListener(queues = "object.queue") public void listenSimpleQueueMessage(Map
msg) throws InterruptedException { System.out.println("消费者接收到object.queue消息:【" + msg + "】"); }
- topic.queue1:绑定的是china.# ,凡是以 china.开头的routing key 都会被匹配到,包括:
还没有评论,来说两句吧...