Golang实现一个基于本地存储的延时队列

一、什么是延迟队列

延迟队列是一种特殊的队列,其核心特点是队列中的消息或任务会被延迟一定时间后才能被消费或执行。这种队列广泛应用于需要延迟处理的场景,比如订单超时未支付自动取消、用户注册后一段时间未登录发送提醒等。

二、延迟队列的应用场景

  • 电商平台订单处理:超过一定时间未支付的订单自动取消。
  • 商品签收后自动确认:在一定时间未确认的情况下,系统自动确认收货。
  • 用户提醒:在平台注册但一定时间内未登录的用户,发送短信提醒。
  • 会议通知:预定会议后,在预定时间点前一定时间通知与会人员。

三、延迟队列和定时任务的区别

特性延迟队列定时任务
触发时间没有固定的开始时间,而是依赖于某个事件触发后,再延迟一段时间执行任务有明确的触发时间,通常是按照固定的时间周期执行。可以通过cron表达式来设定具体的执行时间
周期性无周期性,通常是一次性任务有周期性,可以设置为定期重复执行
任务数量通常处理单个任务一般会同时处理多个任务
实现方式消息队列、专门的延迟队列实现可以通过编程语言或框架提供的定时器功能实现,如Golang的Timer
应用场景订单超时自动取消、用户操作后的反馈提示数据备份、日志清理等定期执行的任务
数据库压力通常是内存队列操作,处理效率较高,不会直接给数据库带来压力查表会给数据库带来较大的查询压力,尤其是当业务数据量较大时
时效性可以更精确地控制任务的执行时间,适合对实效性要求较高的场景由于执行时间的不确定性,可能无法满足对实效性要求较高的系统需求
分布式支持更容易实现分布式支持,适合大规模和高可用性的需求在分布式环境下可能需要额外的协调机制来保证任务的一致性,比如分布式锁
可靠性可以实现更好的异常恢复机制,尤其是在使用持久化存储的情况下可能在系统异常恢复后丢失任务状态,需要额外的机制来保证任务的恢复

四、Golang实现延时队列

实现一个基于本地存储的延时队列,可以通过以下步骤进行:

  1. 定义任务结构:首先定义一个任务结构体,包含任务的执行时间和具体的任务内容。
  2. 使用优先队列:利用Go的container/heap接口实现一个优先队列,用于管理任务。优先队列可以根据任务的执行时间来排序任务。
  3. 存储任务:将任务存储在优先队列中,以便根据执行时间顺序执行。
  4. 定时检查和执行:通过一个循环,定时检查当前时间与队列中最早任务的执行时间。如果当前时间晚于或等于任务的执行时间,则执行该任务。
  5. 持久化存储:为了实现本地存储,可以将任务序列化后存储在本地文件或数据库中。在程序启动时,从存储中读取任务并重新构建优先队列。
  6. 处理程序重启:为了处理程序重启的情况,需要在任务执行后立即将任务状态更新到持久化存储中,以确保任务不会因为程序重启而丢失。
  7. 并发控制:如果需要支持并发,可以使用互斥锁(sync.Mutex)来保护对优先队列的访问。

代码如下:

package main

import (
    "container/heap"
    "fmt"
    "time"
)

// Task 定义了延时任务的结构
type Task struct {
    ExecuteTime time.Time
    Job         func
}

// PriorityQueue 定义了优先队列,用于存储延时任务
type PriorityQueue []*Task

// Len 返回队列中任务的数量
func (pq PriorityQueue) Len() int { return len(pq) }

// Less 比较两个任务的执行时间,以确定它们的优先级
func (pq PriorityQueue) Less(i, j int) bool {
    return pq[i].ExecuteTime.Before(pq[j].ExecuteTime)
}

// Swap 交换队列中两个任务的位置
func (pq PriorityQueue) Swap(i, j int) {
    pq[i], pq[j] = pq[j], pq[i]
}

// Push 向队列中添加新任务
func (pq *PriorityQueue) Push(x interface{}) {
    *pq = append(*pq, x.(*Task))
}

// Pop 从队列中移除并返回一个任务
func (pq *PriorityQueue) Pop() interface{} {
    old := *pq
    n := len(old)
    task := old[n-1]
    *pq = old[0 : n-1]
    return task
}

// main 函数中初始化优先队列并添加任务
func main() {
    var pq PriorityQueue
    heap.Init(&pq)

    // 添加任务到优先队列
    pq.Push(&Task{ExecuteTime: time.Now().Add(2 * time.Second), Job: func() { fmt.Println("Task 1 executed") }})
    pq.Push(&Task{ExecuteTime: time.Now().Add(1 * time.Second), Job: func() { fmt.Println("Task 2 executed") }})

    // 定时检查并执行任务
    for len(pq) > 0 {
        task := heap.Pop(&pq).(*Task)
        if time.Now().After(task.ExecuteTime) {
            task.Job()
        }
        time.Sleep(1 * time.Second) // 每秒检查一次
    }
}

我们创建了一个Task结构体和一个PriorityQueue类型,后者实现了heap.Interface接口。 在main函数中,我们初始化了优先队列,添加了一些任务,并每秒检查一次是否有任务需要执行。