Redis高级数据结构Stream和HyperLogLog

队列与Stream

Redis高级数据结构Stream和HyperLogLog

文章插图
 
redis stream结构如上图所示
消息链表,每个消息都有一个唯一的 ID 和对应的内容 。消息是持久化的,redis 重启后,内容还在 。
Stream唯一名称,它就是 Redis 的 key,在我们首次使用xadd指令追加消息时自动创建 。
消费组,一个stream支持多个 last_delivered_id,表示当前消费组已经消费到哪条消息了 。
每个消费者组都有一个 Stream 内唯一的名称,消费组不会自动创建,它需要单独的指令xgroup create进行创建,需要指定从 Stream 的某个消息 ID 开始消费,这个 ID 用来初始化last_delivered_id变量 。
同一个消费组 (Consumer Group) 可以挂接多个消费者 (Consumer),这些消费者之间是竞争关系,任意一个消费者读取了消息都会使游标last_delivered_id往前移动 。每个消费者有一个组内唯一名称 。
pending_ids,它记录了当前消费者已经被客户端读取,但是还没有 ack的消息 。如果客户端没有 ack,这个变量里面的消息 ID 会越来越多,一旦某个消息被 ack,它就开始减少 。这个 pending_ids 变量在 Redis 官方被称之为PEL,也就是Pending Entries List,这是一个很核心的数据结构,它用来确保客户端至少消费了消息一次,而不会在网络传输的中途丢失了没处理 。
消息ID 的形式是timestampInMillis-sequence,例如1527846880572-5,它表示当前的消息在毫米时间戳1527846880572时产生,并且是该毫秒内产生的第 5 条消息 。
消息内容就是键值对,形如 hash 结构的键值对,这没什么特别之处 。
常用命令
版本:redis-6.2.8
生产端 xadd 追加消息 xdel 删除消息,这里的删除仅仅是设置了标志位,不会实际删除消息 。XDEL streamtest 1672574363910-0 xrange 获取消息列表,会自动过滤已经删除的消息 xlen 消息长度 del 删除 Stream
Redis高级数据结构Stream和HyperLogLog

文章插图
 
streamtest 表示当前这个队列的名字,也就是我们一般意义上Redis中的key,
* 号表示服务器自动生成 ID,后面顺序跟着,是我们存入当前streamtest 这个队列的消息,采用的也是 key/value的存储形式
返回值1672574363910-0 则是生成的消息 ID,由两部分组成:时间戳-序号 。时间戳时毫秒级单位,是生成消息的Redis服务器时间,它是个64位整型 。序号是在这个毫秒时间点内的消息序号 。它也是个64位整型 。建议使用Redis的方案生成消息ID,因为这种时间戳+序号的单调递增的ID方案,几乎可以满足全部的需求,但ID是支持自定义的 。
为了保证消息是有序的,因此Redis生成的ID是单调递增有序的 。由于ID中包含时间戳部分,为了避免服务器时间错误而带来的问题(例如服务器时间延后了),Redis的每个Stream类型数据都维护一个latest_generated_id属性,用于记录最后一个消息的ID 。若发现当前时间戳退后(小于latest_generated_id所记录的),则采用时间戳不变而序号递增的方案来作为新消息ID(这也是序号为什么使用int64的原因,保证有足够多的的序号),从而保证ID的单调递增性质 。
消费端 单消费者
Redis 设计了一个单独的消费指令xread,可以将 Stream 当成普通的消息队列 (list) 来使用 。使用 xread 时,我们可以完全忽略消费组 (Consumer Group) 的存在,就好比 Stream 就是一个普通的列表 (list) 。
XREAD count 1 streams streamtest 0-0 count 1 //读取1条消息 streams 关键字 0-0 从头开始 xread count 2 streams streamtest 1672574316404-0 //消费1672574316404-0(不包括)后面的两条消息 XREAD count 1 streams streamtest $ //默认返回nil,从尾部读取最新的一条消息 XREAD block 0 count 1 streams streamtest $ //block 阻塞读取消息,直到有消息写入
Redis高级数据结构Stream和HyperLogLog

文章插图
 
一般来说客户端如果想要使用 xread 进行顺序消费,一定要记住当前消费到哪里了,也就是返回的消息 ID 。下次继续调用 xread 时,将上次返回的最后一个消息 ID 作为参数传递进去,就可以继续消费后续的消息 。
消费者组
创建消费者组
XGROUP create streamtest cgroup1 0-0 //从头开始消费 XGROUP create streamtest cgroup2 $ //从尾部开始消费,只接收新消息,其他消息忽略


推荐阅读