同理,指定消费者拦截器也是同样的方法,只是具体的实现类要实现org.apache.kafka.clients.consumer.ConsumerInterceptor 接口,这里面也有两个核心方法 。
- onConsume:该方法在消息返回给 Consumer 程序之前调用 。也就是说在开始正式处理消息之前,拦截器会先拦一道,搞一些事情,之后再返回给你 。
- onCommit:Consumer 在提交位移之后调用该方法 。通常你可以在该方法中做一些记账类的动作,比如打日志等 。
典型使用场景Kafka 拦截器都能用在哪些地方呢?其实,跟很多拦截器的用法相同,Kafka 拦截器可以应用于包括客户端监控、端到端系统性能检测、消息审计等多种功能在内的场景 。
我以端到端系统性能检测和消息审计为例来展开介绍下 。
今天 Kafka 默认提供的监控指标都是针对单个客户端或 Broker 的,你很难从具体的消息维度去追踪集群间消息的流转路径 。同时,如何监控一条消息从生产到最后消费的端到端延时也是很多 Kafka 用户迫切需要解决的问题 。
从技术上来说,我们可以在客户端程序中增加这样的统计逻辑,但是对于那些将 Kafka 作为企业级基础架构的公司来说,在应用代码中编写统一的监控逻辑其实是很难的,毕竟这东西非常灵活,不太可能提前确定好所有的计算逻辑 。另外,将监控逻辑与主业务逻辑耦合也是软件工程中不提倡的做法 。
现在,通过实现拦截器的逻辑以及可插拔的机制,我们能够快速地观测、验证以及监控集群间的客户端性能指标,特别是能够从具体的消息层面上去收集这些数据 。这就是 Kafka 拦截器的一个非常典型的使用场景 。
我们再来看看消息审计(message audit)的场景 。设想你的公司把 Kafka 作为一个私有云消息引擎平台向全公司提供服务,这必然要涉及多租户以及消息审计的功能 。
作为私有云的 PaaS 提供方,你肯定要能够随时查看每条消息是哪个业务方在什么时间发布的,之后又被哪些业务方在什么时刻消费 。一个可行的做法就是你编写一个拦截器类,实现相应的消息审计逻辑,然后强行规定所有接入你的 Kafka 服务的客户端程序必须设置该拦截器 。
案例分享下面我以一个具体的案例来说明一下拦截器的使用 。在这个案例中,我们通过编写拦截器类来统计消息端到端处理的延时,非常实用,我建议你可以直接移植到你自己的生产环境中 。
我曾经给一个公司做 Kafka 培训,在培训过程中,那个公司的人提出了一个诉求 。他们的场景很简单,某个业务只有一个 Producer 和一个 Consumer,他们想知道该业务消息从被生产出来到最后被消费的平均总时长是多少,但是目前 Kafka 并没有提供这种端到端的延时统计 。
学习了拦截器之后,我们现在知道可以用拦截器来满足这个需求 。既然是要计算总延时,那么一定要有个公共的地方来保存它,并且这个公共的地方还是要让生产者和消费者程序都能访问的 。在这个例子中,我们假设数据被保存在 redis 中 。
Okay,这个需求显然要实现生产者拦截器,也要实现消费者拦截器 。我们先来实现前者:
public class AvgLatencyProducerInterceptor implements ProducerInterceptor<String, String> {private Jedis jedis; // 省略 Jedis 初始化@Overridepublic ProducerRecord<String, String> onSend(ProducerRecord<String, String> record) {jedis.incr("totalSentMessage");return record;}@Overridepublic void onAcknowledgement(RecordMetadata metadata, Exception exception) {}@Overridepublic void close() {}@Overridepublic void configure(Map<JAVA.lang.String, ?> configs) {}
上面的代码比较关键的是在发送消息前更新总的已发送消息数 。为了节省时间,我没有考虑发送失败的情况,因为发送失败可能导致总发送数不准确 。不过好在处理思路是相同的,你可以有针对性地调整下代码逻辑 。下面是消费者端的拦截器实现,代码如下:
public class AvgLatencyConsumerInterceptor implements ConsumerInterceptor<String, String> {private Jedis jedis; // 省略 Jedis 初始化@Overridepublic ConsumerRecords<String, String> onConsume(ConsumerRecords<String, String> records) {long lantency = 0L;for (ConsumerRecord<String, String> record : records) {lantency += (System.currentTimeMillis() - record.timestamp());}jedis.incrBy("totalLatency", lantency);long totalLatency = Long.parseLong(jedis.get("totalLatency"));long totalSentMsgs = Long.parseLong(jedis.get("totalSentMessage"));jedis.set("avgLatency", String.valueOf(totalLatency / totalSentMsgs));return records;}@Overridepublic void onCommit(Map<TopicPartition, OffsetAndMetadata> offsets) {}@Overridepublic void close() {}@Overridepublic void configure(Map<String, ?> configs) {
推荐阅读
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
- 大宋佳|《狂飙》结局:看似很完美,但是还有3个大坑没有填,让人意难平
- 橄榄油虽好,但是如何挑选 冷压初榨橄榄油
- 小度wifi驱动官方下载 小度wifi手机能连上,但是不能上网的解决方法?
- Linux 9 自动化部署 Kafka 集群
- 腾讯qq黄钻抽奖网址;刚开通年费黄钻,为什么不能参加黄钻抽奖,说我还不是黄钻呢,但是黄钻功能都能用?
- xp打印后台程序服务没有运行;已启动Print Spooler,但是添加网络打印机时还是提示“本地后台打印程序服务没有运行”?
- 姐姐|姐姐们很好,但是《乘风破浪》不行
- 奥斯卡|你是我的荣耀:乔晶晶也是恋爱脑,但是为何却收获了无数赞呢?
- 养一匹高头大马 高头大马
- 虽然票房扑街但是意义深远 一念天堂票房