80后架构师教你学ApacheBeam,一个开源统一分布式数据处理编程库( 五 )

使用DirectRunner运行,输出文件名称类似于
grouppedResults-00000-of-00002、
grouppedResults-00001-of-00002等等 。

  • 连接(Join)
最后,我们通过实现一个Join的例子,其中,用户的基本信息包含ID和名称,对应文件格式如下所示:
35451605324179 Jack
35236905298306 Jim
35236905519469 John
35237005022314 Linda
另一个文件是用户使用手机的部分信息,文件格式如下所示:
35451605324179 3G 中国移动
35236905298306 2G 中国电信
35236905519469 4G 中国移动
我们希望通过Join操作后,能够知道用户使用的什么网络(用户名+网络),使用Apache Beam实现,具体实现代码如下所示:
package org.shirdrn.beam.examples; import org.apache.beam.runners.direct.DirectRunner;import org.apache.beam.sdk.Pipeline;import org.apache.beam.sdk.io.TextIO;import org.apache.beam.sdk.options.PipelineOptions;import org.apache.beam.sdk.options.PipelineOptionsFactory;import org.apache.beam.sdk.transforms.DoFn;import org.apache.beam.sdk.transforms.MapElements;import org.apache.beam.sdk.transforms.ParDo;import org.apache.beam.sdk.transforms.SimpleFunction;import org.apache.beam.sdk.transforms.join.CoGbkResult;import org.apache.beam.sdk.transforms.join.CoGroupByKey;import org.apache.beam.sdk.transforms.join.KeyedPCollectionTuple;import org.apache.beam.sdk.values.KV;import org.apache.beam.sdk.values.PCollection;import org.apache.beam.sdk.values.TupleTag; public class JoinExample {     @SuppressWarnings("serial")    public static void main(String[] args) {         PipelineOptions options = PipelineOptionsFactory.create();        options.setRunner(DirectRunner.class);  // 显式指定PipelineRunner:DirectRunner(Local模式)         Pipeline pipeline = Pipeline.create(options);         // create ID info collection        final PCollection<KV<String, String>> idInfoCollection = pipeline                .apply(TextIO.Read.from("/tmp/dataset/MY_ID_INFO_FILE.txt"))                .apply("CreateUserIdInfoPairs", MapElements.via(                        new SimpleFunction<String, KV<String, String>>() {                     @Override                    public KV<String, String> apply(String input) {                        // line format example: 35451605324179    Jack                        String[] values = input.split("t");                        return KV.of(values[0], values[1]);                    }                 }));         // create operation collection        final PCollection<KV<String, String>> opCollection = pipeline                .apply(TextIO.Read.from("/tmp/dataset/MY_ID_OP_INFO_FILE.txt"))                .apply("CreateIdOperationPairs", MapElements.via(                        new SimpleFunction<String, KV<String, String>>() {                     @Override                    public KV<String, String> apply(String input) {                        // line format example: 35237005342309    3G    CMCC                        String[] values = input.split("t");                        return KV.of(values[0], values[1]);                    }                 }));         final TupleTag<String> idInfoTag = new TupleTag<String>();        final TupleTag<String> opInfoTag = new TupleTag<String>();         final PCollection<KV<String, CoGbkResult>> cogrouppedCollection = KeyedPCollectionTuple                .of(idInfoTag, idInfoCollection)                .and(opInfoTag, opCollection)                .apply(CoGroupByKey.<String>create());         final PCollection<KV<String, String>> finalResultCollection = cogrouppedCollection                .apply("CreateJoinedIdInfoPairs", ParDo.of(new DoFn<KV<String, CoGbkResult>, KV<String, String>>() {                 @ProcessElement                public void processElement(ProcessContext c) {                    KV<String, CoGbkResult> e = c.element();                    String id = e.getKey();                    String name = e.getValue().getOnly(idInfoTag);                    for (String opInfo : c.element().getValue().getAll(opInfoTag)) {                      // Generate a string that combines information from both collection values                      c.output(KV.of(id, "t" + name + "t" + opInfo));                    }                }        }));         PCollection<String> formattedResults = finalResultCollection                .apply("FormatFinalResults", ParDo.of(new DoFn<KV<String, String>, String>() {                  @ProcessElement                  public void processElement(ProcessContext c) {                    c.output(c.element().getKey() + "t" + c.element().getValue());                  }                }));          formattedResults.apply(TextIO.Write.to("joinedResults"));         pipeline.run().waitUntilFinish();     }}


推荐阅读