大数据flink实时计算方案 大数据实时计算引擎flink
一、背景
Flink 由于阿里在国内的助推,火爆程度可以想象,且目前Flink 有非常明显的趋势是往SQL 方向进行的 。很多大厂已经实现了Flink SQL化,那我们怎么去实现一个流式计算平台呢?
文章插图
二、Flink SQL 初探以及代码实现
连接kafka 对数据进行处理写入mysql
package org.example;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.TableEnvironment;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
public class SqlDemo {
public static void main(String[] args) throws Exception {
//创建执行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
EnvironmentSettings settings = EnvironmentSettings.newInstance()
.useBlinkPlanner()
.inStreamingMode()
.build();
TableEnvironment tableEnv = StreamTableEnvironment.create(env, settings);
//把kafka 中的topic映射成一个输入临时表
tableEnv.executeSql(
"create table sensor_source (id string,name string) with (" +
" 'connector' = 'kafka'," +
"'topic' = 'test_info_test'," +
"'properties.bootstrap.servers' = 'localhost:9092'," +
"'properties.group.id' = 'testGroup'," +
"'scan.startup.mode' = 'earliest-offset'," +
"'format' = 'json')"
);
//把mysql 中的表映射成一个输出临时表
String sql = "CREATE TABLE print_table (\n" +
" id STRING,\n" +
" name STRING\n" +
") WITH (\n" +
" 'connector' = 'print'\n" +
")";
String mysql_sql = "CREATE TABLE mysql_sink (\n" +
"id string,\n" +
"name string\n" +
" ) WITH (\n" +
"'connector' = 'jdbc',\n" +
"'url' = 'jdbc:mysql://ip:8081/kafka?serverTimezone=UTC',\n" +
"'table-name' = 'test_info',\n" +
"'username' = 'kafka',\n" +
"'password' = 'Bonc123'\n" +
" )";
String kafka_sink_sql=
"create table kafka_sink (id string,name string) with (" +
" 'connector' = 'kafka'," +
"'topic' = 'test_info_2'," +
"'properties.bootstrap.servers' = 'localhost:9092'," +
"'format' = 'json')";
tableEnv.executeSql(mysql_sql);
//tableEnv.executeSql(kafka_sink_sql);
//tableEnv.executeSql(sql);
//插入数据的sql语句
//tableEnv.executeSql("insert into print_table select * from sensor_source");
tableEnv.executeSql("insert into mysql_sink select * from sensor_source");
//tableEnv.executeSql("insert into kafka_sink select * from sensor_source");
}
}
运行之后mysql里面数据就有了
文章插图
三、Flink 实时计算平台
依据上面的代码,我们可以抽象出一层Flink 实时计算平台 。
文章插图
文章插图
文章来源于诸葛子房
【大数据flink实时计算方案 大数据实时计算引擎flink】
推荐阅读
- 嵩县大坪乡派出所所长?嵩县大坪乡派出所
- ipad12.9寸与a4纸大小 2020 ipad12.9寸与a4纸大小的对比
- 北芪党参炖乳鸽
- 阿胶口服液功效
- 蒲公英泡水能治咽炎吗?
- 感冒喝什么汤谱大全
- 多愁善感的人注定不快乐?大事小事都喜欢纠结的星座
- 米饭|什么精华油最好用 十大最好用的护肤精华油排行
- 刘海|申敏儿&孔孝真合体大片惊艳!韩系讨好审美变得大气耐看
- 酱油|大牌护肤品被曝含致癌雌激素?网友:贫穷又一次救了我