public abstract class UDF { /** * 是否支持该组参数类型,不支持抛出UnsupportedTypeException异常 。默认返回 true */ public void support(Class<?>... paramsClass)throws UnsupportedTypeException; /*** 该 UDF 返回值类型,用于校验嵌套函数类型是否匹配 。可返回简单类型,map,array,record 等类型.默认返回 String 类型*/ public Class<?> returnType();/*** UDF 执行函数,当输入不符合预期时,向外抛出异常* @param params 函数的输入实参* @return 函数输出结果,简单类型或者复杂类型,支持简单类型,map,array,record 类型*/public abstract Object eval(Object... params);}一个判断是否包含子串的UDF 写法:
文章插图
所有的UDF都通过一个核心注册类(这点类似 Hive 的FunctionRegistry)
public final class UdfRegistors { /** * UDF 函数映射 */static final Map<String, UDF> UDF_CACHED = new HashMap<String, UDF>(); static {UDF_CACHED.put("copy", new CopyUDF()); // 复制一个变量的值 UDF_CACHED.put("eq", new EqUDF()); // 判断两个变量是否相等 UDF_CACHED.put("yn", new YnUDF()); // 根据输入true,false 转换为 Y、NUDF_CACHED.put("null", new NullUDF()); // 判断变量是否为null// add udf methodUDF_CACHED.put("location", new LocationUDF()); // 获得手机号码的归属地 UDF_CACHED.put("nation_code", new NationCodeUDF()); // 根据国家名称获取国家代码 UDF_CACHED.put("province_code", new ProvinceCodeUDF()); //根据省名称获取省代码 UDF_CACHED.put("city_code", new CityCodeUDF()); // 根据城市名称获取城市代码 UDF_CACHED.put("phone_num", new PhoneNumUDF()); // 校验是否是手机号或者固话UDF_CACHED.put("number_format", new NumberFormatUDF()); //校验是否可以转化成数字}/*** 添加一个UDF函数 * @param key UDF 函数 * @param value UDF 函数 eval 应线程安全 * @return */ public static boolean addUdf(String key, UDF value) { return UDF_CACHED.put(Optional.of(key).map((it)->it.toLowerCase()).get(), value) != null; } /** * 获得内置的 udf 函数 */ public static UDF getUdf(String udfName) { return UDF_CACHED.get(udfName.toLowerCase()); }}UDF 函数注册时期:
- 可在编译期绑定内置的 UDF 函数;
- 可在系统启动时配置自加载的 UDF 函数;
- 可在运行期动态注入UDF 函数;
文章插图
六、FlumeOnYarn 架构和分布式部署本架构适合以文件作为数据对接的方案,另一方面,通过扩展 Flume 即可实现拿来主义 。Flume 内部实现对 Channel 的 Transaction,对于每个以文件构造的 Event 对象是原子操作,要么全部成功,要么失败 。flume依赖事务来保证event的可靠性 。Flume 默认没有分布式实现,因此开发了 FlumeOnYarn 的架构,用于支持 Flume 的分布式部署 。
FlumeOnYarn优势:
- 无需每个节点安装 Flume,可一键启动和停止;
- 配置文件在客户端节点修改,自动复制到 Yarn 上各实例,无需每个节点修改;
- 基于 CDH或HDP的发行版,即使实现了 Web 可视化化的配置和分布式部署,但是对于 Flume 只能实现单配置文件实例,无法实现多配置实例;
- 集群的规模可以根据数据量大小进行实时的调整(增减节点),实现弹性处理 。通过命令或者 api 即可控制(CDH 等需要在页面添加 host,繁琐且不易动态调整);
- 多个租户或者同一租户多个处理实例互不影响,且能隔离(Yarn Container);
文章插图
上图所示,提交FlumeOnYarn 需要客户端,该客户端没有太多和Flume安装包结构特殊的地方,只是在 lib 下添加了 flume-yarn 的架构支持和 bin 下 flume-on-yarn 的启动脚本 。
Flume OnYarn 客户端程序
文章插图
通过 bin/flume-on-yarn 即可提交 FlumeOnYarn Application 集群 。如下的命令即可一次性申请多个 Yarn 资源节点,实现一键部署:
推荐阅读
- 常见分布式锁实现方式
- 手机店铺介绍怎么写 淘宝开店宝贝描述怎么写
- 基于CentOS8Linux运维教程-Linux系统用户与组管理
- 淘宝店铺描述写什么 淘宝开店描述下你的店铺这怎么写
- 螃蟹女为什么不能娶?
- 推荐一款适合程序员使用的前端框架ZUI,基于Bootstrap深度定制款
- 基于云落地SLB+Tengine实现高可用集群负载均衡 - 中
- 基于CentOS7编译安装mono环境运行C#程序
- 0785-基于CDP7.1.1的Spark3.0技术预览版本发布
- tcp,icmp,http 基于wireshark报文分析快速过滤报文时延