其次,pubsub中消费者获取消息是一个推送模型,这意味着Redis会按消息生产的速度给所有的消费者推送消息,不管消费者处理能力如何,如果消费者应用处理能力不足,消息就会在Redis的client buf中堆积,当堆积数据超过一个阈值后会断开这条连接,这意味着这些消息全部丢失了,在也找不回来了 。如果同时有多个消费者的client buf堆积数据但又还没达到断开连接的阈值,那么Redis服务端的内存会膨胀,进程可能因为oom而被杀掉,这导致了整个服务中断 。
3 总结
优势
- 消息具备广播能力
- psubscribe能按字符串通配符匹配,给予了业务逻辑的灵活性
- 能订阅特定key或特定命令的系统消息
不足
- Redis异常、客户端断连都会导致消息丢失
- 消息缺乏堆积能力,不能削峰填谷 。推送的方式缺乏背压机制,没有考虑消费者处理能力,推送的消息超过消费者处理能力后可能导致消息丢失或服务异常
三 Redis 5.0 stream消息丢失、消息服务不稳定的问题严重限制了pubsub的应用场景,所以Redis需要重新设计一套机制,来解决这些问题,这就有了后来的stream结构 。
1 stream特性
一个稳定的消息服务需要具备几个要点,要保证消息不会丢失,至少被消费一次,要具备削峰填谷的能力,来匹配生产者和消费者吞吐的差异 。在2018年Redis 5.0加入了stream结构,这次考虑了list、pubsub在应用场景下的缺陷,对标kafka的模型重新设计全内存消息队列结构,从这时开始Redis消息队列功能算是能和主流消息队列产品pk一把了 。
stream的改进分为多个方面
成本:
- 存储message数据使用了listpack结构,这是一个紧凑型的数据结构,不同于list的双向链表每个节点都要额外占用2个指针的存储空间,这使得小msg情况下stream的空间利用率更高 。
功能:
- stream引入了消费者组的概念,一个消费者组内可以有多个消费者,同一个组内的消费者共享一个消息位点(last_delivered_id),这使得消费者能够水平的扩容,可以在一个组内加入多个消费者来线性的提升吞吐,对于一个消费者组,每条msg只会被其中一个消费者获取和处理,这是pubsub的广播模型不具备的 。
- 不同消费者组之前是相互隔离的,他们各自维护自己的位点,这使得一条msg能被多个不同的消费者组重复消费,做到了消息广播的能力 。
- stream中消费者采用拉取的方式,并能设置timeout在没有消息时阻塞,通过这种长轮询机制保证了消息的实时性,而且消费速率是和消费者自身吞吐相匹配 。
消息不丢失:
- stream的数据会存储在aof和rdb文件中,这使Redis重启后能够恢复stream的数据 。而pubsub的数据是瞬时的,Redis重启意味着消息全部丢失 。
- stream中每个消费者组会存储一个last_delivered_id来标识已经读取到的位点,客户端连接断开后重连还是能从该位点继续读取,消息不会丢失 。
- stream引入了ack机制保证消息至少被处理一次 。考虑一种场景,如果消费者应用已经读取了消息,但还没来得及处理应用就宕机了,对于这种已经读取但没有ack的消息,stream会标示这条消息的状态为pending,等客户端重连后通过xpending命令可以重新读取到pengind状态的消息,继续处理 。如果这个应用永久宕机了,那么该消费者组内的其他消费者应用也能读取到这条消息,并通过xclaim命令将它归属到自己下面继续处理 。
#基于stream完成消息的生产和消费,并确保异常状态下消息至少被消费一次#创建mystream,并且创建一个consumergroup为mygroupXGROUP CREATE mystream mygroup $ MKSTREAMOK#写入一条消息,由redis自动生成消息id,消息的内容是一个kv数组,这里包含field1 value1 field2 value2XADD mystream * field1 value1 field2 value2"1645517760385-0"#消费者组mygroup中的消费者consumer1从mystream读取一条消息,>表示读取一条该消费者组从未读取过的消息XREADGROUP GROUP mygroup consumer1 COUNT 1 STREAMS mystream >1) 1) "mystream"2) 1) 1) "1645517760385-0"2) 1) "field1"2) "value1"3) "field2"4) "value2"#消费完成后ack确认消息xack mystream mygroup 1645517760385-0(integer) 1#如果消费者应用在ack前异常宕机,恢复后重新获取未处理的消息id 。XPENDING mystream mygroup - + 10 1) 1) "1645517760385-0"2) "consumer1"3) (integer) 3053564) (integer) 1#如果consumer1永远宕机,其他消费者可以把pending状态的消息移动到自己名下后继续消费 #将消息id 1645517760385-0移动到consumer2下 XCLAIM mystream mygroup consumer2 0 1645517760385-0 1) 1) "1645517760385-0"2) 1) "field1"2) "value1"3) "field2"4) "value2"
推荐阅读
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
- 毅力号火星车返回 美国毅力号火星车最新消息
- 使用Docker搭建Redis-cluster环境
- Redis服务安全加固的说明
- Redis缓存知识问题
- Linux下php安装Redis扩展的方法
- Redis从入门到精通,至少要看看这篇
- 当 Redis 发生高延迟时,到底发生了什么?
- Redis 是如何进行主从复制的?
- 乌市地铁4号线最新消息 乌鲁木齐地铁1号线停运原因
- 苹果|消息称iOS 16开测苹果旗舰新品:三个屏加持、至少2万块