构建分布式任务调度系统(一):时间轮

时间轮(Timing Wheel)是一种高效的数据结构,用于管理定时任务或事件。在分布式任务调度系统等场景中经常可以看到时间轮的实现,以实现对大量定时任务的高效调度。

一、为什么用时间轮

相比传统的队列形式的调度器来说,时间轮能够批量高效的管理各种延时任务、周期任务、通知任务等等。

启动定时器终止定时器周期清算n 的含义
无序列表定时器O(1)O(1)O(n)定时器数量
有序列表定时器O(n)O(n)O(1)定时器数量
定时器树O(log n)O(log n)O(1)定时器数量
简单的时间轮O(1)O(1)O(1)同一时间槽定时器数量
有序定时器列表的散列轮O(1),最坏为 O(n)O(1)O(1)同一时间槽定时器数量
无序定时器列表的哈希轮O(1)O(1)O(1),最坏为 O(n)同一时间槽定时器数量
分级时间轮O(m) = O(1)O(1)O(1) ,最坏为 O(n)m 为层级数,n为定时器数量

上面的表格对比了传统的简单调度器与时间轮,时间轮的效率更高。

二、时间轮的实现方式

时间轮有几种不同的实现方式,每种方式都有其特定的应用场景和性能特点。

时间轮类型特点适用场景
简单时间轮固定大小的数组,指针循环移动定时任务数量较少,时间间隔较短的场景
有序时间轮数组元素按到期时间排序需要快速定位到期任务,但添加或删除任务可能需要重新排序
无序时间轮数组元素不排序,到期时间作为任务的一部分存储添加和删除任务时不需要重新排序,但查找到期任务可能需要遍历
分层时间轮多个时间粒度不同的时间轮组成,每个时间轮负责不同时间范围的任务任务到期时间跨度较大的场景
散列时间轮使用哈希表存储定时任务,减少同一时间槽内任务的冲突可以快速定位到期任务,但哈希表的维护可能增加复杂性
最小堆时间轮使用最小堆存储定时任务,堆顶是最早到期的任务需要快速访问最早到期任务的场景
红黑树时间轮使用红黑树存储定时任务,有序性允许快速查找、添加和删除操作需要频繁动态调整任务的场景
延迟队列时间轮结合时间轮和延迟队列,使用时间轮管理延迟队列中的任务需要处理大量短生命周期定时任务的场景

三、时间轮的处理流程

3.1 简单时间轮(Simple Timing Wheel)

简单时间轮通常包含一个固定大小的数组,每个数组元素代表一个时间槽,用于存储到期时间落在该时间槽的任务。随着时间的推移,一个指针会循环移动到下一个时间槽,并执行该时间槽中的所有任务。

时间槽关联的任务列表可以是双向链表(见下图),也可以是双向循环链表。当双向循环链表的当前指针指向的时间槽到达时,可以从头开始执行链表中的所有任务,直到再次到达尾部,这样就完成了一个时间槽的任务执行,因而使用双向循环链表可以简化到期任务的执行过程。

简单时间轮的处理流程分析如下:

  1. 初始化时间轮
  • 时间轮由10个时间槽组成,每个时间槽代表1毫秒,因此整个时间轮走完一圈需要10毫秒。
  1. 设置当前时间指针
  • 初始化时,currentTime 指针指向时间槽0。
  1. 添加任务
  • 当一个定时为1毫秒的任务需要被添加到时间轮时,首先计算它应该被放置在哪个时间槽。
  • 由于当前时间是0毫秒,任务需要在1毫秒后执行,因此任务被添加到时间槽1的TimerTaskList中。
  1. 推进时间轮
  • 随着时间的推移,时间轮的currentTime指针会顺时针移动到下一个时间槽。
  • 每经过1毫秒,currentTime指针移动到下一个时间槽,并检查该时间槽的TimerTaskList是否有任务需要执行。
  1. 执行到期任务
  • currentTime指针移动到时间槽1时,它会执行时间槽1中的所有任务(在这个例子中是task1)。
  • 执行完任务后,这些任务可以从TimerTaskList中移除,或者如果它们是周期性任务,可以重新调度。
  1. 处理周期性任务
  • 如果任务是周期性的,它们需要在执行后重新添加到时间轮中。
  • 例如,如果task1是周期性任务,它将在执行后重新计算下一次执行的时间槽,并被添加到相应的TimerTaskList中。
  1. 任务移除和重新调度
  • 如果任务不再需要执行,可以从TimerTaskList中移除。
  • 对于需要重新调度的任务,根据其周期性重新计算时间槽,并添加到新的时间槽中。
  1. 时间轮循环
  • currentTime指针完成一圈后,它将回到时间槽0,继续循环。
  1. 处理长周期任务
  • 如果任务的周期超过了时间轮的一圈,它就无法被简单时间轮直接处理,需要使用分层时间轮或其他机制来处理。

