Spark 3.0如何提高SQL工作负载的性能
Spark 3.0如何提高SQL工作负载的性能,很多新手对此不是很清楚,为了帮助大家解决这个难题,下面小编将为大家详细讲解,有这方面需求的人可以来学习下,希望你能有所收获。
AQE最初是在Spark 2.4中引入的,但随着Spark 3.0的发展,它变得更加强大。尽管Cloudera建议在我们交付Spark 3.1之前等待在生产中使用它,但您现在可以使用AQE开始在Spark 3.0中进行评估。
首先,让我们看一下AQE解决的问题类型。
初始催化剂设计中的缺陷
下图表示使用DataFrames执行简单的按组分组查询时发生的分布式处理的类型。
Spark为第一阶段确定适当的分区数量,但对于第二阶段,使用默认的幻数200。
不好的原因有三个:
200不可能是理想的分区数,而分区数是影响性能的关键因素之一;
如果将第二阶段的输出写入磁盘,则可能会得到200个小文件。
优化及其缺失会产生连锁反应:如果在第二阶段之后继续进行处理,您可能会错过进行更多优化的潜在机会。
您可以做的是在执行类似于以下语句的查询之前,手动为此shuffle设置此属性的值:
spark.conf.set(" spark.sql.shuffle.partitions"," 2")
这也带来了一些挑战:
在每次查询之前都要设置此属性
这些值将随着数据的发展而过时
此设置将应用于查询中的所有Shuffle操作
在上一个示例的第一阶段之前,数据的分布和数量是已知的,Spark可以得出合理的分区数量值。但是,对于第二阶段,此信息尚不知道要获得执行第一阶段的实际处理所要付出的代价:因此,求助于幻数。
自适应查询执行设计原理
AQE的主要思想是使执行计划不是最终的,并允许在每个阶段的边界进行审核。因此,执行计划被分解为由阶段界定的新的"查询阶段"抽象。
催化剂现在停在每个阶段的边界,以根据中间数据上可用的信息尝试并应用其他优化。
因此,可以将AQE定义为Spark Catalyst之上的一层,它将动态修改Spark计划。
有什么缺点吗?有一些,但它们很小:
执行在Spark的每个阶段边界处停止,以查看其计划,但这被性能提升所抵消。
Spark UI更加难以阅读,因为Spark为给定的应用程序创建了更多的作业,而这些作业不会占用您设置的Job组和描述。
Shuffle分区的自适应数目
自Spark 2.4起,AQE的此功能已可用。
要启用它,您需要将spark.sql.adaptive.enabled设置为true ,该参数默认值为false 。启用AQE后,随机调整分区的数量将自动调整,不再是默认的200或手动设置的值。
这是启用AQE之前和之后第一个TPC-DS查询的执行结果:
动态将排序合并联接转换为广播联接
当任何联接端的运行时统计信息小于广播哈希联接阈值时,AQE会将排序合并联接转换为广播哈希联接。
这是启用AQE之前和之后第二个TPC-DS查询执行的最后阶段:
动态合并shuffle分区
如果随机播放分区的数量大于按键分组的数量,则由于键的不平衡分配,会浪费很多CPU周期
当两个
spark.sql.adaptive.enabledspark.sql.adaptive.coalescePartitions.enabled
设置为true ,Spark将根据以下内容合并连续的shuffle分区
设置为spark.sql.adaptive.advisoryPartitionSizeInBytes指定的目标大小,以避免执行过多的小任务。
动态优化倾斜的连接
倾斜是分布式处理的绊脚石。它实际上可能会使您的处理暂停数小时:
如果不进行优化,则执行连接所需的时间将由最大的分区来定义。
因此,倾斜联接优化将使用spark.sql.adaptive.advisoryPartitionSizeInBytes指定的值将分区A0划分为子分区,并将它们中的每一个联接到表B的对应分区B0。
因此,您需要向AQE提供您的倾斜定义。
这涉及两个属性:
spark.sql.adaptive.skewJoin.skewedPartitionFactor是相对的:如果分区的大小大于此因子乘以中位数分区大小且也大于,则认为该分区是倾斜的
spark.sql.adaptive.skewedPartitionThresholdInBytes ,这是绝对的:这是阈值,低于该阈值将被忽略。
动态分区修剪
动态分区修剪(DPP)的想法是最有效的优化技术之一:仅读取所需的数据。DPP不是AQE的一部分,实际上,必须禁用AQE才能进行DPP。从好的方面来说,这允许将DPP反向移植到Spark 2.4 for CDP。
该优化在逻辑计划和物理计划上均实现。
在逻辑级别上,识别维度过滤器,并通过连接传播到扫描的另一侧。
然后,在物理级别上,过滤器在维度侧执行一次,结果被广播到主表,在该表中也应用了过滤器。
如果禁用spark.sql.optimizer.dynamicPartitionPruning.reuseBroadcastOnly,则DPP实际上可以与其他类型的联接一起使用(例如,SortMergeJoin)。
在那种情况下,Spark会估计DPP过滤器是否真正提高了查询性能。
DPP可以极大地提高高度选择性查询的性能,例如,如果您的查询从5年的数据中的一个月中筛选出来。
并非所有查询的性能都有如此显着的提高,但是在99个TPC-DS查询中,有72个受到DPP的积极影响。
Spark距其最初的核心范例还有很长的路要走:在静态数据集上懒惰地执行优化的静态计划。
静态数据集部分受到流技术的挑战:Spark团队首先创建了一个基于RDD的笨拙设计,然后提出了一个涉及DataFrames的更好的解决方案。
静态计划部分受到SQL和Adaptive Query Execution框架的挑战,从某种意义上说,结构化流对于初始流库是什么:它应该一直是一个优雅的解决方案。
看完上述内容是否对您有帮助呢?如果还想对相关知识有进一步的了解或阅读更多相关文章,请关注行业资讯频道,感谢您对的支持。