构建分布式任务调度系统(一):时间轮
时间轮(Timing Wheel)是一种高效的数据结构,用于管理定时任务或事件。在分布式任务调度系统等场景中经常可以看到时间轮的实现,以实现对大量定时任务的高效调度。
一、为什么用时间轮
相比传统的队列形式的调度器来说,时间轮能够批量高效的管理各种延时任务、周期任务、通知任务等等。
启动定时器 | 终止定时器 | 周期清算 | n 的含义 | |
---|---|---|---|---|
无序列表定时器 | O(1) | O(1) | O(n) | 定时器数量 |
有序列表定时器 | O(n) | O(n) | O(1) | 定时器数量 |
定时器树 | O(log n) | O(log n) | O(1) | 定时器数量 |
简单的时间轮 | O(1) | O(1) | O(1) | 同一时间槽定时器数量 |
有序定时器列表的散列轮 | O(1),最坏为 O(n) | O(1) | O(1) | 同一时间槽定时器数量 |
无序定时器列表的哈希轮 | O(1) | O(1) | O(1),最坏为 O(n) | 同一时间槽定时器数量 |
分级时间轮 | O(m) = O(1) | O(1) | O(1) ,最坏为 O(n) | m 为层级数,n为定时器数量 |
上面的表格对比了传统的简单调度器与时间轮,时间轮的效率更高。
二、时间轮的实现方式
时间轮有几种不同的实现方式,每种方式都有其特定的应用场景和性能特点。
时间轮类型 | 特点 | 适用场景 |
---|---|---|
简单时间轮 | 固定大小的数组,指针循环移动 | 定时任务数量较少,时间间隔较短的场景 |
有序时间轮 | 数组元素按到期时间排序 | 需要快速定位到期任务,但添加或删除任务可能需要重新排序 |
无序时间轮 | 数组元素不排序,到期时间作为任务的一部分存储 | 添加和删除任务时不需要重新排序,但查找到期任务可能需要遍历 |
分层时间轮 | 多个时间粒度不同的时间轮组成,每个时间轮负责不同时间范围的任务 | 任务到期时间跨度较大的场景 |
散列时间轮 | 使用哈希表存储定时任务,减少同一时间槽内任务的冲突 | 可以快速定位到期任务,但哈希表的维护可能增加复杂性 |
最小堆时间轮 | 使用最小堆存储定时任务,堆顶是最早到期的任务 | 需要快速访问最早到期任务的场景 |
红黑树时间轮 | 使用红黑树存储定时任务,有序性允许快速查找、添加和删除操作 | 需要频繁动态调整任务的场景 |
延迟队列时间轮 | 结合时间轮和延迟队列,使用时间轮管理延迟队列中的任务 | 需要处理大量短生命周期定时任务的场景 |
三、时间轮的处理流程
3.1 简单时间轮(Simple Timing Wheel)
简单时间轮通常包含一个固定大小的数组,每个数组元素代表一个时间槽,用于存储到期时间落在该时间槽的任务。随着时间的推移,一个指针会循环移动到下一个时间槽,并执行该时间槽中的所有任务。
时间槽关联的任务列表可以是双向链表(见下图),也可以是双向循环链表。当双向循环链表的当前指针指向的时间槽到达时,可以从头开始执行链表中的所有任务,直到再次到达尾部,这样就完成了一个时间槽的任务执行,因而使用双向循环链表可以简化到期任务的执行过程。
简单时间轮的处理流程分析如下:
- 初始化时间轮:
- 时间轮由10个时间槽组成,每个时间槽代表1毫秒,因此整个时间轮走完一圈需要10毫秒。
- 设置当前时间指针:
- 初始化时,
currentTime
指针指向时间槽0。
- 添加任务:
- 当一个定时为1毫秒的任务需要被添加到时间轮时,首先计算它应该被放置在哪个时间槽。
- 由于当前时间是0毫秒,任务需要在1毫秒后执行,因此任务被添加到时间槽1的
TimerTaskList
中。
- 推进时间轮:
- 随着时间的推移,时间轮的
currentTime
指针会顺时针移动到下一个时间槽。 - 每经过1毫秒,
currentTime
指针移动到下一个时间槽,并检查该时间槽的TimerTaskList
是否有任务需要执行。
- 执行到期任务:
- 当
currentTime
指针移动到时间槽1时,它会执行时间槽1中的所有任务(在这个例子中是task1
)。 - 执行完任务后,这些任务可以从
TimerTaskList
中移除,或者如果它们是周期性任务,可以重新调度。
- 处理周期性任务:
- 如果任务是周期性的,它们需要在执行后重新添加到时间轮中。
- 例如,如果
task1
是周期性任务,它将在执行后重新计算下一次执行的时间槽,并被添加到相应的TimerTaskList
中。
- 任务移除和重新调度:
- 如果任务不再需要执行,可以从
TimerTaskList
中移除。 - 对于需要重新调度的任务,根据其周期性重新计算时间槽,并添加到新的时间槽中。
- 时间轮循环:
- 当
currentTime
指针完成一圈后,它将回到时间槽0,继续循环。
- 处理长周期任务:
- 如果任务的周期超过了时间轮的一圈,它就无法被简单时间轮直接处理,需要使用分层时间轮或其他机制来处理。
3.2 分层时间轮(Hierarchical Timing Wheel)
分层时间轮通过多个时间轮层级来处理不同时间粒度的任务。每个层级的时间轮负责不同范围的时间调度。
以下是对这个分层时间轮处理流程的分析:
- 第一层(秒轮):
span
表示时间轮覆盖的时间范围,这里是60秒。wheelSize
是时间轮的槽数,这里是60,对应秒针的60秒。interval
是每个时间槽代表的时间,这里是1秒(span/wheelSize
)。
当前时间指针(currentTime)指向当前时间槽。
- 第二层(分轮):
span
为60分钟,即3600秒。wheelSize
为60,对应分钟的60分钟。interval
为60秒(span/wheelSize
),即每个时间槽代表1分钟。
- 第三层(时轮):
span
为24小时,即86400秒。wheelSize
为24,对应小时的24小时。interval
为3600秒(span/wheelSize
),即每个时间槽代表1小时。
- 任务调度:
- 当一个任务需要被调度时,首先确定它的时间粒度(秒、分、小时)。
- 任务被添加到最底层的时间轮中,即第一层的秒轮。
- 当任务的时间到达时,如果它是一个周期性任务,并且周期超过了当前时间轮的
span
,它将被提升到上一层时间轮。
- 时间轮推进:
- 每过一个
interval
,当前时间指针会移动到下一个时间槽。 - 当第一层时间轮的指针走完一圈(60秒),如果任务的周期还没有结束,它将被移动到第二层的分轮。
- 同样,当第二层时间轮的指针走完一圈(60分钟),如果任务的周期还没有结束,它将被移动到第三层的时轮。
- 任务执行:
- 当任务在某个时间轮的时间槽中到期时,它将被执行。
- 如果任务是周期性的,它将根据剩余周期重新计算并添加到相应的时间轮层级中。
- 任务移除:
- 如果任务不再需要执行,它将从时间轮中移除。
分层时间轮的设计允许系统有效地处理不同时间粒度的任务,并且可以处理长周期任务,而不需要在单个时间轮中分配大量的时间槽。通过将任务在不同层级之间移动,可以优化内存使用并保持时间轮的效率。
四、时间轮算法的实现
// 时间轮结构体
type TimingWheel struct {
slots []chan Task // 时间槽,每个槽是一个通道,用于存储到期时间相同的任务
current int // 当前时间槽的索引,随着时间的流逝,这个索引会循环增加
interval time.Duration // 时间槽的时间间隔
timer *time.Ticker // 定时器,用于触发时间轮的转动
taskQueue chan Task // 任务队列,用于存储待添加到时间轮的任务
}
// 时间轮的初始化
func NewTimingWheel(slots int, interval time.Duration) *TimingWheel {
// 创建一个具有指定数量时间槽和时间间隔的TimingWheel实例。
tw := &TimingWheel{
slots: make([]chan Task, slots),
current: 0,
interval: interval,
taskQueue: make(chan Task, 100),
}
// 初始化每个时间槽为一个缓冲通道,用于存储任务。
for i := range tw.slots {
tw.slots[i] = make(chan Task, 10)
}
// 创建一个定时器,每隔interval时间触发一次。
tw.timer = time.NewTicker(interval)
// 启动一个后台goroutine来运行时间轮。
go tw.run()
return tw
}
// 添加任务到时间轮
func (tw *TimingWheel) AddTask(task Task, delay time.Duration) {
// 根据任务的延迟时间计算出应该放置的任务槽位。
slotIndex := (int(delay / tw.interval) + tw.current) % len(tw.slots)
// 将任务发送到对应的时间槽通道中。
tw.slots[slotIndex] <- task
}
// 时间轮的运行逻辑
// 定时器触发时,时间轮向前移动一个时间槽。
// 启动一个goroutine来处理当前时间槽中的所有任务。
// 任务被执行后,从时间槽中移除。
func (tw *TimingWheel) run() {
for range tw.timer.C {
tw.current = (tw.current + 1) % len(tw.slots)
go func(slot int) {
for task := range tw.slots[slot] {
task.Run()
}
}(tw.current)
}
}