Python 最强大的任务调度框架 Celery!( 三 )


文章插图
以上我们就启动了一个 worker 并成功消费了队列中的任务 , 并且还从 Redis 里面拿到了执行信息 。当然啦 , 如果只能通过查询 backend 才能拿到信息的话 , 那 celery 就太不智能了 。我们也可以直接从程序中获取 。
直接查询任务执行信息
Redis(backend)里面存储了很多关于任务的信息 , 这些信息我们可以直接在程序中获取 。
from app import task
 
res = task.delay("古明地觉", 17)
print(type(res))
"""

"""
# 直接打印 , 显示任务的 id
print(res)
"""
4bd48a6d-1f0e-45d6-a225-6884067253c3
"""
# 获取状态, 显然此刻没有执行完
# 因此结果是PENDING, 表示等待状态
print(res.status)
"""
PENDING
"""
# 获取 id , 两种方式均可
print(res.task_id)
print(res.id)
"""
4bd48a6d-1f0e-45d6-a225-6884067253c3
4bd48a6d-1f0e-45d6-a225-6884067253c3
"""
# 获取任务执行结束时的时间
# 任务还没有结束, 所以返回None
print(res.date_done)
"""
None
"""
# 获取任务的返回值, 可以通过 result 或者 get()
# 注意: 如果是 result, 那么任务还没有执行完的话会直接返回 None
# 如果是 get(), 那么会阻塞直到任务完成
print(res.result)
print(res.get())
"""
None
name is 古明地觉, age is 17
"""
# 再次查看状态和执行结束时的时间
# 发现 status 变成SUCCESS
# date_done 变成了执行结束时的时间
print(res.status)
# 但显示的是 UTC 时间
print(res.date_done)
"""
SUCCESS
2022-09-08 06:40:34.525492
"""

另外我们说结果需要存储在 backend 中 , 如果没有配置 backend , 那么获取结果的时候会报错 。至于 backend , 因为它是存储结果的 , 所以一般会保存在数据库中 , 因为要持久化 。我这里为了方便 , 就还是保存在 Redis 中 。

Python 最强大的任务调度框架 Celery!

文章插图
celery.result.AsyncResult 对象
调用完任务工厂的 delay 方法之后 , 会创建一个任务并发送至队列 , 同时返回一个 AsyncResult 对象 , 基于此对象我们可以拿到任务执行时的所有信息 。但是 AsyncResult 对象我们也可以手动构造 , 举个例子:
import time
# 我们不光要导入 task, 还要导入里面的 app
from app import app, task
# 导入 AsyncResult 这个类
from celery.result import AsyncResult

 
# 发送任务到队列当中
res = task.delay("古明地觉", 17)

# 传入任务的 id 和 app, 创建 AsyncResult 对象
async_result = AsyncResult(res.id, app=app)

# 此时的这个 res 和 async_result 之间是等价的
# 两者都是 AsyncResult 对象, 它们所拥有的方法也是一样的
# 下面用谁都可以
while True:
# 等价于async_result.state == "SUCCESS"
if async_result.successful():
print(async_result.get())
break
# 等价于async_result.state == "FAILURE"
elif async_result.failed():
print("任务执行失败")
elif async_result.status == "PENDING":
print("任务正在被执行")
elif async_result.status == "RETRY":
print("任务执行异常正在重试")
elif async_result.status == "REJECTED":
print("任务被拒绝接收")
elif async_result.status == "REVOKED":
print("任务被取消")
else:
print("其它的一些状态")
time.sleep(0.8)

"""
任务正在被执行
任务正在被执行
任务正在被执行
任务正在被执行
name is 古明地觉, age is 17
"""

以上就是任务可能出现的一些状态 , 通过轮询的方式 , 我们也可以查看任务是否已经执行完毕 。当然 AsyncResult 还有一些别的方法 , 我们来看一下:
from app import task
 
res = task.delay("古明地觉", 17)
# 1. ready():查看任务状态 , 返回布尔值 。


推荐阅读