flink sqlserver flinksql语法
Flink sql 是什么
sql 的诞生就是为了简化我们对数据开发,可以使用少量的 sql 代码,帮助我完成对数据的查询,分析等功能
声明式 & 易于理解
对于用户只需要表达我想要什么,具体处理逻辑交给框架,系统处理,用户无需关心,对于一些非专业的开发人员有了解 sql,并且 sql 相对我们学习 java,c等语言更简单,学习成本更低,如果跨团队,或者非大数据开发人员,也可以通过 sql 来进行 flink 任务的开发
文章插图
自动调优
查询优化器,会对我们编写的 sql 进行优化,生成效率更好的执行计划,所以用户不需要了解底层细节,即高效的获取结果
稳定
sql 语义发展几十年是一个很稳定的语言,少有变动,当我们引擎的升级,甚至替换成另一个引擎,都可以做到兼容地,平滑地升级,无需更改我们的已经编写好的 sql 代码
流批统一的基础
对于 flink 通过 sql 的表达式,来完成流批的统一,一套 sql 代码,既可以跑流任务,也可以跑批任务,减少我们开发的成本
Flink sql 使用
数据类型
-- 字符串类型# char类型CHARCHAR(n) -- n在 1 和 2147483647 之间未设置n=1# 字符串类型VARCHARVARCHAR(n)-- n在 1 和 2147483647 之间未设置n=1STRING-- 等于最大的varchar(max)# 二进制类型BINARYBINARY(n) -- 范围同上# 可变长度二进制类型VARBINARYVARBINARY(n)-- 类似于stringBYTES-- 数字类型# 带有精度的十进制数字类型-- 类似于java中的DECIMALDECIMAL(p)DECIMAL(p, s)DECDEC(p)DEC(p, s)NUMERICNUMERIC(p)NUMERIC(p, s)# 带符号TINYINT-- -128 to 127SMALLINT -- -32768 to 32767# 不带符号的INT-- 2147483,648 to 2147483647INTEGERBIGINT---9223372036854775808 to 9223372036854775807# 带小数的FLOATDOUBLE-- 时间类型#日期DATE-- 2020-10-12#时间TIMETIME(p) -- 10:10:12.p不指定p,p= 0#时间戳TIMESTAMPTIMESTAMP(p) -- 2020-12-12 12:10:11.p-- 其他类型#ARRAY<t>t ARRAY#map类型MAP<kt, vt>-- 对应java的类型ClassTypejava.lang.StringSTRINGjava.lang.BooleanBOOLEANbooleanBOOLEAN NOT NULLjava.lang.ByteTINYINTbyteTINYINT NOT NULLjava.lang.ShortSMALLINTshortSMALLINT NOT NULLjava.lang.IntegerINTintINT NOT NULLjava.lang.LongBIGINTlongBIGINT NOT NULLjava.lang.FloatFLOATfloatFLOAT NOT NULLjava.lang.DoubleDOUBLEdoubleDOUBLE NOT NULLjava.sql.DateDATEjava.time.LocalDateDATEjava.sql.TimeTIME(0)java.time.LocalTimeTIME(9)java.sql.TimestampTIMESTAMP(9)java.time.LocalDateTimeTIMESTAMP(9)java.time.OffsetDateTimeTIMESTAMP(9) WITH TIME ZONEjava.time.InstantTIMESTAMP(9) WITH LOCAL TIME ZONEjava.time.DurationINVERVAL SECOND(9)java.time.Period INTERVAL YEAR(4) TO MONTHbyte[]BYTEST[]ARRAY<T>java.util.Map<K, V>MAP<K, V>
系统函数 & 自定义函数
/*下面是1.12版本的系统内置的函数,具体我们可以到官网,根据需求使用即可 functions/systemFunctions.html*/// TODO 主要介绍自定义函数/* udf 和 udaf 需要定义eval方法,实现自己的逻辑,具体系统会调用对应的方法 udf : 传入一个值/多个/或者不传入,返回一个新的值,可以重载该方法,具体会根据 传入的参数调用对应eval烦恼歌发 类似`map`算子,作用于sql udaf : 自定义聚合函数,根据自己的逻辑定义累加器 udtf : 用作与表中,可返回一个或多个值,*/import org.apache.flink.api.common.typeinfo.BasicTypeInfo;import org.apache.flink.api.common.typeinfo.TypeInformation;import org.apache.flink.api.java.typeutils.RowTypeInfo;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import org.apache.flink.table.api.EnvironmentSettings;import org.apache.flink.table.api.java.StreamTableEnvironment;import org.apache.flink.table.functions.AggregateFunction;import org.apache.flink.table.functions.ScalarFunction;import org.apache.flink.table.functions.TableFunction;import org.apache.flink.types.Row;import java.sql.SQLException;public class UDFDemo {public static void main(String[] args) {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();StreamTableEnvironment tEnv = StreamTableEnvironment.create(env,EnvironmentSettings.newInstance().build());// 注册函数tEnv.registerFunction("customFunc1", new CustomUDF());tEnv.registerFunction("customFunc2", new CustomUDAF());tEnv.registerFunction("customFunc3", new CustomUDTF());}static class Acc {int result;public Integer gerResult() {return result;}public Acc merge(Acc acc) {result = acc.gerResult() + result;return this;}public void incr() {result++;}}static class CustomUDF extends ScalarFunction {// UDF 需要定义该方法public int eval(String str) {int hc = 0;for (char c : str.toUpperCase().toCharArray()) {hc = hashCode() >> c;}hc = hc - 1 - str.length();hc = hc >> 7;return hc;}}static class CustomUDTF extends TableFunction<Row> {// udtf 需要定义该方法,在该方法实现逻辑public void eval(String str) throws SQLException {if (str != null) {for (String s : str.split(",")) {Row row = new Row(2);row.setField(0, s);row.setField(1, 1);collect(row);}}}Overridepublic TypeInformation<Row> getResultType() {return new RowTypeInfo(BasicTypeInfo.STRING_TYPE_INFO,BasicTypeInfo.INT_TYPE_INFO);}}static class CustomUDAF extends AggregateFunction<Integer, Acc> {Overridepublic Integer getValue(Acc accumulator) {return accumulator.gerResult();}Overridepublic Acc createAccumulator() {return new Acc();}// 累加public void accumulate(Acc acc,String input){if("*".equals(input)){return;}acc.incr();}public void accumulate(Acc acc){acc.incr();}}}
推荐阅读
- SQLSERVER如何分离和附加数据库
- SQLServer时间戳的误解,与时间没有关系
- mysql数据库数据备份和恢复方法 sqlserver数据库备份与恢复
- 大数据flink实时计算方案 大数据实时计算引擎flink
- Flink中的State概念及其扩容算法 state是什么意思
- Javascript怎样访问Sqlserver数据库