flink消费kafka的offset与checkpoint( 二 )
package com.econ.powercloud.jobsTest;import org.apache.flink.api.common.serialization.SimpleStringSchema;import org.apache.flink.api.java.utils.ParameterTool;import org.apache.flink.kafka.shaded.org.apache.kafka.clients.producer.ProducerRecord;import org.apache.flink.streaming.api.CheckpointingMode;import org.apache.flink.streaming.api.datastream.DataStreamSource;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;import org.apache.flink.streaming.connectors.kafka.KafkaSerializationSchema;import javax.annotation.Nullable;import java.util.Properties;public class TestKafkaOffsetCheckpointJob {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.enableCheckpointing(1000 * 60);ParameterTool parameterTool = ParameterTool.fromArgs(args);String bootstrapServers = parameterTool.get("bootstrap.servers") == null ? "localhost:9092" : parameterTool.get("bootstrap.servers");Properties properties = new Properties();properties.setProperty("bootstrap.servers", bootstrapServers);properties.setProperty("group.id", "prod-econ-flink-TestKafkaOffsetCheckpointJob-local");properties.setProperty("transaction.timeout.ms", String.valueOf(1000 * 60 * 5));String topic = "test";FlinkKafkaConsumer stringFlinkKafkaConsumer = new FlinkKafkaConsumer<>(topic, new SimpleStringSchema(), properties);DataStreamSource stringDataStreamSource = env.addSource(stringFlinkKafkaConsumer);String producerTopic = "test2";FlinkKafkaProducer kafkaProducer = new FlinkKafkaProducer<>(producerTopic, new KafkaSerializationSchema() {@Overridepublic ProducerRecord serialize(String element, @Nullable Long timestamp) {return new ProducerRecord<>(producerTopic, element.getBytes());}}, properties, FlinkKafkaProducer.Semantic.EXACTLY_ONCE);stringDataStreamSource.addSink(kafkaProducer);env.execute("TestKafkaOffsetCheckpointJob");}}
提交作业:
[econ@dev-hadoop-node-c ~]$ /opt/flink-1.11.1/bin/flink run -m dev-hadoop-node-c:8081 -c "com.econ.powercloud.jobsTest.TestKafkaOffsetCheckpointJob" -d ~/powercloud-flink-1.0.20201016.jar --bootstrap.servers localhost:9092Job has been submitted with JobID 5fdd14f7fd3c93287635c9d61180d8a6[econ@dev-hadoop-node-c ~]$
使用"kafka-console-producer.sh"往topic "test"生成消息"a1":
RdeMacBook-Pro:kafka r$ ./bin/kafka-console-producer.sh --topic test --broker-list localhost:>a1>
使用"kafka-console-consumer.sh"消费topic "test2"的消息:
RdeMacBook-Pro:kafka r$ ./bin/kafka-console-consumer.sh --topic test2 --bootstrap-server localhost:9092a1
证明作业逻辑本身没有问题 , 实现' 从topic "test"读取数据 , 然后写入topic "test2" ' 。
使用"kafka-consumer-groups.sh"观察消费组"prod-econ-flink-TestKafkaOffsetCheckpointJob-local"的积压数量 , 重点观察指标"LAG" , 可以看到LAG为1 :
RdeMacBook-Pro:kafka r$ ./bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --group prod-econ-flink-TestKafkaOffsetCheckpointJob-local --describe; date; Consumer group 'prod-econ-flink-TestKafkaOffsetCheckpointJob-local' has no active members.TOPICPARTITIONCURRENT-OFFSETLOG-END-OFFSETLAGCONSUMER-IDHOSTCLIENT-IDtest1330---test0330---test2561---2020年10月18日 星期日 20时09分45秒 CSTRdeMacBook-Pro:kafka r$
推荐阅读
- 研究称iPhone 12国行128GB物料成本提高了21%:苹果加倍转嫁给消费者
- 用户|元旦当天苹果应用消费额超过5.4亿美元 刷新单日消费记录
- iPhone物料成本提高了21%:消费者买单
- 索尼欲将新款360 Reality Audio扬声器带入消费者家中
- 全真互联网,产业互联网和消费互联网的融合
- 消费者报告 | 美团充电宝电量不足也扣费,是质量问题还是系统缺陷?
- 中消协点名大数据网络杀熟 反对利用消费者个人数据画像
- 它“骗了”消费者多年,工厂早已停产,靠商标一年“捞金”12亿
- 莫让差评超长审核期侵害消费者权益
- 低欲望的后厂村:遍地985、211,高收入低消费,偏爱996