FlinkSQL 动态加载 UDF 实现思路( 三 )
完整代码public static void main(String[] args) throws Exception {//创建流运行时环境StreamExecutionEnvironment bsEnv = StreamExecutionEnvironment.getExecutionEnvironment();//采用BlinkPlannerEnvironmentSettings bsSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();//创建StreamTable环境StreamTableEnvironment bsTableEnv = StreamTableEnvironment.create(bsEnv, bsSettings);bsEnv.setParallelism(1);// 动态加载String funJarPath = "UDF jar 在 OSS 中所在的 URL 路径";loadJar(new URL(funJarPath));Field configurationField = StreamExecutionEnvironment.class.getDeclaredField("configuration");configurationField.setAccessible(true);Configuration o = (Configuration)configurationField.get(bsEnv);Field confData = http://kandian.youth.cn/index/Configuration.class.getDeclaredField("confData");confData.setAccessible(true);Map temp = (Map)confData.get(o);List jarList = new ArrayList<>();jarList.add(funJarPath);temp.put("pipeline.classpaths",jarList);bsTableEnv.executeSql("CREATE FUNCTION returnSelf AS 'flinksql.function.udf.ReturnSelf'");bsTableEnv.executeSql("CREATE TABLE sourceTable (" +"f_sequence INT," +"f_random INT," +"f_random_str STRING," +"ts AS localtimestamp," +"WATERMARK FOR ts AS ts" +") WITH (" +"'connector' = 'datagen'," +"'rows-per-second'='5'," +"'fields.f_sequence.kind'='sequence'," +"'fields.f_sequence.start'='1'," +"'fields.f_sequence.end'='1000'," +"'fields.f_random.min'='1'," +"'fields.f_random.max'='1000'," +"'fields.f_random_str.length'='10'" +")");bsTableEnv.executeSql("CREATE TABLE sinktable (" +"f_random_str STRING" +") WITH (" +"'connector' = 'print'" +")");bsTableEnv.executeSql("insert into sinktable select returnSelf(f_random_str) from sourceTable");}//动态加载Jarpublic static void loadJar(URL jarUrl) {//从URLClassLoader类加载器中获取类的addURL方法Method method = null;try {method = URLClassLoader.class.getDeclaredMethod("addURL", URL.class);} catch (NoSuchMethodException | SecurityException e1) {e1.printStackTrace();}// 获取方法的访问权限boolean accessible = method.isAccessible();try {//修改访问权限为可写if (accessible == false) {method.setAccessible(true);}// 获取系统类加载器URLClassLoader classLoader = (URLClassLoader) ClassLoader.getSystemClassLoader();//jar路径加入到系统url路径里method.invoke(classLoader, jarUrl);} catch (Exception e) {e.printStackTrace();} finally {method.setAccessible(accessible);}}
最后以上就是在 Flink on K8S 集群 Session 模式下 ,FlinkSQL 动态加载 Jar 的解决方案 。 由于 REST API 没有提供 -C 效果 , 自定义 Jar URL 没有分发到 TaskMananger , 导致 TaskMananger 没有进行类加载到其 JVM 中 。 通过在 Job 的 main 方法中增加动态加载方法及配置 pipeline.classpaths , 可以达到不改动 Flink 源码的情况下实现 -C 效果 。 以上方案刚实现不久 , 还不保证是否有其他未知的问题 , 如果有更好的解决方案或者该方案中存在错误或者疏漏也欢迎提出共同讨论 。
感谢您的阅读 , 如果喜欢本文欢迎关注和转发 , 本头条号将坚持持续分享IT技术知识 。 对于文章内容有其他想法或意见建议等 , 欢迎提出共同讨论共同进步 。
推荐阅读
- 动态降噪+双设备连接,华为FreeBuds Pro上手评
- 网络比15年前更慢错误更多?开发者加载了100万个网站实测
- 算法萌新如何学好动态规划(3)
- 大神已提取出一加8T的动态壁纸:Android 8.0+设备均可使用
- 关于边缘计算与网络动态加速
- PS5系统更新带来动态调整游戏机的风扇速度特性 以提升散热
- “会员配送费更贵”,美团外卖回应了
- Google Photos丰富Memories功能:新增循环显示图片的动态壁纸功能
- Firefox 83将默认启用Warp更新:大幅提升响应时间和加载速度
- 当“一兆难求”遇上动态频谱共享,爱立信与四川电信的5G实践