科技排头@高级应用模式第二辑:应用模式的动态更新,Flink( 二 )
DynamicKeyFunction和DynamicAlertFunction之间的HASH连接意味着 , 对于每个消息都将计算一个哈希码 , 并且消息将在下一个运算符的可用并行实例之间平均分配 。 需要使用keyBy从Flink显式“请求”这样的连接 。
文章图片
图4:在运算符实例之间传递的哈希消息(通过keyBy)
REBALANCE分布是由对rebalance()的显式调用或并行度的更改(对于图2中的作业图而言 , 为12->1)引起的 。 调用rebalance()会使数据以循环方式重新分区 , 并且在某些情况下可以帮助减轻数据偏斜 。
文章图片
图5:跨运算符实例传递的REBALANCE消息
图2中的欺诈检测作业图包含一个附加数据源:规则源(RulesSource) 。 它还从Kafka消费 。 规则通过BROADCAST通道“混合”到主处理数据流中 。 在运算符之间传输数据的其他方法(例如forward、hash或rebalance) , 会让每个消息只可在接收的运算符的并行实例之一中处理;相比之下 , broadcast会让每个消息在broadcaststream连接的运算符的所有并行实例的输入上可用 。 这使得broadcast方法适用于多种需要影响所有消息处理的任务 , 而无需考虑它们的键或源分区 。
文章图片
图6:跨运算符实例传递的BROADCAST消息
注意:实际上Flink中有一些更特殊的数据分区方案 , 我们在这里没有提到 。 如果你想了解更多信息 , 请参阅Flink有关流分区的文档 。
广播状态模式
为了使用规则源 , 我们需要将其“连接”到主数据流:
//StreamssetupDataStream<Transaction>transactions=[...]DataStream<Rule>rulesUpdateStream=[...]{1}BroadcastStream<Rule>rulesStream=rulesUpdateStream.broadcast(RULES_STATE_DESCRIPTOR);{1}//ProcessingpipelinesetupDataStream<Alert>alerts=transactions.connect(rulesStream).process(newDynamicKeyFunction()).keyBy((keyed)->keyed.getKey()).connect(rulesStream).process(newDynamicAlertFunction())如你所见 , 可以调用broadcast方法并指定状态描述符 , 从任何常规流中创建广播流 。 Flink假定在处理主数据流的事件时需要存储和检索广播的数据 , 因此总是从该状态描述符自动创建相应的广播状态(broadcaststate) 。 这与其他的ApacheFlink状态类型是不一样的 , 其他类型中你需要在处理函数的open()方法中对其进行初始化 。 另请注意 , 广播状态始终具有键值格式(MapState) 。
publicstaticfinalMapStateDescriptor<Integer,Rule>RULES_STATE_DESCRIPTOR=newMapStateDescriptor<>(''rules'',Integer.class,Rule.class);连接到rulesStream会导致处理函数的签名发生某些变化 。 上一篇文章在这里做了一点简化 , 用的是ProcessFunction 。 但是 , DynamicKeyFunction实际上是一个BroadcastProcessFunction 。
?复制代码
publicabstractclassBroadcastProcessFunction<IN1,IN2,OUT>{publicabstractvoidprocessElement(IN1value,ReadOnlyContextctx,Collector<OUT>out)throwsException;publicabstractvoidprocessBroadcastElement(IN2value,Contextctx,Collector<OUT>out)throwsException;}这里的区别在于增加了processBroadcastElement方法 , 规则流的消息将通过该方法到达 。 下面新版本的DynamicKeyFunction允许在运行时通过这个流 , 修改数据分配键的列表:
publicclassDynamicKeyFunctionextendsBroadcastProcessFunction<Transaction,Rule,Keyed<Transaction,String,Integer>>{@OverridepublicvoidprocessBroadcastElement(Rulerule,Contextctx,Collector<Keyed<Transaction,String,Integer>>out){BroadcastState<Integer,Rule>broadcastState=ctx.getBroadcastState(RULES_STATE_DESCRIPTOR);broadcastState.put(rule.getRuleId(),rule);}@OverridepublicvoidprocessElement(Transactionevent,ReadOnlyContextctx,Collector<Keyed<Transaction,String,Integer>>out){ReadOnlyBroadcastState<Integer,Rule>rulesState=ctx.getBroadcastState(RULES_STATE_DESCRIPTOR);for(Map.Entry<Integer,Rule>entry:rulesState.immutableEntries()){finalRulerule=entry.getValue();out.collect(newKeyed<>(event,KeysExtractor.getKey(rule.getGroupingKeyNames(),event),rule.getRuleId()));}}}在上面的代码中 , processElement()接收事务 , 而processBroadcastElement()接收规则更新 。 创建新规则后将按图6所示分配 , 并使用processBroadcastState将其保存在运算符的所有并行实例中 。 我们使用规则的ID作为存储和引用各个规则的键 。 我们不再迭代硬编码的List , 而是迭代动态更新的广播状态的条目 。
推荐阅读
- 小米科技▲卢伟冰再次发力,全球首发骁龙768G,5G新机将在两天后发布!
- 快科技最贵或达5000元,苹果头戴耳机更多细节曝光:包含两款
- 科技迷7nm版年底流片,要放弃美国代工?国产x86转向三星台积电代工
- 骊微电子科技PD充电器应用方案,PN8161+PN8307H高集成18W
- 快科技小米高管都是外人?雷军透露了一个秘密
- 靓科技解读Thing,a16z、5.15亿美金的数据加密股票基金:找寻下一个Big
- 王伯伯说科技流畅用三年,即将开学的学生党准备好了吗?三款高配低价千元机
- 知叔达科技中芯国际早已料到,成功绕开了光刻机,怒了!荷兰ASML再次失约
- 小熊带你玩科技数据成粤企生产新要素,工业互联网深调研〡从经验依赖到数据驱动
- 每日科技果粉大批华人再掀归国潮,美利坚的钱“不香了”?,硅谷科技人才流失