你好,我是码哥,一个拥抱硬核技术和对象,面向人民币编程的男人,设置星标不迷路 。
我在【redis 使用 List 实现消息队列的利与弊】说过使用 List 实现消息队列有很多局限性 。
- 没有 ACK 机制 。
- 没有类似 Kafka 的 ConsumerGroup 消费组概念 。
- 消息堆积 。
- List 是线性结构,查询指定数据需要遍历整个列表 。
同时提供了消息的持久化和主从复制机制,客户端可以访问任何时刻的数据,并且能记住每一个客户端的访问位置,从而保证消息不丢失 。
以下几个是 Stream 类型的主要特性 。
- 使用 Radix Tree 和 listpack 结构来存储消息 。
- 消息 ID 序列化生成 。
- 借鉴 Kafka Consume Group 的概念,多个消费者划分到不同的 Consume Group 中,消费同一个 Streams,同一个 Consume Group 的多个消费者可以一起并行但不重复消费,提升消费能力 。
- 支持多播(多对多),阻塞和非阻塞读取 。
- ACK 确认机制,保证了消息至少被消费一次 。
- 可设置消息保存上限阈值,我会把历史消息丢弃,防止内存占用过大 。
适合系统消息量不大,容忍数据丢失,使用 Redis Stream 作为消息队列就能享受高性能快速读写消息的优势 。
2、修炼心法每个 Stream 都有一个唯一的名称,作为 Stream 在 Redis 的 key,在首次使用
xadd
指令添加消息的时候会自动创建 。可以看到 Stream 在一个 Redix Tree 树上,树上存储的是消息 ID,每个消息 ID 对应的消息通过一个指针指向 listpack 。
Stream 流就像是一个仅追加内容的消息链表,把消息一个个串起来,每个消息都有一个唯一的 ID 和消息内容,消息内容则由多个 field/value 键值对组成 。底层使用 Radix Tree 和 listpack 数据结构存储数据 。
为了便于理解,我画了一张图,并对 Radix Tree 的存储数据做了下变形,使用列表来体现 Stream 中消息的逻辑有序性 。
文章插图
这张图涉及很多概念,但是你不要慌 。我一步步拆开说,最后你再回头看就懂了 。
先带你屡下全局思路 。
- Consumer Group:消费组,每个消费组可以有一个或者多个消费者,消费者之间是竞争关系 。不同消费组的消费者之间无任何关系 。
- *pel,全称是 Pending Entries List,记录了当前被客户端读取但是还没有 ack(Acknowledge character 确认字符)的消息 。如果客户端没有 ack,这个变量的消息 ID 会越来越多 。这是一个核心数据结构,用来确保客户端至少消费消息一次 。
typedef struct stream {rax *rax;uint64_t length;streamID last_id;streamID first_id;streamID max_deleted_entry_id;uint64_t entries_added;rax *cgroups;} stream;typedef struct streamID {uint64_t ms;uint64_t seq;} streamID;
- *rax,是一个 rax 的指针,指向一个 Radix Tree,key 存储消息 ID,value 实际上指向一个 listpack 数据结构,存储了多条消息,每条消息的 ID 都大于等于 这个 key 的消息 ID 。
- length,该 Stream 的消息条数 。
- streamID结构体,消息 ID 抽象,一共占 128 位,内部维护了毫秒时间戳(字段 ms);一个毫秒内的自增序号(字段 seq),用于区分同一毫秒内插入多条消息 。
- last_id,当前 Stream 最后一条消息的 ID 。
- first_id,当前 Stream 第一条消息的 ID 。
- max_deleted_entry_id,当前 Stream 被删除的最大的消息 ID 。
- entries_added,总共有多少条消息添加到 Stream 中,entries_added = 已删除消息条数 + 未删除消息条数 。
推荐阅读
- 五步让你掌握Python数据结构
- 面试为啥都问Redis缓存?赶紧补一下
- 一台服务器上部署 Redis 伪集群
- 为什么创建 Redis 集群时会自动错开主从节点?
- Java集合框架解析:选择正确数据结构提升性能
- 王者荣耀的段位排行榜是通过Redis实现的?
- Redis断连该如何抢救?
- 一文搞懂Redis架构演化之路
- Redis 为什么这么快?
- Redisson锁机制源码分析