Flink的DataSet基本算子总结( 二 )
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();//创建第一张表:用户ID姓名ArrayList> data1 = new ArrayList>();data1.add(new Tuple2(1,"Tom"));data1.add(new Tuple2(2,"Mike"));data1.add(new Tuple2(3,"Mary"));data1.add(new Tuple2(4,"Jone"));//创建第二张表:用户ID 所在的城市ArrayList> data2 = new ArrayList>();data2.add(new Tuple2(1,"北京"));data2.add(new Tuple2(2,"上海"));data2.add(new Tuple2(3,"广州"));data2.add(new Tuple2(4,"重庆"));//实现join的多表查询:用户ID姓名所在的程序DataSet> table1 = env.fromCollection(data1);DataSet> table2 = env.fromCollection(data2);//生成笛卡尔积table1.cross(table2).print();
5、First-NExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();//这里的数据是:员工姓名、薪水、部门号DataSet> grade =env.fromElements(new Tuple3("Tom",1000,10),new Tuple3("Mary",1500,20),new Tuple3("Mike",1200,30),new Tuple3("Jerry",2000,10));//按照插入顺序取前三条记录grade.first(3).print();System.out.println("**********************");//先按照部门号排序 , 在按照薪水排序grade.sortPartition(2, Order.ASCENDING).sortPartition(1, Order.ASCENDING).print();System.out.println("**********************");//按照部门号分组 , 求每组的第一条记录grade.groupBy(2).first(1).print();
6、外链接操作ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();//创建第一张表:用户ID姓名ArrayList> data1 = new ArrayList>();data1.add(new Tuple2(1,"Tom"));data1.add(new Tuple2(3,"Mary"));data1.add(new Tuple2(4,"Jone"));//创建第二张表:用户ID 所在的城市ArrayList> data2 = new ArrayList>();data2.add(new Tuple2(1,"北京"));data2.add(new Tuple2(2,"上海"));data2.add(new Tuple2(4,"重庆"));//实现join的多表查询:用户ID姓名所在的程序DataSet> table1 = env.fromCollection(data1);DataSet> table2 = env.fromCollection(data2);//左外连接table1.leftOuterJoin(table2).where(0).equalTo(0).with(new JoinFunction, Tuple2
推荐阅读
- 荣耀V40正式得到确认!参数配置也基本确定!售价或将是惊喜
- 九千元买的手机,最基本的功能都不好用,男子:还能干什么?
- FlinkSQL 动态加载 UDF 实现思路
- 国内运营商基本停止4G投资 但依然可用
- 苹果手机信号差问题基本确定,英特尔如释重负,背了整整4年的锅
- iPhone SE Plus基本确认,配备A14、支持5G
- 万字干货还原美团Flink实时数仓建设
- 超越华为小米,这个国产手机在美国卖出新高度,只是国人基本不用
- 网易云音乐基于Flink实时数仓实践
- flink消费kafka的offset与checkpoint