/* Consumer group. */typedef struct streamCG {streamID last_id;long long entries_read;rax *pel;rax *consumers;} streamCG;
- last_id,表示该消费组的消费者已经读取但还未 ACK 的最后一条消息 ID 。
- *pel,是 pending entries list 简写,指向一个 Radix Tree 的指针,保存着 Consumer group 中所有消费者读取但还未 ACK 确认的消息,就是这玩意实现了 ACK 机制 。该树的 key 是消息 ID,value 关联一个 streamNACK 实例 。
- *consumers,Radix Tree 指针,表示消费组中的所有消费者,key 是消费者名称,value 指向一个 streamConsumer 实例 。
/* Pending (yet not acknowledged) message in a consumer group. */typedef struct streamNACK {mstime_t delivery_time;uint64_t delivery_count;streamConsumer *consumer;} streamNACK;
- delivery_time,该消息最后一次推送给 Consumer 的时间戳 。
- delivery_count,消息被推送次数 。
- *consumer,消息推送的 Consumer 客户端 。
/* A specific consumer in a consumer group.*/typedef struct streamConsumer {mstime_t seen_time;sds name;rax *pel;} streamConsumer;
- seen_time,消费者最近一次被激活的时间戳 。
- name,消费者名称 。
- *pel,Radix Tree 指针,对于同一个消息而言,``streamCG -> pel与streamConsumer -> pel的streamNACK` 实例是同一个 。
文章插图
肖材积:“Redis 你好,Stream 如何结合 Radix Tree 和 listpack 结构来存储消息?为什么不使用散列表来存储,消息 ID 作为散列表的 key,散列表的 value 存储消息键值对内容 。’”在回答之前,先插入几条消息到 Stream,让你对 Stream 消息的存储格式有个大体认知 。
该命令的语法如下 。
XADD key id field value [field value ...]
Stream 中的每个消息可以包含不同数量的多个键值对,写入消息成功后,我会把消息的 ID 返回给客户端 。执行如下指令把用户购买书籍的下单消息存放到 hotlist:books队列,消息内容主要由 payerID、amount 和 orderID 。
> XADD hotlist:books * payerID 1 amount 69.00 orderID 91679218539571-0> XADD hotlist:books * payerID 1 amount 36.00 orderID 151679218572182-0> XADD hotlist:books * payerID 2 amount 99.00 orderID 881679218588426-0> XADD hotlist:books * payerID 3 amount 68.00 orderID 801679218604492-0
hotlist:books 是 Stream 的名称,后面的 “*” 表示让 Redis 为插入的消息自动生成一个唯一 ID,你也可以自定义 。消息 ID 由两部分组成 。
- 当前毫秒内的时间戳 。
- 顺序编号 。从 0 为起始值,用于区分同一时间内产生的多个命令 。
肖材积:“如何理解 Stream 是一种只执行追加操作(Append only)的数据结构?”通过将元素 ID 与时间进行关联,并强制要求新元素的 ID 必须大于旧元素的 ID, Redis 从逻辑上将 Stream 变成了一种只执行追加操作(append only)的数据结构 。
用户可以确信,新的消息和事件只会出现在已有消息和事件之后,就像现实世界里新事件总是发生在已有事件之后一样,一切都是有序进行的 。
?肖材积:“插入的消息 ID 大部分相同,比如这四条消息的 ID 都是 1679218 前缀 。另外,每条消息键值对的键通常都是一样的,比如这四条消息的键都是 payerID、amount 和 orderID 。使用散列表存储的话会很多冗余数据,你这么抠门,所以不使用散列表对不对?”
推荐阅读
- 五步让你掌握Python数据结构
- 面试为啥都问Redis缓存?赶紧补一下
- 一台服务器上部署 Redis 伪集群
- 为什么创建 Redis 集群时会自动错开主从节点?
- Java集合框架解析:选择正确数据结构提升性能
- 王者荣耀的段位排行榜是通过Redis实现的?
- Redis断连该如何抢救?
- 一文搞懂Redis架构演化之路
- Redis 为什么这么快?
- Redisson锁机制源码分析