Golang之流式编程( 二 )

FilterFilter函数提供过滤item的功能 , FilterFunc定义过滤逻辑true保留item , false则不保留:
// 例子 保留偶数s := []int{1, 2, 3, 4, 5, 6, 7, 8, 9, 0}fx.From(func(source chan<- interface{}) {for _, v := range s {source <- v}}).Filter(func(item interface{}) bool {if item.(int)%2 == 0 {return true}return false})// 源码func (p Stream) Filter(fn FilterFunc, opts ...Option) Stream { return p.Walk(func(item interface{}, pipe chan<- interface{}) {// 执行过滤函数true保留 , false丢弃if fn(item) {pipe <- item} }, opts...)}GroupGroup对流数据进行分组 , 需定义分组的key , 数据分组后以slice存入channel:
// 例子 按照首字符"g"或者"p"分组 , 没有则分到另一组ss := []string{"golang", "google", "php", "python", "java", "c++"}fx.From(func(source chan<- interface{}) {for _, s := range ss {source <- s} }).Group(func(item interface{}) interface{} {if strings.HasPrefix(item.(string), "g") {return "g"} else if strings.HasPrefix(item.(string), "p") {return "p"}return "" }).ForEach(func(item interface{}) {fmt.Println(item) })}// 源码func (p Stream) Group(fn KeyFunc) Stream {// 定义分组存储map groups := make(map[interface{}][]interface{}) for item := range p.source {// 用户自定义分组keykey := fn(item)// key相同分到一组groups[key] = append(groups[key], item) } source := make(chan interface{}) go func() {for _, group := range groups {// 相同key的一组数据写入到channelsource <- group}close(source) }() return Range(source)}Reversereverse可以对流中元素进行反转处理:
Golang之流式编程文章插图
// 例子fx.Just(1, 2, 3, 4, 5).Reverse().ForEach(func(item interface{}) {fmt.Println(item)})// 源码func (p Stream) Reverse() Stream { var items []interface{}// 获取流中数据 for item := range p.source {items = append(items, item) } // 反转算法 for i := len(items)/2 - 1; i >= 0; i-- {opp := len(items) - 1 - iitems[i], items[opp] = items[opp], items[i] }// 写入流 return Just(items...)}Distinctdistinct对流中元素进行去重 , 去重在业务开发中比较常用 , 经常需要对用户id等做去重操作:
// 例子fx.Just(1, 2, 2, 2, 3, 3, 4, 5, 6).Distinct(func(item interface{}) interface{} {return item}).ForEach(func(item interface{}) {fmt.Println(item)})// 结果为 1 , 2 , 3 , 4 , 5 , 6// 源码func (p Stream) Distinct(fn KeyFunc) Stream { source := make(chan interface{}) threading.GoSafe(func() {defer close(source)// 通过key进行去重 , 相同key只保留一个keys := make(map[interface{}]lang.PlaceholderType)for item := range p.source {key := fn(item)// key存在则不保留if _, ok := keys[key]; !ok {source <- itemkeys[key] = lang.Placeholder}} }) return Range(source)}WalkWalk函数并发的作用在流中每一个item上 , 可以通过WithWorkers设置并发数 , 默认并发数为16 , 最小并发数为1 , 如设置unlimitedWorkers为true则并发数无限制 , 但并发写入流中的数据由defaultWorkers限制 , WalkFunc中用户可以自定义后续写入流中的元素 , 可以不写入也可以写入多个元素:
// 例子fx.Just("aaa", "bbb", "ccc").Walk(func(item interface{}, pipe chan<- interface{}) {newItem := strings.ToUpper(item.(string))pipe


推荐阅读