用Golang实现一个消息队列
Go的channel
是并发编程中的一种同步通信机制,天然支持并发请求,因此可以用来实现消息队列。这里实现一个能够处理并发请求的消息队列。
一、设计消息队列的基本结构
定义一个Message
结构体来存储消息的ID和负载,一个Queue
结构体来管理消息队列。
type Message struct {
ID string
Payload interface{}
}
type Queue struct {
messages []Message
mutex sync.Mutex
cond *sync.Cond
}
func NewQueue() *Queue {
q := &Queue{}
q.cond = sync.NewCond(&q.mutex)
return q
}
二、实现生产者和消费者
生产者负责生成消息并将其添加到队列中,消费者从队列中读取消息并进行处理。为了确保线程安全和条件等待,可以使用sync.Mutex
和sync.Cond
。
func (q *Queue) Produce(msg Message) {
q.mutex.Lock()
defer q.mutex.Unlock()
q.messages = append(q.messages, msg)
q.cond.Signal()
}
func (q *Queue) Consume() Message {
q.mutex.Lock()
defer q.mutex.Unlock()
for len(q.messages) == 0 {
q.cond.Wait()
}
msg := q.messages[0]
q.messages = q.messages[1:]
return msg
}
三、并发处理
使用goroutine
可以实现并发生产和消费,用来模拟生产者和消费者的行为。
func main() {
queue := NewQueue()
// 生产者Goroutine
go func() {
for i := 0; i < 10; i++ {
msg := Message{ID: fmt.Sprintf("msg-%d", i), Payload: fmt.Sprintf("Data %d", i)}
queue.Produce(msg)
fmt.Printf("Produced: %v\n", msg)
time.Sleep(time.Second)
}
}()
// 消费者Goroutine
go func() {
for {
msg := queue.Consume()
fmt.Printf("Consumed: %v\n", msg)
time.Sleep(time.Second * 2)
}
}()
// 防止主程序退出
select {}
}
四、持久化存储
将消息存储在磁盘或数据库中,防止系统崩溃导致数据丢失。
- 将消息写入到一个日志文件中,这样即使程序重启,消息也不会丢失。
- 在数据库中创建一个表来存储消息,并在消息处理失败时将消息插入到这个表中。