80后架构师教你学ApacheBeam,一个开源统一分布式数据处理编程库( 四 )

Pipeline的具体含义,可以看上面代码的注释信息 。下面,我们考虑以HDFS数据源作为Source,如何构建第一个PTransform,代码片段如下所示:
PCollection<KV<LongWritable, Text>> resultCollection = pipeline.apply(HDFSFileSource.readFrom(        "hdfs://myserver:8020/data/ds/beam.txt",        TextInputFormat.class, LongWritable.class, Text.class))可以看到,返回的是具有键值分别为LongWritable、Text类型的KV对象集合,后续处理和上面处理逻辑类似 。如果使用Maven构建Project,需要加上如下依赖(这里beam.version的值可以为最新Release版本0.4.0):
<dependency>    <groupId>org.apache.beam</groupId>    <artifactId>beam-sdks-JAVA-io-hdfs</artifactId>    <version>${beam.version}</version></dependency>

  • 去重(Distinct)
去重也是对数据集比较常见的操作,使用Apache Beam来实现,示例代码如下所示:
package org.shirdrn.beam.examples; import org.apache.beam.runners.direct.DirectRunner;import org.apache.beam.sdk.Pipeline;import org.apache.beam.sdk.io.TextIO;import org.apache.beam.sdk.options.PipelineOptions;import org.apache.beam.sdk.options.PipelineOptionsFactory;import org.apache.beam.sdk.transforms.Distinct; public class DistinctExample {     public static void main(String[] args) throws Exception {          PipelineOptions options = PipelineOptionsFactory.create();         options.setRunner(DirectRunner.class); // 显式指定PipelineRunner:DirectRunner(Local模式)          Pipeline pipeline = Pipeline.create(options);         pipeline.apply(TextIO.Read.from("/tmp/dataset/MY_ID_FILE.txt"))             .apply(Distinct.<String> create()) // 创建一个处理String类型的PTransform:Distinct             .apply(TextIO.Write.to("deduped.txt")); // 输出结果         pipeline.run().waitUntilFinish();    }}
  • 分组(GroupByKey)
对数据进行分组操作也非常普遍,我们拿一个最基础的PTransform实现GroupByKey来实现一个例子,代码如下所示:
package org.shirdrn.beam.examples; import org.apache.beam.runners.direct.DirectRunner;import org.apache.beam.runners.direct.repackaged.com.google.common.base.Joiner;import org.apache.beam.sdk.Pipeline;import org.apache.beam.sdk.io.TextIO;import org.apache.beam.sdk.options.PipelineOptions;import org.apache.beam.sdk.options.PipelineOptionsFactory;import org.apache.beam.sdk.transforms.DoFn;import org.apache.beam.sdk.transforms.GroupByKey;import org.apache.beam.sdk.transforms.MapElements;import org.apache.beam.sdk.transforms.ParDo;import org.apache.beam.sdk.transforms.SimpleFunction;import org.apache.beam.sdk.values.KV; public class GroupByKeyExample {     @SuppressWarnings("serial")    public static void main(String[] args) {         PipelineOptions options = PipelineOptionsFactory.create();        options.setRunner(DirectRunner.class); // 显式指定PipelineRunner:DirectRunner(Local模式)         Pipeline pipeline = Pipeline.create(options);         pipeline.apply(TextIO.Read.from("/tmp/dataset/MY_INFO_FILE.txt"))            .apply("ExtractFields", ParDo.of(new DoFn<String, KV<String, String>>() {                 @ProcessElement                public void processElement(ProcessContext c) {                    // file format example: 35451605324179    3G    CMCC                    String[] values = c.element().split("t");                    if(values.length == 3) {                        c.output(KV.of(values[1], values[0]));                    }                }            }))            .apply("GroupByKey", GroupByKey.<String, String>create()) // 创建一个GroupByKey实例的PTransform            .apply("ConcatResults", MapElements.via(                    new SimpleFunction<KV<String, Iterable<String>>, String>() {                         @Override                        public String apply(KV<String, Iterable<String>> input) {                            return new StringBuffer()                                    .append(input.getKey()).append("t")                                    .append(Joiner.on(",").join(input.getValue()))                                    .toString();                        }              }))            .apply(TextIO.Write.to("grouppedResults"));         pipeline.run().waitUntilFinish();     }}


推荐阅读