[埃尔法哥哥]解决 Spark 数据倾斜的 8 大实用方法



[埃尔法哥哥]解决 Spark 数据倾斜的 8 大实用方法
本文插图

什么是数据倾斜?
对 Spark/Hadoop 这样的分布式大数据系统来讲 , 数据量大并不可怕 , 可怕的是数据倾斜 。
对于分布式系统而言 , 理想情况下 , 随着系统规模(节点数量)的增加 , 应用整体耗时线性下降 。 如果一台机器处理一批大量数据需要 120 分钟 , 当机器数量增加到 3 台时 , 理想的耗时为 120 / 3 = 40 分钟 。 但是 , 想做到分布式情况下每台机器执行时间是单机时的1 / N , 就必须保证每台机器的任务量相等 。 不幸的是 , 很多时候 , 任务的分配是不均匀的 , 甚至不均匀到大部分任务被分配到个别机器上 , 其它大部分机器所分配的任务量只占总量的小部分 。 比如一台机器负责处理 80% 的任务 , 另外两台机器各处理 10% 的任务 。
『不患多而患不均』 , 这是分布式环境下最大的问题 。 意味着计算能力不是线性扩展的 , 而是存在短板效应: 一个 Stage 所耗费的时间 , 是由最慢的那个 Task 决定 。
由于同一个 Stage 内的所有 task 执行相同的计算 , 在排除不同计算节点计算能力差异的前提下 , 不同 task 之间耗时的差异主要由该 task 所处理的数据量决定 。 所以 , 要想发挥分布式系统并行计算的优势 , 就必须解决数据倾斜问题 。
数据倾斜的危害
当出现数据倾斜时 , 小量任务耗时远高于其它任务 , 从而使得整体耗时过大 , 未能充分发挥分布式系统的并行计算优势 。
另外 , 当发生数据倾斜时 , 部分任务处理的数据量过大 , 可能造成内存不足使得任务失败 , 并进而引进整个应用失败 。
数据倾斜的现象
当发现如下现象时 , 十有八九是发生数据倾斜了:
绝大多数 task 执行得都非常快 , 但个别 task 执行极慢 , 整体任务卡在某个阶段不能结束 。
原本能够正常执行的 Spark 作业 , 某天突然报出 OOM(内存溢出)异常 , 观察异常栈 , 是我们写的业务代码造成的 。 这种情况比较少见 。
TIPS:
在 Spark streaming 程序中 , 数据倾斜更容易出现 , 特别是在程序中包含一些类似 sql 的 join、group 这种操作的时候 。 因为 Spark Streaming 程序在运行的时候 , 我们一般不会分配特别多的内存 , 因此一旦在这个过程中出现一些数据倾斜 , 就十分容易造成 OOM 。
数据倾斜的原因
在进行 shuffle 的时候 , 必须将各个节点上相同的 key 拉取到某个节点上的一个 task 来进行处理 , 比如按照 key 进行聚合或 join 等操作 。 此时如果某个 key 对应的数据量特别大的话 , 就会发生数据倾斜 。 比如大部分 key 对应10条数据 , 但是个别 key 却对应了100万条数据 , 那么大部分 task 可能就只会分配到10条数据 , 然后1秒钟就运行完了;但是个别 task 可能分配到了100万数据 , 要运行一两个小时 。
因此出现数据倾斜的时候 , Spark 作业看起来会运行得非常缓慢 , 甚至可能因为某个 task 处理的数据量过大导致内存溢出 。
问题发现与定位
1、通过 Spark Web UI
通过 Spark Web UI 来查看当前运行的 stage 各个 task 分配的数据量(Shuffle Read Size/Records) , 从而进一步确定是不是 task 分配的数据不均匀导致了数据倾斜 。
知道数据倾斜发生在哪一个 stage 之后 , 接着我们就需要根据 stage 划分原理 , 推算出来发生倾斜的那个 stage 对应代码中的哪一部分 , 这部分代码中肯定会有一个 shuffle 类算子 。 可以通过 countByKey 查看各个 key 的分布 。
TIPS:
数据倾斜只会发生在 shuffle 过程中 。 这里给大家罗列一些常用的并且可能会触发 shuffle 操作的算子: distinct、groupByKey、reduceByKey、aggregateByKey、join、cogroup、repartition 等 。 出现数据倾斜时 , 可能就是你的代码中使用了这些算子中的某一个所导致的 。


推荐阅读