知道时间轮算法吗?在Netty和Kafka中如何应用的?

大家好,我是yes 。
最近看 Kafka 看到了时间轮算法,记得以前看 Netty 也看到过这玩意,没太过关注 。今天就来看看时间轮到底是什么东西 。
为什么要用时间轮算法来实现延迟操作?
延时操作 JAVA 不是提供了 Timer 么?
还有 DelayQueue 配合线程池或者 ScheduledThreadPool 不香吗?
我们先来简单看看 Timer、DelayQueue 和 ScheduledThreadPool 的相关实现,看看它们是如何实现延时任务的,源码之下无秘密 。再来剖析下为何 Netty 和 Kafka 特意实现了时间轮来处理延迟任务 。
如果在手机上阅读其实纯看字也行,不用看代码,我都会先用文字描述清楚 。不过电脑上看效果更佳 。
TimerTimer 可以实现延时任务,也可以实现周期性任务 。我们先来看看 Timer 核心属性和构造器 。

知道时间轮算法吗?在Netty和Kafka中如何应用的?

文章插图
 
核心就是一个优先队列和封装的执行任务的线程,从这我们也可以看到一个 Timer 只有一个线程执行任务 。
再来看看如何实现延时和周期性任务的 。我先简单的概括一下,首先维持一个小顶堆,即最快需要执行的任务排在优先队列的第一个,根据堆的特性我们知道插入和删除的时间复杂度都是 O(logn) 。
然后 TimerThread 不断地拿排着的第一个任务的执行时间和当前时间做对比 。如果时间到了先看看这个任务是不是周期性执行的任务,如果是则修改当前任务时间为下次执行的时间,如果不是周期性任务则将任务从优先队列中移除 。最后执行任务 。如果时间还未到则调用 wait() 等待 。
再看下图,整理下流程 。
知道时间轮算法吗?在Netty和Kafka中如何应用的?

文章插图
 
流程知道了再对着看下代码,这块就差不多了 。看代码不爽的可以跳过代码部分,影响不大 。
先来看下 TaskQueue,就简单看下插入任务的过程,就是个普通的堆插入操作 。
知道时间轮算法吗?在Netty和Kafka中如何应用的?

文章插图
 
再来看看 TimerThread 的 run 操作 。
知道时间轮算法吗?在Netty和Kafka中如何应用的?

文章插图
 
小结一下可以看出 Timer 实际就是根据任务的执行时间维护了一个优先队列,并且起了一个线程不断地拉取任务执行 。
有什么弊端呢?
首先优先队列的插入和删除的时间复杂度是O(logn),当数据量大的时候,频繁的入堆出堆性能有待考虑 。
并且是单线程执行,那么如果一个任务执行的时间过久则会影响下一个任务的执行时间(当然你任务的run要是异步执行也行) 。
并且从代码可以看到对异常没有做什么处理,那么一个任务出错的时候会导致之后的任务都无法执行 。
ScheduledThreadPoolExecutor在说 ScheduledThreadPoolExecutor 之前我们再看下 Timer 的注释,注释可都是干货千万不要错过 。我做了点修改,突出了下重点 。
Java 5.0 introduced ScheduledThreadPoolExecutor, It is effectively a more versatile replacement for the Timer, it allows multiple service threads. Configuring with one thread makes it equivalent to Timer 。
简单翻译下:1.5 引入了 ScheduledThreadPoolExecutor,它是一个具有更多功能的 Timer 的替代品,允许多个服务线程 。如果设置一个服务线程和 Timer 没啥差别 。
从注释看出相对于 Timer ,可能就是单线程跑任务和多线程跑任务的区别 。我们来看下 。
知道时间轮算法吗?在Netty和Kafka中如何应用的?

文章插图
 
继承了 ThreadPoolExecutor,实现了 ScheduledExecutorService 。可以定性操作就是正常线程池差不多了 。区别就在于两点,一个是 ScheduledFutureTask ,一个是 DelayedWorkQueue 。
其实 DelayedWorkQueue 就是优先队列,也是利用数组实现的小顶堆 。而 ScheduledFutureTask 继承自 FutureTask 重写了 run 方法,实现了周期性任务的需求 。
知道时间轮算法吗?在Netty和Kafka中如何应用的?

文章插图
 
小结一下ScheduledThreadPoolExecutor 大致的流程和 Timer 差不多,也是维护一个优先队列,然后通过重写 task 的 run 方法来实现周期性任务,主要差别在于能多线程运行任务,不会单线程阻塞 。
并且 Java 线程池的设定是 task 出错会把错误吃了,无声无息的 。因此一个任务出错也不会影响之后的任务 。
DelayQueueJava 中还有个延迟队列 DelayQueue,加入延迟队列的元素都必须实现 Delayed 接口 。延迟队列内部是利用 PriorityQueue 实现的,所以还是利用优先队列!Delayed 接口继承了Comparable 因此优先队列是通过 delay 来排序的 。


推荐阅读