使用DirectRunner运行,输出文件名称类似于
grouppedResults-00000-of-00002、
grouppedResults-00001-of-00002等等 。
- 连接(Join)
35451605324179 Jack另一个文件是用户使用手机的部分信息,文件格式如下所示:
35236905298306 Jim
35236905519469 John
35237005022314 Linda
35451605324179 3G 中国移动我们希望通过Join操作后,能够知道用户使用的什么网络(用户名+网络),使用Apache Beam实现,具体实现代码如下所示:
35236905298306 2G 中国电信
35236905519469 4G 中国移动
package org.shirdrn.beam.examples; import org.apache.beam.runners.direct.DirectRunner;import org.apache.beam.sdk.Pipeline;import org.apache.beam.sdk.io.TextIO;import org.apache.beam.sdk.options.PipelineOptions;import org.apache.beam.sdk.options.PipelineOptionsFactory;import org.apache.beam.sdk.transforms.DoFn;import org.apache.beam.sdk.transforms.MapElements;import org.apache.beam.sdk.transforms.ParDo;import org.apache.beam.sdk.transforms.SimpleFunction;import org.apache.beam.sdk.transforms.join.CoGbkResult;import org.apache.beam.sdk.transforms.join.CoGroupByKey;import org.apache.beam.sdk.transforms.join.KeyedPCollectionTuple;import org.apache.beam.sdk.values.KV;import org.apache.beam.sdk.values.PCollection;import org.apache.beam.sdk.values.TupleTag; public class JoinExample { @SuppressWarnings("serial") public static void main(String[] args) { PipelineOptions options = PipelineOptionsFactory.create(); options.setRunner(DirectRunner.class); // 显式指定PipelineRunner:DirectRunner(Local模式) Pipeline pipeline = Pipeline.create(options); // create ID info collection final PCollection<KV<String, String>> idInfoCollection = pipeline .apply(TextIO.Read.from("/tmp/dataset/MY_ID_INFO_FILE.txt")) .apply("CreateUserIdInfoPairs", MapElements.via( new SimpleFunction<String, KV<String, String>>() { @Override public KV<String, String> apply(String input) { // line format example: 35451605324179 Jack String[] values = input.split("t"); return KV.of(values[0], values[1]); } })); // create operation collection final PCollection<KV<String, String>> opCollection = pipeline .apply(TextIO.Read.from("/tmp/dataset/MY_ID_OP_INFO_FILE.txt")) .apply("CreateIdOperationPairs", MapElements.via( new SimpleFunction<String, KV<String, String>>() { @Override public KV<String, String> apply(String input) { // line format example: 35237005342309 3G CMCC String[] values = input.split("t"); return KV.of(values[0], values[1]); } })); final TupleTag<String> idInfoTag = new TupleTag<String>(); final TupleTag<String> opInfoTag = new TupleTag<String>(); final PCollection<KV<String, CoGbkResult>> cogrouppedCollection = KeyedPCollectionTuple .of(idInfoTag, idInfoCollection) .and(opInfoTag, opCollection) .apply(CoGroupByKey.<String>create()); final PCollection<KV<String, String>> finalResultCollection = cogrouppedCollection .apply("CreateJoinedIdInfoPairs", ParDo.of(new DoFn<KV<String, CoGbkResult>, KV<String, String>>() { @ProcessElement public void processElement(ProcessContext c) { KV<String, CoGbkResult> e = c.element(); String id = e.getKey(); String name = e.getValue().getOnly(idInfoTag); for (String opInfo : c.element().getValue().getAll(opInfoTag)) { // Generate a string that combines information from both collection values c.output(KV.of(id, "t" + name + "t" + opInfo)); } } })); PCollection<String> formattedResults = finalResultCollection .apply("FormatFinalResults", ParDo.of(new DoFn<KV<String, String>, String>() { @ProcessElement public void processElement(ProcessContext c) { c.output(c.element().getKey() + "t" + c.element().getValue()); } })); formattedResults.apply(TextIO.Write.to("joinedResults")); pipeline.run().waitUntilFinish(); }}
推荐阅读
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
- 架构的腐化是必然的
- 浅谈数据库分布式架构设计
- 离婚后户口迁回娘家可以独立一户吗?
- 产后骶髂关节炎
- 淘宝开店不经营会有什么后果 开了淘宝店怎么运营起来
- 怎么才能成为淘宝商家 淘宝开店之后怎么操作
- 达成一次性赔偿协议后,能否再次起诉要求赔偿?
- 开机后不能直达想看的节目,如何为老年人选择一台操作便利的电视
- 风衣后面的腰带怎么系?
- 运动后血尿什么原因