消息队列是个啥?干啥用的?
消息队列是一种用于在应用程序之间传输数据的软件模式。它允许不同的应用程序通过在共享队列中存储和检索消息来实现异步通信。
消息队列通常包括三个主要组件:消息生产者、消息队列和消息消费者。消息生产者负责将消息发送到消息队列,而消息消费者则从消息队列中接收消息并进行处理。消息队列则充当了这两者之间的缓冲区。
消息队列通常用于以下情况:
解耦:消息队列可以帮助解耦应用程序之间的依赖关系,因为消息生产者和消息消费者可以独立地运行,而不必知道对方的具体实现。
异步通信:通过消息队列,应用程序可以异步地通信,这意味着一个应用程序可以发送消息并继续进行其他操作,而不必等待接收方对消息作出响应。
扩展性:由于消息队列提供了一个缓冲区,它可以帮助应对高流量的情况,并支持横向扩展。
可靠性:消息队列可以确保消息在传递过程中不会丢失,并支持消息重试和失败处理。
常见的消息队列包括 RabbitMQ、Kafka、ActiveMQ以及我们今天要着重讨论的Redis等。
本文仅讨论使用Redis进行消息队列操作的三种方式,暂不讨论和其他老牌消息队列服务的对比和优劣。
使用List实现一个简单的消息队列
此处利用Redis List可以通过push和pop的方式进行写入和读取的特性;而且数据是有序的,且使用lpush写入的数据默认条件下没有过期时间,可以保证数据在没有被pop读取之前不会丢失。
定义消息
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 | type ActionMessage struct { MessageID string `json:"message_id"` MessageType string `json:"message_type"` } // 实现 encoding.BinaryMarshaler 接口 func (m *ActionMessage) UnmarshalBinary(data []byte) error { return json.Unmarshal(data, m) } func (m *ActionMessage) MarshalBinary() (data []byte, err error) { return json.Marshal(m) } // Process 处理消息 func (message *ActionMessage) Process() { fmt.Println("消息ID:", message.MessageID, "消息类型:", message.MessageType) } |
读取消息
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 | func readActionMessage(key string) { var messageItem ActionMessage err := redisClient.RPop(context.Background(), key).Scan(&messageItem) if err != nil { // 出现错误,等待500毫秒继续读取 time.Sleep(time.Millisecond * 500) fmt.Println("sleep") readActionMessage(key) return } // 执行消息处理操作 messageItem.Process() // 继续读取消息 readActionMessage(key) } |
发送消息
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 | func testMessageQueueWithList() { // 消息队列key queueKey := "ActionMessageQueue" go func() { // 新开一个 goroutine 模拟消息队列接收端 readActionMessage(queueKey) }() // 模拟发送端 sendActionMessage(queueKey) } func sendActionMessage(key string) { for i := 0; i < 100; i++ { messageItem := ActionMessage{ MessageID: fmt.Sprintf("%d", i+1), MessageType: fmt.Sprintf("%d", i%5), } _, err := redisClient.LPush(context.Background(), key, &messageItem).Result() if err != nil { fmt.Println(err.Error()) } time.Sleep(time.Millisecond * 100) } } |
执行结果:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 | 消息ID: 1 消息类型: 0 sleep 消息ID: 2 消息类型: 1 ………………………… sleep 消息ID: 96 消息类型: 0 消息ID: 97 消息类型: 1 消息ID: 98 消息类型: 2 消息ID: 99 消息类型: 3 消息ID: 100 消息类型: 4 sleep sleep sleep sleep sleep sleep sleep sleep |
至此,我们已经实现了一个简单的消息队列,并运行成功。但由于不是专业的消息队列,还是存在一些问题:
1. 消息的消费端必须采用轮训的模式,轮训间隔过长会导致消息处理不及时,过短的话又会增加Redis服务的压力。
2. 由于Redis本身是一个内存数据库,虽然也有aof和rdb两种机制进行持久化方式,但在Redis宕机后始终还是会有部分数据丢失的风险。
3. 消息只要被消费一次后就会从队列中消失,一旦发生bug,想要回溯数据进行重新消费会变得非常困难。
4. 没有消息消费状态记录,不知道事件是否真的处理成功了。
因此,这种实现方式只能作为一些轻量,低可靠度要求的服务使用,且实现并不优雅。
Pub/Sub作为消息队列使用
上面我们用List实现了一个简单的消息队列,但是存在很多的问题,扩展性和可靠性都比较差。而且由于本质是轮训读取,消息并不实时。
pub/sub很好的解决了这个问题:
实时性:pub/sub 可以实现实时消息传递,消息被发布到频道时,订阅者可以立即接收到这些消息。
Redis 的 pub/sub(发布/订阅)功能允许多个客户端通过消息代理来传递消息。
发布者(Publisher)将消息发送到指定的频道(Channel),订阅者(Subscriber)则通过订阅这些频道来接收这些消息。
golang Pub/Sub 实现消息队列
此处我们用宠物门店的订单相关信息来举例,订单的创建,用户取消,门店取消,修改了订单相关的信息,服务完成等等状态发生在不同的业务场景或接口,同时会产生不同的后续处理(并且一定程度上并不需要同步处理)。
比较原始的方法可以把这些处理都写在事件发生的地方,但这样会造成业务代码非常冗杂,难以维护。久而久之就会变成屎山。
Pub/Sub实现的消息队列能够比较优雅的解决这个问题。
比如上面的场景,仅需在业务时间发生的时候往消息队列发送一个修改了宠物信息的消息即可,比如门店经过称重修改了订单对应的宠物信息,此时需要修改服务价格,告知客户和员工这个订单进行了信息修改。
推送服务在接收到这个消息的时候执行推送逻辑,告知客户宠物信息进行了修改、告知员工订单发生了信息修改。
价格计算服务在接收到这个消息的时候执行计算逻辑,更新到新的服务价格。
定义订单状态变化相关消息
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 | package models import ( "fmt" ) // 订单相关订阅消息模型 type OrderPushMessage struct { MessageType OrderPushMessageType `json:"message_type"` OrderID string `json:"order_id"` UserID string `json:"user_id"` StoreID string `json:"store_id"` } type OrderPushMessageType string const ( OrderPushMessageTypeWillExpireSoon OrderPushMessageType = "1" OrderPushMessageTypeCancelByStore OrderPushMessageType = "2" OrderPushMessageTypeEditByStore OrderPushMessageType = "3" OrderPushMessageTypeServiceCompleted OrderPushMessageType = "4" OrderPushMessageTypeCancelByUser OrderPushMessageType = "5" OrderPushMessageTypeCreateNewReservation OrderPushMessageType = "6" OrderPushMessageTypeCancelBySystem OrderPushMessageType = "7" OrderPushMessageTypeServiceStarted OrderPushMessageType = "8" OrderPushMessageTypeNeedStartPickUp OrderPushMessageType = "9" OrderPushMessageTypePickUpExpiried OrderPushMessageType = "10" OrderPushMessageTypePetInfoEdited OrderPushMessageType = "11" OrderPushMessageTypeChangeReservationStaff OrderPushMessageType = "12" OrderPushMessageTypeNextWashPush OrderPushMessageType = "13" ) // Process 处理消息 func (message *OrderPushMessage) Process() { action := "" switch message.MessageType { case OrderPushMessageTypeCancelByStore: action = "发送取消原因给客户" case OrderPushMessageTypeCancelByUser: action = "发送取消通知给门店" case OrderPushMessageTypeServiceCompleted: action = "服务完成,通知客户来接走宠物" default: action = "其他处理方式" + string(message.MessageType) } fmt.Println( "处理订单\n订单ID:", message.OrderID, "门店ID:", message.StoreID, "用户ID:", message.UserID, "处理方式:", action, ) } |
接收端
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 | package smessage import ( "context" "encoding/json" "fmt" "redis_demo/models" "github.com/beego/beego/v2/server/web" "github.com/redis/go-redis/v9" ) // 订单相关消息推送通道 const OrderPushPubSubChannel = "OrderPushPubSubChannel" func InitPubSubClient() { dbHost, _ := web.AppConfig.String("aliyun.redis.endpoint") dbPort, _ := web.AppConfig.String("aliyun.redis.port") dbPassword, _ := web.AppConfig.String("aliyun.redis.password") dbnum, _ := web.AppConfig.Int("aliyun.redis.dbnum") redisClient := redis.NewClient(&redis.Options{ Addr: dbHost + ":" + dbPort, Password: dbPassword, DB: dbnum, }) sub := redisClient.Subscribe( context.Background(), OrderPushPubSubChannel, ) // 用Channel来接收消息 ch := sub.Channel() // 处理消息 for msg := range ch { switch msg.Channel { case OrderPushPubSubChannel: var messageModel models.OrderPushMessage err := json.Unmarshal([]byte(msg.Payload), &messageModel) if err != nil { fmt.Println(err.Error()) } else { go messageModel.Process() } default: fmt.Println("未知消息通道:", msg.Payload) } } } |
发送端
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 | package pmessage import ( "context" "encoding/json" "sync" "github.com/beego/beego/v2/server/web" "github.com/redis/go-redis/v9" ) var redisClient *redis.Client var redisClientOnce = sync.Once{} // 订单相关消息推送通道 const OrderPushPubSubChannel = "OrderPushPubSubChannel" // InitPubSubClient 初始化redis客户端 func InitPubSubClient() *redis.Client { redisClientOnce.Do(func() { dbHost, _ := web.AppConfig.String("aliyun.redis.endpoint") dbPort, _ := web.AppConfig.String("aliyun.redis.port") dbPassword, _ := web.AppConfig.String("aliyun.redis.password") dbnum, _ := web.AppConfig.Int("aliyun.redis.dbnum") redisClient = redis.NewClient(&redis.Options{ Addr: dbHost + ":" + dbPort, Password: dbPassword, DB: dbnum, }) }) return redisClient } // PublishRedisSubscribeMessage 发送redis订阅消息 func PublishRedisSubscribeMessage(channelName string, message interface{}) error { bytes, err := json.Marshal(message) if err != nil { return err } _, err = redisClient.Publish(context.Background(), channelName, string(bytes)).Result() if err != nil { return err } return nil } |
初始化和调用
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 | func SendTestMessage() { for i := 1; i < 101; i++ { err := pmessage.PublishRedisSubscribeMessage(pmessage.OrderPushPubSubChannel, models.OrderPushMessage{ MessageType: models.OrderPushMessageType(fmt.Sprintf("%d", i%13)), OrderID: strconv.Itoa(i), UserID: strconv.Itoa(i + 9527), StoreID: "1000010001", }) if nil != err { log.Println(err.Error()) } } } func init() { go pmessage.InitPubSubClient() go smessage.InitPubSubClient() } |
运行结果
1 2 3 4 5 6 7 8 9 10 11 12 13 | 处理订单 订单ID: 1 门店ID: 1000010001 用户ID: 9528 处理方式: 其他处理方式1 处理订单 订单ID: 2 门店ID: 1000010001 用户ID: 9529 处理方式: 发送取消原因给客户 处理订单 订单ID: 3 门店ID: 1000010001 用户ID: 9530 处理方式: 其他处理方式3 处理订单 订单ID: 4 门店ID: 1000010001 用户ID: 9531 处理方式: 服务完成,通知客户来接走宠物 处理订单 订单ID: 5 门店ID: 1000010001 用户ID: 9532 处理方式: 发送取消通知给门店 处理订单 订单ID: 6 门店ID: 1000010001 用户ID: 9533 处理方式: 其他处理方式6 …………………… |
至此,我们已经用Redis pub/sub实现了简单的消息队列,需要注意的是:
1. 我们可以通过不同的channel来区分不同的消息发送方和接收方,例如订单消息可以定义一个channel,用户相关的消息可以定义另外一个channel,接收方根据需要进行订阅即可。
2. pub/sub机制下接收方可以有N个,N>=1; 如果没有定义接收方,消息将直接丢弃,如果接收方因为网络或其它原因掉线,消息也会随之丢失,所以可靠性略差。
3. 消息在内存中存储,跟List一样同样存在消息消费后即丢失,无法回溯和重复处理的问题。
4. 在分布式系统中,由于消息没有消费状态,如果有多个接收端执行相同逻辑,可能会导致消息所需要的处理逻辑重复执行。
综上,pub/sub虽然很好的实现了业务解耦,发送和订阅解耦,不同channel的互不干扰,但可靠性和持久存储方面存在一些问题。
Stream作为消息队列使用
从上面的代码和运行结果可知,无论使用List还是pub/sub实现的消息队列都或多或少的存在一些问题,Redis5.0以后专门推出了stream数据结构用于消息队列的场景。
它提供了一个高性能、低延迟的消息队列实现。相比于传统的 pub/sub 模式,Redis Stream 具有以下优势:
消息可持久化:Redis Stream 将消息保存在内存中,同时也可以选择将消息保存到磁盘上的持久化文件中,从而保证消息的可靠性。
多消费者:Redis Stream 支持多个消费者对同一个消息队列进行消费,每个消费者可以独立消费消息。
消息确认机制:Redis Stream 支持消息确认机制,当消费者成功消费了一条消息后,可以向 Redis 服务器发送确认信息,从而保证消息不会被重复消费。
消息追溯:Redis Stream 提供了消息追溯功能,即消费者可以根据消息的 ID 进行随时随地的查找和消费消息。
消息分组:Redis Stream 支持对消息进行分组,从而使不同分组的消费者可以独立消费消息,提高了消息消费的并发性。
Redis Stream底层数据结构
Redis Stream 使用的数据结构 radix tree(基数树),也被称为 trie(字典树),是一种常用的树形数据结构。
Radix tree 是一种特殊的多叉树,它将相同的前缀字符串合并成一个共享节点,从而减少了存储空间和查询时间。
图解Radix Tree(截取自Redis源码注释)
假设Radix Tree树中包含以下几个字符串 “foo”, “foobar” 和 “footer”;如果node节点代表rax树中的一个key,就写在 [] 里面,反之写在 () 里面。
然而,这里有个非常常见的优化版本,即将连续拥有单个子节点的节点“压缩”到节点本身作为一个字符字符串,每个字符代表下一级子节点,并且只提供到代表最后一个字符节点的节点的链接。因此,上面的结构被转换为:
不过,这种树形图实现上却是有点麻烦,比如 当字符串 “first” 要插入上面的 Radix Tree 中,这里就涉及到节点切割了,因为此时 “foo” 就不再是一个公共前缀了,最终树形图如下:
源码分析
树结构rax定义:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 | #define RAX_NODE_MAX_SIZE ((1<<29)-1) typedef struct raxNode { uint32_t iskey:1; /* 节点是否包含key */ uint32_t isnull:1; /* 节点的值是否为NULL(没有存储值) */ uint32_t iscompr:1; /* 节点是否被压缩 */ uint32_t size:29; /* 子节点大小,或压缩字符长度 */ unsigned char data[]; /* 节点的实际存储数据 */ } raxNode; typedef struct rax { raxNode *head; /* 保存消息的 Radix Tree */ uint64_t numele; /* 元素/key个数 */ uint64_t numnodes; /* raxNode个数 */ } rax; |
stream数据结构定义:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 | typedef struct streamID { uint64_t ms; /* Unix time in milliseconds. */ uint64_t seq; /* Sequence number. */ } streamID; typedef struct stream { rax *rax; /* The radix tree holding the stream. */ uint64_t length; /* Current number of elements inside this stream. */ streamID last_id; /* Zero if there are yet no items. */ streamID first_id; /* The first non-tombstone entry, zero if empty. */ streamID max_deleted_entry_id; /* The maximal ID that was deleted. */ uint64_t entries_added; /* All time count of elements added. */ rax *cgroups; /* Consumer groups dictionary: name -> streamCG */ } stream; /* Consumer group. */ typedef struct streamCG { streamID last_id; /* Last delivered (not acknowledged) ID for this group. Consumers that will just ask for more messages will served with IDs > than this. */ long long entries_read; /* In a perfect world (CG starts at 0-0, no dels, no XGROUP SETID, ...), this is the total number of group reads. In the real world, the reasoning behind this value is detailed at the top comment of streamEstimateDistanceFromFirstEverEntry(). */ rax *pel; /* Pending entries list. This is a radix tree that has every message delivered to consumers (without the NOACK option) that was yet not acknowledged as processed. The key of the radix tree is the ID as a 64 bit big endian number, while the associated value is a streamNACK structure.*/ rax *consumers; /* A radix tree representing the consumers by name and their associated representation in the form of streamConsumer structures. */ } streamCG; |
Redis stream 的实现依赖于 rax 结构以及 listpack 结构。每个消息流都包含一个 rax 结构,以消息ID 为 key、listpack结构为 value 存储在 rax 结构中。每个消息的具体信息存储在这个 listpack 中。
Stream 保存的消息数据,按照 key-value 形式来看的话,消息 ID 就相当于 key,而消息内容相当于是 value。
也就是,Stream 会使用 Radix Tree 来保存消息 ID,然后将消息内容保存在 listpack 中,并作为消息 ID 的 value,用 raxNode 的 value 指针指向对应的 listpack。
Redis Stream实际使用
接下来我们模拟门店订单消息使用stream进行消息队列实践。
消息结构体使用跟上面一致的结构。
发送
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 | // NewRedisClient 初始化redis客户端 func NewRedisClient() *redis.Client { dbHost, _ := web.AppConfig.String("aliyun.redis.endpoint") dbPort, _ := web.AppConfig.String("aliyun.redis.port") dbPassword, _ := web.AppConfig.String("aliyun.redis.password") dbnum, _ := web.AppConfig.Int("aliyun.redis.dbnum") redisClient := redis.NewClient(&redis.Options{ Addr: dbHost + ":" + dbPort, Password: dbPassword, DB: dbnum, }) return redisClient } func PublishStreamMessage(streamName, messageBodyType string, messageBody interface{}, client *redis.Client) error { // 将消息内容解析为JSON备用 messageBodyBytes, err := json.Marshal(messageBody) if err != nil { return err } messageID, err := client.XAdd(context.Background(), &redis.XAddArgs{ Stream: streamName, // 标记stream的key NoMkStream: false, // 默认false在流不存在的时候自动创建,如果不想自动创建可改为true MaxLen: 500_000, // 队列最大长度,默认不限制,根据业务设置一个最大长度可节省内存 MinID: "", // ID起始值,不设置则按照默认规则自增 Approx: false, // 在达到MaxLen后是否使用近似值,默认否,如果为true则可能容忍超过maxlen的长度 Limit: 0, // 也是限制队列最大长度,跟maxlen的区别在于必须是maxlen使用模糊限制的时候生效,一般用不着 ID: "*", // *由Redis生成消息ID,自行指定ID的时候需要处理自增 Values: map[string]interface{}{ "MessageBody": messageBodyBytes, "MessageBodyType": messageBodyType, }, // key1 value1 key2 value2 ... 形式的消息内容,这里将要发送的消息体和类型分别组装 }).Result() fmt.Println("New message id =", messageID) return err } |
xadd命令可直接往队列中加入我们想要发送的消息,如注释中所示,我们可以通过设置最大长度来控制队列长度,以达到节约内存的目的。
消息的内容是key1 value1 key2 value2 …的形式,需要在业务代码中对具体消息体进行格式化,这一点略微麻烦。
模拟往队列中发1亿条消息:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 | func SendStreamMessage(streamName, messageBodyType string) { client := NewRedisClient() for i := 1; i < 100000001; i++ { message := models.OrderPushMessage{ MessageType: models.OrderPushMessageType(fmt.Sprintf("%d", i%13)), OrderID: strconv.Itoa(i), UserID: strconv.Itoa(i + 9527), StoreID: "1000010001", } err := PublishStreamMessage(streamName, messageBodyType, message, client) if err != nil { log.Println(err.Error()) } } } |
普通消费和标记
我们先试用正常的直接读取消息,标记消费的模式进行消息处理,这里我们开三个协程模拟三个分布式客户端正在从消息队列接收消息。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 | // SimulatedNormalReception模拟普通接收 func SimulatedNormalReception(streamName string) { go func() { rdb := NewRedisClient() for { streams, err := rdb.XRead(context.Background(), &redis.XReadArgs{ Streams: []string{streamName, "$"}, // $ 表示从最新消息位置开始消费,也可以自行指定消息ID,从指定消息ID开始 Count: 1, // 每次读取条数 Block: time.Second * 5, // 阻塞时长,如果为0则不阻塞 }).Result() if err != nil { log.Println(err.Error()) continue } for _, stream := range streams { for _, message := range stream.Messages { fmt.Println("处理消息:", message) _, err = rdb.XAck(context.Background(), streamName, "", message.ID).Result() // 标记消息已被消费 if err != nil { log.Println(err.Error()) continue } } } } }() go func() { rdb := NewRedisClient() for { streams, err := rdb.XRead(context.Background(), &redis.XReadArgs{ Streams: []string{streamName, "$"}, Count: 1, Block: time.Second * 5, }).Result() if err != nil { log.Println(err.Error()) continue } for _, stream := range streams { for _, message := range stream.Messages { fmt.Println("处理消息:", message) _, err = rdb.XAck(context.Background(), streamName, "", message.ID).Result() if err != nil { log.Println(err.Error()) continue } } } } }() go func() { rdb := NewRedisClient() for { streams, err := rdb.XRead(context.Background(), &redis.XReadArgs{ Streams: []string{streamName, "$"}, Count: 1, Block: time.Second * 5, }).Result() if err != nil { log.Println(err.Error()) continue } for _, stream := range streams { for _, message := range stream.Messages { fmt.Println("处理消息:", message) _, err = rdb.XAck(context.Background(), streamName, "", message.ID).Result() if err != nil { log.Println(err.Error()) continue } } } } }() } |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 | ...... 处理消息: {1683711350250-0 map[MessageBody:{"message_type":"4","order_id":"84842","user_id":"94369","store_id":"1000010001"} MessageBodyType:OrderMessage]} New message id = 1683711350250-0 处理消息: {1683711350250-0 map[MessageBody:{"message_type":"4","order_id":"84842","user_id":"94369","store_id":"1000010001"} MessageBodyType:OrderMessage]} 处理消息: {1683711350250-0 map[MessageBody:{"message_type":"4","order_id":"84842","user_id":"94369","store_id":"1000010001"} MessageBodyType:OrderMessage]} New message id = 1683711350250-1 处理消息: {1683711350250-2 map[MessageBody:{"message_type":"6","order_id":"84844","user_id":"94371","store_id":"1000010001"} MessageBodyType:OrderMessage]} New message id = 1683711350250-2 处理消息: {1683711350250-3 map[MessageBody:{"message_type":"7","order_id":"84845","user_id":"94372","store_id":"1000010001"} MessageBodyType:OrderMessage]} New message id = 1683711350250-3 处理消息: {1683711350250-3 map[MessageBody:{"message_type":"7","order_id":"84845","user_id":"94372","store_id":"1000010001"} MessageBodyType:OrderMessage]} 处理消息: {1683711350251-0 map[MessageBody:{"message_type":"8","order_id":"84846","user_id":"94373","store_id":"1000010001"} MessageBodyType:OrderMessage]} New message id = 1683711350251-0 ^CNew message id = 1683711350252-0 ...... |
从运行结果可以看到,我们模拟的三个分布式客户端虽然都各自实现了标记消息被消费的逻辑,但消息仍然被三个端分别消费并进行了逻辑处理。
消息的重复处理可能会导致业务逻辑的错误,那么有没有办法解决呢?
答案当然是有的,我们可以通过分布式锁,将消息处理的过程锁住,保证每个消息仅处理一次,但好像成本略高啊?
其实Redis提供了优雅的解决方案,使用消费者分组来解决这个问题,同一个分组内,仅一个消费者能消费这条消息,且不会引入分布式锁这么重量级的解决方案。
使用消费者组进行消费
接下来使用消费者分组对上面的逻辑进行改造。
创建消费组:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 | func CreateConsumerGroup(streamName, groupName string, consumers []string) { rdb := NewRedisClient() // 这里为了方便测试,先销毁消费组 _, err := rdb.XGroupDestroy(context.Background(), streamName, groupName).Result() if err != nil { log.Println(err.Error()) return } // 创建消费组,$参数表示从最后开始消费 _, err = rdb.XGroupCreate(context.Background(), streamName, groupName, "$").Result() if err != nil { log.Println(err.Error()) return } // 创建消费者, 这一步非常必要,Redis规定需要先在组内创建消费者,才能使用这个消费者接收消息 for _, consumer := range consumers { _, err = rdb.XGroupCreateConsumer(context.Background(), streamName, groupName, consumer).Result() if err != nil { log.Println(err.Error()) return } } } |
使用消费组接收消息:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 | // SimulatedGroupAndConsumerReception 模拟使用消费端分组进行接收 func SimulatedGroupAndConsumerReception(streamName, groupName string) { consumerA := "ConsumerA" consumerB := "ConsumerB" consumerC := "ConsumerC" CreateConsumerGroup(streamName, groupName, []string{consumerA, consumerB, consumerC}) go func() { rdb := NewRedisClient() for { streams, err := rdb.XReadGroup(context.Background(), &redis.XReadGroupArgs{ Group: groupName, Consumer: consumerA, // id为>,表示着消费者只希望接收从未传递给任何其他消费者的消息 // id为0或其他,表示可获取已读但未确认的消息,请注意,在这种情况下,BLOCK和NOACK都将被忽略。 Streams: []string{streamName, ">"}, Count: 1, Block: time.Second * 5, NoAck: true, // 这里为true,表示自动标记消费,无需再调用标记消费的xack命令 }).Result() if err != nil { log.Println(err.Error()) continue } for _, stream := range streams { for _, message := range stream.Messages { fmt.Println("ConsumerA 处理消息:", message) } } } }() go func() { rdb := NewRedisClient() for { streams, err := rdb.XReadGroup(context.Background(), &redis.XReadGroupArgs{ Group: groupName, Consumer: consumerB, Streams: []string{streamName, ">"}, Count: 1, Block: time.Second * 5, NoAck: true, }).Result() if err != nil { log.Println(err.Error()) continue } for _, stream := range streams { for _, message := range stream.Messages { fmt.Println("ConsumerB 处理消息:", message) } } } }() go func() { rdb := NewRedisClient() for { streams, err := rdb.XReadGroup(context.Background(), &redis.XReadGroupArgs{ Group: groupName, Consumer: consumerB, Streams: []string{streamName, ">"}, Count: 1, Block: time.Second * 5, NoAck: true, }).Result() if err != nil { log.Println(err.Error()) continue } for _, stream := range streams { for _, message := range stream.Messages { fmt.Println("ConsumerC 处理消息:", message) } } } }() } |
运行结果:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 | New message id = 1683713149147-1 ConsumerC 处理消息: {1683713149147-1 map[MessageBody:{"message_type":"2","order_id":"21569","user_id":"31096","store_id":"1000010001"} MessageBodyType:OrderMessage]} New message id = 1683713149147-2 ConsumerB 处理消息: {1683713149147-2 map[MessageBody:{"message_type":"3","order_id":"21570","user_id":"31097","store_id":"1000010001"} MessageBodyType:OrderMessage]} New message id = 1683713149147-3 ConsumerA 处理消息: {1683713149147-3 map[MessageBody:{"message_type":"4","order_id":"21571","user_id":"31098","store_id":"1000010001"} MessageBodyType:OrderMessage]} New message id = 1683713149147-4 ConsumerC 处理消息: {1683713149147-4 map[MessageBody:{"message_type":"5","order_id":"21572","user_id":"31099","store_id":"1000010001"} MessageBodyType:OrderMessage]} New message id = 1683713149147-5 ConsumerB 处理消息: {1683713149147-5 map[MessageBody:{"message_type":"6","order_id":"21573","user_id":"31100","store_id":"1000010001"} MessageBodyType:OrderMessage]} New message id = 1683713149147-6 ConsumerA 处理消息: {1683713149147-6 map[MessageBody:{"message_type":"7","order_id":"21574","user_id":"31101","store_id":"1000010001"} MessageBodyType:OrderMessage]} New message id = 1683713149147-7 ConsumerC 处理消息: {1683713149147-7 map[MessageBody:{"message_type":"8","order_id":"21575","user_id":"31102","store_id":"1000010001"} MessageBodyType:OrderMessage]} New message id = 1683713149147-8 ConsumerB 处理消息: {1683713149147-8 map[MessageBody:{"message_type":"9","order_id":"21576","user_id":"31103","store_id":"1000010001"} MessageBodyType:OrderMessage]} New message id = 1683713149147-9 ConsumerA 处理消息: {1683713149147-9 map[MessageBody:{"message_type":"10","order_id":"21577","user_id":"31104","store_id":"1000010001"} MessageBodyType:OrderMessage]} New message id = 1683713149147-10 ConsumerC 处理消息: {1683713149147-10 map[MessageBody:{"message_type":"11","order_id":"21578","user_id":"31105","store_id":"1000010001"} MessageBodyType:OrderMessage]} |
从运行结果来看,每条消息均只被消费了一次,而且几乎是均匀的分不到了不同的消费端,也就意味着这对分布式系统非常友好,可以很好的解决分布式系统下消息到达负载的问题。
常见命令用途说明
消息队列相关命令:
XADD – 添加消息到末尾
XTRIM – 对流进行修剪,限制长度
XDEL – 删除消息
XLEN – 获取流包含的元素数量,即消息长度
XRANGE – 获取消息列表,会自动过滤已经删除的消息
XREVRANGE – 反向获取消息列表,ID 从大到小
XREAD – 以阻塞或非阻塞方式获取消息列表
XACK – 将消息标记为”已处理”
消费者组相关命令:
XGROUP CREATE – 创建消费者组
XREADGROUP GROUP – 读取消费者组中的消息
XGROUP SETID – 为消费者组设置新的最后递送消息ID
XGROUP DELCONSUMER – 删除消费者
XGROUP DESTROY – 删除消费者组
XPENDING – 显示待处理消息的相关信息
XCLAIM – 转移消息的归属权
XINFO – 查看流和消费者组的相关信息;
XINFO GROUPS – 打印消费者组的信息;
XINFO STREAM – 打印流信息
总结
1. 使用List实现的消息队列由于需要轮训,可能造成很多的资源浪费或消息的到达不及时,且数据保存在内存中,用后即销毁,无法回溯。
2. 使用pub/sub实现的消息队列支持多端接收,已经可以很好的满足业务解耦,到达速度也非常快;但同样存在读取后即销毁,无法回溯的问题。
3. Redis stream实现的消息队列基本比较好的解决的上面的问题,而且结合消费组以后对分布式系统非常友好;对比kafka等老牌重量级中间件速度还更快一些,唯一的缺点可能是内存会导致成本更高了。
程序猿老龚(龚杰洪)原创,版权所有,转载请注明出处.