2. 基表不能被广播,比如left outer join时,只能广播右表
看起来广播是一个比较理想的方案,但它有没有缺点呢?也很明显 。这个方案只能用于广播较小的表,否则数据的冗余传输就远大于shuffle的开销;另外,广播时需要将被广播的表现collect到driver端,当频繁有广播出现时,对driver的内存也是一个考验 。
如下图所示,broadcast hash join可以分为两步:
1. broadcast阶段:将小表广播分发到大表所在的所有主机 。广播算法可以有很多,最简单的是先发给driver,driver再统一分发给所有executor;要不就是基于bittorrete的p2p思路;
2. hash join阶段:在每个executor上执行单机版hash join,小表映射,大表试探;
文章插图
SparkSQL规定broadcast hash join执行的基本条件为被广播小表必须小于参数
spark.sql.autoBroadcastJoinThreshold,默认为10M 。
Shuffle Hash Join
当一侧的表比较小时,我们选择将其广播出去以避免shuffle,提高性能 。但因为被广播的表首先被collect到driver段,然后被冗余分发到每个executor上,所以当表比较大时,采用broadcast join会对driver端和executor端造成较大的压力 。
但由于Spark是一个分布式的计算引擎,可以通过分区的形式将大批量的数据划分成n份较小的数据集进行并行计算 。这种思想应用到Join上便是Shuffle Hash Join了 。利用key相同必然分区相同的这个原理,两个表中,key相同的行都会被shuffle到同一个分区中,SparkSQL将较大表的join分而治之,先将表划分成n个分区,再对两个表中相对应分区的数据分别进行Hash Join,这样即在一定程度上减少了driver广播一侧表的压力,也减少了executor端取整张被广播表的内存消耗 。其原理如下图:
文章插图
Shuffle Hash Join分为两步:
1. 对两张表分别按照join keys进行重分区,即shuffle,目的是为了让有相同join keys值的记录分到对应的分区中
2. 对对应分区中的数据进行join,此处先将小表分区构造为一张hash表,然后根据大表分区中记录的join keys值拿出来进行匹配
Shuffle Hash Join的条件有以下几个:
1. 分区的平均大小不超过
spark.sql.autoBroadcastJoinThreshold所配置的值,默认是10M
2. 基表不能被广播,比如left outer join时,只能广播右表
3. 一侧的表要明显小于另外一侧,小的一侧将被广播(明显小于的定义为3倍小,此处为经验值)
我们可以看到,在一定大小的表中,SparkSQL从时空结合的角度来看,将两个表进行重新分区,并且对小表中的分区进行hash化,从而完成join 。在保持一定复杂度的基础上,尽量减少driver和executor的内存压力,提升了计算时的稳定性 。
在大数据条件下如果一张表很小,执行join操作最优的选择无疑是broadcast hash join,效率最高 。但是一旦小表数据量增大,广播所需内存、带宽等资源必然就会太大,broadcast hash join就不再是最优方案 。此时可以按照join key进行分区,根据key相同必然分区相同的原理,就可以将大表join分而治之,划分为很多小表的join,充分利用集群资源并行化 。如下图所示,shuffle hash join也可以分为两步:
1. shuffle阶段:分别将两个表按照join key进行分区,将相同join key的记录重分布到同一节点,两张表的数据会被重分布到集群中所有节点 。这个过程称为shuffle
2. hash join阶段:每个分区节点上的数据单独执行单机hash join算法 。
文章插图
看到这里,可以初步总结出来如果两张小表join可以直接使用单机版hash join;如果一张大表join一张极小表,可以选择broadcast hash join算法;而如果是一张大表join一张小表,则可以选择shuffle hash join算法;那如果是两张大表进行join呢?
Sort Merge Join
上面介绍的两种实现对于一定大小的表比较适用,但当两个表都非常大时,显然无论适用哪种都会对计算内存造成很大压力 。这是因为join时两者采取的都是hash join,是将一侧的数据完全加载到内存中,使用hash code取join keys值相等的记录进行连接 。
当两个表都非常大时,SparkSQL采用了一种全新的方案来对表进行Join,即Sort Merge Join 。这种实现方式不用将一侧数据全部加载后再进星hash join,但需要在join前将数据排序,如下图所示:
推荐阅读
- 蛇有牙齿吗,有几个牙齿 蛇的牙齿是什么样子的
- 应该是全网最全的JVM知识点总结
- 明日叶哪里有卖苗的,明日叶的功效
- 马鞭草茶能不能长期喝,大多是女性喝的
- 藿香清口茶在哪儿买,服用藿香正气水的注意事项
- PHP“垂死”十年
- 阿里云服务器是如何实现每台服务器都是公网IP的呢?
- 红茶桂花泡水喝的功效,桂花红茶的泡法步骤
- 怎么卸载找不到程序的流氓软件?
- 想验证安装的操作系统是否正版,可以这样找到Win10产品密钥