[埃尔法哥哥]解决 Spark 数据倾斜的 8 大实用方法( 五 )


def antiSkew(): RDD[(String, Int)] = { val SPLIT = "-" val prefix = new Random().nextInt(10) pairs.map(t => ( prefix + SPLIT + t._1, 1)) .reduceByKey((v1, v2) => v1 + v2) .map(t => (t._1.split(SPLIT)(1), t2._2)) .reduceByKey((v1, v2) => v1 + v2)}
不过进行两次 mapreduce , 性能稍微比一次的差些 。
Hadoop 中的数据倾斜
Hadoop 中直接贴近用户使用的是 Mapreduce 程序和 Hive 程序 , 虽说 Hive 最后也是用 MR 来执行(至少目前 Hive 内存计算并不普及) , 但是毕竟写的内容逻辑区别很大 , 一个是程序 , 一个是Sql , 因此这里稍作区分 。
Hadoop 中的数据倾斜主要表现在 ruduce 阶段卡在99.99% , 一直99.99%不能结束 。
这里如果详细的看日志或者和监控界面的话会发现:
有一个多几个 reduce 卡住
各种 container报错 OOM
读写的数据量极大 , 至少远远超过其它正常的 reduce
伴随着数据倾斜 , 会出现任务被 kill 等各种诡异的表现 。
经验:
Hive的数据倾斜 , 一般都发生在 Sql 中 Group 和 On 上 , 而且和数据逻辑绑定比较深 。
优化方法:
这里列出来一些方法和思路 , 具体的参数和用法在官网看就行了 。
map join 方式
count distinct 的操作 , 先转成 group , 再 count
参数调优
set hive.map.aggr=trueset hive.groupby.skewindata=http://news.hoteastday.com/a/true
left semi jion 的使用
设置 map 端输出、中间结果压缩 。 (不完全是解决数据倾斜的问题 , 但是减少了 IO 读写和网络传输 , 能提高很多效率)
说明:
hive.map.aggr=true: 在map中会做部分聚集操作 , 效率更高但需要更多的内存 。
hive.groupby.skewindata=http://news.hoteastday.com/a/true: 数据倾斜时负载均衡 , 当选项设定为true , 生成的查询计划会有两个MRJob 。 第一个MRJob 中 , Map的输出结果集合会随机分布到Reduce中 , 每个Reduce做部分聚合操作 , 并输出结果 , 这样处理的结果是相同的GroupBy Key有可能被分发到不同的Reduce中 , 从而达到负载均衡的目的;第二个MRJob再根据预处理的数据结果按照GroupBy Key分布到Reduce中(这个过程可以保证相同的GroupBy Key被分布到同一个Reduce中) , 最后完成最终的聚合操作 。


推荐阅读