[埃尔法哥哥]解决 Spark 数据倾斜的 8 大实用方法( 二 )


2、通过 key 统计
也可以通过抽样统计 key 的出现次数验证 。
由于数据量巨大 , 可以采用抽样的方式 , 对数据进行抽样 , 统计出现的次数 , 根据出现次数大小排序取出前几个:
df.select("key").sample(false, 0.1) // 数据采样 .(k => (k, 1)).reduceBykey(_ + _) // 统计 key 出现的次数 .map(k => (k._2, k._1)).sortByKey(false) // 根据 key 出现次数进行排序 .take(10) // 取前 10 个 。
如果发现多数数据分布都较为平均 , 而个别数据比其他数据大上若干个数量级 , 则说明发生了数据倾斜 。
如何缓解数据倾斜?
基本思路:
业务逻辑:我们从业务逻辑的层面上来优化数据倾斜 , 比如要统计不同城市的订单情况 , 那么我们单独对这一线城市来做 count , 最后和其它城市做整合 。
程序实现:比如说在 Hive 中 , 经常遇到 count(distinct)操作 , 这样会导致最终只有一个 reduce , 我们可以先 group 再在外面包一层 count , 就可以了;在 Spark 中使用 reduceByKey 替代 groupByKey 等 。
参数调优:Hadoop 和 Spark 都自带了很多的参数和机制来调节数据倾斜 , 合理利用它们就能解决大部分问题 。
思路1. 过滤异常数据
如果导致数据倾斜的 key 是异常数据 , 那么简单的过滤掉就可以了 。
首先要对 key 进行分析 , 判断是哪些 key 造成数据倾斜 。 具体方法上面已经介绍过了 , 这里不赘述 。
然后对这些 key 对应的记录进行分析:
空值或者异常值之类的 , 大多是这个原因引起
无效数据 , 大量重复的测试数据或是对结果影响不大的有效数据
有效数据 , 业务导致的正常数据分布
解决方案:
对于第 1 , 2 种情况 , 直接对数据进行过滤即可 。
第3种情况则需要特殊的处理 , 具体我们下面详细介绍 。
思路2. 提高 shuffle 并行度
Spark 在做 Shuffle 时 , 默认使用 HashPartitioner(非 Hash Shuffle)对数据进行分区 。 如果并行度设置的不合适 , 可能造成大量不相同的 Key 对应的数据被分配到了同一个 Task 上 , 造成该 Task 所处理的数据远大于其它 Task , 从而造成数据倾斜 。
如果调整 Shuffle 时的并行度 , 使得原本被分配到同一 Task 的不同 Key 发配到不同 Task 上处理 , 则可降低原 Task 所需处理的数据量 , 从而缓解数据倾斜问题造成的短板效应 。
(1)操作流程
RDD 操作 可在需要 Shuffle 的操作算子上直接设置并行度或者使用 spark.default.parallelism 设置 。 如果是 Spark SQL , 还可通过 SET spark.sql.shuffle.partitions=[num_tasks] 设置并行度 。 默认参数由不同的 Cluster Manager 控制 。
dataFrame 和 sparkSql 可以设置 spark.sql.shuffle.partitions=[num_tasks] 参数控制 shuffle 的并发度 , 默认为200 。
(2)适用场景
大量不同的 Key 被分配到了相同的 Task 造成该 Task 数据量过大 。
(3)解决方案
调整并行度 。 一般是增大并行度 , 但有时如减小并行度也可达到效果 。
(4)优势
实现简单 , 只需要参数调优 。 可用最小的代价解决问题 。 一般如果出现数据倾斜 , 都可以通过这种方法先试验几次 , 如果问题未解决 , 再尝试其它方法 。
(5)劣势
适用场景少 , 只是让每个 task 执行更少的不同的key 。 无法解决个别key特别大的情况造成的倾斜 , 如果某些 key 的大小非常大 , 即使一个 task 单独执行它 , 也会受到数据倾斜的困扰 。 并且该方法一般只能缓解数据倾斜 , 没有彻底消除问题 。 从实践经验来看 , 其效果一般 。
TIPS:
可以把数据倾斜类比为 hash 冲突 。 提高并行度就类似于 提高 hash 表的大小 。
思路3. 自定义 Partitioner


推荐阅读