Redis Stream 数据结构实现原理真的很强( 二 )

  • *cgroups,rax 指针,也指向一个 Radix Tree,记录当前 Stream 的所有 Consume Group,每个 Consume Group 的名称都是唯一标识,作为 Radix Tree 的 key,Consumer Group 实例作为 value 。
  • Consumer GroupConsumer Group 由 streamCG 结构体定义,每个 Stream 可以有多个 Consumer Group,一个消费组可以有多个消费者同时对组内消息进行消费 。
    /* Consumer group. */typedef struct streamCG {streamID last_id;long long entries_read;rax *pel;rax *consumers;} streamCG;
    • last_id,表示该消费组的消费者已经读取但还未 ACK 的最后一条消息 ID 。
    • *pel,是 pending entries list 简写,指向一个 Radix Tree 的指针,保存着 Consumer group 中所有消费者读取但还未 ACK 确认的消息,就是这玩意实现了 ACK 机制 。该树的 key 是消息 ID,value 关联一个 streamNACK 实例 。
    • *consumers,Radix Tree 指针,表示消费组中的所有消费者,key 是消费者名称,value 指向一个 streamConsumer 实例 。
    streamNACKstreamCG -> *pel 对应的 value 是一个 streamNACK 实例,用于抽象消费者已经读取,但是未 ACK 的消息 ID 相关信息 。
    /* Pending (yet not acknowledged) message in a consumer group. */typedef struct streamNACK {mstime_t delivery_time;uint64_t delivery_count;streamConsumer *consumer;} streamNACK;
    • delivery_time,该消息最后一次推送给 Consumer 的时间戳 。
    • delivery_count,消息被推送次数 。
    • *consumer,消息推送的 Consumer 客户端 。
    streamConsumerConsumer Group 中对 Consumer 的抽象 。
    /* A specific consumer in a consumer group.*/typedef struct streamConsumer {mstime_t seen_time;sds name;rax *pel;} streamConsumer;
    • seen_time,消费者最近一次被激活的时间戳 。
    • name,消费者名称 。
    • *pel,Radix Tree 指针,对于同一个消息而言,``streamCG -> pel与streamConsumer -> pel的streamNACK` 实例是同一个 。
    最后来一张图,便于你理解 。
    Redis Stream 数据结构实现原理真的很强

    文章插图
    肖材积:“Redis 你好,Stream 如何结合 Radix Tree 和 listpack 结构来存储消息?为什么不使用散列表来存储,消息 ID 作为散列表的 key,散列表的 value 存储消息键值对内容 。’”
    在回答之前,先插入几条消息到 Stream,让你对 Stream 消息的存储格式有个大体认知 。
    该命令的语法如下 。
    XADD key id field value [field value ...]Stream 中的每个消息可以包含不同数量的多个键值对,写入消息成功后,我会把消息的 ID 返回给客户端 。
    执行如下指令把用户购买书籍的下单消息存放到 hotlist:books队列,消息内容主要由 payerID、amount 和 orderID 。
    > XADD hotlist:books * payerID 1 amount 69.00 orderID 91679218539571-0> XADD hotlist:books * payerID 1 amount 36.00 orderID 151679218572182-0> XADD hotlist:books * payerID 2 amount 99.00 orderID 881679218588426-0> XADD hotlist:books * payerID 3 amount 68.00 orderID 801679218604492-0hotlist:books 是 Stream 的名称,后面的 “*” 表示让 Redis 为插入的消息自动生成一个唯一 ID,你也可以自定义 。
    消息 ID 由两部分组成 。
    • 当前毫秒内的时间戳 。
    • 顺序编号 。从 0 为起始值,用于区分同一时间内产生的多个命令 。
    肖材积:“如何理解 Stream 是一种只执行追加操作(Append only)的数据结构?”
    通过将元素 ID 与时间进行关联,并强制要求新元素的 ID 必须大于旧元素的 ID, Redis 从逻辑上将 Stream 变成了一种只执行追加操作(append only)的数据结构 。
    用户可以确信,新的消息和事件只会出现在已有消息和事件之后,就像现实世界里新事件总是发生在已有事件之后一样,一切都是有序进行的 。
    ?肖材积:“插入的消息 ID 大部分相同,比如这四条消息的 ID 都是 1679218 前缀 。另外,每条消息键值对的键通常都是一样的,比如这四条消息的键都是 payerID、amount 和 orderID 。使用散列表存储的话会很多冗余数据,你这么抠门,所以不使用散列表对不对?”


    推荐阅读