Python 最强大的任务调度框架 Celery!( 九 )


自定义任务流
有时候我们也可以将执行的多个任务 , 划分到一个组中 。
# celery_demo/tasks/task1.py
from app import app

 
@app.task()
def add(x, y):
print("加法计算")
return x + y

@app.task()
def sub(x, y):
print("减法计算")
return x - y

@app.task()
def mul(x, y):
print("乘法计算")
return x * y

@app.task()
def div(x, y):
print("除法计算")
return x // y

老规矩 , 重启 worker , 因为我们修改了任务工厂 。
然后来导入它们 , 创建任务 , 并将这些任务划分到一个组中 。
>>> from tasks.task1 import add, sub, mul, div
>>> from celery import group
# 调用 signature 方法 , 得到 signature 对象
# 此时 t1.delay() 和 add.delay(2, 3) 是等价的
>>> t1 = add.signature(args=(2, 3))
>>> t2 = sub.signature(args=(2, 3))
>>> t3 = mul.signature(args=(2, 3))
>>> t4 = div.signature(args=(4, 2))
# 但是变成 signature 对象之后 , 
# 我们可以将其放到一个组里面
>>> gp = group(t1, t2, t3, t4)
# 执行组任务
# 返回 celery.result.GroupResult 对象
>>> res = gp()
# 每个组也有一个唯一 id
>>> print("组id:", res.id)
组id: 65f32cc4-b8ce-4bf8-916b-f5cc359a901a

 
# 调用 get 方法也会阻塞 , 知道组里面任务全部完成
>>> print("组结果:", res.get())
组结果: [5, -1, 6, 2]
>>>

可以看到整个组也是有唯一 id 的 , 另外 signature 也可以写成 subtask 或者 s , 在源码里面这几个是等价的 。

Python 最强大的任务调度框架 Celery!

文章插图
我们观察一下 worker 的输出 , 任务是并发执行的 , 所以哪个先完成不好说 。但是调用组的 get 方法时 , 里面的返回值顺序一定和任务添加时候的顺序保持一致 。
除此之外 , celery 还支持将多个任务像链子一样串起来 , 第一个任务的输出会作为第二个任务的输入 , 传递给下一个任务的第一个参数 。
# celery_demo/tasks/task1.py
from app import app

 
@app.task
def task1():
l = []
return l

@app.task
# task1 的返回值会传递给这里的 task1_return
def task2(task1_return, value):
task1_return.append(value)
return task1_return

@app.task
# task2 的返回值会传递给这里的 task2_return
def task3(task2_return, num):
return [i + num for i in task2_return]

@app.task
# task3 的返回值会传递给这里的 task3_return
def task4(task3_return):
return sum(task3_return)

然后我们看怎么将这些任务像链子一样串起来 。
>>> from tasks.task1 import *
>>> from celery import chain
# 将多个 signature 对象进行与运算
# 当然内部肯定重写了 __or__ 这个魔法方法
>>> my_chain = chain(
... task1.s() | task2.s(123) | task3.s(5) | task4.s())
# 执行任务链
>>> res = my_chain()
# 获取返回值
>>> print(res.get())
128

这种链式处理的场景非常常见 , 比如 MapReduce 。
celery 实现定时任务
既然是定时任务 , 那么就意味着 worker 要后台启动 , 否则一旦远程连接断开 , 就停掉了 。因此 celery 是支持我们后台启动的 , 并且可以启动多个 。
# 启动 worker
celery multi start w1 -A app -l info
# 可以同时启动多个
celery multi start w2 w3 -A app -l info

 
# 以上我们就启动了 3 个 worker
# 如果想停止的话
celery multi stop w1 w2 w3 -A app -l info

但是注意 , 这种启动方式在 windows 上面不支持 , 因为 celery 会默认创建两个目录 , 分别是 /var/log/celery 和 /var/run/celery , 显然这是类 Unix 系统的目录结构 。
Python 最强大的任务调度框架 Celery!


推荐阅读