用Golang实现一个消息队列

Go的channel是并发编程中的一种同步通信机制,天然支持并发请求,因此可以用来实现消息队列。这里实现一个能够处理并发请求的消息队列。

一、设计消息队列的基本结构

定义一个Message结构体来存储消息的ID和负载,一个Queue结构体来管理消息队列。

type Message struct {
    ID      string
    Payload interface{}
}

type Queue struct {
    messages []Message
    mutex    sync.Mutex
    cond     *sync.Cond
}

func NewQueue() *Queue {
    q := &Queue{}
    q.cond = sync.NewCond(&q.mutex)
    return q
}

二、实现生产者和消费者

生产者负责生成消息并将其添加到队列中,消费者从队列中读取消息并进行处理。为了确保线程安全和条件等待,可以使用sync.Mutexsync.Cond

func (q *Queue) Produce(msg Message) {
    q.mutex.Lock()
    defer q.mutex.Unlock()
    q.messages = append(q.messages, msg)
    q.cond.Signal()
}

func (q *Queue) Consume() Message {
    q.mutex.Lock()
    defer q.mutex.Unlock()
    for len(q.messages) == 0 {
        q.cond.Wait()
    }
    msg := q.messages[0]
    q.messages = q.messages[1:]
    return msg
}

三、并发处理

使用goroutine可以实现并发生产和消费,用来模拟生产者和消费者的行为。

func main() {
    queue := NewQueue()
    // 生产者Goroutine
    go func() {
        for i := 0; i < 10; i++ {
            msg := Message{ID: fmt.Sprintf("msg-%d", i), Payload: fmt.Sprintf("Data %d", i)}
            queue.Produce(msg)
            fmt.Printf("Produced: %v\n", msg)
            time.Sleep(time.Second)
        }
    }()
    // 消费者Goroutine
    go func() {
        for {
            msg := queue.Consume()
            fmt.Printf("Consumed: %v\n", msg)
            time.Sleep(time.Second * 2)
        }
    }()
    // 防止主程序退出
    select {}
}

四、持久化存储

将消息存储在磁盘或数据库中,防止系统崩溃导致数据丢失。

  • 将消息写入到一个日志文件中,这样即使程序重启,消息也不会丢失。
  • 在数据库中创建一个表来存储消息,并在消息处理失败时将消息插入到这个表中。