自定义任务流
有时候我们也可以将执行的多个任务 , 划分到一个组中 。
# celery_demo/tasks/task1.py
from app import app
@app.task()
def add(x, y):
print("加法计算")
return x + y
@app.task()
def sub(x, y):
print("减法计算")
return x - y
@app.task()
def mul(x, y):
print("乘法计算")
return x * y
@app.task()
def div(x, y):
print("除法计算")
return x // y
老规矩 , 重启 worker , 因为我们修改了任务工厂 。
然后来导入它们 , 创建任务 , 并将这些任务划分到一个组中 。
>>> from tasks.task1 import add, sub, mul, div
>>> from celery import group
# 调用 signature 方法 , 得到 signature 对象
# 此时 t1.delay() 和 add.delay(2, 3) 是等价的
>>> t1 = add.signature(args=(2, 3))
>>> t2 = sub.signature(args=(2, 3))
>>> t3 = mul.signature(args=(2, 3))
>>> t4 = div.signature(args=(4, 2))
# 但是变成 signature 对象之后 ,
# 我们可以将其放到一个组里面
>>> gp = group(t1, t2, t3, t4)
# 执行组任务
# 返回 celery.result.GroupResult 对象
>>> res = gp()
# 每个组也有一个唯一 id
>>> print("组id:", res.id)
组id: 65f32cc4-b8ce-4bf8-916b-f5cc359a901a
# 调用 get 方法也会阻塞 , 知道组里面任务全部完成
>>> print("组结果:", res.get())
组结果: [5, -1, 6, 2]
>>>
可以看到整个组也是有唯一 id 的 , 另外 signature 也可以写成 subtask 或者 s , 在源码里面这几个是等价的 。
文章插图
我们观察一下 worker 的输出 , 任务是并发执行的 , 所以哪个先完成不好说 。但是调用组的 get 方法时 , 里面的返回值顺序一定和任务添加时候的顺序保持一致 。
除此之外 , celery 还支持将多个任务像链子一样串起来 , 第一个任务的输出会作为第二个任务的输入 , 传递给下一个任务的第一个参数 。
# celery_demo/tasks/task1.py
from app import app
@app.task
def task1():
l = []
return l
@app.task
# task1 的返回值会传递给这里的 task1_return
def task2(task1_return, value):
task1_return.append(value)
return task1_return
@app.task
# task2 的返回值会传递给这里的 task2_return
def task3(task2_return, num):
return [i + num for i in task2_return]
@app.task
# task3 的返回值会传递给这里的 task3_return
def task4(task3_return):
return sum(task3_return)
然后我们看怎么将这些任务像链子一样串起来 。
>>> from tasks.task1 import *
>>> from celery import chain
# 将多个 signature 对象进行与运算
# 当然内部肯定重写了 __or__ 这个魔法方法
>>> my_chain = chain(
... task1.s() | task2.s(123) | task3.s(5) | task4.s())
# 执行任务链
>>> res = my_chain()
# 获取返回值
>>> print(res.get())
128
这种链式处理的场景非常常见 , 比如 MapReduce 。
celery 实现定时任务
既然是定时任务 , 那么就意味着 worker 要后台启动 , 否则一旦远程连接断开 , 就停掉了 。因此 celery 是支持我们后台启动的 , 并且可以启动多个 。
# 启动 worker
celery multi start w1 -A app -l info
# 可以同时启动多个
celery multi start w2 w3 -A app -l info
# 以上我们就启动了 3 个 worker
# 如果想停止的话
celery multi stop w1 w2 w3 -A app -l info
但是注意 , 这种启动方式在 windows 上面不支持 , 因为 celery 会默认创建两个目录 , 分别是 /var/log/celery 和 /var/run/celery , 显然这是类 Unix 系统的目录结构 。
推荐阅读
- 龙之家族|《龙之家族》这1细节,暗示了丹妮莉丝,才是最伟大的坦格利安?
- 小米|37岁财富自由!雷军接受央视采访:我要成伟大的人 小米要影响世界、造车是被逼的
- 中国最大的岛屿是日本岛 中国最大的岛屿是什么岛-张雪峰
- 盛明兰|重温《知否》,王老太师一生最大的败笔,就是娶了偏心的王老太太
- 韩剧|《当你沉睡时》:当噩梦照进现实,救赎和信任才是最强的陪伴
- 赵鹏翔|“李易峰”事件最大的受益者是谁?
- 罚罪|《罚罪》:大王现身,是全剧最大的反转吗,网友:不是的
- |能力,让一个人迅速脱颖而出,如何发挥出你的最强技能?
- 华为|喷问界M7不好看的人品位比较Low!余承东:华为做伟大的设计
- 最大的恐龙有多大 最大的恐龙排行榜前十名