一、前言
RocketMQ 的消费者可以根据 Tag 进行消息过滤,也支持自定义属性过滤 。消息过滤目前是在 Broker 端实现的,优点是减少了对于 Consumer 无用消息的网络传输,缺点是增加了 Broker 的负担、而且实现相对复杂 。
目前 RocketMQ 只支持两个模式过滤器,一个是基于 TAG,另外一个是基于 SQL92 。其中 TAG 模式相对于比较简单;而另外一个就相当的复杂,实现方式跟 spring 表达式有点相似;同时也提供了一个配置项,来决定是否开启 SQL92;
- enablePropertyFilter 是否支持根据属性过滤,默认为 false,如果使用基于标的式 SQL92 模式过滤消息,则该参数必须设置为 true 。
另外关于类过滤的,很快就过期,官方不推荐使用该模式,所以这里不在这里解读 。
二、源码导读
1、消费者过滤器管理组件consumerFilterManager源码分析,其中布隆过滤器的数据结构是怎样的,怎么进行注册的;
2、过滤原理,通过解读ExpressionMessageFilter类来分析其运作原理;
三、过滤数据管理
BrokerController中有一个ConsumerFilterManager,就是用来管理消费者过滤器数据的;
文章插图
- 构造方法;
- 数据对象的结构;
- 消费组批量注册过滤数据对象;
- 根据消费组取消注册;
- 判断数据是否死亡;
这里就分析这几个核心方法吧,其余的方法其实也差不多;
1、构造方法
public class ConsumerFilterManager extends ConfigManager {private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.FILTER_LOGGER_NAME);// 定义了24小时常量private static final long MS_24_HOUR = 24 * 3600 * 1000;// topic -> filtersprivate ConcurrentMapfilterDataByTopic = new ConcurrentHashMap(256);private transient BrokerController brokerController;// 布隆过滤器,简单解释一下什么叫做布隆过滤器,bit位数组,写入数据先hash再就更正bit位里的01// 查询的时候,可以对查询数据计算hash再到一个位置找是01,如果是0肯定没出现过这条数据,如果是1,有可能出现过// 可以快速筛查你的数据要不然是肯定没出现过,要不然是可能出现过private transient BloomFilter bloomFilter;public ConsumerFilterManager() {// just for test,仅限于测试this.bloomFilter = BloomFilter.createByFn(20, 64);public ConsumerFilterManager(BrokerController brokerController) {this.brokerController = brokerController;this.BloomFilter = BloomFilter.createByFn( // 根据配置创建布隆过滤器brokerController.getBrokerConfig().getMaxErrorRateOfBloomFilter(),brokerController.getBrokerConfig().getExpectConsumerNumUseFilter()// then set bit map length of store config.brokerController.getMessageStoreConfig().setBitMapLengthConsumeQueueExt(this.bloomFilter.getM()
2、数据对象的结构我们可以发现ConsumerFilterManager是继承至ConfigManager,如果看过我以前的文章就知道ConfigManager有一个抽象方法configFilePath,是用来标明文件持久化路径的;
我们直接找到子类的对应方法
@Overridepublic String configFilePath() {if (this.brokerController != null) {return BrokerPathConfigHelper.getConsumerFilterPath(this.brokerController.getMessageStoreConfig().getStorePathrootDir()return BrokerPathConfigHelper.getConsumerFilterPath("./unit_test");
public static String getConsumerFilterPath(final String rootDir) {return rootDir + File.separator + "config" + File.separator + "consumerFilter.json";
再找到对应的文件,只不过这个文件中数据为空
文章插图
但是没事,我在网上找到一个文件数据结构,文件存放的格式如下:
"filterDataByTopic":{"Topic":{"topic": String,"groupFilterData": {"consumerGroup":{"consumerGroup" : String,"topic": String,"expression": String,"expressionType": String,"bornTime": long,"deadTime": long,"bloomFilterData":{"bitPos": int[],"bitNum": int},"clientVersion": long},},
文章插图
从字面都可以大概猜出其用意;除了那个 BloomFilter 相关的字段属性;
ConsumerFilterManager 对象中的 bloomFilter 属性我们可以理解是一个工具方法;而 ConsumerFilterData 对象中的 bloomFilterData 属性是这个消费组中的数值数组,用来判断是否满足过滤条件的消息;
推荐阅读
- 7张图大总结:SQL 数据分析常用语句
- 道恩·强森|外媒分析《黑亚当》票房扑街的原因:巨石强森不是DC需要的救世主
- 为什么要做专利分析?专利分析的价值有哪些?
- 招聘|34岁0基础想转行数据分析师晚吗?
- 商品房买卖合同常见纠纷问题分析 房屋买卖合同纠纷
- 店霸 电霸:电商大数据分析软件好用吗?拼多多数据准确吗?
- 华东五校实力优势分析 华东五校
- 9高一物理期中考试总结分析 高一期中考试总结
- 层次分析法(什么是层次分析法?)
- |学会用数据分析汇报工作,升职加薪指日可待