Kafka 的网络通信设计,竟然只用 20 行就实现了粘包拆包逻辑

一、开篇经过上次文章的铺垫,相信大家对 JAVA 的 NIO 有了一些感性的认识,也初步了解了它的 API 了,可以开始去阅读 Kafka Producer 端的发送消息的部分了 。
突然想感叹一下,阅读 Kafka 这个全世界著名的开源项目,多多少少会让人赏心悦目
二、发送消息的八个主流程先大致扫一眼,发送消息的八个主流程,然后再逐个击破 。
发送消息的主流程主要是在 Sender 方法里的,Sender 是一个后台线程,在构造 Producer 的时候,就已经被启动在后台运行了 。所以我们主要看它的 run 方法 。
run 方法是一个 while 循环,我们看里面的 run 方法 。(当前位置:Sender 类)

Kafka 的网络通信设计,竟然只用 20 行就实现了粘包拆包逻辑

文章插图
 
步骤一:获取集群的元数据 。(当前位置:Sender 类)
Kafka 的网络通信设计,竟然只用 20 行就实现了粘包拆包逻辑

文章插图
 
在上一篇文章可以知道,我们已经在 KafkaProducer 类的 doSend 方法中,完成了元数据的拉取,所以这里是可以获取到元数据的了 。
步骤二:判断哪些 partition 有消息可以发送 。(当前位置:Sender 类)
Kafka 的网络通信设计,竟然只用 20 行就实现了粘包拆包逻辑

文章插图
 
步骤三:标识还没有拉取到元数据的 topic,这些 topic 需要再次拉取一次元数据 。(当前位置:Sender 类)
Kafka 的网络通信设计,竟然只用 20 行就实现了粘包拆包逻辑

文章插图
 
这个是一些容错
步骤四:检查与要发送消息的主机的网络连接是否建立好了(当前类:Sender 类)
Kafka 的网络通信设计,竟然只用 20 行就实现了粘包拆包逻辑

文章插图
 
步骤五:把发往同一台机器的不同批次的消息合并成一个请求
Kafka 的网络通信设计,竟然只用 20 行就实现了粘包拆包逻辑

文章插图
 
步骤六:处理超时的批次
Kafka 的网络通信设计,竟然只用 20 行就实现了粘包拆包逻辑

文章插图
 
步骤七:创建请求
Kafka 的网络通信设计,竟然只用 20 行就实现了粘包拆包逻辑

文章插图
 
步骤八:真正的发送消息出去的网络请求,包括:发送请求,接收和处理响应,拉取元数据等
Kafka 的网络通信设计,竟然只用 20 行就实现了粘包拆包逻辑

文章插图
 
三、消息可以发送出去的条件(1)首先我们来到这个 ready 方法里面(当前位置:RecordAccumulator)
Kafka 的网络通信设计,竟然只用 20 行就实现了粘包拆包逻辑

文章插图
 
(2)来看这一行:
boolean exhausted = this.free.queued() > 0;free 是指 BufferPool,queued 方法:
Kafka 的网络通信设计,竟然只用 20 行就实现了粘包拆包逻辑

文章插图
 
waiters 里面是 Condition,表示是否有等待释放内存的线程,如果有,那么就是内存不足的意思 。
也就是说,内存不足,exhausted 为 true,否则 为 false 。
(3)遍历所有的分区和批次
Kafka 的网络通信设计,竟然只用 20 行就实现了粘包拆包逻辑

文章插图
 
拿出一个批次出来,下面开始判断是否可发送的条件:
(4)第一次发送为 false;下次重试时间到了,false;重试时间没到,true 。
boolean backingOff = batch.attempts > 0 && batch.lastAttemptMs + retryBackoffMs > nowMs;batch.attempts :表示是否尝试过了
batch.lastAttemptMs :表示分区的上次尝试时间,初始值为当前时间
retryBackOffMs :表示重试的时间间隔,默认为 100 ms
nowMs:表示当前时间
那么这句是什么意思?
  • 如果消息是第一次发送,那么这个 backingOff 就是 false;
  • 如果消息第一次发送失败,进入重试,并且还没到下次重试的时间,这个 backingOff 就是 true,如果到了重试的时间,那么 backingOff 就是 false 。
这句话可能不好理解,可以假设,上次重试时间点是 10:00:00.000,重试的时间间隔是 100ms,下次重试时间是 10:00:00.100,而当前时间是 10:00:00.020,即还没到下次重试的时间 。
那么 batch.lastAttemptMs + retryBackoffMs > nowMs 为 true,即还没到下次重试时间 。
(5)计算出已经等待的时间
long waitedTimeMs = nowMs - batch.lastAttemptMs;nowMs:表示当前时间
batch.lastAttemptMs:上次重试时间
waitedTimeMs:已经等待的时间
(6)等待的时间
long timeToWaitMs = backingOff ? retryBackoffMs : lingerMs;


推荐阅读