Kafka-生产者(producer)发送信息流程详解

Kafka-生产者(producer)发送信息流程详解

码农世界 2024-05-22 前端 64 次浏览 0个评论

Kafka概述

在 Kafka 消息发送的过程中,涉及到了两个重要的线程:主线程(main thread)和发送线程(Sender thread)。

1.主线程(main thread):

  • 应用程序在主线程中创建 Kafka 生产者实例。
  • 这个生产者实例负责与 Kafka 集群通信,发送消息到指定的主题。
  • 主线程还会创建一个称为 RecordAccumulator 的缓冲区。
  • 这个缓冲区是 Kafka 生产者内部用来暂存待发送消息的地方。
  • 主线程将生产的消息写入 RecordAccumulator 中。

    2.RecordAccumulator 缓冲区:

    • RecordAccumulator 是 Kafka 生产者的一个重要组件,用于收集和管理待发送的消息记录(ProducerRecord)。
    • 主线程通过调用 Kafka 生产者的 send() 方法将消息记录发送给 RecordAccumulator。
    • RecordAccumulator 管理多个分区的消息队列,并根据配置的分区器(Partitioner)将消息分配到相应的分区队列中。

      3.发送线程(Sender thread)的作用:

      • 发送线程是 Kafka 生产者内部的一个后台线程,它负责从 RecordAccumulator缓冲区中拉取待发送的消息,并将这些消息批量发送到 Kafka Broker。
      • 发送线程会周期性地检查 RecordAccumulator 中是否有待发送的消息,如果有则获取这些消息并准备发送。
      • 发送线程的主要任务是通过网络与 Kafka Broker 进行通信,将消息推送到目标主题的分区中。

        4.消息发送的具体流程:

        (1)消息发送请求产生:

        • 应用程序创建 Kafka 生产者实例,并对发送的消息进行封装成 ProducerRecord 对象。
        • ProducerRecord 中包含了消息的主题、键、值等信息。

          (2)消息分区:

          • 如果消息没有指定分区,分区器(Partitioner)将为消息选择一个目标分区。
          • Partitioner 可以根据消息的键、消息内容等信息选择分区,以确保消息被均匀地分配到不同的分区中。

            (3)消息缓冲:

            • Kafka 生产者将消息发送到 RecordAccumulator(记录累加器)中缓冲,等待批量处理和发送。
            • RecordAccumulator 是用来批量处理和管理待发送消息的缓冲区,可以在内存中暂存一段时间的消息。

              (4)批量处理:

              • 根据配置的批处理大小和等待时间**,RecordAccumulator 中的消息可以被批量处理。**
              • 批量处理有利于提高性能和吞吐量,减少单独发送消息的开销。

                (5)消息序列化与压缩:

                • 在发送之前,消息会被序列化为字节数组。
                • 可选地,消息还可以被压缩以减少网络传输的数据量。

                  (6)请求到达发送器:

                  • 发送器(Sender)线程周期性地或根据条件触发,从 RecordAccumulator 中拉取待发送的消息。

                    (7)消息发送到 Broker:

                    • Sender 线程将消息批量发送到 Kafka Broker。
                    • 发送器与 Broker 建立连接,将消息发送到指定分区的 Leader 副本。

                      (8)消息持久化:

                      • 消息被 Leader 副本持久化到磁盘。
                      • Leader 副本将消息复制到 ISR(In-Sync Replicas)集合中的其他副本。

                        (9)消息确认:

                        • Broker 在成功持久化消息后会向生产者发送确认信息。
                        • 生产者可以配置不同的确认级别(acks)来控制消息的可靠性,例如等待 Leader 确认或等待所有 ISR 集合中的副本都确认。

                          (10)消息发送完成:

                          • 一旦收到确认,生产者可以选择提交下一批消息或处理其他逻辑。
                          • 在接收到确认之前,生产者可以选择等待重试或处理发送失败的情况。

                            通过以上步骤,Kafka 生产者实现了高效、可靠的消息传递机制,确保消息被安全地发送到 Kafka Broker,并最终持久化到磁盘以供消费者消费。


                            5.异步发送和确认机制:

                            • Kafka 生产者支持异步发送消息的方式,即主线程在发送消息后不必等待发送的结果即可继续执行其他操作。
                            • 生产者可以配置消息确认机制(acks),以确保消息是否成功发送到 Kafka Broker。确认机制可以是无需确认、Leader 确认或者 Leader 和 ISR 集合中的所有副本都确认。

                              6.错误处理:

                              • 在发送消息的过程中,如果发生网络故障、Broker 不可用等异常情况,发送线程会尝试重试发送消息,以确保消息的可靠性。
                              • Kafka 生产者提供了一些配置选项来控制重试次数、重试间隔等参数,以应对不同的故障情况。


                                Tips:想了解更多相关知识,可以移步我的主页哦~

转载请注明来自码农世界,本文标题:《Kafka-生产者(producer)发送信息流程详解》

百度分享代码,如果开启HTTPS请参考李洋个人博客
每一天,每一秒,你所做的决定都会改变你的人生!

发表评论

快捷回复:

评论列表 (暂无评论,64人围观)参与讨论

还没有评论,来说两句吧...

Top