80后架构师教你学ApacheBeam,一个开源统一分布式数据处理编程库( 三 )


  • Combine
这里,单独把Combine这类合并数据集的实现拿出来,它的抽象很有趣,主要面向globally 和per-key这两类抽象,实现了一个非常丰富的PTransform算子库,对应的类图如下所示:
80后架构师教你学ApacheBeam,一个开源统一分布式数据处理编程库

文章插图
 
通过上图可以看出,作用在一个数据集上具有Combine特征的基本操作:Max、Min、Top、Mean、Sum、Count等等 。
  • Window
Window是用来处理某一个Micro batch的数据记录可以进行Merge这种场景的需求,通常用在Streaming处理的情况下 。Apache Beam也提供了对Window的抽象,其中对于某一个Window下的数据的处理,是通过WindowFn接口来定义的,与该接口相关的处理类,如下类图所示:
80后架构师教你学ApacheBeam,一个开源统一分布式数据处理编程库

文章插图
 
编程实战
首先说明一下,为了简单起见,我直接在代码中显式配置指定PipelineRunner,示例代码片段如下所示:
PipelineOptions options = PipelineOptionsFactory.create();
options.setRunner(DirectRunner.class);
如果要部署到服务器上,可以通过命令行的方式指定PipelineRunner,比如要在Spark集群上运行,类似如下所示命令行:
spark-submit --class org.shirdrn.beam.examples.MinimalwordCountBasedSparkRunner 2017-01-18 --master spark://myserver:7077 target/my-beam-Apps-0.0.1-SNAPSHOT-shaded.jar --runner=SparkRunner下面,我们从几个典型的例子来看(基于Apache Beam软件包的examples有所改动),Apache Beam如何构建Pipeline并运行在指定的PipelineRunner上:
  • WordCount(Count/Source/Sink)
我们根据Apache Beam的MinimalWordCount示例代码开始,看如何构建一个Pipeline,并最终执行它 。MinimalWordCount的实现,代码如下所示:
package org.shirdrn.beam.examples; import org.apache.beam.runners.direct.DirectRunner;import org.apache.beam.sdk.Pipeline;import org.apache.beam.sdk.io.TextIO;import org.apache.beam.sdk.options.PipelineOptions;import org.apache.beam.sdk.options.PipelineOptionsFactory;import org.apache.beam.sdk.transforms.Count;import org.apache.beam.sdk.transforms.DoFn;import org.apache.beam.sdk.transforms.MapElements;import org.apache.beam.sdk.transforms.ParDo;import org.apache.beam.sdk.transforms.SimpleFunction;import org.apache.beam.sdk.values.KV; public class MinimalWordCount {     @SuppressWarnings("serial")    public static void main(String[] args) {         PipelineOptions options = PipelineOptionsFactory.create();        options.setRunner(DirectRunner.class); // 显式指定PipelineRunner:DirectRunner(Local模式)         Pipeline pipeline = Pipeline.create(options);         pipeline.apply(TextIO.Read.from("/tmp/dataset/apache_beam.txt")) // 读取本地文件,构建第一个PTransform                .apply("ExtractWords", ParDo.of(new DoFn<String, String>() { // 对文件中每一行进行处理(实际上Split)                     @ProcessElement                    public void processElement(ProcessContext c) {                        for (String word : c.element().split("[\s:\,\.\-]+")) {                            if (!word.isEmpty()) {                                c.output(word);                            }                        }                    }                 }))                .apply(Count.<String> perElement()) // 统计每一个Word的Count                .apply("ConcatResultKVs", MapElements.via( // 拼接最后的格式化输出(Key为Word,Value为Count)                        new SimpleFunction<KV<String, Long>, String>() {                     @Override                    public String apply(KV<String, Long> input) {                        return input.getKey() + ": " + input.getValue();                    }                 }))                .apply(TextIO.Write.to("wordcount")); // 输出结果         pipeline.run().waitUntilFinish();    }}


推荐阅读