一、开篇经过上次文章的铺垫,相信大家对 JAVA 的 NIO 有了一些感性的认识,也初步了解了它的 API 了,可以开始去阅读 Kafka Producer 端的发送消息的部分了 。
突然想感叹一下,阅读 Kafka 这个全世界著名的开源项目,多多少少会让人赏心悦目
二、发送消息的八个主流程先大致扫一眼,发送消息的八个主流程,然后再逐个击破 。
发送消息的主流程主要是在 Sender 方法里的,Sender 是一个后台线程,在构造 Producer 的时候,就已经被启动在后台运行了 。所以我们主要看它的 run 方法 。
run 方法是一个 while 循环,我们看里面的 run 方法 。(当前位置:Sender 类)
文章插图
步骤一:获取集群的元数据 。(当前位置:Sender 类)
文章插图
在上一篇文章可以知道,我们已经在 KafkaProducer 类的 doSend 方法中,完成了元数据的拉取,所以这里是可以获取到元数据的了 。
步骤二:判断哪些 partition 有消息可以发送 。(当前位置:Sender 类)
文章插图
步骤三:标识还没有拉取到元数据的 topic,这些 topic 需要再次拉取一次元数据 。(当前位置:Sender 类)
文章插图
这个是一些容错
步骤四:检查与要发送消息的主机的网络连接是否建立好了(当前类:Sender 类)
文章插图
步骤五:把发往同一台机器的不同批次的消息合并成一个请求
文章插图
步骤六:处理超时的批次
文章插图
步骤七:创建请求
文章插图
步骤八:真正的发送消息出去的网络请求,包括:发送请求,接收和处理响应,拉取元数据等
文章插图
三、消息可以发送出去的条件(1)首先我们来到这个 ready 方法里面(当前位置:RecordAccumulator)
文章插图
(2)来看这一行:
boolean exhausted = this.free.queued() > 0;
free 是指 BufferPool,queued 方法:文章插图
waiters 里面是 Condition,表示是否有等待释放内存的线程,如果有,那么就是内存不足的意思 。
也就是说,内存不足,exhausted 为 true,否则 为 false 。
(3)遍历所有的分区和批次
文章插图
拿出一个批次出来,下面开始判断是否可发送的条件:
(4)第一次发送为 false;下次重试时间到了,false;重试时间没到,true 。
boolean backingOff = batch.attempts > 0 && batch.lastAttemptMs + retryBackoffMs > nowMs;
batch.attempts :表示是否尝试过了batch.lastAttemptMs :表示分区的上次尝试时间,初始值为当前时间
retryBackOffMs :表示重试的时间间隔,默认为 100 ms
nowMs:表示当前时间
那么这句是什么意思?
- 如果消息是第一次发送,那么这个 backingOff 就是 false;
- 如果消息第一次发送失败,进入重试,并且还没到下次重试的时间,这个 backingOff 就是 true,如果到了重试的时间,那么 backingOff 就是 false 。
那么 batch.lastAttemptMs + retryBackoffMs > nowMs 为 true,即还没到下次重试时间 。
(5)计算出已经等待的时间
long waitedTimeMs = nowMs - batch.lastAttemptMs;
nowMs:表示当前时间batch.lastAttemptMs:上次重试时间
waitedTimeMs:已经等待的时间
(6)等待的时间
long timeToWaitMs = backingOff ? retryBackoffMs : lingerMs;
推荐阅读
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
- 你的声音价值百万,教你六个利用声音赚钱的方法
- 头条新手的坑展现量,千次阅读量,原创不懂处处是坑我心酸历程1
- 头条视频如何转发给微信好友?正确的收藏和私信,老年人一学就会
- 喷墨打印机常见故障及解决方法,家里有打印机的你快学起来
- fic秋季展时间2021 fic秋季展时间
- 如何判定新买的笔记本是否是全新机器
- 17岁男孩离家出走怎么处理 17岁的女孩离家出走了怎么办
- 雪莲花干株多少克,雪莲花的价格多少
- 百合花盆栽的养殖方法,介绍下百合花的药用价值
- 氟轻松软膏的作用,复方醋酸氟轻松酊