在分布式系统和实时数据处理中,流处理是十分重要的技术 。在数据密集型应用中,数据快速到达,转瞬即逝,需要及时进行处理,流式处理强调数据和事件的处理速度,对性能和可靠性有较高的要求 。
流处理框架包括:Storm,Spark Streaming 和 Flink 等,而 Kafka 也不甘示弱,推出了分布式流处理平台 Kafka Streams 。Faust 把 Kafka Streams 带到了 Python,并实现了抽象和优化,为数据和事件的流处理提供了一个高效便利的框架 。
![Faust - 简洁高效的 Python 流处理库](http://img.jiangsulong.com/220425/0Z0423215-0.jpg)
文章插图
Faust
简介Faust,是 robinhood 在 Github 上开源的 Python 流处理库,目前版本为 1.10.4 。
Faust 把 Kafka Streams 的概念带到了 Python,提供了包括流处理和事件处理的模式 。Faust 使用纯 Python 实现,使得开发者可以使用包括 NumPy, PyTorch, Pandas 等的库进行数据处理 。
Faust 实现简洁优雅,使用简单,性能优秀,且具有高可用、分布式、灵活性高的特点 。目前 Faust 已被用于构建高性能分布式系统和实时数据管道中 。
![Faust - 简洁高效的 Python 流处理库](http://img.jiangsulong.com/220425/0Z0421213-1.jpg)
文章插图
Faust
使用Faust 需求 Python 3.6 或以上,且需要可用的 Kafka >= 0.10 服务 。使用 pip 安装:
$ pip install -U faust
此外,一些额外的特性需要额外的依赖,如 rocksdb,可以用来作为 Faust 在生产环境中的存储,以及 redis,可以在开启缓存时使用 。![Faust - 简洁高效的 Python 流处理库](http://img.jiangsulong.com/220425/0Z0426150-2.jpg)
文章插图
Faust
安装完成以后,就可以在项目中使用了 。我们来看一个简单的例子:
import faustApp = faust.App('hello-world',broker='kafka://localhost:9092',value_serializer='raw',)greetings_topic = app.topic('greetings')@app.agent(greetings_topic)async def greet(greetings):async for greeting in greetings:print(greeting)
首先,我们使用 faust.App 创建一个 Faust 应用,并配置应用的名字、Kafka broker 和序列化方式 。然后,我们创建一个主题,这跟 Kafka 中的主题是对应的 。
Faust 利用 Python 3.6+ 的异步语法 async,定义异步函数 greet,并注册为 Faust 应用的一个 agent 。函数接收实时的数据集合 greetings,并异步地对每项数据进行输出 。
把上述代码保存为 hello_world.py,并在命令行启动工作者:
$ faust -A hello_world worker -l info
该 Faust 工作者就会从 Kafka 中实时读取数据并处理 。我们可以发送一些数据来观察效果:
$ faust -A hello_world send @greet "Hello Faust"
上述命令发送了一条消息,执行后,我们就能在工作者的命令行中看到这条消息 。Faust 还充分利用了 Python 的类型提示,能够方便地定义数据模型:
import faustclass Greeting(faust.Record):from_name: strto_name: strapp = faust.App('hello-app', broker='kafka://localhost')topic = app.topic('hello-topic', value_type=Greeting)@app.agent(topic)async def hello(greetings):async for greeting in greetings:print(f'Hello from {greeting.from_name} to {greeting.to_name}')@app.timer(interval=1.0)async def example_sender(app):await hello.send(value=https://www.isolves.com/it/cxkf/yy/Python/2021-03-16/Greeting(from_name='Faust', to_name='you'),)if __name__ == '__main__':app.main()
![Faust - 简洁高效的 Python 流处理库](http://img.jiangsulong.com/220425/0Z04215E-3.jpg)
文章插图
Faust
总结Faust 把 Kafka Streams 带到了 Python 中,实现了简洁高效的数据流处理 。其使用简单的装饰器和基于类型提示机的据模型,就能定义实现数据的处理逻辑;充分利用了 Python 的 async 异步机制,和其他高性能的异步库,实现了高效性能;其使用 Python 实现,使用开发者可以无缝对接其他数据处理和大数据相关功能 。
【Faust - 简洁高效的 Python 流处理库】
推荐阅读
- 五步掌握用VSCode进行高效Python开发
- 溧阳市埭头建设,裕安区三个确保高效推进六安茶谷建设
- 配置一个简洁高效的 Zsh
- 彻底搞懂epoll高效运行的原理
- 如何高效利用Java UI组件库,开发现代化图形用户界面
- 私域流量如何高效运营:中老年买单决策30%是情感原因
- 运用sed命令高效地删除文件的特定行
- linux c编程之高效线程池如何实现无琐化
- 知乎口碑最好的 5 个职场高效插件,安装包都给你准备好了
- 多进程编程 - 共享内存