使用redis作为消息队列的用法

背景
最近项目有个需求需要动态更新规则,当时脑中想到的第一个方案是利用zk的监听机制,管理人员更新完规则将状态写入zk,集群中的机器监听zk的状态,当有状态变更后,集群中的机器开始拉取最新的配置 。但由于公司技术选型,没有专门搭建zk集群,因此也不可能为这一个小需求去搭建zk集群 。图为使用zk监听状态变化的流程 。
 

使用redis作为消息队列的用法

文章插图
 
 
最后只好退而求其次,想到了使用redis的队列来做规则的更新
消息队列
首先做简单的引入 。
  1. 队列(来自百度百科):是一种特殊的线性表,特殊之处在于它只允许在表的前端(front)进行删除操作,而在表的后端(rear)进行插入操作,和栈一样,队列是一种操作受限制的线性表 。进行插入操作的端称为队尾,进行删除操作的端称为队头 。
  2. 消息队列(来自百度百科):是在消息的传输过程中保存消息的容器 。
从队列和消息队列的定义看来,看不出什么相似之处 。但我理解它们的作用是相似的,只是使用环境不同 。队列和消息队列 本质上都可以用于解决“生产者”和“消费者”问题,在二者这间建立桥梁,it中专业术语是对“生产者”和“消费者”进行解耦 。可以动态的通过调整“生产者”和“消费者”线程数或服务器实例数,在正常情况使消费和生产到达一个平衡;在高峰情况下(生产者大于消费者)可以保护消费者不被拖垮的同时,还可以对把积压的数据保存下来,消费者可以延迟消费这些数据进行处理 。
队列 一般指的是单个服务实例内部使用,比如,在JAVA中的一个jvm实例内部可以使用Queue的子类(Deque:双端队列,是Queue的子接口),比如:单线程情况下使用LinkedList(无界)、PriorityQueue(优先队列);多线程情况下可以阻塞队列ArrayBlockingQueue(有界)、LinkedBlockingQueue(无界)、DelayQueue(延迟队列 无界)、PriorityBlockingQueue(优先 无界)、SynchronousQueue(没有容量的队列) 。可以看到java的api已经很强大了,可以根据自己的业务需求选择使用 。使用方法:生产者从一端放入消息,消费者从另一端取出消息进行处理,消息放到队列里(感觉是不是有点像“消息队列”的定义) 。
MQ主要是用来:
  1. 解耦应用、
  2. 异步化消息
  3. 流量削峰填谷
目前使用的较多的有ActiveMQ、RabbitMQ、ZeroMQ、Kafka、MetaMQ、RocketMQ等 。
使用redis作为消息队列的用法

文章插图
 
另外上面提到的“有界”和“无界”,指的是队列的容量大小 。有界 指的是创建队列时必须指定队列的容量;无界 创建队列时无需指定队列的容量,容量大小取决于jvm实例分配的内存空间大小 。在海量业务场景里,我们期望队列的容量是无限的,但单个jvm实例 即便是使用“无界”队列 由于单个实例内存是有限的,最终无法容纳下海量的消息数据 。聪明的程序员就想 能不能使用一个第三方的队列来存储这些数据呢?当然是可以的,这就产生了“消息队列” 。
消息队列 一般是采用一个独立的集群专门用于消息存储,可以存储在内存里 也可以直接存储在磁盘中 。比如常见的:RabbitMQ、kafka、rocketMQ、ActiveMQ、zeromq等等,它们有不同的特性,以及采用了各种不同的实现,适用于各种场景的消息任务分发 。但他们本质作用跟上面讲的单实例环境中java“队列”没什么两样:在消息的传输过程中保存消息的容器 。只是这里转换到“分布式”环境中而已 。
使用redis作为消息队列的用法

文章插图
 
可以看到这里这里提到的“传统”消息队列,都是一个很重型的集群 。如果这个分布式环境中的消息数量有限,我们可以不必引入这种重型的mq框架 。比如:本次分享的主题 如何使用redis实现“消息队列” 。
redis 实现消息队列
redis有一个数据类型叫list(列表),它的每个子元素都是 string 类型的双向链表 。我们可以通过 push,pop 操作从链表的头部或者尾部添加删除元素 。这使得 list 既可以用作栈,也可以用作队列 。
假如,我们有一个队列系统,把一个个任务放到队列中,另一个进程就把队列中的任务取出来执行 。
放到队列我们使用LPUSH,也就是往双向链表的尾部填充一个元素,这一端也叫生产者,是产生内容的一端 。


推荐阅读