TimingWheel 时间轮算法是如何实现的?

前言
我在 2. SOFAJRaft源码分析—JRaft的定时任务调度器是怎么做的? 这篇文章里已经讲解过时间轮算法在JRaft中是怎么应用的,但是我感觉我并没有讲解清楚这个东西,导致看了这篇文章依然和没看是一样的,所以我打算重新说透时间轮算法 。
时间轮的应用并非 JRaft 独有,其应用场景还有很多,在 Netty、Akka、Quartz、ZooKeeper 、Kafka等组件中都存在时间轮的踪影 。
我们下面讲解的时间轮的实现以JRaft中的为例子进行讲解,因为JRaft这部分的代码是参考Netty的,所以大家也可以去Netty中去寻找源码实现 。
时间轮用来解决什么问题?如果一个系统中存在着大量的调度任务,而大量的调度任务如果每一个都使用自己的调度器来管理任务的生命周期的话,浪费cpu的资源并且很低效 。
时间轮是一种高效来利用线程资源来进行批量化调度的一种调度模型 。把大批量的调度任务全部都绑定到同一个的调度器上面,使用这一个调度器来进行所有任务的管理(manager),触发(trigger)以及运行(runnable) 。能够高效的管理各种延时任务,周期任务,通知任务等等 。
不过,时间轮调度器的时间精度可能不是很高,对于精度要求特别高的调度任务可能不太适合 。因为时间轮算法的精度取决于,时间段“指针”单元的最小粒度大小,比如时间轮的格子是一秒跳一次,那么调度精度小于一秒的任务就无法被时间轮所调度 。
时间轮结构

TimingWheel 时间轮算法是如何实现的?

文章插图
 
如图,JRaft中时间轮(HashedWheelTimer)是一个存储定时任务的环形队列,底层采用数组实现,数组中的每个元素可以存放一个定时任务列表(HashedWheelBucket),HashedWheelBucket是一个环形的双向链表,链表中的每一项表示的都是定时任务项(HashedWheelTimeout),其中封装了真正的定时任务(TimerTask) 。
时间轮由多个时间格组成,每个时间格代表当前时间轮的基本时间跨度(tickDuration) 。时间轮的时间格个数是固定的,可用 wheel.length 来表示 。
时间轮还有一个表盘指针(tick),用来表示时间轮当前指针跳动的次数,可以用tickDuration * (tick + 1)来表示下一次到期的任务,需要处理此时间格所对应的 HashedWheelBucket 中的所有任务 。
时间轮运行逻辑时间轮在启动的时候会记录一下当前启动的时间赋值给startTime 。时间轮在添加任务的时候首先会计算延迟时间(deadline),比如一个任务的延迟时间为24ms,那么会将当前的时间(currentTime)+24ms-时间轮启动时的时间(startTime) 。然后将任务封装成HashedWheelTimeout加入到timeouts队列中,作为缓存 。
时间轮在运行的时候会将timeouts中缓存的HashedWheelTimeout任务取10万个出来进行遍历 。
然后需要计算出几个参数值:
  1. HashedWheelTimeout的总共延迟的次数:将每个任务的延迟时间(deadline)/tickDuration 计算出tick需要总共跳动的次数;
  2. 计算时间轮round次数:根据计算的需要走的(总次数- 当前tick数量)/ 时间格个数(wheel.length) 。比如tickDuration为1ms,时间格个数为20个,那么时间轮走一圈需要20ms,那么添加进一个延时为24ms的数据,如果当前的tick为0,那么计算出的轮数为1,指针没运行一圈就会将round取出来减一,所以需要转动到第二轮之后才可以将轮数round减为0之后才会运行
  3. 计算出该任务需要放置到时间轮(wheel)的槽位,然后加入到槽位链表最后
将timeouts中的数据放置到时间轮wheel中之后,计算出当前时针走到的槽位的位置,并取出槽位中的链表数据,将deadline和当前的时间做对比,运行过期的数据 。
源码分析构造器public HashedWheelTimer(ThreadFactory threadFactory, long tickDuration, TimeUnit unit, int ticksPerWheel,long maxPendingTimeouts) {if (threadFactory == null) {throw new NullPointerException("threadFactory");}//unit = MILLISECONDSif (unit == null) {throw new NullPointerException("unit");}if (tickDuration <= 0) {throw new IllegalArgumentException("tickDuration must be greater than 0: " + tickDuration);}if (ticksPerWheel <= 0) {throw new IllegalArgumentException("ticksPerWheel must be greater than 0: " + ticksPerWheel);}// Normalize ticksPerWheel to power of two and initialize the wheel.// 创建一个HashedWheelBucket数组// 创建时间轮基本的数据结构,一个数组 。长度为不小于ticksPerWheel的最小2的n次方wheel = createWheel(ticksPerWheel);// 这是一个标示符,用来快速计算任务应该呆的格子 。// 我们知道,给定一个deadline的定时任务,其应该呆的格子=deadline%wheel.length.但是%操作是个相对耗时的操作,所以使用一种变通的位运算代替:// 因为一圈的长度为2的n次方,mask = 2^n-1后低位将全部是1,然后deadline&mast == deadline%wheel.length// JAVA中的HashMap在进行hash之后,进行index的hash寻址寻址的算法也是和这个一样的mask = wheel.length - 1;// Convert tickDuration to nanos.//tickDuration传入是1的话,这里会转换成1000000this.tickDuration = unit.toNanos(tickDuration);// Prevent overflow.// 校验是否存在溢出 。即指针转动的时间间隔不能太长而导致tickDuration*wheel.length>Long.MAX_VALUEif (this.tickDuration >= Long.MAX_VALUE / wheel.length) {throw new IllegalArgumentException(String.format("tickDuration: %d (expected: 0 < tickDuration in nanos < %d", tickDuration, Long.MAX_VALUE/ wheel.length));}//将worker包装成threadworkerThread = threadFactory.newThread(worker);//maxPendingTimeouts = -1this.maxPendingTimeouts = maxPendingTimeouts;//如果HashedWheelTimer实例太多,那么就会打印一个error日志if (instanceCounter.incrementAndGet() > INSTANCE_COUNT_LIMIT&& warnedTooManyInstances.compareAndSet(false, true)) {reportTooManyInstances();}}


推荐阅读