今天我们花了一些时间讨论 Kafka 提供的冷门功能:拦截器 。如之前所说,拦截器的出场率极低,以至于我从未看到过国内大厂实际应用 Kafka 拦截器的报道 。但冷门不代表没用 。事实上,我们可以利用拦截器满足实际的需求,比如端到端系统性能检测、消息审计等 。??既然是不常见,那就说明在实际场景中并没有太高的出场率,但它们依然是很高级很实用的 。下面就有请今天的主角登场:Kafka 拦截器 。
什么是拦截器?如果你用过 Spring Interceptor 或是 Apache Flume,那么应该不会对拦截器这个概念感到陌生,其基本思想就是允许应用程序在不修改逻辑的情况下,动态地实现一组可插拔的事件处理逻辑链 。它能够在主业务操作的前后多个时间点上插入对应的“拦截”逻辑 。下面这张图展示了 Spring MVC 拦截器的工作原理:
文章插图
图片来源:https://o7planning.org/en/11229/spring-mvc-interceptors-tutorial
拦截器 1 和拦截器 2 分别在请求发送之前、发送之后以及完成之后三个地方插入了对应的处理逻辑 。而 Flume 中的拦截器也是同理,它们插入的逻辑可以是修改待发送的消息,也可以是创建新的消息,甚至是丢弃消息 。这些功能都是以配置拦截器类的方式动态插入到应用程序中的,故可以快速地切换不同的拦截器而不影响主程序逻辑 。
【Kafka不常见但是很高级的功能: Kafka 拦截器】Kafka 拦截器借鉴了这样的设计思路 。你可以在消息处理的前后多个时点动态植入不同的处理逻辑,比如在消息发送前或者在消息被消费后 。
作为一个非常小众的功能,Kafka 拦截器自 0.10.0.0 版本被引入后并未得到太多的实际应用,我也从未在任何 Kafka 技术峰会上看到有公司分享其使用拦截器的成功案例 。但即便如此,在自己的 Kafka 工具箱中放入这么一个有用的东西依然是值得的 。今天我们就让它来发挥威力,展示一些非常酷炫的功能 。
Kafka 拦截器Kafka 拦截器分为生产者拦截器和消费者拦截器 。生产者拦截器允许你在发送消息前以及消息提交成功后植入你的拦截器逻辑;而消费者拦截器支持在消费消息前以及提交位移后编写特定逻辑 。值得一提的是,这两种拦截器都支持链的方式,即你可以将一组拦截器串连成一个大的拦截器,Kafka 会按照添加顺序依次执行拦截器逻辑 。
举个例子,假设你想在生产消息前执行两个“前置动作”:第一个是为消息增加一个头信息,封装发送该消息的时间,第二个是更新发送消息数字段,那么当你将这两个拦截器串联在一起统一指定给 Producer 后,Producer 会按顺序执行上面的动作,然后再发送消息 。
当前 Kafka 拦截器的设置方法是通过参数配置完成的 。生产者和消费者两端有一个相同的参数,名字叫 interceptor.classes,它指定的是一组类的列表,每个类就是特定逻辑的拦截器实现类 。拿上面的例子来说,假设第一个拦截器的完整类路径是com.yourcompany.kafkaproject.interceptors.AddTimeStampInterceptor,第二个类是 com.yourcompany.kafkaproject.interceptors.UpdateCounterInterceptor,那么你需要按照以下方法在 Producer 端指定拦截器:
Properties props = new Properties();List<String> interceptors = new ArrayList<>();interceptors.add("com.yourcompany.kafkaproject.interceptors.AddTimestampInterceptor"); // 拦截器 1interceptors.add("com.yourcompany.kafkaproject.interceptors.UpdateCounterInterceptor"); // 拦截器 2props.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, interceptors);……
现在问题来了,我们应该怎么编写 AddTimeStampInterceptor 和 UpdateCounterInterceptor 类呢?其实很简单,这两个类以及你自己编写的所有 Producer 端拦截器实现类都要继承org.apache.kafka.clients.producer.ProducerInterceptor 接口 。该接口是 Kafka 提供的,里面有两个核心的方法 。- onSend:该方法会在消息发送之前被调用 。如果你想在发送之前对消息“美美容”,这个方法是你唯一的机会 。
- onAcknowledgement:该方法会在消息成功提交或发送失败之后被调用 。还记得我在上一期中提到的发送回调通知 callback 吗?onAcknowledgement 的调用要早于 callback 的调用 。值得注意的是,这个方法和 onSend 不是在同一个线程中被调用的,因此如果你在这两个方法中调用了某个共享可变对象,一定要保证线程安全哦 。还有一点很重要,这个方法处在 Producer 发送的主路径中,所以最好别放一些太重的逻辑进去,否则你会发现你的 Producer TPS 直线下降 。
推荐阅读
- 大宋佳|《狂飙》结局:看似很完美,但是还有3个大坑没有填,让人意难平
- 橄榄油虽好,但是如何挑选 冷压初榨橄榄油
- 小度wifi驱动官方下载 小度wifi手机能连上,但是不能上网的解决方法?
- Linux 9 自动化部署 Kafka 集群
- 腾讯qq黄钻抽奖网址;刚开通年费黄钻,为什么不能参加黄钻抽奖,说我还不是黄钻呢,但是黄钻功能都能用?
- xp打印后台程序服务没有运行;已启动Print Spooler,但是添加网络打印机时还是提示“本地后台打印程序服务没有运行”?
- 姐姐|姐姐们很好,但是《乘风破浪》不行
- 奥斯卡|你是我的荣耀:乔晶晶也是恋爱脑,但是为何却收获了无数赞呢?
- 养一匹高头大马 高头大马
- 虽然票房扑街但是意义深远 一念天堂票房