Kafka 的网络通信设计,竟然只用 20 行就实现了粘包拆包逻辑( 二 )

retryBackoffMs :表示重试的时间间隔,默认是 100 ms
lingerMs:这个值默认是 0,即来一条发送一条 。所以在生产上,一定要配置这个值,充分利用 batch 来缓存批次,避免过多和服务器的通信 。
如果是第一次发送,backingOff 为 false,那么 timeToWaitMs 为 lingerMs 。
(7)还需要等待多久
long timeLeftMs = Math.max(timeToWaitMs - waitedTimeMs, 0);timeToWaitMs:一共需要等待的时间
waitedTimeMs:已经等待的时间
timeLeftMs:还需要等待的时间
(8)是否有批次满了
boolean full = deque.size() > 1 || batch.records.isFull();如果队列里的批次数量大于 1,则表示已经有批次已经满了 。
如果批次数量为 1,但是这个批次的消息已经满了
(9)是否超时,即已经等待的时长,是否大于一共需要等待的时长
boolean expired = waitedTimeMs >= timeToWaitMs;(10)最后是发送条件,下面的五个条件是或的关系,任意一个满足,都可以发送
boolean sendable = full || expired || exhausted || closed || flushInProgress();

  • 如果批次已经满了
  • 等待的时间到了
  • 内存满了
  • 客户端关闭,但仍然有消息没发送
(11)如果达到了发送消息的条件,并且重试的时间到了(或者是第一次发送)
则把当前消息所在的分区的 Leader Partition 对应的主机,加到 readyNodes 数据结构中来
if (sendable && !backingOff) {readyNodes.add(leader);}至此,已经找到了需要发送消息的主机,那么接下来就是建立到这台主机的连接 。
四、Kafka Producer 对于 Java NIO 的封装到建立网络连接的时候,看到这段代码:
Kafka 的网络通信设计,竟然只用 20 行就实现了粘包拆包逻辑

文章插图
 
可以看到具体的实现是在 NetwordClient 里面
Kafka 的网络通信设计,竟然只用 20 行就实现了粘包拆包逻辑

文章插图
 

Kafka 的网络通信设计,竟然只用 20 行就实现了粘包拆包逻辑

文章插图
 
第一个条件就是发送消息不能是在更新元数据的时候;
第二个条件点进去:
Kafka 的网络通信设计,竟然只用 20 行就实现了粘包拆包逻辑

文章插图
 
发现这边有个核心的对象,selector,它是 NetworkClient 里的一个属性 。(NetworkClient 是 Kafka 网络连接的一个很重要的对象!):
Kafka 的网络通信设计,竟然只用 20 行就实现了粘包拆包逻辑

文章插图
 
我们再点进去,找它的实现类,Selector:
Kafka 的网络通信设计,竟然只用 20 行就实现了粘包拆包逻辑

文章插图
 
可以看到有两个核心属性,第一个 nIOSelector 就是对于 Java 的 Nio 的封装 。
第二个是一个 Map,Map 的 key 是 broker 的编号,value 是 KafkaChannel,KafkaChannel 可以理解为是 SocketChannel 。
好,然后再继续看一下 KafkaChannel:
Kafka 的网络通信设计,竟然只用 20 行就实现了粘包拆包逻辑

文章插图
 

Kafka 的网络通信设计,竟然只用 20 行就实现了粘包拆包逻辑

文章插图
 
最终,如下图所示:
Kafka 的网络通信设计,竟然只用 20 行就实现了粘包拆包逻辑

文章插图
 
五、检查并建立网络连接我们从第四步的代码开始看:
Kafka 的网络通信设计,竟然只用 20 行就实现了粘包拆包逻辑

文章插图
 
第一个条件,表示是否建立好了连接,如果建立好了,会在 nodeState 的结构中缓存起来的 。
Kafka 的网络通信设计,竟然只用 20 行就实现了粘包拆包逻辑

文章插图
 
第二个条件:通道是否准备好了:
Kafka 的网络通信设计,竟然只用 20 行就实现了粘包拆包逻辑

文章插图
 
第三个条件:
Kafka 的网络通信设计,竟然只用 20 行就实现了粘包拆包逻辑

文章插图
 
max.in.flight.requests.per.connection这个参数,是在初始化 NetworkClient 对象的时候,传递进来的,默认值是 5.
表示最多默认有多少次请求没有得到服务端的响应 。
这里第三个条件,就是说,是否小于 5 个请求发送出去了,没有得到响应 。
但现在我们是第一次判断与主机的网络是否连接好,网络肯定是没有建立好的,所以这个方法会返回 false 。
Kafka 的网络通信设计,竟然只用 20 行就实现了粘包拆包逻辑

文章插图
 


推荐阅读