五分钟了解Flink状态管理( 二 )

模式生成数据和主流程代码:public static void mAIn(String[] args) throws Exception {// 1、执行环境创建StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();// 2、读取Socket数据端口 。实际根据具体业务对接数据来源DataStreamSource<String> orderStream = environment.socketTextStream("localhost", 9527);// 3、数据读取个切割方式SingleOutputStreamOperator<OrderBO> resultDataStream = orderStream.flatMap(new CleanDataAnd2Order()) // 清洗和处理数据.keyBy(x -> x.getUserId()) // 分区.process(new AlarmLogic()); // 处理告警逻辑// 4、打印分析结果resultDataStream.print("告警===>");// 5、环境启动environment.execute("OrderAlarmApp");}模拟数据:
模拟场景:某个用户1分钟内连续两次退款,第二次发出告警 。示例数据:1,aaa,100,1,user12,bbb,200,1,user23,ccc,300,2,user14,ddd,400,2,user15,ddd,400,2,user16,bbb,200,2,user27,bbb,400,2,user2完整代码地址:https://Github.com/yclxiao/flink-blog/blob/7eb84d18aa71d8f2023d6158796de34d331b9b3f/src/main/JAVA/top/mangod/flinkblog/demo005/OrderAlarmApp.java#L43




推荐阅读