Apache Spark优化工具包


Apache Spark优化工具包

文章插图
> Source: Pixabay
 
Apache Spark是一种开放源代码的分布式计算引擎,目前是用于内存中批处理驱动的数据处理的最受欢迎的框架(它还支持实时数据流传输) 。得益于其先进的查询优化器,DAG调度程序和执行引擎,Spark能够非常高效地处理和分析大型数据集 。但是,在没有仔细调整的情况下运行Spark作业仍会导致性能下降 。
在博客文章中,我将分享一些Spark性能调优的技巧,以帮助您解决和加快运行缓慢的Spark作业 。
(本文提到的所有功能均来自PySpark,您可以使用Spark API文档找到与Scala / JAVA等效的功能 。)
分区不均匀当数据集最初由Spark加载并成为弹性分布式数据集(RDD)时,所有数据均匀地分布在分区之间 。但是,在用户对其应用某些类型的数据操作之后,这些分区可能会变得不均匀 。例如,groupByKey操作可能导致分区偏斜,因为一个键可能比另一个键包含更多的记录 。此外,由于Spark的DataFrameWriter允许使用partitionBy将分区数据写入磁盘,因此磁盘上的分区也可能不均匀 。
在DataFrame中重新平衡偏斜的分区将极大地提高Spark在DataFrame上的处理性能 。您可以使用getNumPartitions函数检查DataFrame中的分区数,并通过运行简单的Spark作业来查找每个分区中的记录数,例如:
from pyspark.sql.functions import spark_partition_iddf.withColumn("partition_id", spark_partition_id()).groupBy("partition_id").count().show()如果发现DataFrame的分区大小高度不均匀,请在对它进行任何分析之前,使用重新分区或合并函数对DataFrame进行重新分区 。还建议在将数据写回磁盘之前,先对内存中的数据进行分区 。RDD模块也支持这些重新分区功能 。
坚持RDD的缺点由于惰性执行原理,除非用户明确调用操作来收集结果,否则Spark不会对数据集执行任何实际的转换 。此外,如果用户希望对中间结果应用其他转换,Spark将需要从头开始重新计算所有内容 。为了允许用户更有效地重用日期,Spark可以使用持久性或缓存功能将数据缓存在内存和/或磁盘中 。
但是,缓存并不总是一个好主意 。Spark缓存数据集后,Catalyst优化器优化进一步转换的能力将受到限制,因为它不再能够改善源数据级别的修剪 。例如,如果将过滤器应用于在源数据库中建立索引的列,则Catalyst将无法利用索引来提高性能 。
因此,仅当缓存数据将在以后多次重用时才建议使用缓存数据 。迭代探索数据集或调整ML模型时 。
Apache Spark优化工具包

文章插图
> Source: Pixabay
 
基于成本的优化器(CBO)【Apache Spark优化工具包】基于成本的优化器(CBO)通过向Catalyst提供其他表级统计信息,可以加快Spark SQL作业的速度,这对于连接许多数据集的作业特别有用 。使用者可以通过将spark.sql.cbo.enabled设置为true(默认值)来启用CBO 。
为了充分利用CBO,用户需要保持列级和表级统计信息都是最新的,从而使CBO可以使用准确的估算来优化查询计划 。为此,在对表运行SQL查询之前,请使用ANALYZE TABLE命令收集统计信息 。记住在修改表之后再次分析表,以确保统计信息是最新的 。
广播Join除了启用CBO,在Spark中优化连接数据集的另一种方法是使用广播联接 。在无序连接中,两个表中的记录都将通过网络传输给执行器,当一个表比另一个大得多时,这是次优的 。在广播联接中,较小的表将被发送给执行程序,以与较大的表联接,从而避免了通过网络发送大量数据的情况 。
用户可以通过spark.sql.autoBroadcastJoinThreshold配置控制广播联接,指示要广播的表的最大大小 。此外,即使表的大小大于spark.sql.autoBroadcastJoinThreshold,也可以使用广播提示来告诉Spark广播表:
from pyspark.sql.functions import broadcastbroadcast(spark.table("tbl_a")).join(spark.table("tbl_b"), "key")垃圾收集(GC)由于所有Spark作业都占用大量内存,因此确保有效进行垃圾收集非常重要-我们希望产生较少的内存"垃圾"以减少GC时间 。要了解您的Spark作业是否在GC中花费过多时间,请在Spark UI中检查"任务反序列化时间"和" GC时间" 。
例如,由于Spark需要反序列化更多对象,因此使用用户定义函数(UDF)和lambda函数将导致更长的GC时间 。还建议避免创建中间对象并将不必要的RDD缓存到JVM堆 。
TL; DR:


推荐阅读