Pipeline的具体含义,可以看上面代码的注释信息 。下面,我们考虑以HDFS数据源作为Source,如何构建第一个PTransform,代码片段如下所示:
PCollection<KV<LongWritable, Text>> resultCollection = pipeline.apply(HDFSFileSource.readFrom( "hdfs://myserver:8020/data/ds/beam.txt", TextInputFormat.class, LongWritable.class, Text.class))
可以看到,返回的是具有键值分别为LongWritable、Text类型的KV对象集合,后续处理和上面处理逻辑类似 。如果使用Maven构建Project,需要加上如下依赖(这里beam.version的值可以为最新Release版本0.4.0):
<dependency> <groupId>org.apache.beam</groupId> <artifactId>beam-sdks-JAVA-io-hdfs</artifactId> <version>${beam.version}</version></dependency>
- 去重(Distinct)
package org.shirdrn.beam.examples; import org.apache.beam.runners.direct.DirectRunner;import org.apache.beam.sdk.Pipeline;import org.apache.beam.sdk.io.TextIO;import org.apache.beam.sdk.options.PipelineOptions;import org.apache.beam.sdk.options.PipelineOptionsFactory;import org.apache.beam.sdk.transforms.Distinct; public class DistinctExample { public static void main(String[] args) throws Exception { PipelineOptions options = PipelineOptionsFactory.create(); options.setRunner(DirectRunner.class); // 显式指定PipelineRunner:DirectRunner(Local模式) Pipeline pipeline = Pipeline.create(options); pipeline.apply(TextIO.Read.from("/tmp/dataset/MY_ID_FILE.txt")) .apply(Distinct.<String> create()) // 创建一个处理String类型的PTransform:Distinct .apply(TextIO.Write.to("deduped.txt")); // 输出结果 pipeline.run().waitUntilFinish(); }}
- 分组(GroupByKey)
package org.shirdrn.beam.examples; import org.apache.beam.runners.direct.DirectRunner;import org.apache.beam.runners.direct.repackaged.com.google.common.base.Joiner;import org.apache.beam.sdk.Pipeline;import org.apache.beam.sdk.io.TextIO;import org.apache.beam.sdk.options.PipelineOptions;import org.apache.beam.sdk.options.PipelineOptionsFactory;import org.apache.beam.sdk.transforms.DoFn;import org.apache.beam.sdk.transforms.GroupByKey;import org.apache.beam.sdk.transforms.MapElements;import org.apache.beam.sdk.transforms.ParDo;import org.apache.beam.sdk.transforms.SimpleFunction;import org.apache.beam.sdk.values.KV; public class GroupByKeyExample { @SuppressWarnings("serial") public static void main(String[] args) { PipelineOptions options = PipelineOptionsFactory.create(); options.setRunner(DirectRunner.class); // 显式指定PipelineRunner:DirectRunner(Local模式) Pipeline pipeline = Pipeline.create(options); pipeline.apply(TextIO.Read.from("/tmp/dataset/MY_INFO_FILE.txt")) .apply("ExtractFields", ParDo.of(new DoFn<String, KV<String, String>>() { @ProcessElement public void processElement(ProcessContext c) { // file format example: 35451605324179 3G CMCC String[] values = c.element().split("t"); if(values.length == 3) { c.output(KV.of(values[1], values[0])); } } })) .apply("GroupByKey", GroupByKey.<String, String>create()) // 创建一个GroupByKey实例的PTransform .apply("ConcatResults", MapElements.via( new SimpleFunction<KV<String, Iterable<String>>, String>() { @Override public String apply(KV<String, Iterable<String>> input) { return new StringBuffer() .append(input.getKey()).append("t") .append(Joiner.on(",").join(input.getValue())) .toString(); } })) .apply(TextIO.Write.to("grouppedResults")); pipeline.run().waitUntilFinish(); }}
推荐阅读
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
- 架构的腐化是必然的
- 浅谈数据库分布式架构设计
- 离婚后户口迁回娘家可以独立一户吗?
- 产后骶髂关节炎
- 淘宝开店不经营会有什么后果 开了淘宝店怎么运营起来
- 怎么才能成为淘宝商家 淘宝开店之后怎么操作
- 达成一次性赔偿协议后,能否再次起诉要求赔偿?
- 开机后不能直达想看的节目,如何为老年人选择一台操作便利的电视
- 风衣后面的腰带怎么系?
- 运动后血尿什么原因