文章插图
作者/蔡锡生
简介
gocraft/work是一款使用go开发的任务处理软件,通过redis存储任务队列,可以使用工作池同时处理多个任务 。本文主要介绍任务注册和任务消费的源代码 。
功能特性
• Fast and efficient. Faster than this, this, and this. See below for benchmarks.
• Reliable - don't lose jobs even if your process crashes.
• Middleware on jobs -- good for metrics instrumentation, logging, etc.
• If a job fails, it will be retried a specified number of times.
• Schedule jobs to hAppen in the future.
• Enqueue unique jobs so that only one job with a given name/arguments exists in the queue at once.
• Web UI to manage failed jobs and observe the system.
• Periodically enqueue jobs on a cron-like schedule.
• Pause / unpause jobs and control concurrency within and across processes.
注册Job
注册Job流程
1. 创建redis client pool 。
2. 创建对象,定义任务处理函数 。
3. 创建任务工作池,需要传入被处理对象结构体,最大并发数,命名空间,redis client pool 。
4. 创建Job,需要传入job名称和job处理函数,job在redis中使用列表存储,key的组成:nameSapce:job:jobName,同一namespace支持多种类型任务处理 。
这里使用任务名称作为key存入redis,任务处理参数存放到列表中
func main() {// Make a new pool. Arguments:// Context{} is a struct that will be the context for the request.// 10 is the max concurrency// "my_app_namespace" is the Redis namespace// redisPool is a Redis poolpool := work.NewWorkerPool(Context{}, 10, "my_app_namespace", redisPool)// Add middleware that will be executed for each jobpool.Middleware((*Context).Log)// Map the name of jobs to handler functions// pool 中的 jobTypes是一个字典,key 是任务名称,value 是 任务处理函数// 当有任务的时候,会将任务需要的参数 放入到redis key 为jobName的列表中// 第二个参数必须是 工作池对象的方法pool.Job("send_email", (*Context).SendEmail)// Customize options:pool.JobWithOptions("export", work.JobOptions{Priority: 10, MaxFails: 1}, (*Context).Export)// Start processing jobspool.Start()...}
发送Job
发送job其实调用NewEnqueuer方法向redis的列表中压入元素(具体的内容是任务参数) 。
package mainimport ("github.com/gomodule/redigo/redis""github.com/gocraft/work")// Make a redis poolvar redisPool = &redis.Pool{MaxActive: 5,MaxIdle: 5,Wait: true,Dial: func() (redis.Conn, error) {return redis.Dial("tcp", ":6379")},}// Make an enqueuer with a particular namespacevar enqueuer = work.NewEnqueuer("my_app_namespace", redisPool)func main() {// Enqueue a job named "send_email" with the specified parameters._, err := enqueuer.Enqueue("send_email", work.Q{"address": "test@example.com", "subject": "hello world", "customer_id": 4})if err != nil {log.Fatal(err)}}
Woker Fetch Job
在New WrokPool的时候会根据并法参数concurrency,创建同等个数的woker 。
Worker是一个job处理者,通过永久for循环,不间断的从redis的任务队列中获取任务,在处理任务的时候,协程阻塞,等待一个任务处理完,再继续下一个 。
下面的代码是worker在for循环中的重要操作(1) fetch job (2) process job
func (w *worker) loop() {for {select {。。。case <-timer.C:job, err := w.fetchJob()w.process(job)}}}
fetchJob本质是redis的pop,push操作 。首先将redis列表中的任务移除,然后再放入到处理队列中,这个操作必须是原子操作(原子性是指事务是一个不可再分割的工作单元,事务中的操作要么都发生,要么都不发生),作者使用了lua脚本完成 。最后返回一个job对象,里面有后面任务处理函数需要的args,即这里的rawJson 。
func (w *worker) fetchJob() (*Job, error) {scriptArgs = append(scriptArgs, w.poolID) // ARGV[1]...values, err := redis.Values(w.redisFetchScript.Do(conn, scriptArgs...))...job, err := newJob(rawJSON, dequeuedFrom, inProgQueue)..return job, nil}
Worker handle Job.
Pool.JobWithOptions(InstallMasterJob, work.JobOptions{Priority: 1, MaxFails: 1}, ConsumeJob)
推荐阅读
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
- 阿里巴巴国际站搜索引擎工作原理解析
- 如何去掉电脑桌面的小黄锁?
- 社保|这四类工作,尽量别让子女“碰”?不少年轻人都因此被“拖累”
- 女孩|“中年危机”过后,不止送外卖和开快车,还有一些工作可以尝试
- 大佬Google工作10年,关于技术、管理和职场生涯的一些感悟
- |四人五一钓鱼被抓,工作估计黄了,这种钓鱼行为可判刑
- 变频电源的工作原理及应用介绍
- 体检|长期伏案工作引发的腰背脖子痛,算职业病吗?
- |我,工作10年的海员,33岁,透露收入,揭秘想辞职的原因
- 高铁|直击郑州市民超市囤货:郑州高铁站多名工作人员感染 主城区实行足不出区