flink sqlserver flinksql语法( 二 )
简单案例
代码
flink sql 中时间机制本质与 dataStream api 相同,只不过使用少于区别,稍加注意即可,注意指定 watermark 需要使用 sql 中 timestamp(3)类型(具体对应 java 类型可根据上面类型自行判断),设置 watermark 后可使用 ROWTIEM 字段(具体看 sql 代码),没有设置可直接使用 PROCTIME 字段
注意 : 不同的时间语义要严格对应环境配置的时间语义,否则可能出现异常
时间字段为两种,属于非用户指定字段,设置完时间语义后,根据需求使用具体的时间字段
ROWTIME : 事件时间
PROCTIME : 处理时间字段
场景 :
join : 场景与双流 join 或者 维表 join,目前 flink 支持的不是很好
topN & 去重 : 语法基本相同,row_num > 1 即 topN , 当=1 则是去重操作
topN 场景一些热搜,排名等内容
去重顾名思义,就是为了去重,去重会涉及到 retract 流(以后会详细讲)内容,会更新之前已经存在的结果
// TODO 下面代码仅供参考,具体测试根据自己时间环境来
//以下只是一些简单的案例,后面会逐步深入复杂sql和原理层面
import org.apache.flink.configuration.PipelineOptions;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.StatementSet;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.TableResult;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
/**
* author 857hub
*/
public class ClickhouseSinkApp {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
StreamTableEnvironment tEnv = StreamTableEnvironment.create(
env,
EnvironmentSettings.newInstance().
//useBlinkPlanner().
build()
);
tEnv.getConfig().getConfiguration().setString(PipelineOptions.NAME, "sql test");
// sources
String source = "CREATE TABLE source (\n" +
"`id` int,\n" +
"`name` varchar.\n" +
"`ts` timestamp(3),\n" +
// 指定watermark 允许延迟5s
"WATERMARK FOR ts AS ts - INTERVAL '5' SECOND"+
") WITH (\n" +
"'connector' = 'kafka',\n" +
"'topic' = 'test1',\n" +
"'properties.bootstrap.servers' = '172.16.100.109:9092',\n" +
"'properties.group.id' = 'xzw',\n" +
"'scan.startup.mode' = 'latest-offset',\n" +
"'format' = 'json'\n" +
")";
String source2 = "CREATE TABLE source2 (\n" +
"`id` int,\n" +
"`name` varchar,\n" +
"`ts` timestamp(3)\n" +
") WITH (\n" +
"'connector' = 'kafka',\n" +
"'topic' = 'test2',\n" +
"'properties.bootstrap.servers' = '172.16.100.109:9092',\n" +
"'properties.group.id' = 'xzw',\n" +
"'scan.startup.mode' = 'latest-offset',\n" +
"'format' = 'json'\n" +
")";
// clickhouse sink 由我自己定义,后面会对sql自定义source和sink进行讲解
String sink = "CREATE TABLE sink (\n" +
"`id` INT,\n" +
"`name` VARCHAR\n" +
") WITH (\n" +
// 需要自定义接信息参数--option
"'connector' = 'xzw_ck',\n" +
"'url' = 'jdbc:clickhouse://localhost:8123/default',\n" +
"'table-name' = 'test',\n" +
"'username' = 'default',\n" +
"'password' = '123456'\n" +
")";
// 执行 source sink sql
tEnv.executeSql(source);
tEnv.executeSql(source2);
tEnv.executeSql(sink);
/*
由于是简单使用,没有在场景应用,简单介绍一下区别,可以根据们不同的区别在自己项目中使用
left json : 无论是否join上都返回左表的数据
inner join : 只有join上才会返回匹配后的结果
full outer join : 两边的数据都会返回,无论是否join上,没有的则为null
interval join : 基于时间范围内的join,在指定的时间范围内返回join上的数据
*/
String joinSql = "select * from source1 s1" +
【flink sqlserver flinksql语法】"left join source2 s2" +
// 内连接
//"inner join source2" || "join source2"
// 全连接
//"full outer join source2"
// 时间范围join
//"s1.ts >= s2.ts AND s1.ts < s2.ts + INTERVAL '10' MINUTE" +
" on s1.id =s2.id "
;
Table joinTable = tEnv.sqlQuery(joinSql);
// 分组排序,取topN,如果要是去重 rnum=1即可实现去重操作
String insertSql = "insert into sink select id,name from(" +
"select *," +
"row_number() over(partition by id order by ts) as rnum " +
"from "+joinTable+" where rnum < 5 " +
")";
// add insert sql
TableResult tableResult = executeSql(tEnv, "insert into sink select * from source", "*",insertSql);
推荐阅读
- SQLSERVER如何分离和附加数据库
- SQLServer时间戳的误解,与时间没有关系
- mysql数据库数据备份和恢复方法 sqlserver数据库备份与恢复
- 大数据flink实时计算方案 大数据实时计算引擎flink
- Flink中的State概念及其扩容算法 state是什么意思
- Javascript怎样访问Sqlserver数据库