产业气象站|并行流ParallelStream中隐藏的陷阱
文章图片
前提
这篇文章介绍一下日常开发中并行流ParallelStream中隐藏的陷阱 , 这个问题其实离我们很近 , 特别是喜欢使用JDK1.8+的流式编程的伙伴 , 应该会深有感触 。 标题中所谓的"陷阱" , 其实并不是ParallelStream自身的陷阱 , 而一般是开发者错误使用ParallelStream给自己埋下的陷阱 。
一个故意而为的例子下面举一个故意而为的例子 , 实际上应该不会有类似的业务代码:
publicclassParallelStreamMain{publicstaticvoidmain(String[]args)throwsException{List<List>array=newArrayList<>()Listitem1=newArrayList<>()Listitem2=newArrayList<>()Listtarget=newArrayList<>(100)array.add(item1)array.add(item2)array.parallelStream().forEach(x->{for(inti=0i<100000i++){target.add(i)}})System.out.println(target.size())}}
某一次执行结果为:163913 。 如果不停地执行这个main方法 , 最终都会得到一个非200000的结果 , 这里的问题就在于使用了并行流parallelStream()方法 。 ParallelStream底层使用了Fork/Join框架实现 , 也就是应用了线程池ForkJoinPool把并行流中的节点抽象为ForkJoinTask进行计算 , 背后用到的"任务窃取"等原理这里就不进行展开 , 只需要明确:
ForkJoinPool一般使用Runtime.getRuntime().availableProcessors()(此值一般认为是物理机器的逻辑核心数量)作为并行度(parallelism) , 简单认为是可并发执行的任务数 , 并不是工作线程数 。 多核机器中 , 使用ParallelStream在流的节点中的所有操作都相当于在「一个多线程环境中」进行操作 , 里面的所有操作都会产生不可预期的结果 , 例如可能会数组越界、添加元素丢失、部分下标index的引用为NULL等等 。 一个仿真例子写这篇文章不是有意为之 , 其实很早之前笔者曾经遇到一个比较隐蔽的生产故障 , 其中有一段访问量比较低的代码大致如下:
@DataprivatestaticclassOrderDTO{privateStringorderIdprivateOrderStatusorderStatusprivateBigDecimalamountprivateLongcustomerId}@DataprivatestaticclassOrder{privateLongidprivateStringorderIdprivateIntegerorderStatusprivateBigDecimalamountprivateLongcustomerIdprivateOffsetDateTimecreateTimeprivateOffsetDateTimeeditTime}publicvoidgroupByOrderStatus(LongcustomerId){Listorders=orderDao.selectByCustomerId(customerId)ListorderDTOList=newArrayList<>()orders.parallelStream().forEach(order->{OrderDTOdto=newOrderDTO()......orderDTOList.add(dto)})Map<String,List>collect=orderDTOList.stream().collect(Collectors.groupingBy(item->item.getOrderStatus().getCode()))......}
该方法的功能是通过客户ID查询订单列表 , 然后把订单列表转化为OrderDTO列表 , 然后再按照订单状态字段进行分组 。 通过生产日志和测试回归发现 , 上面的代码段中groupByOrderStatus()方法会偶发空指针异常 。
【产业气象站|并行流ParallelStream中隐藏的陷阱】初次出现问题的时候 , 由于开发者通过Lambda表达式把多处代码压缩为1行 , 所以从异常栈比较难排查具体发生问题的代码 , 后面把Lambda表达式以句点起点拆分为多行上线后观察一段时间 , 最终定位到发生空指针异常的代码段为Collectors.groupingBy(item->item.getOrderStatus().getCode()) , 也就是OrderDTO实例中的orderStatus为空对象 。 这里显然 , groupByOrderStatus()方法其实是被封闭在线程栈中调用 , 本不应该有多个线程去并发修改其中的内容 , 这里只剩下一个疑点:使用了parallelStream() 。 后来直接把parallelStream()修改为stream()重新上线 , 该空指针问题不再复现 。
推荐阅读
- 产业气象站|5G基站太耗电!三大运营商正式官宣:将智能化关闭5G基站节约电费
- 产业气象站|他从不打无准备之仗,华为联手哈工大究竟想干啥?依任总性格
- 产业气象站|G是否影响健康?,张朝阳用手机保持30厘米
- 爱集微APP|“芯”势力助推游戏产业发展,芯片成为ChinaJoy的关键词之一
- 产业气象站|电力机器人“小白”上岗巡检
- 产业气象站|苏宁智能宣布五项Biu+共享政策,从生态赋能到生态共享
- 产业气象站|点赞“中国芯里的南大智慧”!华为公司CEO任正非一行访问南京大学
- 产业气象站|花多少钱收购,微软正在谈判收购TikTok美国业务
- 产业气象站|包括王兴,马云创办支付宝的本质不是为了支付,很多人没理解
- 上观新闻|半导体产业如何发展?嘉定举办的这个论坛指明了方向