【【微服务全家桶】-实用篇-3-MQ
- 1 初识MQ
- 1.1 同步通讯和异步通信
- 1.2 同步调用的问题
- 1.3 异步调用
- 1.4 MQ
- 1.5 RabbitMQ快速入门
- 1.6 MQ常见消息模型
- 1.6.1 HelloWorld案例
- 2 SpringAMQP
- 2.1 初始SpringAMQP
- 2.2 入门案例
- 2.2.1 消息发送
- 2.2.2 消息接收-***@RabbitListener***
- 2.3 工作队列
- 2.3.1 工作队列案例
- 2.4 发布、订阅
- 2.4.1 Fanout广播
- 2.4.1.1 设置配置类
- 2.4.1.2 重新启动
- 2.4.1.3 发送/接收消息
- 2.4.2 Direct订阅
- 2.4.2.1 添加订阅者
- 2.4.2.2 重新启动
- 2.4.2.3 发送/接收消息
- 2.4.3 Topic订阅
- 2.4.3.1 添加订阅者
- 2.4.3.2 重新启动
- 2.4.3.3 发送/接收消息
- 2.5 消息转换器
- 2.5.1 发送者-修改Spring的消息对象处理
- 2.5.1.1在父工程中引入依赖
- 2.5.1.2 声明MessageConverter
- 2.5.1.3 重新启动
- 2.5.2 接收者-修改消息对象处理器
- 2.5.2.1 引入依赖
- 2.5.2.2 声明MessageConverter
- 2.5.2.3 添加接收者
- 2.5.2.4 重新发送
1 初识MQ
1.1 同步通讯和异步通信
1.2 同步调用的问题
1.3 异步调用
异步通信的优点:
-
耦合度低
-
吞吐量提升
-
故障隔离
-
流量削峰
异步通信的缺点:
-
依赖于Broker的可靠性、安全性、吞吐能力
-
架构复杂了,业务没有明显的流程线,不好追踪管理
通常一般选择同步通信,对时效性要求较高
1.4 MQ
MQ(MessageQueue),,中文是消息队列,字面来看就是存放消息的队列。也就是事件驱动架构中的Broker。
1.5 RabbitMQ快速入门
加载rabbitMQ
docker load -i mq.tar
安装MQ
docker run \ -e RABBITMQ_DEFAULT_USER=itcast \ -e RABBITMQ_DEFAULT_PASS=123321 \ --name mq \ --hostname mq1 \ -p 15672:15672 \ -p 5672:5672 \ -d \ rabbitmq:3-management
-e 用户名和密码
–hostname 配置主机名,集群部署用到
-p端口映射 15672管理平台端口,ui界面,5672消息通信端口
浏览器打开管理平台端口 http://192.168.204.129:15672
RabbitMQ中的几个概念:
-
channel:操作MQ的工具
-
exchange:路由消息到队列中
-
queue:缓存消息
-
virtual host:虚拟主机,是对queue、exchanges等资源的逻辑分组
1.6 MQ常见消息模型
1.6.1 HelloWorld案例
基本消息队列的消息发送流程:
- 建立connection
- 创建channel
- 利用channeli声明队列
- 利用channell向队列发送消息
生产者代码
public class PublisherTest { @Test public void testSendMessage() throws IOException, TimeoutException { // 1.建立连接 ConnectionFactory factory = new ConnectionFactory(); // 1.1.设置连接参数,分别是:主机名、端口号、vhost、用户名、密码 factory.setHost("192.168.204.129"); factory.setPort(5672); factory.setVirtualHost("/"); factory.setUsername("itcast"); factory.setPassword("123321"); // 1.2.建立连接 Connection connection = factory.newConnection(); // 2.创建通道Channel Channel channel = connection.createChannel(); // 3.创建队列 String queueName = "simple.queue"; channel.queueDeclare(queueName, false, false, false, null); // 4.发送消息 String message = "hello, rabbitmq!"; channel.basicPublish("", queueName, null, message.getBytes()); System.out.println("发送消息成功:【" + message + "】"); // 5.关闭通道和连接 channel.close(); connection.close(); } }
基本消息队列的消息接收流程:
- 建立connection
- 创建channel
- 利用channel声明队列
- 定义consumer的消费行为handleDelivery()
- 利用channel将消费者与队列绑定
消费者代码
public class ConsumerTest { public static void main(String[] args) throws IOException, TimeoutException { // 1.建立连接 ConnectionFactory factory = new ConnectionFactory(); // 1.1.设置连接参数,分别是:主机名、端口号、vhost、用户名、密码 factory.setHost("192.168.204.129"); factory.setPort(5672); factory.setVirtualHost("/"); factory.setUsername("itcast"); factory.setPassword("123321"); // 1.2.建立连接 Connection connection = factory.newConnection(); // 2.创建通道Channel Channel channel = connection.createChannel(); // 3.创建队列 String queueName = "simple.queue"; channel.queueDeclare(queueName, false, false, false, null); // 4.订阅消息 channel.basicConsume(queueName, true, new DefaultConsumer(channel){ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { // 5.处理消息 String message = new String(body); System.out.println("接收到消息:【" + message + "】"); } }); System.out.println("等待接收消息。。。。"); } }
相应的创建连接和channel以及queue
消费者消费完后,阅后即焚,queue中为空
2 SpringAMQP
2.1 初始SpringAMQP
2.2 入门案例
2.2.1 消息发送
案例要求
1.在父工程中引入spring-amqp依赖
org.springframework.boot spring-boot-starter-amqp 2.1 在publisher服务中编写application.yml,配置MQ连接信息
spring: rabbitmq: host: 192.168.204.129 port: 5672 virtual-host: / username: itcast password: 123321
2.2 在publisher服务中新建一个测试类,编写测试方法
@RunWith(SpringRunner.class) @SpringBootTest public class SpringAmqpTest { @Autowired private RabbitTemplate rabbitTemplate; @Test public void testSimpleQueue() { String queueName = "simple.queue"; String message = "hello, spring amqp!"; rabbitTemplate.convertAndSend(queueName, message); System.out.println("发送消息成功:【" + message + "】"); } }
浏览器打开管理平台端口 http://192.168.204.129:15672
2.2.2 消息接收-@RabbitListener
1.在consumer服务中编写application.yml,添加MQ连接信息
spring: rabbitmq: host: 192.168.204.129 port: 5672 virtual-host: / username: itcast password: 123321
2.在consumer中编写一个类,编写消费逻辑
@Component public class SpringRabbitListener { @RabbitListener(queues = "simple.queue") public void receiveMessage(String message) { System.out.println("接收到消息:【" + message + "】"); } }
@Component在Spring中注册成Bean,@RabbitListener中参数queues设置监听队列。
成功接收到,且rabbit阅后即焚
2.3 工作队列
有多个消费者,称之为工作队列。提高消息处理速率,避免消息堆积
2.3.1 工作队列案例
在SpringAmqpTest中添加为生产者添加新的测试方法,Thread.sleep(20)生产者每秒产生50条消息。
@Test public void test2SimpleQueue() throws InterruptedException { String queueName = "simple.queue"; String message = "hello, spring amqp!"; for(int count=0;count<=50;count++){ rabbitTemplate.convertAndSend(queueName, message+count); Thread.sleep(20); } }
在SpringRabbitListener中添加新的消费者
@Component public class SpringRabbitListener { @RabbitListener(queues = "simple.queue") public void listenWorkqueue1(String message) throws InterruptedException { System.out.println("消费者1......接收到消息:【" + message + "】"+ LocalTime.now()); Thread.sleep(20); } @RabbitListener(queues = "simple.queue") public void listenWorkqueue2(String message) throws InterruptedException { System.err.println("消费者2......接收到消息:【" + message + "】"+ LocalTime.now()); Thread.sleep(200); } }
预计:消费者1每秒处理50条,消费者2每秒处理5条。
启动ConsumerApplication,发现
消费者1处理所有的偶数,消费者2处理所有的奇数,这是因为在AMQP中有个预分配的策略,默认平均分配。
如果要更改AMQP的预取的分配策略,需要修改消费者中application.yml文件,设置preFetch的值,控制预取消息的上限
spring: rabbitmq: host: 192.168.204.129 port: 5672 virtual-host: / username: itcast password: 123321 listener: simple: prefetch: 1
修改过后重新启动,显示预分配策略正常
2.4 发布、订阅
2.4.1 Fanout广播
Fanout Exchange会将接收到的消息路由到每一个跟其绑定的queue
2.4.1.1 设置配置类
在消费者中创建config.FanoutConfig,设置配置类
@Configuration public class FanoutConfig { //itcast.fanout @Bean public FanoutExchange fanoutExchange(){ return new FanoutExchange("itcast.fanout"); } //fanout.queue1 @Bean public Queue fanoutQueue1(){ return new Queue("fanout.queue1"); } //fanout.queue2 @Bean public Queue fanoutQueue2(){ return new Queue("fanout.queue2"); } //绑定队列1到交换机 @Bean public Binding fanoutBinding1(Queue fanoutQueue1, FanoutExchange fanoutExchange){ return BindingBuilder.bind(fanoutQueue1).to(fanoutExchange); } //绑定队列2到交换机 @Bean public Binding fanoutBinding2(Queue fanoutQueue2, FanoutExchange fanoutExchange){ return BindingBuilder.bind(fanoutQueue2).to(fanoutExchange); } }
2.4.1.2 重新启动
发现RabbitMQ已经实现
2.4.1.3 发送/接收消息
在消费者中SpringRabbitListener中添加消费者
@RabbitListener(queues = "fanout.queue1") public void listenFanoutqueue1(String message) { System.out.println("消费者接收到Fanoutqueue1消息:【" + message + "】"); } @RabbitListener(queues = "fanout.queue2") public void listenFanoutqueue2(String message) { System.out.println("消费者接收到Fanoutqueue2消息:【" + message + "】"); }
在生产者中SpringAmqpTest添加新的测试方法
@Test public void testSendFanoutExchange() throws InterruptedException { String exchangeName = "itcast.fanout"; String message = "hello, spring amqp!"; for(int count=0;count<=50;count++){ rabbitTemplate.convertAndSend(exchangeName, "", message+count); Thread.sleep(20); } }
测试结果
两个队列收到相同的消息
2.4.2 Direct订阅
2.4.2.1 添加订阅者
既可以通过设置配置类来完成消费者绑定,也可以直接通过注解绑定。
在SpringRabbitListener中添加消费者,利用注解完成
绑定队列和交换机***@QueueBinding***
队列***@Queue***
交换机***@Exchange***
@RabbitListener(bindings =@QueueBinding( value=@Queue(name="direct.queue1"), exchange=@Exchange(name="itcast.direct",type= ExchangeTypes.DIRECT), key={"red","blue"} )) public void listenDirectQueue1(String message){ System.out.println("消费者接收到DirectQueue1消息:【" + message + "】"); } @RabbitListener(bindings =@QueueBinding( value=@Queue(name="direct.queue2"), exchange=@Exchange(name="itcast.direct",type= ExchangeTypes.DIRECT), key={"red","yellow"} )) public void listenDirectQueue2(String message){ System.out.println("消费者接收到DirectQueue2消息:【" + message + "】"); }
2.4.2.2 重新启动
2.4.2.3 发送/接收消息
在生产者中SpringAmqpTest添加新的测试方法
@Test//blue是队列1,yellow是队列2,red是队列1和队列2 public void testSendDirectExchange() throws InterruptedException { String exchangeName = "itcast.direct"; String message = "hello, spring amqp!"; String routingKey=""; for(int count=0;count<=50;count++){ if(count%2==0){ routingKey="blue"; } else{ routingKey="yellow"; } if(count%10==0){ routingKey="red"; } rabbitTemplate.convertAndSend(exchangeName, routingKey, message+count); Thread.sleep(20); } }
2.4.3 Topic订阅
2.4.3.1 添加订阅者
既可以通过设置配置类来完成消费者绑定,也可以直接通过注解绑定。
在SpringRabbitListener中添加消费者,利用注解完成
@RabbitListener(bindings =@QueueBinding( value=@Queue(name="topic.queue1"), exchange=@Exchange(name="itcast.topic",type= ExchangeTypes.TOPIC), key={"china.#"} )) public void listenTopicQueue1(String message){ System.err.println("消费者接收到TopicQueue1消息:【" + message + "】"); } @RabbitListener(bindings =@QueueBinding( value=@Queue(name="topic.queue2"), exchange=@Exchange(name="itcast.topic",type= ExchangeTypes.TOPIC), key={"#.news"} )) public void listenTopicQueue2(String message){ System.err.println("消费者接收到TopicQueue2消息:【" + message + "】"); }
2.4.3.2 重新启动
2.4.3.3 发送/接收消息
在生产者中SpringAmqpTest添加新的测试方法
@Test public void testSendTopicExchange() throws InterruptedException { String exchangeName = "itcast.topic"; String message = "hello, spring amqp!"; String routingKey=""; for(int count=0;count<=50;count++){ if(count%2==0){ routingKey="china."+count; } else{ routingKey=count+".news"; } if(count%10==0){ routingKey="china.news"; } rabbitTemplate.convertAndSend(exchangeName, routingKey, message+count); Thread.sleep(20); } }
2.5 消息转换器
在FanoutConfig中添加新的Bean对象
@Bean public Queue objectQueue1(){ return new Queue("object.queue"); }
成功加入mq,为其添加测试方法
@Test public void testSendObjectQueue() throws InterruptedException { String queueName = "object.queue"; HashMap
msg = new HashMap<>(); msg.put("name", "张三"); msg.put("age", 18); rabbitTemplate.convertAndSend(queueName, msg); } 但是并不是能接受所有的object,只是序列化后的数据
2.5.1 发送者-修改Spring的消息对象处理
2.5.1.1在父工程中引入依赖
com.fasterxml.jackson.core jackson-databind 2.5.1.2 声明MessageConverter
在publisher的启动类中声明bean对象
@SpringBootApplication public class PublisherApplication { public static void main(String[] args) { SpringApplication.run(PublisherApplication.class); } @Bean public MessageConverter messageConverter(){ return new Jackson2JsonMessageConverter(); } }
2.5.1.3 重新启动
2.5.2 接收者-修改消息对象处理器
2.5.2.1 引入依赖
父工程引入过了,跳过
2.5.2.2 声明MessageConverter
在consumer的启动类中声明bean对象
@SpringBootApplication public class ConsumerApplication { public static void main(String[] args) { SpringApplication.run(ConsumerApplication.class, args); } @Bean public MessageConverter messageConverter(){ return new Jackson2JsonMessageConverter(); } }
2.5.2.3 添加接收者
直接绑定队列
@RabbitListener(queues = "object.queue") public void listenObjectQueue1(Map
msg){ System.out.println("消费者接收到ObjectQueue1消息:【" + msg + "】"); } 2.5.2.4 重新发送
成功接收到JSON格式对象
-
-
-
还没有评论,来说两句吧...