kafka-消费者-指定offset消费(SpringBoot整合Kafka)

kafka-消费者-指定offset消费(SpringBoot整合Kafka)

码农世界 2024-06-19 后端 75 次浏览 0个评论

文章目录

  • 1、指定offset消费
    • 1.1、创建消费者监听器‘
    • 1.2、application.yml配置
    • 1.3、使用 Java代码 创建 主题 my_topic1 并建立3个分区并给每个分区建立3个副本
    • 1.4、创建生产者发送消息
      • 1.4.1、分区0中的数据
      • 1.5、创建SpringBoot启动类
      • 1.6、屏蔽 kafka debug 日志 logback.xml
      • 1.7、引入spring-kafka依赖
      • 1.8、消费者控制台:

        1、指定offset消费

        1.1、创建消费者监听器‘

        package com.atguigu.spring.kafka.consumer.listener;
        import org.apache.kafka.clients.consumer.ConsumerRecord;
        import org.springframework.kafka.annotation.KafkaListener;
        import org.springframework.kafka.annotation.PartitionOffset;
        import org.springframework.kafka.annotation.TopicPartition;
        import org.springframework.stereotype.Component;
        @Component
        public class MyKafkaPartitionListener {
            //初始化偏移量指定后,每次重启都会从该位置消费一轮,所以一般是调式解决问题时才使用
            @KafkaListener(
                    topicPartitions = {
                            @TopicPartition(topic = "my_topic1"
                            ,partitionOffsets = {
                                    @PartitionOffset(partition = "0",initialOffset = "2")
                            })}
                    , groupId = "my_group1")
            public void onMessage1(ConsumerRecord record) {
                System.out.println("my_group1消费者1获取到消息:topic = "+ record.topic()
                        +",partition:"+record.partition()
                        +",offset = "+record.offset()
                        +",key = "+record.key()
                        +",value = "+record.value());
            }
        }
        

        1.2、application.yml配置

        server:
          port: 8120
        # v1
        spring:
          Kafka:
            bootstrap-servers: 192.168.74.148:9095,192.168.74.148:9096,192.168.74.148:9097
            consumer:
              # read-committed读事务已提交的消息 解决脏读问题
              isolation-level: read-committed # 消费者的事务隔离级别:read-uncommitted会导致脏读,可以读取生产者事务还未提交的消息
              # 消费者是否自动ack :true自动ack 消费者获取到消息后kafka提交消费者偏移量
              enable-auto-commit: true 
              # 消费者提交ack时多长时间批量提交一次
              auto-commit-interval: 1000
              # 消费者第一次消费主题消息时从哪个位置开始
              auto-offset-reset: earliest  #指定Offset消费:earliest | latest | none
              key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
              value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
        

        1.3、使用 Java代码 创建 主题 my_topic1 并建立3个分区并给每个分区建立3个副本

        package com.atguigu.spring.kafka.consumer.config;
        import org.apache.kafka.clients.admin.NewTopic;
        import org.springframework.context.annotation.Bean;
        import org.springframework.context.annotation.Configuration;
        import org.springframework.kafka.config.TopicBuilder;
        @Configuration
        public class MyKafkaConfig {
            @Bean
            public NewTopic springTestPartitionTopic() {
                return TopicBuilder.name("my_topic1") //主题名称
                        .partitions(3) //分区数量
                        .replicas(3) //副本数量
                        .build();
            }
        }
        

        kafka-消费者-指定offset消费(SpringBoot整合Kafka)

        kafka-消费者-指定offset消费(SpringBoot整合Kafka)

        1.4、创建生产者发送消息

        package com.atguigu.spring.kafka.consumer;
        import jakarta.annotation.Resource;
        import org.junit.jupiter.api.Test;
        import org.springframework.boot.test.context.SpringBootTest;
        import org.springframework.kafka.core.KafkaTemplate;
        @SpringBootTest
        class SpringKafkaConsumerApplicationTests {
            @Resource
            KafkaTemplate kafkaTemplate;
            @Test
            void contextLoads() {
                for (int i = 0; i < 10; i++) {
                    kafkaTemplate.send("my_topic1",i%3,"", "指定分区消费"+i);
                }
            }
        }
        

        kafka-消费者-指定offset消费(SpringBoot整合Kafka)

        kafka-消费者-指定offset消费(SpringBoot整合Kafka)

        1.4.1、分区0中的数据

        [
          [
            {
              "partition": 0,
              "offset": 0,
              "msg": "指定offset消费0",
              "timespan": 1717660785962,
              "date": "2024-06-06 07:59:45"
            },
            {
              "partition": 0,
              "offset": 1,
              "msg": "指定offset消费3",
              "timespan": 1717660785974,
              "date": "2024-06-06 07:59:45"
            },
            {
              "partition": 0,
              "offset": 2,
              "msg": "指定offset消费6",
              "timespan": 1717660785975,
              "date": "2024-06-06 07:59:45"
            },
            {
              "partition": 0,
              "offset": 3,
              "msg": "指定offset消费9",
              "timespan": 1717660785975,
              "date": "2024-06-06 07:59:45"
            }
          ]
        ]
        

        1.5、创建SpringBoot启动类

        package com.atguigu.spring.kafka.consumer;
        import org.springframework.boot.SpringApplication;
        import org.springframework.boot.autoconfigure.SpringBootApplication;
        // Generated by https://start.springboot.io
        // 优质的 spring/boot/data/security/cloud 框架中文文档尽在 => https://springdoc.cn
        @SpringBootApplication
        public class SpringKafkaConsumerApplication {
            public static void main(String[] args) {
                SpringApplication.run(SpringKafkaConsumerApplication.class, args);
            }
        }
        

        1.6、屏蔽 kafka debug 日志 logback.xml

              
            
            
        
        

        1.7、引入spring-kafka依赖

        
        
            4.0.0
            
                org.springframework.boot
                spring-boot-starter-parent
                3.0.5
                 
            
            
            
            com.atguigu
            spring-kafka-consumer
            0.0.1-SNAPSHOT
            spring-kafka-consumer
            spring-kafka-consumer
            
                17
            
            
                
                    org.springframework.boot
                    spring-boot-starter
                
                
                    org.springframework.boot
                    spring-boot-starter-test
                    test
                
                
                    org.springframework.boot
                    spring-boot-starter-web
                
                
                    org.springframework.kafka
                    spring-kafka
                
            
            
                
                    
                        org.springframework.boot
                        spring-boot-maven-plugin
                    
                
            
        
        

        1.8、消费者控制台:

          .   ____          _            __ _ _
         /\\ / ___'_ __ _ _(_)_ __  __ _ \ \ \ \
        ( ( )\___ | '_ | '_| | '_ \/ _` | \ \ \ \
         \\/  ___)| |_)| | | | | || (_| |  ) ) ) )
          '  |____| .__|_| |_|_| |_\__, | / / / /
         =========|_|==============|___/=/_/_/_/
         :: Spring Boot ::                (v3.0.5)
        my_group1消费者1获取到消息:topic = my_topic1,partition:0,offset = 2,key = ,value = 指定offset消费6
        my_group1消费者1获取到消息:topic = my_topic1,partition:0,offset = 3,key = ,value = 指定offset消费9
        

        此时如果重新启动 SpringKafkaConsumerApplication 消费者还是会消费数据,重复消费

          .   ____          _            __ _ _
         /\\ / ___'_ __ _ _(_)_ __  __ _ \ \ \ \
        ( ( )\___ | '_ | '_| | '_ \/ _` | \ \ \ \
         \\/  ___)| |_)| | | | | || (_| |  ) ) ) )
          '  |____| .__|_| |_|_| |_\__, | / / / /
         =========|_|==============|___/=/_/_/_/
         :: Spring Boot ::                (v3.0.5)
        my_group1消费者1获取到消息:topic = my_topic1,partition:0,offset = 2,key = ,value = 指定offset消费6
        my_group1消费者1获取到消息:topic = my_topic1,partition:0,offset = 3,key = ,value = 指定offset消费9
        

转载请注明来自码农世界,本文标题:《kafka-消费者-指定offset消费(SpringBoot整合Kafka)》

百度分享代码,如果开启HTTPS请参考李洋个人博客
每一天,每一秒,你所做的决定都会改变你的人生!

发表评论

快捷回复:

评论列表 (暂无评论,75人围观)参与讨论

还没有评论,来说两句吧...

Top