基于XML描述的可编程函数式ETL实现( 三 )

  • 函数编写者应该保证函数体内是线程安全的;
  • UDF 实现如下:
    public abstract class UDF { /** * 是否支持该组参数类型,不支持抛出UnsupportedTypeException异常 。默认返回 true */ public void support(Class<?>... paramsClass)throws UnsupportedTypeException; /*** 该 UDF 返回值类型,用于校验嵌套函数类型是否匹配 。可返回简单类型,map,array,record 等类型.默认返回 String 类型*/ public Class<?> returnType();/*** UDF 执行函数,当输入不符合预期时,向外抛出异常* @param params 函数的输入实参* @return 函数输出结果,简单类型或者复杂类型,支持简单类型,map,array,record 类型*/public abstract Object eval(Object... params);}一个判断是否包含子串的UDF 写法:
    基于XML描述的可编程函数式ETL实现

    文章插图
     
    所有的UDF都通过一个核心注册类(这点类似 Hive 的FunctionRegistry)
    public final class UdfRegistors { /** * UDF 函数映射 */static final Map<String, UDF> UDF_CACHED = new HashMap<String, UDF>(); static {UDF_CACHED.put("copy", new CopyUDF()); // 复制一个变量的值 UDF_CACHED.put("eq", new EqUDF()); // 判断两个变量是否相等 UDF_CACHED.put("yn", new YnUDF()); // 根据输入true,false 转换为 Y、NUDF_CACHED.put("null", new NullUDF()); // 判断变量是否为null// add udf methodUDF_CACHED.put("location", new LocationUDF()); // 获得手机号码的归属地 UDF_CACHED.put("nation_code", new NationCodeUDF()); // 根据国家名称获取国家代码 UDF_CACHED.put("province_code", new ProvinceCodeUDF()); //根据省名称获取省代码 UDF_CACHED.put("city_code", new CityCodeUDF()); // 根据城市名称获取城市代码 UDF_CACHED.put("phone_num", new PhoneNumUDF()); // 校验是否是手机号或者固话UDF_CACHED.put("number_format", new NumberFormatUDF()); //校验是否可以转化成数字}/*** 添加一个UDF函数 * @param key UDF 函数 * @param value UDF 函数 eval 应线程安全 * @return */ public static boolean addUdf(String key, UDF value) { return UDF_CACHED.put(Optional.of(key).map((it)->it.toLowerCase()).get(), value) != null; } /** * 获得内置的 udf 函数 */ public static UDF getUdf(String udfName) { return UDF_CACHED.get(udfName.toLowerCase()); }}UDF 函数注册时期:
    1. 可在编译期绑定内置的 UDF 函数;
    2. 可在系统启动时配置自加载的 UDF 函数;
    3. 可在运行期动态注入UDF 函数;
    五、数据测试工具数据对接过程,面对数据是否能转换为目标结果常常无从所知 。基于XML 控制文件的数据解析,可实现一个测试工具 。该工具通过上传数据文件和上传 XML 控制文件,可对数据文件随机的读取行进行匹配测试,只要数据列和目标 XML文件能通过列匹配测试,则数据可通过 ETL 解析清洗 。否则继续修改 XML 控制文件,直到顺利通过匹配 。
    基于XML描述的可编程函数式ETL实现

    文章插图
     
    六、FlumeOnYarn 架构和分布式部署本架构适合以文件作为数据对接的方案,另一方面,通过扩展 Flume 即可实现拿来主义 。Flume 内部实现对 Channel 的 Transaction,对于每个以文件构造的 Event 对象是原子操作,要么全部成功,要么失败 。flume依赖事务来保证event的可靠性 。Flume 默认没有分布式实现,因此开发了 FlumeOnYarn 的架构,用于支持 Flume 的分布式部署 。
    FlumeOnYarn优势:
    1. 无需每个节点安装 Flume,可一键启动和停止;
    2. 配置文件在客户端节点修改,自动复制到 Yarn 上各实例,无需每个节点修改;
    3. 基于 CDH或HDP的发行版,即使实现了 Web 可视化化的配置和分布式部署,但是对于 Flume 只能实现单配置文件实例,无法实现多配置实例;
    4. 集群的规模可以根据数据量大小进行实时的调整(增减节点),实现弹性处理 。通过命令或者 api 即可控制(CDH 等需要在页面添加 host,繁琐且不易动态调整);
    5. 多个租户或者同一租户多个处理实例互不影响,且能隔离(Yarn Container);
    FlumeOnYarn 架构
    基于XML描述的可编程函数式ETL实现

    文章插图
     
    上图所示,提交FlumeOnYarn 需要客户端,该客户端没有太多和Flume安装包结构特殊的地方,只是在 lib 下添加了 flume-yarn 的架构支持和 bin 下 flume-on-yarn 的启动脚本 。
    Flume OnYarn 客户端程序
    基于XML描述的可编程函数式ETL实现

    文章插图
     
    通过 bin/flume-on-yarn 即可提交 FlumeOnYarn Application 集群 。如下的命令即可一次性申请多个 Yarn 资源节点,实现一键部署:


    推荐阅读