Golang之流式编程( 三 )

<- newItem}).ForEach(func(item interface{}) {fmt.Println(item)})// 源码func (p Stream) walkLimited(fn WalkFunc, option *rxOptions) Stream { pipe := make(chan interface{}, option.workers) go func() {var wg sync.WaitGrouppool := make(chan lang.PlaceholderType, option.workers)for {// 控制并发数量pool <- lang.Placeholderitem, ok := <-p.sourceif !ok {<-poolbreak}wg.Add(1)go func() {defer func() {wg.Done()<-pool}()// 作用在每个元素上fn(item, pipe)}()}// 等待处理完成wg.Wait()close(pipe) }() return Range(pipe)}并发处理【Golang之流式编程】fx工具除了进行流数据处理以外还提供了函数并发功能 , 在微服务中实现某个功能往往需要依赖多个服务 , 并发的处理依赖可以有效的降低依赖耗时 , 提升服务的性能 。
Golang之流式编程文章插图
fx.Parallel(func() {userRPC() // 依赖1}, func() {accountRPC() // 依赖2}, func() {orderRPC() // 依赖3})注意fx.Parallel进行依赖并行处理的时候不会有error返回 , 如需有error返回或者有一个依赖报错需要立马结束依赖请求请使用MapReduce工具进行处理 。
总结本篇文章介绍了流处理的基本概念和gozero中的流处理工具fx , 在实际的生产中流处理场景应用也非常多 , 希望本篇文章能给大家带来一定的启发 , 更好的应对工作中的流处理场景 。


推荐阅读