3.2 分层时间轮(Hierarchical Timing Wheel)

分层时间轮通过多个时间轮层级来处理不同时间粒度的任务。每个层级的时间轮负责不同范围的时间调度。

以下是对这个分层时间轮处理流程的分析:

  1. 第一层(秒轮)
  • span 表示时间轮覆盖的时间范围,这里是60秒。
  • wheelSize 是时间轮的槽数,这里是60,对应秒针的60秒。
  • interval 是每个时间槽代表的时间,这里是1秒(span/wheelSize)。

当前时间指针(currentTime)指向当前时间槽。

  1. 第二层(分轮)
  • span 为60分钟,即3600秒。
  • wheelSize 为60,对应分钟的60分钟。
  • interval 为60秒(span/wheelSize),即每个时间槽代表1分钟。
  1. 第三层(时轮)
  • span 为24小时,即86400秒。
  • wheelSize 为24,对应小时的24小时。
  • interval 为3600秒(span/wheelSize),即每个时间槽代表1小时。
  1. 任务调度
  • 当一个任务需要被调度时,首先确定它的时间粒度(秒、分、小时)。
  • 任务被添加到最底层的时间轮中,即第一层的秒轮。
  • 当任务的时间到达时,如果它是一个周期性任务,并且周期超过了当前时间轮的span,它将被提升到上一层时间轮。
  1. 时间轮推进
  • 每过一个interval,当前时间指针会移动到下一个时间槽。
  • 当第一层时间轮的指针走完一圈(60秒),如果任务的周期还没有结束,它将被移动到第二层的分轮。
  • 同样,当第二层时间轮的指针走完一圈(60分钟),如果任务的周期还没有结束,它将被移动到第三层的时轮。
  1. 任务执行
  • 当任务在某个时间轮的时间槽中到期时,它将被执行。
  • 如果任务是周期性的,它将根据剩余周期重新计算并添加到相应的时间轮层级中。
  1. 任务移除
  • 如果任务不再需要执行,它将从时间轮中移除。

分层时间轮的设计允许系统有效地处理不同时间粒度的任务,并且可以处理长周期任务,而不需要在单个时间轮中分配大量的时间槽。通过将任务在不同层级之间移动,可以优化内存使用并保持时间轮的效率。

四、时间轮算法的实现

// 时间轮结构体
type TimingWheel struct {
    slots        []chan Task // 时间槽,每个槽是一个通道,用于存储到期时间相同的任务
    current      int          // 当前时间槽的索引,随着时间的流逝,这个索引会循环增加
    interval     time.Duration // 时间槽的时间间隔
    timer        *time.Ticker  // 定时器,用于触发时间轮的转动
    taskQueue    chan Task     // 任务队列,用于存储待添加到时间轮的任务
}

// 时间轮的初始化
func NewTimingWheel(slots int, interval time.Duration) *TimingWheel {
    // 创建一个具有指定数量时间槽和时间间隔的TimingWheel实例。
    tw := &TimingWheel{
        slots:       make([]chan Task, slots),
        current:     0,
        interval:    interval,
        taskQueue:   make(chan Task, 100),
    }
    // 初始化每个时间槽为一个缓冲通道,用于存储任务。
    for i := range tw.slots {
        tw.slots[i] = make(chan Task, 10)
    }
    // 创建一个定时器,每隔interval时间触发一次。
    tw.timer = time.NewTicker(interval)
    // 启动一个后台goroutine来运行时间轮。
    go tw.run()
    
    return tw
}

// 添加任务到时间轮
func (tw *TimingWheel) AddTask(task Task, delay time.Duration) {
    // 根据任务的延迟时间计算出应该放置的任务槽位。
    slotIndex := (int(delay / tw.interval) + tw.current) % len(tw.slots)
    // 将任务发送到对应的时间槽通道中。
    tw.slots[slotIndex] <- task
}

// 时间轮的运行逻辑
// 定时器触发时,时间轮向前移动一个时间槽。
// 启动一个goroutine来处理当前时间槽中的所有任务。
// 任务被执行后,从时间槽中移除。
func (tw *TimingWheel) run() {
    for range tw.timer.C {
        tw.current = (tw.current + 1) % len(tw.slots)
        go func(slot int) {
            for task := range tw.slots[slot] {
                task.Run()
            }
        }(tw.current)
    }
}

附:参考链接:

  1. 时间轮详解
  2. 时间轮(TimingWheel)高性能定时任务原理解密
  3. 时间轮 (史上最全)
  4. 一种高效的定时器算法实现(简单时间轮)