文章插图
4.2 元数据定义映射与维护
文章插图
4.3 互不干扰的高性能消息推送RabbitMQ采用推模式进行消息消费 , 虽然RocketMQ也支持消息推送消费 , 但是因为AMQP协议中通过prefetch参数限制了客户端缓存消息数量以保证不会因缓存太多消息导致客户端内存异常 , 因此在消息网关实现消息推送时也需要满足AMQP协议的语义 。
同时每个消息网关都需要数千甚至数万的queue的消息推送 , 每个queue消息消费速率存在差异 , 并且每个队列可能随时有消息需要推送到客户端进行消费 , 要保证不同queue之间的推送互不干扰且及时 。
为了实现高效的、互不干扰的消息推送 , 有以下策略:
- 每个queue采用独立的线程 , 保证互不干扰和时效性 , 缺点是无法支撑海量queue的消息推送 。
- 基于信号量、阻塞队列等 , 在感知到有可推送消息和可消费服务端时按需进行消息的推送 , 这样可使用少量的线程即可完成高效的消息推送 。
文章插图
一个消息消费过程:客户端在启动连接到消息网关后 , 在消息网关中会构建RocketMQ推送消费客户端实例 , 并且注入自定义的ConsumeMessageService实例 , 同时使用一个信号量保存客户端允许推送的消息数量 。
当消息从集群侧推送到消息网关时 , 将消息按照推送的批次封装为一个任务保存在ConsumeMessageService实例的BlockingQueue中 , 同时推送线程会轮询所有的ConsumeMessageService实例 , 如果发现本地缓存有待消费的消息并且有可消费消息的业务客户端 , 将任务提交到线程池中完成消息的推送 。
为了保证不会因为少量消费速率特别高的queue导致其它queue的消息推送时效性降低 , 会限制每一个ConsumeMessageService只允许推送一定数量的消息即转到推送其它queue的消息 , 以此即可保证所有queue的消息推送的互不干扰和时效性 。
在客户端消费ack/uack后再次通过信号量通知下一次推送 , 这样也保证了使用少量的线程资源即可完成海量消息的推送需求 。
4.4 消费启停与消费限流能力实现基于消息网关 , 可以在消息推送逻辑中增加消费启停和消费限流逻辑 。
消费启停可以帮助业务快速实现消费的暂停或是部分异常节点停止消息消费 。
消费限流可以帮助业务控制消息消费速率 , 避免对底层依赖产生太大压力 。
4.5 平台架构
文章插图
- 最终形成了以上的平台架构 。新建设了一个AMQP-proxy消息网关服务实现AMQP消息转换到RocketMQ , 支持业务的消息生产消费 。
- 建设了mq-meta服务维护集群的元数据信息 。
- 通过mq-controller控制集群的主从切换 , 实现集群的高可用 , 同时增加了集群监控 , 负载均衡模块保障集群的高可用 。
文章插图
原生RabbitMQ集群业务压测性能
推荐阅读
- 鸡蛋壳可以用来养花 鸡蛋壳属于什么垃圾
- 宜宾|90后女博士当选副市长,“弃医从政”让人佩服,评论却有不同声音
- 教您这样处理,从此老鼠去无踪 家里有老鼠怎么办
- 如何才能快速自学日语? 怎么学日语
- 股市迭创新高,从三个角度看大盘指数 如何看大盘
- 寒性体质如何调理
- 肉末豆腐羹宝宝
- 金立手机口碑怎么样 金立手机怎么样
- 什么是初等数学? 微积分是什么
- 从哪里能查询到养老保险缴了多长时间 怎么查养老保险交了多少年