- Combine
文章插图
通过上图可以看出,作用在一个数据集上具有Combine特征的基本操作:Max、Min、Top、Mean、Sum、Count等等 。
- Window
文章插图
编程实战
首先说明一下,为了简单起见,我直接在代码中显式配置指定PipelineRunner,示例代码片段如下所示:
PipelineOptions options = PipelineOptionsFactory.create();如果要部署到服务器上,可以通过命令行的方式指定PipelineRunner,比如要在Spark集群上运行,类似如下所示命令行:
options.setRunner(DirectRunner.class);
spark-submit --class org.shirdrn.beam.examples.MinimalwordCountBasedSparkRunner 2017-01-18 --master spark://myserver:7077 target/my-beam-Apps-0.0.1-SNAPSHOT-shaded.jar --runner=SparkRunner
下面,我们从几个典型的例子来看(基于Apache Beam软件包的examples有所改动),Apache Beam如何构建Pipeline并运行在指定的PipelineRunner上:- WordCount(Count/Source/Sink)
package org.shirdrn.beam.examples; import org.apache.beam.runners.direct.DirectRunner;import org.apache.beam.sdk.Pipeline;import org.apache.beam.sdk.io.TextIO;import org.apache.beam.sdk.options.PipelineOptions;import org.apache.beam.sdk.options.PipelineOptionsFactory;import org.apache.beam.sdk.transforms.Count;import org.apache.beam.sdk.transforms.DoFn;import org.apache.beam.sdk.transforms.MapElements;import org.apache.beam.sdk.transforms.ParDo;import org.apache.beam.sdk.transforms.SimpleFunction;import org.apache.beam.sdk.values.KV; public class MinimalWordCount { @SuppressWarnings("serial") public static void main(String[] args) { PipelineOptions options = PipelineOptionsFactory.create(); options.setRunner(DirectRunner.class); // 显式指定PipelineRunner:DirectRunner(Local模式) Pipeline pipeline = Pipeline.create(options); pipeline.apply(TextIO.Read.from("/tmp/dataset/apache_beam.txt")) // 读取本地文件,构建第一个PTransform .apply("ExtractWords", ParDo.of(new DoFn<String, String>() { // 对文件中每一行进行处理(实际上Split) @ProcessElement public void processElement(ProcessContext c) { for (String word : c.element().split("[\s:\,\.\-]+")) { if (!word.isEmpty()) { c.output(word); } } } })) .apply(Count.<String> perElement()) // 统计每一个Word的Count .apply("ConcatResultKVs", MapElements.via( // 拼接最后的格式化输出(Key为Word,Value为Count) new SimpleFunction<KV<String, Long>, String>() { @Override public String apply(KV<String, Long> input) { return input.getKey() + ": " + input.getValue(); } })) .apply(TextIO.Write.to("wordcount")); // 输出结果 pipeline.run().waitUntilFinish(); }}
推荐阅读
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
- 架构的腐化是必然的
- 浅谈数据库分布式架构设计
- 离婚后户口迁回娘家可以独立一户吗?
- 产后骶髂关节炎
- 淘宝开店不经营会有什么后果 开了淘宝店怎么运营起来
- 怎么才能成为淘宝商家 淘宝开店之后怎么操作
- 达成一次性赔偿协议后,能否再次起诉要求赔偿?
- 开机后不能直达想看的节目,如何为老年人选择一台操作便利的电视
- 风衣后面的腰带怎么系?
- 运动后血尿什么原因