flink消费kafka的offset与checkpoint( 三 )

证明flink消费了kafka数据后 , 不会更新offset到kafka 。
停止作业:
[econ@dev-hadoop-node-c ~]$ /opt/flink-1.11.1/bin/flink stop -m dev-hadoop-node-c:8081 5fdd14f7fd3c93287635c9d61180d8a6Suspending job "5fdd14f7fd3c93287635c9d61180d8a6" with a savepoint.Savepoint completed. Path: hdfs://nameservice1/flink1.11/flink-savepoints/savepoint-5fdd14-53dfd9f8eccd[econ@dev-hadoop-node-c ~]$ 再次启动作业 , 但是 , 不使用上面生成的savepoint:
[econ@dev-hadoop-node-c ~]$ /opt/flink-1.11.1/bin/flink run -m dev-hadoop-node-c:8081 -c "com.econ.powercloud.jobsTest.TestKafkaOffsetCheckpointJob" -d ~/powercloud-flink-1.0.20201016.jar --bootstrap.servers localhost:9092Job has been submitted with JobID 130568a2eeec96296237ed3e1f280f83[econ@dev-hadoop-node-c ~]$ 观察topic "test2" , 发现 , 同样的数据"a1"被生产进入:
RdeMacBook-Pro:kafka r$ ./bin/kafka-console-consumer.sh --topic test2 --bootstrap-server localhost:9092a1a1证明:flink在没有使用savepoint的时候 , 消费kafka的offset还是从kafka自身获取 。
再仔细观察topic "test"的“消费组积压数量” , 注意在"20时10分05秒"还观察到积压数值1 , 但是在"20时10分08秒"就发现积压数值都是0.
RdeMacBook-Pro:kafka r$ ./bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --group prod-econ-flink-TestKafkaOffsetCheckpointJob-local --describe; date; Consumer group 'prod-econ-flink-TestKafkaOffsetCheckpointJob-local' has no active members.TOPICPARTITIONCURRENT-OFFSETLOG-END-OFFSETLAGCONSUMER-IDHOSTCLIENT-IDtest1330---test0330---test2561---2020年10月18日 星期日 20时10分05秒 CSTRdeMacBook-Pro:kafka r$ ./bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --group prod-econ-flink-TestKafkaOffsetCheckpointJob-local --describe; date; Consumer group 'prod-econ-flink-TestKafkaOffsetCheckpointJob-local' has no active members.TOPICPARTITIONCURRENT-OFFSETLOG-END-OFFSETLAGCONSUMER-IDHOSTCLIENT-IDtest1330---test0330---test2660---2020年10月18日 星期日 20时10分08秒 CSTRdeMacBook-Pro:kafka r$ 这是因为 , 在"20:10:06"完成了一次checkpoint , 把offset更新回kafka 。
flink消费kafka的offset与checkpoint文章插图
Flink Checkpoint History
下面接着测试flink使用savepoint的情况下 , 是否会重复消费kafka数据 。
使用"kafka-console-producer.sh"往topic "test"生成消息"a2":
RdeMacBook-Pro:kafka r$ ./bin/kafka-console-producer.sh --topic test --broker-list localhost:9092>a1>a2>使用"kafka-console-consumer.sh"消费topic "test2"的消息:
RdeMacBook-Pro:kafka r$ ./bin/kafka-console-consumer.sh --topic test2 --bootstrap-server localhost:9092a1a1a2停止作业:
[econ@dev-hadoop-node-c ~]$ /opt/flink-1.11.1/bin/flink stop -m dev-hadoop-node-c:8081 bb8b4ba7ddaad869c6469fab5e81d179Suspending job "bb8b4ba7ddaad869c6469fab5e81d179" with a savepoint.Savepoint completed. Path: hdfs://nameservice1/flink1.11/flink-savepoints/savepoint-bb8b4b-99016a1c3e60[econ@dev-hadoop-node-c ~]$ 观察topic "test"的“消费组积压数量” , 发现LAG还是1:


推荐阅读