Redis高级数据结构Stream和HyperLogLog( 二 )


XINFO stream streamtest 查看消息队列信息

Redis高级数据结构Stream和HyperLogLog

文章插图
 
XINFO groups streamtest 查看消息者组情况
Redis高级数据结构Stream和HyperLogLog

文章插图
 
消费消息
有了消费组,自然还需要消费者,Stream 提供了 xreadgroup 指令可以进行消费组的组内消费,需要提供消费组名称、消费者名称和起始消息 ID 。它同 xread 一样,也可以阻塞等待新消息 。读到新消息后,对应的消息 ID 就会进入消费者的PEL(正在处理的消息) 结构里,客户端处理完毕后使用 xack 指令通知服务器,本条消息已经处理完毕,该消息 ID 就会从 PEL 中移除 。
XREADGROUP group cgroup1 c1 count 1 streams streamtest > //cgroup1 指定消费者组 //c1 指定消费者 //count 1 消费数量 // > 从当前消费者组的last_delivered_id(不包括)开始读 //阻塞读取,直到有消息写入,并返回阻塞时间 XREADGROUP group cgroup1 c1 block 0 count 1 streams streamtest > XINFO groups streamtest //消费者组状态 XINFO consumers streamtest cgroup1 //消费者组cgroup1内的消费者状态 XACK streamtest cgroup1 1672624113938-0 //确认消息 XPENDING streamtest cgroup1 //返回cgroup1内消费者未处理完的消息
消费者组状态
Redis高级数据结构Stream和HyperLogLog

文章插图
 
更多的Redis的Stream命令请大家参考Redis官方文档:
Redis队列几种实现的总结
基于List的 LPUSH+BRPOP 的实现
足够简单,消费消息延迟几乎为零,但是需要处理空闲连接的问题 。
如果线程一直阻塞在那里,Redis客户端的连接就成了闲置连接,闲置过久,服务器一般会主动断开连接,减少闲置资源占用,这个时候blpop和brpop或抛出异常,所以在编写客户端消费者的时候要小心,如果捕获到异常需要重试 。
其他缺点包括:
做消费者确认ACK麻烦,不能保证消费者消费消息后是否成功处理的问题(宕机或处理异常等),通常需要维护一个Pending列表,保证消息处理确认;不能做广播模式,如pub/sub,消息发布/订阅模型;不能重复消费,一旦消费就会被删除;不支持分组消费 。
基于Sorted-Set的实现
多用来实现延迟队列,当然也可以实现有序的普通的消息队列,但是消费者无法阻塞的获取消息,只能轮询,不允许重复消息 。
PUB/SUB,订阅/发布模式
优点:
典型的广播模式,一个消息可以发布到多个消费者;多信道订阅,消费者可以同时订阅多个信道,从而接收多类消息;消息即时发送,消息不用等待消费者读取,消费者会自动接收到信道发布的消息 。
缺点:
消息一旦发布,不能接收 。换句话就是发布时若客户端不在线,则消息丢失,不能寻回;不能保证每个消费者接收的时间是一致的;若消费者客户端出现消息积压,到一定程度,会被强制断开,导致消息意外丢失 。通常发生在消息的生产远大于消费速度时;可见,Pub/Sub 模式不适合做消息存储,消息积压类的业务,而是擅长处理广播,即时通讯,即时反馈的业务 。
基于Stream类型的实现
基本上已经有了一个消息中间件的雏形,可以考虑在生产过程中使用,当然真正要在生产中应用,要做的事情还很多,比如消息队列的管理和监控就需要花大力气去实现,而专业消息队列都已经自带或者存在着很好的第三方方案和插件 。不保证消息不丢失 。
消息队列问题
从我们上面对Stream的使用表明,Stream已经具备了一个消息队列的基本要素,生产者API、消费者API,消息Broker,消息的确认机制等等,所以在使用消息中间件中产生的问题,这里一样也会遇到 。
Stream 消息太多怎么办?
要是消息积累太多,Stream 的链表岂不是很长,内容会不会爆掉?xdel 指令又不会删除消息,它只是给消息做了个标志位 。
Redis 自然考虑到了这一点,所以它提供了一个定长 Stream 功能 。在 xadd 的指令提供一个定长长度 maxlen,就可以将老的消息干掉,确保最多不超过指定长度 。
消息如果忘记 ACK 会怎样?
Stream 在每个消费者结构中保存了正在处理中的消息 ID 列表 PEL,如果消费者收到了消息处理完了但是没有回复 ack,就会导致 PEL 列表不断增长,如果有很多消费组的话,那么这个 PEL 占用的内存就会放大 。所以消息要尽可能的快速消费并确认 。


推荐阅读