var (MaxWorker = os.Getenv("MAX_WORKERS")MaxQueue= os.Getenv("MAX_QUEUE"))// Job represents the job to be runtype Job struct {Payload Payload}// A buffered channel that we can send work requests on.var JobQueue chan Job// Worker represents the worker that executes the jobtype Worker struct {WorkerPoolchan chan JobJobChannelchan Jobquitchan bool}func NewWorker(workerPool chan chan Job) Worker {return Worker{WorkerPool: workerPool,JobChannel: make(chan Job),quit:make(chan bool)}}// Start method starts the run loop for the worker, listening for a quit channel in// case we need to stop itfunc (w Worker) Start() {go func() {for {// register the current worker into the worker queue.w.WorkerPool <- w.JobChannelselect {case job := <-w.JobChannel:// we have received a work request.if err := job.Payload.UploadToS3(); err != nil {log.Errorf("Error uploading to S3: %s", err.Error())}case <-w.quit:// we have received a signal to stopreturn}}}()}// Stop signals the worker to stop listening for work requests.func (w Worker) Stop() {go func() {w.quit <- true}()}func payloadHandler(w http.ResponseWriter, r *http.Request) {if r.Method != "POST" {w.WriteHeader(http.StatusMethodNotAllowed)return}// Read the body into a string for json decodingvar content = &PayloadCollection{}err := json.NewDecoder(io.LimitReader(r.Body, MaxLength)).Decode(&content)if err != nil {w.Header().Set("Content-Type", "application/json; charset=UTF-8")w.WriteHeader(http.StatusBadRequest)return}// Go through each payload and queue items individually to be posted to S3for _, payload := range content.Payloads {// let's create a job with the payloadwork := Job{Payload: payload}// Push the work onto the queue.JobQueue <- work}w.WriteHeader(http.StatusOK)}
请求任务都放到JobQueue里面了,如何监听队列,并触发请求呢 。这个地方又出现了Dispatcher,我们在另一篇文章中有详细探讨(基于dispatcher模式的事件与数据分发处理器的Go语言实现:
https://www.toutiao.com/article/7186518439215841827/) 。在系统启动的时候,我们会通过NewDispatcher生成Dispatcher,并调用它的Run方法 。
【使用Golang构建一万+每秒处理请求的高性能系统】type Dispatcher struct {// A pool of workers channels that are registered with the dispatcherWorkerPool chan chan Job}func NewDispatcher(maxWorkers int) *Dispatcher {pool := make(chan chan Job, maxWorkers)return &Dispatcher{WorkerPool: pool}}func (d *Dispatcher) Run() {// starting n number of workersfor i := 0; i < d.maxWorkers; i++ {worker := NewWorker(d.pool)worker.Start()}go d.dispatch()}func (d *Dispatcher) dispatch() {for {select {case job := <-JobQueue:// a job request has been receivedgo func(job Job) {// try to obtain a worker job channel that is available.// this will block until a worker is idlejobChannel := <-d.WorkerPool// dispatch the job to the worker job channeljobChannel <- job}(job)}}}
Dispatcher与Worker的关系如下图所示:
文章插图
第三方案整体流程
1.客户请求到Handler 。
2.Handler把请求作业写入JobQueue 。
3.Dispatcher的dispatcher方法,从全局JobQueue中读取Job 。
4.Dispatcher的dispatcher方法同时也从WorkerPool中读取JobChannel(属于某一个Worker,即每一个Worker都有一个JobChannel) 。
5.Dispatcher把获得的Job写入JobChannel,即分配某个Worker 。
6.Worker从自己的JobChannel中获取作业并执行 。执行完成后,空闲后,把自己的JobChannel再次写入WorkerPool等待分配 。
这样实现后,效果明显,同时需要的机器数量大幅降低了,从100台降低到20台 。
文章插图
第三方案效果
文章插图
部署机器变化
这里的两层,一层是全局JobQueue,缓存任务 。第二个是每个Worker都有自己的执行队列,一台机器可以创建多个Worker 。这样就提升了处理能力 。
方案对比
实现难度
方案问题
GO协程原生方法
简单
无法应对大规模请求,无法控制协程数量
GO 单层Channel
简单
当处理能力达不到请求速率后,队列满,系统崩溃
GO两层Channel
复杂
参考资料:
http://marcio.io/2015/07/handling-1-million-requests-per-minute-with-golang/ 。
https://github.com/ReGYChang/zero/blob/main/pkg/utils/worker_pool.go 。
推荐阅读
- 一文彻底弄明白Golang获取各种路径问题
- 如何向服务器添加 ssh 公钥
- 佳能闪光灯580使用方法 佳能闪光灯
- 世界上人口使用最多的语言是什么语言 世界上使用人口最多的语言是
- 鲤鱼|鲫鱼和鲤鱼开始上浅滩了,小短竿准备好,使用短竿钓鱼的技巧
- 试管的用途和使用方法 试管的用途
- 咖啡滤纸怎么使用 咖啡滤纸
- 花呗账户冻结怎么使用才可以解冻没有逾期 花呗账户冻结怎么使用才可以解冻
- zbrush—使用雕刻机都需要什么软件
- 封箱胶带切割器使用 胶带切割机