文章目录
- 1、指定offset消费
- 1.1、创建消费者监听器‘
- 1.2、application.yml配置
- 1.3、使用 Java代码 创建 主题 my_topic1 并建立3个分区并给每个分区建立3个副本
- 1.4、创建生产者发送消息
- 1.4.1、分区0中的数据
- 1.5、创建SpringBoot启动类
- 1.6、屏蔽 kafka debug 日志 logback.xml
- 1.7、引入spring-kafka依赖
- 1.8、消费者控制台:
1、指定offset消费
1.1、创建消费者监听器‘
package com.atguigu.spring.kafka.consumer.listener; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.springframework.kafka.annotation.KafkaListener; import org.springframework.kafka.annotation.PartitionOffset; import org.springframework.kafka.annotation.TopicPartition; import org.springframework.stereotype.Component; @Component public class MyKafkaPartitionListener { //初始化偏移量指定后,每次重启都会从该位置消费一轮,所以一般是调式解决问题时才使用 @KafkaListener( topicPartitions = { @TopicPartition(topic = "my_topic1" ,partitionOffsets = { @PartitionOffset(partition = "0",initialOffset = "2") })} , groupId = "my_group1") public void onMessage1(ConsumerRecord
record) { System.out.println("my_group1消费者1获取到消息:topic = "+ record.topic() +",partition:"+record.partition() +",offset = "+record.offset() +",key = "+record.key() +",value = "+record.value()); } } 1.2、application.yml配置
server: port: 8120 # v1 spring: Kafka: bootstrap-servers: 192.168.74.148:9095,192.168.74.148:9096,192.168.74.148:9097 consumer: # read-committed读事务已提交的消息 解决脏读问题 isolation-level: read-committed # 消费者的事务隔离级别:read-uncommitted会导致脏读,可以读取生产者事务还未提交的消息 # 消费者是否自动ack :true自动ack 消费者获取到消息后kafka提交消费者偏移量 enable-auto-commit: true # 消费者提交ack时多长时间批量提交一次 auto-commit-interval: 1000 # 消费者第一次消费主题消息时从哪个位置开始 auto-offset-reset: earliest #指定Offset消费:earliest | latest | none key-deserializer: org.apache.kafka.common.serialization.StringDeserializer value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
1.3、使用 Java代码 创建 主题 my_topic1 并建立3个分区并给每个分区建立3个副本
package com.atguigu.spring.kafka.consumer.config; import org.apache.kafka.clients.admin.NewTopic; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.kafka.config.TopicBuilder; @Configuration public class MyKafkaConfig { @Bean public NewTopic springTestPartitionTopic() { return TopicBuilder.name("my_topic1") //主题名称 .partitions(3) //分区数量 .replicas(3) //副本数量 .build(); } }
1.4、创建生产者发送消息
package com.atguigu.spring.kafka.consumer; import jakarta.annotation.Resource; import org.junit.jupiter.api.Test; import org.springframework.boot.test.context.SpringBootTest; import org.springframework.kafka.core.KafkaTemplate; @SpringBootTest class SpringKafkaConsumerApplicationTests { @Resource KafkaTemplate kafkaTemplate; @Test void contextLoads() { for (int i = 0; i < 10; i++) { kafkaTemplate.send("my_topic1",i%3,"", "指定分区消费"+i); } } }
1.4.1、分区0中的数据
[ [ { "partition": 0, "offset": 0, "msg": "指定offset消费0", "timespan": 1717660785962, "date": "2024-06-06 07:59:45" }, { "partition": 0, "offset": 1, "msg": "指定offset消费3", "timespan": 1717660785974, "date": "2024-06-06 07:59:45" }, { "partition": 0, "offset": 2, "msg": "指定offset消费6", "timespan": 1717660785975, "date": "2024-06-06 07:59:45" }, { "partition": 0, "offset": 3, "msg": "指定offset消费9", "timespan": 1717660785975, "date": "2024-06-06 07:59:45" } ] ]
1.5、创建SpringBoot启动类
package com.atguigu.spring.kafka.consumer; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; // Generated by https://start.springboot.io // 优质的 spring/boot/data/security/cloud 框架中文文档尽在 => https://springdoc.cn @SpringBootApplication public class SpringKafkaConsumerApplication { public static void main(String[] args) { SpringApplication.run(SpringKafkaConsumerApplication.class, args); } }
1.6、屏蔽 kafka debug 日志 logback.xml
1.7、引入spring-kafka依赖
4.0.0 org.springframework.boot spring-boot-starter-parent 3.0.5 com.atguigu spring-kafka-consumer 0.0.1-SNAPSHOT spring-kafka-consumer spring-kafka-consumer 17 org.springframework.boot spring-boot-starter org.springframework.boot spring-boot-starter-test test org.springframework.boot spring-boot-starter-web org.springframework.kafka spring-kafka org.springframework.boot spring-boot-maven-plugin 1.8、消费者控制台:
. ____ _ __ _ _ /\\ / ___'_ __ _ _(_)_ __ __ _ \ \ \ \ ( ( )\___ | '_ | '_| | '_ \/ _` | \ \ \ \ \\/ ___)| |_)| | | | | || (_| | ) ) ) ) ' |____| .__|_| |_|_| |_\__, | / / / / =========|_|==============|___/=/_/_/_/ :: Spring Boot :: (v3.0.5) my_group1消费者1获取到消息:topic = my_topic1,partition:0,offset = 2,key = ,value = 指定offset消费6 my_group1消费者1获取到消息:topic = my_topic1,partition:0,offset = 3,key = ,value = 指定offset消费9
此时如果重新启动 SpringKafkaConsumerApplication 消费者还是会消费数据,重复消费
. ____ _ __ _ _ /\\ / ___'_ __ _ _(_)_ __ __ _ \ \ \ \ ( ( )\___ | '_ | '_| | '_ \/ _` | \ \ \ \ \\/ ___)| |_)| | | | | || (_| | ) ) ) ) ' |____| .__|_| |_|_| |_\__, | / / / / =========|_|==============|___/=/_/_/_/ :: Spring Boot :: (v3.0.5) my_group1消费者1获取到消息:topic = my_topic1,partition:0,offset = 2,key = ,value = 指定offset消费6 my_group1消费者1获取到消息:topic = my_topic1,partition:0,offset = 3,key = ,value = 指定offset消费9
还没有评论,来说两句吧...