ksqlDB是一个专门为流式处理应用程序构建的事件流数据库 。它提供了一个基于 SQL 的 API 来查询和处理 Kafka 中的数据 。ksqlDB 的特性包括过滤、转换和连接来自流和表的数据,通过聚合事件创建物化视图,等等 。
要使用 ksqlDB 中的数据,我们首先需要声明一个 schema:
CREATE STREAM CARPARK_EVENTS (NAMEVARCHAR,SPACEVARCHAR,OCCUPIED BOOLEAN)WITH (KAFKA_TOPIC='carpark',VALUE_FORMAT='JSON');
ksqlDB 是一个集群应用程序,这个初始声明工作可以在启动时完成,也可以根据需要由客户端来完成 。完成这些之后,客户端就可以订阅来自原始主题的变更流(带有过滤器) 。例如,想要获得一个停车场有空位的通知,可以运行以下命令:
SELECT TIMESTAMPTOSTRING(ROWTIME,'yyyy-MM-dd HH:mm:ss') AS EVENT_TS,SPACEFROM CARPARK_EVENTS WHERE NAME='Sheffield NCP'AND OCCUPIED=falseEMIT CHANGES;
与 SQL 查询不同,这个查询是一种持续查询(使用 EMIT CHANGES 子句指定) 。持续查询,即推送(Push)查询,将在事件发生时(现在和将来)持续地返回新的匹配项,直到事件终止为止 。ksqlDB 还支拉取(Pull)查询(我们将在下面探讨),这些查询的行为与常规 RDBMS 的查询差不多,返回某个时间点的值 。因此,ksqlDB 既支持流也支持静态状态,在实际当中,大多数应用程序需要根据正在执行的操作来选择这两种方式 。
ksqlDB 提供了一个全面的 REST API,通过 curl 进行上面的 SQL 调用看起来像这样:
curl --http2 'http://localhost:8088/query-stream'--data-raw '{"sql":"SELECT TIMESTAMPTOSTRING(ROWTIME,'''yyyy-MM-dd HH:mm:ss''') AS EVENT_TS, SPACE FROM CARPARK_EVENTS WHERE NAME='''Sheffield NCP''' and OCCUPIED=false EMIT CHANGES;"}'
这个调用产生一个来自服务器端的响应流(带有头部信息),然后源主题有匹配的事件时,这些事件被发送到客户端:
{"queryId":"383894a7-05ee-4ec8-bb3b-c5ad39811539","columnNames":["EVENT_TS","SPACE"],"columnTypes":["STRING","STRING"]}…["2020-08-05 16:02:33","A42"]………["2020-08-05 16:07:31","D72"]…
我们也可以使用 ksqlDB 来定义和填充新的数据流 。在 SELECT 语句前加上 CREATE STREAM streamname AS,就可以将持续查询的输出路由到 Kafka 主题 。因此,我们可以使用 ksqlDB 转换、连接、过滤发送给 Kafka 的事件 。ksqlDB 支持将表作为一等对象类型,我们可以用它来增强接收到的有关停车场信息的事件:
CREATE STREAM CARPARKS ASSELECT E.NAME AS NAME, E.SPACE,R.LOCATION, R.CAPACITY,E.OCCUPIED,CASEWHEN OCCUPIED=TRUE THEN 1ELSE -1END AS OCCUPIED_INDFROMCARPARK_EVENTS EINNER JOINCARPARK_REFERENCE RON E.NAME = R.NAME;
我们仍然使用 CASE 语句来创建可用车位的计数 。上面的 CREATE STREAM 填充了一个 Kafka 主题,看起来像这样:
文章插图
最后,让我们看看如何在 ksqlDB 中创建有状态聚合并在客户端查询 。要创建物化视图,需要运行包含聚合函数的 SQL:
CREATE TABLE CARPARK_SPACES ASSELECT NAME,SUM(OCCUPIED_IND) AS OCCUPIED_SPACESFROM CARPARKSGROUP BY NAME;
这个状态是在分布式 ksqlDB 节点中维护的,可以使用 REST API 查询:curl --http2 'http://localhost:8088/query-stream'--data-raw '{"sql":"SELECT OCCUPIED_SPACES FROM CARPARK_SPACES WHERE NAME='''Birmingham NCP''';"}'
与上面看到的响应流不同,针对状态的查询(称为“拉取查询”,而不是“推送查询”)是立即返回的,然后退出:{"queryId":null,"columnNames":["OCCUPIED_SPACES"],"columnTypes":["INTEGER"]}[30]
如果应用程序希望获得最新的数字,它们可以重新发出查询,结果可能会发生变化,也可能不会:curl --http2 'http://localhost:8088/query-stream'--data-raw '{"sql":"SELECT OCCUPIED_SPACES FROM CARPARK_SPACES WHERE NAME='''Birmingham NCP''';"}'{"queryId":null,"columnNames":["OCCUPIED_SPACES"],"columnTypes":["INTEGER"]}[29]
ksqlDB 官方提供了一个Java客户端,社区提供了Python和Go客户端 。与其他系统集成将 Kafka 作为异步消息传递的高可伸缩性和持久性代理的一个好处是在应用程序之间交换的数据也可以用于驱动流式处理(如上所述),或直接送入依赖的系统 。
推荐阅读
- 运维开发之paramiko远程执行命令实时返回
- 通过API网关实现微服务管控-限流,熔断和降级
- 实时音视频面视必备:快速掌握11个视频技术相关的基础概念
- 轻松上手 Spring Boot & Kafka 实战
- 给大家分享一款高性能api网关
- API敏捷开发框架
- linux网络编程常见API详解
- 令程序员头疼的性能问题,这一款性能分析工具帮你实时监控
- 基于OGG 实现Oracle到Kafka增量数据实时同步
- 怎样设计安全的GraphQL API?