gocraft/work工作队列源码简介( 二 )

 
workpool注册任务ConsumeJob后,该任务ConsumeJob会被赋值给worker.jobTypes[job.Name].GenericHandler, 他的反射类型被赋值给了jobType.DynamicHandler 。如果该消费任务使用了上下文参数 。
 
创建消费任务的2种方法
 
• If you don't need context:
 
【gocraft/work工作队列源码简介】func YourFunctionName(job *work.Job) error.
 
• If you want your handler to accept a context:
 
func (c *Context) YourFunctionName(job *work.Job) error // or,
 
func YourFunctionName(c *Context, job *work.Job) error.
 
func (wp *WorkerPool) JobWithOptions(name string, jobOpts JobOptions, fn interface{}) *WorkerPool {jobOpts = applyDefaultsAndValidate(jobOpts)vfn := reflect.ValueOf(fn)validateHandlerType(wp.contextType, vfn)jt := &jobType{Name:name,//vfn 任务消费方法的反射类型,如果消费方法中有ctx 参数,那么会调用反射执行DynamicHandler: vfn,JobOptions:jobOpts,}if gh, ok := fn.(func(*Job) error); ok {// 用户的任务消费函数,被赋值给了jobType的GenericHandler,如果消费方法只有一个job参数,则执行GenericHandlerjt.IsGeneric = truejt.GenericHandler = gh}wp.jobTypes[name] = jtfor _, w := range wp.workers {w.updateMiddlewareAndJobTypes(wp.middleware, wp.jobTypes)}return wp} 
执行消费任务的真正代码
 
worker对象的processJob(job * Job)方法 调用了runJob方法执行GenericHandler or DynamicHandler.Call 。
 
func runJob(job *Job, ctxType reflect.Type, middleware []*middlewareHandler, jt *jobType) (returnCtx reflect.Value, returnError error) {。。。。next = func() error {。。。。if jt.IsGeneric {// 任务消费方法没有ctx时候执行return jt.GenericHandler(job)}// 任务消费方法有ctx时执行res := jt.DynamicHandler.Call([]reflect.Value{returnCtx, reflect.ValueOf(job)})x := res[0].Interface()if x == nil {return nil}return x.(error)}...returnError = next()return} 
LStack产品简介
 
面向行业应用开发商(ISV/SI)提供混合云/边缘云场景下云原生应用开发测试、交付、运维一站式服务,帮助企业采用云原生敏捷开发交付方法论,从而提高软件开发人员效率、减少运维成本,加快数字化转型,并最终实现业务创新 。




推荐阅读