Python 最强大的任务调度框架 Celery!( 八 )


 

  •  
    serializer:指定序列化的方法;
     
  •  
    当然 app.task 还有很多不常用的参数 , 这里就不说了 , 有兴趣可以去查看官网或源码 , 我们演示一下几个常用的参数:
    # celery_demo/tasks/task1.py
    from app import app

     
    @app.task(name="你好")
    def add(x, y):
    return x + y

    @app.task(name="我不好", bind=True)
    def sub(self, x, y):
    """
    如果 bind=True , 则需要多指定一个 self
    这个 self 就是对应的任务工厂
    """
    # self.request 是一个 celery.task.Context 对象
    # 获取它的属性字典 , 即可拿到该任务的所有属性
    print(self.request.__dict__)
    return x - y

    其它代码不变 , 我们重新启动 worker:
    Python 最强大的任务调度框架 Celery!

    文章插图
    然后创建任务发送至队列 , 再由 worker 取出执行:
    >>> from tasks.task1 import add, sub
    >>> add.delay(111, 222).get()
    333
    >>> sub.delay(111, 222).get()
    -111

    执行没有问题 , 然后看看 worker 的输出:
    Python 最强大的任务调度框架 Celery!

    文章插图
    创建任务工厂时 , 如果指定了 bind=True , 那么执行任务时会将任务工厂本身作为第一个参数传过去 。任务工厂本质上就是 Task 实例对象 , 调用它的 delay 方法即可创建任务 。
    所以如果我们在 sub 内部继续调用 self.delay(11, 22) , 会有什么后果呢?没错 , worker 会进入无限递归 。因为执行任务的时候 , 在任务的内部又创建了任务 , 所以会死循环下去 。
    当然 self 还有很多其它属性和方法 , 具体有哪些可以通过 Task 这个类来查看 。这里面比较重要的是 self.request , 它包含了某个具体任务的相关信息 , 而且信息非常多 。
    Python 最强大的任务调度框架 Celery!

    文章插图
    比如当前传递的参数是什么 , 就可以通过 self.request 拿到 。当然啦 , self.request 是一个 Context 对象 , 因为不同任务获取 self.request 的结果肯定是不同的 , 但 self(任务工厂)却只有一个 , 所以要基于 Context 进行隔离 。
    我们可以通过 __dict__ 拿到 Context 对象的属性字典 , 然后再进行操作 。
    最后再来说一说 @app.task 里面的 base 参数 。
    # celery_demo/tasks/task1.py
    from celery import app
    from app import Task

     
    class MyTask(Task):
    """
    自定义一个类 , 继承自celery.Task
    exc: 失败时的错误的类型;
    task_id: 任务的id;
    args: 任务函数的位置参数;
    kwargs: 任务函数的关键字参数;
    einfo: 失败时的异常详细信息;
    retval: 任务成功执行的返回值;
    """
    def on_failure(self, exc, task_id, args, kwargs, einfo):
    """任务失败时执行"""

    def on_success(self, retval, task_id, args, kwargs):
    """任务成功时执行"""
    print("任务执行成功")

    def on_retry(self, exc, task_id, args, kwargs, einfo):
    """任务重试时执行"""

    # 使用 @app.task 的时候 , 指定 base 即可
    # 然后任务在执行的时候 , 会触发 MyTask 里面的回调函数
    @app.task(name="地灵殿", base=MyTask)
    def add(x, y):
    print("加法计算")
    return x + y

    重新启动 worker , 然后创建任务 。
    Python 最强大的任务调度框架 Celery!

    文章插图
    指定了 base , 任务在执行的时候会根据执行状态的不同 , 触发 MyTask 里面的不同方法 。


    推荐阅读