retryBackoffMs :表示重试的时间间隔,默认是 100 ms
lingerMs:这个值默认是 0,即来一条发送一条 。所以在生产上,一定要配置这个值,充分利用 batch 来缓存批次,避免过多和服务器的通信 。
如果是第一次发送,backingOff 为 false,那么 timeToWaitMs 为 lingerMs 。
(7)还需要等待多久
long timeLeftMs = Math.max(timeToWaitMs - waitedTimeMs, 0);
timeToWaitMs:一共需要等待的时间
waitedTimeMs:已经等待的时间
timeLeftMs:还需要等待的时间
(8)是否有批次满了
boolean full = deque.size() > 1 || batch.records.isFull();
如果队列里的批次数量大于 1,则表示已经有批次已经满了 。
如果批次数量为 1,但是这个批次的消息已经满了
(9)是否超时,即已经等待的时长,是否大于一共需要等待的时长
boolean expired = waitedTimeMs >= timeToWaitMs;
(10)最后是发送条件,下面的五个条件是或的关系,任意一个满足,都可以发送
boolean sendable = full || expired || exhausted || closed || flushInProgress();
- 如果批次已经满了
- 等待的时间到了
- 内存满了
- 客户端关闭,但仍然有消息没发送
则把当前消息所在的分区的 Leader Partition 对应的主机,加到 readyNodes 数据结构中来
if (sendable && !backingOff) {readyNodes.add(leader);}
至此,已经找到了需要发送消息的主机,那么接下来就是建立到这台主机的连接 。四、Kafka Producer 对于 Java NIO 的封装到建立网络连接的时候,看到这段代码:
文章插图
可以看到具体的实现是在 NetwordClient 里面
文章插图
文章插图
第一个条件就是发送消息不能是在更新元数据的时候;
第二个条件点进去:
文章插图
发现这边有个核心的对象,selector,它是 NetworkClient 里的一个属性 。(NetworkClient 是 Kafka 网络连接的一个很重要的对象!):
文章插图
我们再点进去,找它的实现类,Selector:
文章插图
可以看到有两个核心属性,第一个 nIOSelector 就是对于 Java 的 Nio 的封装 。
第二个是一个 Map,Map 的 key 是 broker 的编号,value 是 KafkaChannel,KafkaChannel 可以理解为是 SocketChannel 。
好,然后再继续看一下 KafkaChannel:
文章插图
文章插图
最终,如下图所示:
文章插图
五、检查并建立网络连接我们从第四步的代码开始看:
文章插图
第一个条件,表示是否建立好了连接,如果建立好了,会在 nodeState 的结构中缓存起来的 。
文章插图
第二个条件:通道是否准备好了:
文章插图
第三个条件:
文章插图
max.in.flight.requests.per.connection
这个参数,是在初始化 NetworkClient 对象的时候,传递进来的,默认值是 5.表示最多默认有多少次请求没有得到服务端的响应 。
这里第三个条件,就是说,是否小于 5 个请求发送出去了,没有得到响应 。
但现在我们是第一次判断与主机的网络是否连接好,网络肯定是没有建立好的,所以这个方法会返回 false 。
文章插图
推荐阅读
- 你的声音价值百万,教你六个利用声音赚钱的方法
- 头条新手的坑展现量,千次阅读量,原创不懂处处是坑我心酸历程1
- 头条视频如何转发给微信好友?正确的收藏和私信,老年人一学就会
- 喷墨打印机常见故障及解决方法,家里有打印机的你快学起来
- fic秋季展时间2021 fic秋季展时间
- 如何判定新买的笔记本是否是全新机器
- 17岁男孩离家出走怎么处理 17岁的女孩离家出走了怎么办
- 雪莲花干株多少克,雪莲花的价格多少
- 百合花盆栽的养殖方法,介绍下百合花的药用价值
- 氟轻松软膏的作用,复方醋酸氟轻松酊