【微服务全家桶】-实用篇-3-MQ

【微服务全家桶】-实用篇-3-MQ

码农世界 2024-05-22 前端 54 次浏览 0个评论

【【微服务全家桶】-实用篇-3-MQ

  • 1 初识MQ
    • 1.1 同步通讯和异步通信
    • 1.2 同步调用的问题
    • 1.3 异步调用
    • 1.4 MQ
    • 1.5 RabbitMQ快速入门
    • 1.6 MQ常见消息模型
      • 1.6.1 HelloWorld案例
      • 2 SpringAMQP
        • 2.1 初始SpringAMQP
        • 2.2 入门案例
          • 2.2.1 消息发送
          • 2.2.2 消息接收-***@RabbitListener***
          • 2.3 工作队列
            • 2.3.1 工作队列案例
            • 2.4 发布、订阅
              • 2.4.1 Fanout广播
                • 2.4.1.1 设置配置类
                • 2.4.1.2 重新启动
                • 2.4.1.3 发送/接收消息
                • 2.4.2 Direct订阅
                  • 2.4.2.1 添加订阅者
                  • 2.4.2.2 重新启动
                  • 2.4.2.3 发送/接收消息
                  • 2.4.3 Topic订阅
                    • 2.4.3.1 添加订阅者
                    • 2.4.3.2 重新启动
                    • 2.4.3.3 发送/接收消息
                    • 2.5 消息转换器
                      • 2.5.1 发送者-修改Spring的消息对象处理
                        • 2.5.1.1在父工程中引入依赖
                        • 2.5.1.2 声明MessageConverter
                          • 2.5.1.3 重新启动
                          • 2.5.2 接收者-修改消息对象处理器
                            • 2.5.2.1 引入依赖
                            • 2.5.2.2 声明MessageConverter
                            • 2.5.2.3 添加接收者
                            • 2.5.2.4 重新发送

                              1 初识MQ

                              1.1 同步通讯和异步通信

                              1.2 同步调用的问题

                              1.3 异步调用

                              异步通信的优点:

                              • 耦合度低

                              • 吞吐量提升

                              • 故障隔离

                              • 流量削峰

                                异步通信的缺点:

                                • 依赖于Broker的可靠性、安全性、吞吐能力

                                • 架构复杂了,业务没有明显的流程线,不好追踪管理

                                  通常一般选择同步通信,对时效性要求较高

                                  1.4 MQ

                                  MQ(MessageQueue),,中文是消息队列,字面来看就是存放消息的队列。也就是事件驱动架构中的Broker。

                                  1.5 RabbitMQ快速入门

                                  加载rabbitMQ

                                  docker load -i mq.tar
                                  

                                  安装MQ

                                  docker run \
                                   -e RABBITMQ_DEFAULT_USER=itcast \
                                   -e RABBITMQ_DEFAULT_PASS=123321 \
                                   --name mq \
                                   --hostname mq1 \
                                   -p 15672:15672 \
                                   -p 5672:5672 \
                                   -d \
                                   rabbitmq:3-management
                                  

                                  -e 用户名和密码

                                  –hostname 配置主机名,集群部署用到

                                  -p端口映射 15672管理平台端口,ui界面,5672消息通信端口

                                  浏览器打开管理平台端口 http://192.168.204.129:15672

                                  RabbitMQ中的几个概念:

                                  • channel:操作MQ的工具

                                  • exchange:路由消息到队列中

                                  • queue:缓存消息

                                  • virtual host:虚拟主机,是对queue、exchanges等资源的逻辑分组

                                    1.6 MQ常见消息模型

                                    1.6.1 HelloWorld案例

                                    基本消息队列的消息发送流程:

                                    1. 建立connection
                                    2. 创建channel
                                    3. 利用channeli声明队列
                                    4. 利用channell向队列发送消息

                                    生产者代码

                                    public class PublisherTest {
                                        @Test
                                        public void testSendMessage() throws IOException, TimeoutException {
                                            // 1.建立连接
                                            ConnectionFactory factory = new ConnectionFactory();
                                            // 1.1.设置连接参数,分别是:主机名、端口号、vhost、用户名、密码
                                            factory.setHost("192.168.204.129");
                                            factory.setPort(5672);
                                            factory.setVirtualHost("/");
                                            factory.setUsername("itcast");
                                            factory.setPassword("123321");
                                            // 1.2.建立连接
                                            Connection connection = factory.newConnection();
                                            // 2.创建通道Channel
                                            Channel channel = connection.createChannel();
                                            // 3.创建队列
                                            String queueName = "simple.queue";
                                            channel.queueDeclare(queueName, false, false, false, null);
                                            // 4.发送消息
                                            String message = "hello, rabbitmq!";
                                            channel.basicPublish("", queueName, null, message.getBytes());
                                            System.out.println("发送消息成功:【" + message + "】");
                                            // 5.关闭通道和连接
                                            channel.close();
                                            connection.close();
                                        }
                                    }
                                    

                                    基本消息队列的消息接收流程:

                                    1. 建立connection
                                    2. 创建channel
                                    3. 利用channel声明队列
                                    4. 定义consumer的消费行为handleDelivery()
                                    5. 利用channel将消费者与队列绑定

                                    消费者代码

                                    public class ConsumerTest {
                                        public static void main(String[] args) throws IOException, TimeoutException {
                                            // 1.建立连接
                                            ConnectionFactory factory = new ConnectionFactory();
                                            // 1.1.设置连接参数,分别是:主机名、端口号、vhost、用户名、密码
                                            factory.setHost("192.168.204.129");
                                            factory.setPort(5672);
                                            factory.setVirtualHost("/");
                                            factory.setUsername("itcast");
                                            factory.setPassword("123321");
                                            // 1.2.建立连接
                                            Connection connection = factory.newConnection();
                                            // 2.创建通道Channel
                                            Channel channel = connection.createChannel();
                                            // 3.创建队列
                                            String queueName = "simple.queue";
                                            channel.queueDeclare(queueName, false, false, false, null);
                                            // 4.订阅消息
                                            channel.basicConsume(queueName, true, new DefaultConsumer(channel){
                                                @Override
                                                public void handleDelivery(String consumerTag, Envelope envelope,
                                                                           AMQP.BasicProperties properties, byte[] body) throws IOException {
                                                    // 5.处理消息
                                                    String message = new String(body);
                                                    System.out.println("接收到消息:【" + message + "】");
                                                }
                                            });
                                            System.out.println("等待接收消息。。。。");
                                        }
                                    }
                                    

                                    相应的创建连接和channel以及queue

                                    消费者消费完后,阅后即焚,queue中为空

                                    2 SpringAMQP

                                    2.1 初始SpringAMQP

                                    2.2 入门案例

                                    2.2.1 消息发送

                                    案例要求

                                    1.在父工程中引入spring-amqp依赖

                                            
                                            
                                                org.springframework.boot
                                                spring-boot-starter-amqp
                                            
                                    

                                    2.1 在publisher服务中编写application.yml,配置MQ连接信息

                                    spring:
                                      rabbitmq:
                                        host: 192.168.204.129
                                        port: 5672
                                        virtual-host: /
                                        username: itcast
                                        password: 123321
                                    

                                    2.2 在publisher服务中新建一个测试类,编写测试方法

                                    @RunWith(SpringRunner.class)
                                    @SpringBootTest
                                    public class SpringAmqpTest {
                                        @Autowired
                                        private RabbitTemplate rabbitTemplate;
                                        @Test
                                        public void testSimpleQueue() {
                                            String queueName = "simple.queue";
                                            String message = "hello, spring amqp!";
                                            rabbitTemplate.convertAndSend(queueName, message);
                                            System.out.println("发送消息成功:【" + message + "】");
                                        }
                                    }
                                    

                                    浏览器打开管理平台端口 http://192.168.204.129:15672

                                    2.2.2 消息接收-@RabbitListener

                                    1.在consumer服务中编写application.yml,添加MQ连接信息

                                    spring:
                                      rabbitmq:
                                        host: 192.168.204.129
                                        port: 5672
                                        virtual-host: /
                                        username: itcast
                                        password: 123321
                                    

                                    2.在consumer中编写一个类,编写消费逻辑

                                    @Component
                                    public class SpringRabbitListener {
                                        @RabbitListener(queues = "simple.queue")
                                        public void receiveMessage(String message) {
                                            System.out.println("接收到消息:【" + message + "】");
                                        }
                                    }
                                    

                                    @Component在Spring中注册成Bean,@RabbitListener中参数queues设置监听队列。

                                    成功接收到,且rabbit阅后即焚

                                    2.3 工作队列

                                    有多个消费者,称之为工作队列。提高消息处理速率,避免消息堆积

                                    2.3.1 工作队列案例

                                    在SpringAmqpTest中添加为生产者添加新的测试方法,Thread.sleep(20)生产者每秒产生50条消息。

                                    @Test
                                        public void test2SimpleQueue() throws InterruptedException {
                                            String queueName = "simple.queue";
                                            String message = "hello, spring amqp!";
                                            for(int count=0;count<=50;count++){
                                                rabbitTemplate.convertAndSend(queueName, message+count);
                                                Thread.sleep(20);
                                            }
                                        }
                                    

                                    在SpringRabbitListener中添加新的消费者

                                    @Component
                                    public class SpringRabbitListener {
                                        @RabbitListener(queues = "simple.queue")
                                        public void listenWorkqueue1(String message) throws InterruptedException {
                                            System.out.println("消费者1......接收到消息:【" + message + "】"+ LocalTime.now());
                                            Thread.sleep(20);
                                        }
                                        @RabbitListener(queues = "simple.queue")
                                        public void listenWorkqueue2(String message) throws InterruptedException {
                                            System.err.println("消费者2......接收到消息:【" + message + "】"+ LocalTime.now());
                                            Thread.sleep(200);
                                        }
                                    }
                                    

                                    预计:消费者1每秒处理50条,消费者2每秒处理5条。

                                    启动ConsumerApplication,发现

                                    消费者1处理所有的偶数,消费者2处理所有的奇数,这是因为在AMQP中有个预分配的策略,默认平均分配。

                                    如果要更改AMQP的预取的分配策略,需要修改消费者中application.yml文件,设置preFetch的值,控制预取消息的上限

                                    spring:
                                      rabbitmq:
                                        host: 192.168.204.129
                                        port: 5672
                                        virtual-host: /
                                        username: itcast
                                        password: 123321
                                        listener:
                                          simple:
                                            prefetch: 1
                                    

                                    修改过后重新启动,显示预分配策略正常

                                    2.4 发布、订阅

                                    2.4.1 Fanout广播

                                    Fanout Exchange会将接收到的消息路由到每一个跟其绑定的queue

                                    2.4.1.1 设置配置类

                                    在消费者中创建config.FanoutConfig,设置配置类

                                    @Configuration
                                    public class FanoutConfig {
                                        //itcast.fanout
                                        @Bean
                                        public FanoutExchange fanoutExchange(){
                                            return new FanoutExchange("itcast.fanout");
                                        }
                                        //fanout.queue1
                                        @Bean
                                        public Queue fanoutQueue1(){
                                            return new Queue("fanout.queue1");
                                        }
                                        //fanout.queue2
                                        @Bean
                                        public Queue fanoutQueue2(){
                                            return new Queue("fanout.queue2");
                                        }
                                        //绑定队列1到交换机
                                        @Bean
                                        public Binding fanoutBinding1(Queue fanoutQueue1, FanoutExchange fanoutExchange){
                                            return BindingBuilder.bind(fanoutQueue1).to(fanoutExchange);
                                        }
                                        //绑定队列2到交换机
                                        @Bean
                                        public Binding fanoutBinding2(Queue fanoutQueue2, FanoutExchange fanoutExchange){
                                            return BindingBuilder.bind(fanoutQueue2).to(fanoutExchange);
                                        }
                                    }
                                    
                                    2.4.1.2 重新启动

                                    发现RabbitMQ已经实现

                                    2.4.1.3 发送/接收消息

                                    在消费者中SpringRabbitListener中添加消费者

                                        @RabbitListener(queues = "fanout.queue1")
                                        public void listenFanoutqueue1(String message) {
                                            System.out.println("消费者接收到Fanoutqueue1消息:【" + message + "】");
                                        }
                                        @RabbitListener(queues = "fanout.queue2")
                                        public void listenFanoutqueue2(String message) {
                                            System.out.println("消费者接收到Fanoutqueue2消息:【" + message + "】");
                                        }
                                    

                                    在生产者中SpringAmqpTest添加新的测试方法

                                        @Test
                                        public void testSendFanoutExchange() throws InterruptedException {
                                               String exchangeName = "itcast.fanout";
                                                String message = "hello, spring amqp!";
                                                for(int count=0;count<=50;count++){
                                                    rabbitTemplate.convertAndSend(exchangeName, "", message+count);
                                                    Thread.sleep(20);
                                                }
                                        }
                                    

                                    测试结果

                                    两个队列收到相同的消息

                                    2.4.2 Direct订阅

                                    2.4.2.1 添加订阅者

                                    既可以通过设置配置类来完成消费者绑定,也可以直接通过注解绑定。

                                    在SpringRabbitListener中添加消费者,利用注解完成

                                    绑定队列和交换机***@QueueBinding***

                                    队列***@Queue***

                                    交换机***@Exchange***

                                        @RabbitListener(bindings =@QueueBinding(
                                                value=@Queue(name="direct.queue1"),
                                                exchange=@Exchange(name="itcast.direct",type= ExchangeTypes.DIRECT),
                                                key={"red","blue"}
                                        ))
                                        public void listenDirectQueue1(String message){
                                            System.out.println("消费者接收到DirectQueue1消息:【" + message + "】");
                                        }
                                        @RabbitListener(bindings =@QueueBinding(
                                                value=@Queue(name="direct.queue2"),
                                                exchange=@Exchange(name="itcast.direct",type= ExchangeTypes.DIRECT),
                                                key={"red","yellow"}
                                        ))
                                        public void listenDirectQueue2(String message){
                                            System.out.println("消费者接收到DirectQueue2消息:【" + message + "】");
                                        }
                                    
                                    2.4.2.2 重新启动
                                    2.4.2.3 发送/接收消息

                                    在生产者中SpringAmqpTest添加新的测试方法

                                        @Test//blue是队列1,yellow是队列2,red是队列1和队列2
                                        public void testSendDirectExchange() throws InterruptedException {
                                            String exchangeName = "itcast.direct";
                                            String message = "hello, spring amqp!";
                                            String routingKey="";
                                            for(int count=0;count<=50;count++){
                                                if(count%2==0){
                                                    routingKey="blue";
                                                }
                                                else{
                                                    routingKey="yellow";
                                                }
                                                if(count%10==0){
                                                    routingKey="red";
                                                }
                                                rabbitTemplate.convertAndSend(exchangeName, routingKey, message+count);
                                                Thread.sleep(20);
                                            }
                                        }
                                    

                                    2.4.3 Topic订阅

                                    2.4.3.1 添加订阅者

                                    既可以通过设置配置类来完成消费者绑定,也可以直接通过注解绑定。

                                    在SpringRabbitListener中添加消费者,利用注解完成

                                        @RabbitListener(bindings =@QueueBinding(
                                                value=@Queue(name="topic.queue1"),
                                                exchange=@Exchange(name="itcast.topic",type= ExchangeTypes.TOPIC),
                                                key={"china.#"}
                                        ))
                                        public void listenTopicQueue1(String message){
                                            System.err.println("消费者接收到TopicQueue1消息:【" + message + "】");
                                        }
                                        @RabbitListener(bindings =@QueueBinding(
                                                value=@Queue(name="topic.queue2"),
                                                exchange=@Exchange(name="itcast.topic",type= ExchangeTypes.TOPIC),
                                                key={"#.news"}
                                        ))
                                        public void listenTopicQueue2(String message){
                                            System.err.println("消费者接收到TopicQueue2消息:【" + message + "】");
                                        }
                                    
                                    2.4.3.2 重新启动
                                    2.4.3.3 发送/接收消息

                                    在生产者中SpringAmqpTest添加新的测试方法

                                        @Test
                                        public void testSendTopicExchange() throws InterruptedException {
                                            String exchangeName = "itcast.topic";
                                            String message = "hello, spring amqp!";
                                            String routingKey="";
                                            for(int count=0;count<=50;count++){
                                                if(count%2==0){
                                                    routingKey="china."+count;
                                                }
                                                else{
                                                    routingKey=count+".news";
                                                }
                                                if(count%10==0){
                                                    routingKey="china.news";
                                                }
                                                rabbitTemplate.convertAndSend(exchangeName, routingKey, message+count);
                                                Thread.sleep(20);
                                            }
                                        }
                                    

                                    2.5 消息转换器

                                    在FanoutConfig中添加新的Bean对象

                                        @Bean
                                        public Queue objectQueue1(){
                                            return new Queue("object.queue");
                                        }
                                    

                                    成功加入mq,为其添加测试方法

                                        @Test
                                        public void testSendObjectQueue() throws InterruptedException {
                                            String queueName = "object.queue";
                                            HashMap msg = new HashMap<>();
                                            msg.put("name", "张三");
                                            msg.put("age", 18);
                                            rabbitTemplate.convertAndSend(queueName, msg);
                                        }
                                    

                                    但是并不是能接受所有的object,只是序列化后的数据

                                    2.5.1 发送者-修改Spring的消息对象处理

                                    2.5.1.1在父工程中引入依赖
                                            
                                                com.fasterxml.jackson.core
                                                jackson-databind
                                            
                                    
                                    2.5.1.2 声明MessageConverter

                                    在publisher的启动类中声明bean对象

                                    @SpringBootApplication
                                    public class PublisherApplication {
                                        public static void main(String[] args) {
                                            SpringApplication.run(PublisherApplication.class);
                                        }
                                        @Bean
                                        public MessageConverter messageConverter(){
                                            return new Jackson2JsonMessageConverter();
                                        }
                                    }
                                    
                                    2.5.1.3 重新启动

                                    2.5.2 接收者-修改消息对象处理器

                                    2.5.2.1 引入依赖

                                    父工程引入过了,跳过

                                    2.5.2.2 声明MessageConverter

                                    在consumer的启动类中声明bean对象

                                    @SpringBootApplication
                                    public class ConsumerApplication {
                                        public static void main(String[] args) {
                                            SpringApplication.run(ConsumerApplication.class, args);
                                        }
                                        @Bean
                                        public MessageConverter messageConverter(){
                                            return new Jackson2JsonMessageConverter();
                                        }
                                    }
                                    
                                    2.5.2.3 添加接收者

                                    直接绑定队列

                                    @RabbitListener(queues = "object.queue")
                                    public void listenObjectQueue1(Map msg){
                                        System.out.println("消费者接收到ObjectQueue1消息:【" + msg + "】");
                                    }
                                    
                                    2.5.2.4 重新发送

                                    成功接收到JSON格式对象

转载请注明来自码农世界,本文标题:《【微服务全家桶】-实用篇-3-MQ》

百度分享代码,如果开启HTTPS请参考李洋个人博客
每一天,每一秒,你所做的决定都会改变你的人生!

发表评论

快捷回复:

评论列表 (暂无评论,54人围观)参与讨论

还没有评论,来说两句吧...

Top