Flink的DataSet基本算子总结( 二 )

:表示第一张表 * 第二个Tuple2:表示第二张表 * Tuple3:多表join连接查询后的返回结果*/.with(new JoinFunction, Tuple2, Tuple3>() { public Tuple3 join(Tuple2 table1,Tuple2 table2) throws Exception {return new Tuple3(table1.f0,table1.f1,table2.f1); } }).print();4、笛卡尔积ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();//创建第一张表:用户ID姓名ArrayList> data1 = new ArrayList>();data1.add(new Tuple2(1,"Tom"));data1.add(new Tuple2(2,"Mike"));data1.add(new Tuple2(3,"Mary"));data1.add(new Tuple2(4,"Jone"));//创建第二张表:用户ID 所在的城市ArrayList> data2 = new ArrayList>();data2.add(new Tuple2(1,"北京"));data2.add(new Tuple2(2,"上海"));data2.add(new Tuple2(3,"广州"));data2.add(new Tuple2(4,"重庆"));//实现join的多表查询:用户ID姓名所在的程序DataSet> table1 = env.fromCollection(data1);DataSet> table2 = env.fromCollection(data2);//生成笛卡尔积table1.cross(table2).print();5、First-NExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();//这里的数据是:员工姓名、薪水、部门号DataSet> grade =env.fromElements(new Tuple3("Tom",1000,10),new Tuple3("Mary",1500,20),new Tuple3("Mike",1200,30),new Tuple3("Jerry",2000,10));//按照插入顺序取前三条记录grade.first(3).print();System.out.println("**********************");//先按照部门号排序 , 在按照薪水排序grade.sortPartition(2, Order.ASCENDING).sortPartition(1, Order.ASCENDING).print();System.out.println("**********************");//按照部门号分组 , 求每组的第一条记录grade.groupBy(2).first(1).print();6、外链接操作ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();//创建第一张表:用户ID姓名ArrayList> data1 = new ArrayList>();data1.add(new Tuple2(1,"Tom"));data1.add(new Tuple2(3,"Mary"));data1.add(new Tuple2(4,"Jone"));//创建第二张表:用户ID 所在的城市ArrayList> data2 = new ArrayList>();data2.add(new Tuple2(1,"北京"));data2.add(new Tuple2(2,"上海"));data2.add(new Tuple2(4,"重庆"));//实现join的多表查询:用户ID姓名所在的程序DataSet> table1 = env.fromCollection(data1);DataSet> table2 = env.fromCollection(data2);//左外连接table1.leftOuterJoin(table2).where(0).equalTo(0).with(new JoinFunction, Tuple2, Tuple3>() {public Tuple3 join(Tuple2 table1,Tuple2 table2) throws Exception {// 左外连接表示等号左边的信息会被包含if(table2 == null){return new Tuple3(table1.f0,table1.f1,null);}else{return new Tuple3(table1.f0,table1.f1,table2.f1);}} }).print();System.out.println("***********************************");//右外连接table1.rightOuterJoin(table2).where(0).equalTo(0).with(new JoinFunction, Tuple2, Tuple3>() {public Tuple3 join(Tuple2 table1,Tuple2 table2) throws Exception {//右外链接表示等号右边的表的信息会被包含if(table1 == null){return new Tuple3


推荐阅读