模式生成数据和主流程代码:public static void mAIn(String[] args) throws Exception {// 1、执行环境创建StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();// 2、读取Socket数据端口 。实际根据具体业务对接数据来源DataStreamSource<String> orderStream = environment.socketTextStream("localhost", 9527);// 3、数据读取个切割方式SingleOutputStreamOperator<OrderBO> resultDataStream = orderStream.flatMap(new CleanDataAnd2Order()) // 清洗和处理数据.keyBy(x -> x.getUserId()) // 分区.process(new AlarmLogic()); // 处理告警逻辑// 4、打印分析结果resultDataStream.print("告警===>");// 5、环境启动environment.execute("OrderAlarmApp");}
模拟数据:
模拟场景:某个用户1分钟内连续两次退款,第二次发出告警 。示例数据:1,aaa,100,1,user12,bbb,200,1,user23,ccc,300,2,user14,ddd,400,2,user15,ddd,400,2,user16,bbb,200,2,user27,bbb,400,2,user2
完整代码地址:https://Github.com/yclxiao/flink-blog/blob/7eb84d18aa71d8f2023d6158796de34d331b9b3f/src/main/JAVA/top/mangod/flinkblog/demo005/OrderAlarmApp.java#L43
推荐阅读
- 只需五分钟就能解决的Active Directory安全问题
- 一文带你了解Docker与Containerd的区别
- 关于接口测试,你了解多少?
- 如何从一个小细节了解一个人的内心世界?
- 秋刀鱼为什么那么便宜?秋刀鱼的害处是啥?先了解一下 秋刀鱼为什么那么便宜
- 贴对联的风俗与讲究 贴对联的风俗与讲究了解怎么写
- 了解编制,到底啥是备案制?
- 婚车租赁注意事项大全
- 减肥适合吃的7种主食,玉米排在第6,第一个建议常吃,不妨了解
- 了解饮料才能喝出健康