[埃尔法哥哥]解决 Spark 数据倾斜的 8 大实用方法( 三 )


思路3. 自定义 Partitioner
(1)原理
使用自定义的 Partitioner(默认为 HashPartitioner) , 将原本被分配到同一个 Task 的不同 Key 分配到不同 Task 。
例如 , 我们在 groupByKey 算子上 , 使用自定义的 Partitioner:
.groupByKey(new Partitioner() { @Override public int numPartitions() { return 12; } @Override public int getPartition(Object key) { int id = Integer.parseInt(key.toString()); if(id >= 9500000 && id <= 9500084 && ((id - 9500000) % 12) == 0) { return (id - 9500000) / 12; } else { return id % 12; } }})
TIPS:
这个做法相当于自定义 hash 表的 哈希函数 。
(2)适用场景
大量不同的 Key 被分配到了相同的 Task 造成该 Task 数据量过大 。
(3)解决方案
使用自定义的 Partitioner 实现类代替默认的 HashPartitioner , 尽量将所有不同的 Key 均匀分配到不同的 Task 中 。
(4)优势
不影响原有的并行度设计 。 如果改变并行度 , 后续 Stage 的并行度也会默认改变 , 可能会影响后续 Stage 。
(5)劣势
适用场景有限 , 只能将不同 Key 分散开 , 对于同一 Key 对应数据集非常大的场景不适用 。 效果与调整并行度类似 , 只能缓解数据倾斜而不能完全消除数据倾斜 。 而且需要根据数据特点自定义专用的 Partitioner , 不够灵活 。
思路4. Reduce 端 Join 转化为 Map 端 Join
通过 Spark 的 Broadcast 机制 , 将 Reduce 端 Join 转化为 Map 端 Join , 这意味着 Spark 现在不需要跨节点做 shuffle 而是直接通过本地文件进行 join , 从而完全消除 Shuffle 带来的数据倾斜 。
[埃尔法哥哥]解决 Spark 数据倾斜的 8 大实用方法
本文插图
from pyspark.sql.functions import broadcastresult = broadcast(A).join(B, ["join_col"], "left")
其中 A 是比较小的 dataframe 并且能够整个存放在 executor 内存中 。
(1)适用场景
参与Join的一边数据集足够小 , 可被加载进 Driver 并通过 Broadcast 方法广播到各个 Executor 中 。
(2)解决方案
在 Java/Scala 代码中将小数据集数据拉取到 Driver , 然后通过 Broadcast 方案将小数据集的数据广播到各 Executor 。 或者在使用 SQL 前 , 将 Broadcast 的阈值调整得足够大 , 从而使 Broadcast 生效 。 进而将 Reduce Join 替换为 Map Join 。
(3)优势
避免了 Shuffle , 彻底消除了数据倾斜产生的条件 , 可极大提升性能 。
(4)劣势
因为是先将小数据通过 Broadcase 发送到每个 executor 上 , 所以需要参与 Join 的一方数据集足够小 , 并且主要适用于 Join 的场景 , 不适合聚合的场景 , 适用条件有限 。
NOTES:
使用Spark SQL时需要通过 SET spark.sql.autoBroadcastJoinThreshold=104857600 将 Broadcast 的阈值设置得足够大 , 才会生效 。
思路5. 拆分 join 再 union
思路很简单 , 就是将一个 join 拆分成 倾斜数据集 Join 和 非倾斜数据集 Join , 最后进行 union:
对包含少数几个数据量过大的 key 的那个 RDD (假设是 leftRDD) , 通过 sample 算子采样出一份样本来 , 然后统计一下每个 key 的数量 , 计算出来数据量最大的是哪几个 key 。 具体方法上面已经介绍过了 , 这里不赘述 。
然后将这 k 个 key 对应的数据从 leftRDD 中单独过滤出来 , 并给每个 key 都打上 1~n 以内的随机数作为前缀 , 形成一个单独的 leftSkewRDD;而不会导致倾斜的大部分 key 形成另外一个 leftUnSkewRDD 。
接着将需要 join 的另一个 rightRDD , 也过滤出来那几个倾斜 key 并通过 flatMap 操作将该数据集中每条数据均转换为 n 条数据(这 n 条数据都按顺序附加一个 0~n 的前缀) , 形成单独的 rightSkewRDD;不会导致倾斜的大部分 key 也形成另外一个 rightUnSkewRDD 。


推荐阅读