Go语言异步高并发编程的秘密:无锁,无条件变量,无回调


Go语言异步高并发编程的秘密:无锁,无条件变量,无回调

文章插图
背景在并发处理中,资源争用是一个常见的问题 。为了避免资源争用,需要进行优化 。以下是一些可以优化并发处理中的资源争用问题的建议:
  1. 避免锁竞争:锁竞争是一种常见的资源争用问题 。可以通过减少锁的使用,使用更细粒度的锁,以及避免不必要的锁竞争来减少锁竞争 。
  2. 使用缓存:在一些情况下,可以使用缓存来减少资源争用 。例如,在处理一些计算密集型的任务时,可以使用缓存来避免重复计算 。
  3. 使用原子操作:原子操作可以在不使用锁的情况下实现资源的同步访问 。Go 语言中提供了一些原子操作,例如 atomic.AddInt32 和 atomic.LoadInt32 等,可以用于实现线程安全的资源访问 。
  4. 使用互斥锁:互斥锁是一种用于避免并发资源争用的机制 。在需要对资源进行访问的时候,可以使用互斥锁来保证资源的独占访问 。
  5. 使用读写锁:读写锁是一种特殊的互斥锁,可以允许多个读操作同时进行,但是只允许一个写操作进行 。在读操作频繁的场景下,可以使用读写锁来提高并发性能 。
  6. 使用条件变量:条件变量是一种用于在不同线程之间进行协调的机制 。可以使用条件变量来避免不必要的资源争用 。例如,在一个生产者-消费者模式的程序中,可以使用条件变量来协调生产者和消费者之间的交互,从而避免资源争用
但是如果让你不用锁,条件变量,回调的话,还怎么写并发程序啊,谷歌大佬Sameer给了大家一个思路 。"Advanced Go Concurrency Patterns" by Sameer Ajmani: 这篇博客深入研究了 Golang 中的并发模式,并讨论了如何使用它们来构建高性能系统 。它仅仅使用了Go语言的goroutine和channel便实现高效异步并发编程,没有用到诸如await,context等包括锁,条件变量,和回调函数,文章包括一些示例和实践建议,帮助读者更好地理解和实践这些概念 。下面我们针对他给出的case做一些说明与总结,同时对go语言并发编程的语言特性与技巧进行总结,换句话就是说想提炼出面向场景的go语言高并发编程的八股模式
select-loop的编程关键要素1.如何处理事件
2.如何处理元素
3.如何关闭退出
代码示例
【Go语言异步高并发编程的秘密:无锁,无条件变量,无回调】 
Go语言异步高并发编程的秘密:无锁,无条件变量,无回调

文章插图
核心结构与接口
下面代码给出了核心结构sub,以及它实现了接口subscription的关键代码 。
  1. updates属性是一个通道,用于用户对元素进行处理 。
  2. fetcher是用于获取元素的客户端,它可以是从数据库读取,也可以是从消息队列读取 。
  3. closing用于关闭退出select-loop主体
// sub implements the Subscription interface.type sub struct { fetcher Fetcher// fetches items updates chan Item// sends items to the user closing chan chan error // for Close}func (s *sub) Updates() <-chan Item { return s.updates}func (s *sub) Close() error { errc := make(chan error) s.closing <- errc // 向closing通道中同步写入errc return <-errc// 等待主loop返回}// Subscribe returns a new Subscription that uses fetcher to fetch Items.func Subscribe(fetcher Fetcher) Subscription { s := &sub{fetcher: fetcher,updates: make(chan Item),// for Updatesclosing: make(chan chan error), // for Close } go s.loop() return s}sub的核心处理逻辑// loop periodically fecthes Items, sends them on s.updates, and exits// when Close is called.It extends dedupeLoop with logic to run// Fetch asynchronously.func (s *sub) loop() { const maxPending = 10 type fetchResult struct {fetched []Itemnexttime.Timeerrerror } var fetchDone chan fetchResult // if non-nil, Fetch is running var pending []Item var next time.Time var err error var seen = make(map[string]bool) for {var fetchDelay time.Durationif now := time.Now(); next.After(now) {fetchDelay = next.Sub(now)}var startFetch <-chan time.Timeif fetchDone == nil && len(pending) < maxPending {//等待队列长度未超过最大设置且fetchDone是空,即元素已经都入队列了// 设置fetchDelay时间后,startFetch通道有值startFetch = time.After(fetchDelay)}var first Itemvar updates chan Itemif len(pending) > 0 {first = pending[0]updates = s.updates // updates通道是为了用户进一步消费的}select {case <-startFetch:fetchDone = make(chan fetchResult, 1)go func() {fetched, next, err := s.fetcher.Fetch()fetchDone <- fetchResult{fetched, next, err}}()case result := <-fetchDone:fetchDone = nil// Use result.fetched, result.next, result.errfetched := result.fetchednext, err = result.next, result.errif err != nil {next = time.Now().Add(10 * time.Second)break}for _, item := range fetched {if id := item.GUID; !seen[id] {pending = Append(pending, item)seen[id] = true}}case errc := <-s.closing:errc <- errclose(s.updates)returncase updates <- first:pending = pending[1:]} }}


推荐阅读