go-zero 如何应对海量定时/延迟任务?

一个系统中存在着大量的调度任务 , 同时调度任务存在时间的滞后性 , 而大量的调度任务如果每一个都使用自己的调度器来管理任务的生命周期的话 , 浪费cpu的资源而且很低效 。
本文来介绍 go-zero 中 延迟操作 , 它可能让开发者调度多个任务时 , 只需关注具体的业务执行函数和执行时间「立即或者延迟」 。 而 延迟操作 , 通常可以采用两个方案:

  1. Timer:定时器维护一个优先队列 , 到时间点执行 , 然后把需要执行的 task 存储在 map 中
  2. collection 中的 timingWheel, 维护一个存放任务组的数组 , 每一个槽都维护一个存储task的双向链表 。 开始执行时 , 计时器每隔指定时间执行一个槽里面的tasks 。
方案2把维护task从 优先队列 O(nlog(n)) 降到 双向链表 O(1) , 而执行task也只要轮询一个时间点的tasks O(N) , 不需要像优先队列 , 放入和删除元素 O(nlog(n))
我们先看看 go-zero 中自己对 timingWheel 的使用 :
cache 中的 timingWheel首先我们先来在 collectioncache 中关于 timingWheel 的使用:
timingWheel, err := NewTimingWheel(time.Second, slots, func(k, v interface{}) {key, ok := k.(string)if !ok {return}cache.Del(key)})if err != nil {return nil, err}cache.timingWheel = timingWheel这是 cache 初始化中也同时初始化 timingWheel 做key的过期处理 , 参数依次代表:
  • interval:时间划分刻度
  • numSlots:时间槽
  • execute:时间点执行函数
cache 中执行函数则是 删除过期key , 而这个过期则由 timingWheel 来控制推进时间 。
接下来 , 就通过 cachetimingWheel 的使用来认识 。
初始化// 真正做初始化func newTimingWheelWithClock(interval time.Duration, numSlots int, execute Execute, ticker timex.Ticker) (*TimingWheel, error) {tw :=e != nil; {task := e.Value.(*timingEntry)// 标记删除 , 在 scan 中做真正的删除 「删除map的data」if task.removed {next := e.Next()l.Remove(e)tw.timers.Del(task.key)e = nextcontinue} else if task.circle > 0 {// 当前执行点已经过期 , 但是同时不在第一层 , 所以当前层即然已经完成了 , 就会降到下一层// 但是并没有修改 postask.circle--e = e.Next()continue} else if task.diff > 0 {// 因为之前已经标注了diff , 需要再进入队列next := e.Next()l.Remove(e)pos := (tw.tickedPos + task.diff) % tw.numSlotstw.slots[pos].PushBack(task)tw.setTimerPosition(pos, task)task.diff = 0e = nextcontinue}// 以上的情况都是不能执行的情况 , 能够执行的会被加入tasks中tasks = append(tasks, timingTask{key:task.key,value: task.value,})next := e.Next()l.Remove(e)tw.timers.Del(task.key)e = next}// for range tasks , 然后把每个 task->execute 执行即可tw.runTasks(tasks)}具体的分支情况在注释中说明了 , 在看的时候可以和前面的 moveTask() 结合起来 , 其中 circle 下降 , diff 的计算是关联两个函数的重点 。
至于 diff 计算就涉及到 pos, circle 的计算:
// interval: 4min, d: 60min, numSlots: 16, tickedPos = 15// step = 15, pos = 14, circle = 0func (tw *TimingWheel) getPositionAndCircle(d time.Duration) (pos int, circle int) {steps := int(d / tw.interval)pos = (tw.tickedPos + steps) % tw.numSlotscircle = (steps - 1) / tw.numSlotsreturn}


推荐阅读