千家信息网

spark 3.0.1中AQE配置的示例分析

发表于:2025-01-23 作者:千家信息网编辑
千家信息网最后更新 2025年01月23日,这篇文章主要介绍spark 3.0.1中AQE配置的示例分析,文中介绍的非常详细,具有一定的参考价值,感兴趣的小伙伴们一定要看完!AQE简介从spark configuration,到在最早在spar
千家信息网最后更新 2025年01月23日spark 3.0.1中AQE配置的示例分析

这篇文章主要介绍spark 3.0.1中AQE配置的示例分析,文中介绍的非常详细,具有一定的参考价值,感兴趣的小伙伴们一定要看完!

AQE简介

从spark configuration,到在最早在spark 1.6版本就已经有了AQE;到了spark 2.x版本,intel大数据团队进行了相应的原型开发和实践;到了spark 3.0时代,Databricks和intel一起为社区贡献了新的AQE

spark 3.0.1中的AQE的配置

配置项默认值官方说明分析
spark.sql.adaptive.enabledfalse是否开启自适应查询此处设置为true开启
spark.sql.adaptive.coalescePartitions.enabledtrue是否合并临近的shuffle分区(根据'spark.sql.adaptive.advisoryPartitionSizeInBytes'的阈值来合并)此处默认为true开启,分析见: 分析1
spark.sql.adaptive.coalescePartitions.initialPartitionNum(none)shuffle合并分区之前的初始分区数,默认为spark.sql.shuffle.partitions的值分析见:分析2
spark.sql.adaptive.coalescePartitions.minPartitionNum(none)shuffle 分区合并后的最小分区数,默认为spark集群的默认并行度分析见: 分析3
spark.sql.adaptive.advisoryPartitionSizeInBytes64MB建议的shuffle分区的大小,在合并分区和处理join数据倾斜的时候用到分析见:分析3
spark.sql.adaptive.skewJoin.enabledtrue是否开启join中数据倾斜的自适应处理
spark.sql.adaptive.skewJoin.skewedPartitionFactor5数据倾斜判断因子,必须同时满足skewedPartitionFactor和skewedPartitionThresholdInBytes分析见:分析4
spark.sql.adaptive.skewJoin.skewedPartitionThresholdInBytes256MB数据倾斜判断阈值,必须同时满足skewedPartitionFactor和skewedPartitionThresholdInBytes分析见:分析4
spark.sql.adaptive.logLeveldebug配置自适应执行的计划改变日志调整为info级别,便于观察自适应计划的改变
spark.sql.adaptive.nonEmptyPartitionRatioForBroadcastJoin0.2转为broadcastJoin的非空分区比例阈值,>=该值,将不会转换为broadcastjoin分析见:分析5

分析1

在OptimizeSkewedJoin.scala中,我们看到ADVISORY_PARTITION_SIZE_IN_BYTES,也就是spark.sql.adaptive.advisoryPartitionSizeInBytes被引用的地方, (OptimizeSkewedJoin是物理计划中的规则)

 /**   * The goal of skew join optimization is to make the data distribution more even. The target size   * to split skewed partitions is the average size of non-skewed partition, or the   * advisory partition size if avg size is smaller than it.   */  private def targetSize(sizes: Seq[Long], medianSize: Long): Long = {    val advisorySize = conf.getConf(SQLConf.ADVISORY_PARTITION_SIZE_IN_BYTES)    val nonSkewSizes = sizes.filterNot(isSkewed(_, medianSize))    // It's impossible that all the partitions are skewed, as we use median size to define skew.    assert(nonSkewSizes.nonEmpty)    math.max(advisorySize, nonSkewSizes.sum / nonSkewSizes.length)  }

其中:

  1. nonSkewSizes为task非倾斜的分区

  2. targetSize返回的是max(非倾斜的分区的平均值,advisorySize),其中advisorySize为spark.sql.adaptive.advisoryPartitionSizeInBytes值,所以说 targetSize不一定是spark.sql.adaptive.advisoryPartitionSizeInBytes值

  3. medianSize值为task的分区大小的中位值

分析2

在SQLConf.scala

def numShufflePartitions: Int = {    if (adaptiveExecutionEnabled && coalesceShufflePartitionsEnabled) {      getConf(COALESCE_PARTITIONS_INITIAL_PARTITION_NUM).getOrElse(defaultNumShufflePartitions)    } else {      defaultNumShufflePartitions    }  }

从spark 3.0.1开始如果开启了AQE和shuffle分区合并,则用的是spark.sql.adaptive.coalescePartitions.initialPartitionNum,这在如果有多个shuffle stage的情况下,增加分区数,可以有效的增强shuffle分区合并的效果

分析3

在CoalesceShufflePartitions.scala,CoalesceShufflePartitions是一个物理计划的规则,会执行如下操作

 if (!shuffleStages.forall(_.shuffle.canChangeNumPartitions)) {      plan    } else {      // `ShuffleQueryStageExec#mapStats` returns None when the input RDD has 0 partitions,      // we should skip it when calculating the `partitionStartIndices`.      val validMetrics = shuffleStages.flatMap(_.mapStats)      // We may have different pre-shuffle partition numbers, don't reduce shuffle partition number      // in that case. For example when we union fully aggregated data (data is arranged to a single      // partition) and a result of a SortMergeJoin (multiple partitions).      val distinctNumPreShufflePartitions =        validMetrics.map(stats => stats.bytesByPartitionId.length).distinct      if (validMetrics.nonEmpty && distinctNumPreShufflePartitions.length == 1) {        // We fall back to Spark default parallelism if the minimum number of coalesced partitions        // is not set, so to avoid perf regressions compared to no coalescing.        val minPartitionNum = conf.getConf(SQLConf.COALESCE_PARTITIONS_MIN_PARTITION_NUM)          .getOrElse(session.sparkContext.defaultParallelism)        val partitionSpecs = ShufflePartitionsUtil.coalescePartitions(          validMetrics.toArray,          advisoryTargetSize = conf.getConf(SQLConf.ADVISORY_PARTITION_SIZE_IN_BYTES),          minNumPartitions = minPartitionNum)        // This transformation adds new nodes, so we must use `transformUp` here.        val stageIds = shuffleStages.map(_.id).toSet        plan.transformUp {          // even for shuffle exchange whose input RDD has 0 partition, we should still update its          // `partitionStartIndices`, so that all the leaf shuffles in a stage have the same          // number of output partitions.          case stage: ShuffleQueryStageExec if stageIds.contains(stage.id) =>            CustomShuffleReaderExec(stage, partitionSpecs, COALESCED_SHUFFLE_READER_DESCRIPTION)        }      } else {        plan      }    }  }

也就是说:

  1. 如果是用户自己指定的分区操作,如repartition操作,spark.sql.adaptive.coalescePartitions.minPartitionNum无效,且跳过分区合并优化

  2. 如果多个task进行shuffle,且task有不同的分区数的话,spark.sql.adaptive.coalescePartitions.minPartitionNum无效,且跳过分区合并优化

  3. 见ShufflePartitionsUtil.coalescePartition分析

分析4

在OptimizeSkewedJoin.scala中,我们看到

/**   * A partition is considered as a skewed partition if its size is larger than the median   * partition size * ADAPTIVE_EXECUTION_SKEWED_PARTITION_FACTOR and also larger than   * ADVISORY_PARTITION_SIZE_IN_BYTES.   */  private def isSkewed(size: Long, medianSize: Long): Boolean = {    size > medianSize * conf.getConf(SQLConf.SKEW_JOIN_SKEWED_PARTITION_FACTOR) &&      size > conf.getConf(SQLConf.SKEW_JOIN_SKEWED_PARTITION_THRESHOLD)  }
  1. OptimizeSkewedJoin是个物理计划的规则,会根据isSkewed来判断是否数据数据有倾斜,而且必须是满足SKEW_JOIN_SKEWED_PARTITION_FACTOR和SKEW_JOIN_SKEWED_PARTITION_THRESHOLD才会判断为数据倾斜了

  2. medianSize为task的分区大小的中位值

分析5

在AdaptiveSparkPlanExec方法getFinalPhysicalPlan中调用了reOptimize方法,而reOptimize方法则会执行逻辑计划的优化操作:

private def reOptimize(logicalPlan: LogicalPlan): (SparkPlan, LogicalPlan) = {    logicalPlan.invalidateStatsCache()    val optimized = optimizer.execute(logicalPlan)    val sparkPlan = context.session.sessionState.planner.plan(ReturnAnswer(optimized)).next()    val newPlan = applyPhysicalRules(sparkPlan, preprocessingRules ++ queryStagePreparationRules)    (newPlan, optimized)  }

而optimizer 中有个DemoteBroadcastHashJoin规则:

@transient privateval optimizer = new RuleExecutor[LogicalPlan] {    // TODO add more optimization rules    override protected def batches: Seq[Batch] = Seq(      Batch("Demote BroadcastHashJoin", Once, DemoteBroadcastHashJoin(conf))    )  }

而对于DemoteBroadcastHashJoin则有对是否broadcastjoin的判断:

case class DemoteBroadcastHashJoin(conf: SQLConf) extends Rule[LogicalPlan] {  private def shouldDemote(plan: LogicalPlan): Boolean = plan match {    case LogicalQueryStage(_, stage: ShuffleQueryStageExec) if stage.resultOption.isDefined      && stage.mapStats.isDefined =>      val mapStats = stage.mapStats.get      val partitionCnt = mapStats.bytesByPartitionId.length      val nonZeroCnt = mapStats.bytesByPartitionId.count(_ > 0)      partitionCnt > 0 && nonZeroCnt > 0 &&        (nonZeroCnt * 1.0 / partitionCnt) < conf.nonEmptyPartitionRatioForBroadcastJoin    case _ => false  }  def apply(plan: LogicalPlan): LogicalPlan = plan.transformDown {    case j @ Join(left, right, _, _, hint) =>      var newHint = hint      if (!hint.leftHint.exists(_.strategy.isDefined) && shouldDemote(left)) {        newHint = newHint.copy(leftHint =          Some(hint.leftHint.getOrElse(HintInfo()).copy(strategy = Some(NO_BROADCAST_HASH))))      }      if (!hint.rightHint.exists(_.strategy.isDefined) && shouldDemote(right)) {        newHint = newHint.copy(rightHint =          Some(hint.rightHint.getOrElse(HintInfo()).copy(strategy = Some(NO_BROADCAST_HASH))))      }      if (newHint.ne(hint)) {        j.copy(hint = newHint)      } else {        j      }  }}

shouldDemote就是对是否进行broadcastjoin的判断:

  1. 首先得是ShuffleQueryStageExec操作

  2. 如果非空分区比列大于nonEmptyPartitionRatioForBroadcastJoin,也就是spark.sql.adaptive.nonEmptyPartitionRatioForBroadcastJoin,则不会把mergehashjoin转换为broadcastJoin

  3. 这在sql中先join在groupby的场景中比较容易出现

ShufflePartitionsUtil.coalescePartition分析(合并分区的核心代码)

见coalescePartition如示:

def coalescePartitions(      mapOutputStatistics: Array[MapOutputStatistics],      advisoryTargetSize: Long,      minNumPartitions: Int): Seq[ShufflePartitionSpec] = {    // If `minNumPartitions` is very large, it is possible that we need to use a value less than    // `advisoryTargetSize` as the target size of a coalesced task.    val totalPostShuffleInputSize = mapOutputStatistics.map(_.bytesByPartitionId.sum).sum    // The max at here is to make sure that when we have an empty table, we only have a single    // coalesced partition.    // There is no particular reason that we pick 16. We just need a number to prevent    // `maxTargetSize` from being set to 0.    val maxTargetSize = math.max(      math.ceil(totalPostShuffleInputSize / minNumPartitions.toDouble).toLong, 16)    val targetSize = math.min(maxTargetSize, advisoryTargetSize)    val shuffleIds = mapOutputStatistics.map(_.shuffleId).mkString(", ")    logInfo(s"For shuffle($shuffleIds), advisory target size: $advisoryTargetSize, " +      s"actual target size $targetSize.")    // Make sure these shuffles have the same number of partitions.    val distinctNumShufflePartitions =      mapOutputStatistics.map(stats => stats.bytesByPartitionId.length).distinct    // The reason that we are expecting a single value of the number of shuffle partitions    // is that when we add Exchanges, we set the number of shuffle partitions    // (i.e. map output partitions) using a static setting, which is the value of    // `spark.sql.shuffle.partitions`. Even if two input RDDs are having different    // number of partitions, they will have the same number of shuffle partitions    // (i.e. map output partitions).    assert(      distinctNumShufflePartitions.length == 1,      "There should be only one distinct value of the number of shuffle partitions " +        "among registered Exchange operators.")    val numPartitions = distinctNumShufflePartitions.head    val partitionSpecs = ArrayBuffer[CoalescedPartitionSpec]()    var latestSplitPoint = 0    var coalescedSize = 0L    var i = 0    while (i < numPartitions) {      // We calculate the total size of i-th shuffle partitions from all shuffles.      var totalSizeOfCurrentPartition = 0L      var j = 0      while (j < mapOutputStatistics.length) {        totalSizeOfCurrentPartition += mapOutputStatistics(j).bytesByPartitionId(i)        j += 1      }      // If including the `totalSizeOfCurrentPartition` would exceed the target size, then start a      // new coalesced partition.      if (i > latestSplitPoint && coalescedSize + totalSizeOfCurrentPartition > targetSize) {        partitionSpecs += CoalescedPartitionSpec(latestSplitPoint, i)        latestSplitPoint = i        // reset postShuffleInputSize.        coalescedSize = totalSizeOfCurrentPartition      } else {        coalescedSize += totalSizeOfCurrentPartition      }      i += 1    }    partitionSpecs += CoalescedPartitionSpec(latestSplitPoint, numPartitions)    partitionSpecs  }
  1. totalPostShuffleInputSize 先计算出总的shuffle的数据大小

  2. maxTargetSize取max(totalPostShuffleInputSize/minNumPartitions,16)的最大值,minNumPartitions也就是spark.sql.adaptive.coalescePartitions.minPartitionNum的值

  3. targetSize取min(maxTargetSize,advisoryTargetSize),advisoryTargetSize也就是spark.sql.adaptive.advisoryPartitionSizeInBytes的值,所以说该值只是建议值,不一定是targetSize

  4. while循环就是取相邻的分区合并,对于每个task中的每个相邻分区合并,直到不大于targetSize

OptimizeSkewedJoin.optimizeSkewJoin分析(数据倾斜优化的核心代码)

见optimizeSkewJoin如示:

def optimizeSkewJoin(plan: SparkPlan): SparkPlan = plan.transformUp {    case smj @ SortMergeJoinExec(_, _, joinType, _,        s1 @ SortExec(_, _, ShuffleStage(left: ShuffleStageInfo), _),        s2 @ SortExec(_, _, ShuffleStage(right: ShuffleStageInfo), _), _)        if supportedJoinTypes.contains(joinType) =>      assert(left.partitionsWithSizes.length == right.partitionsWithSizes.length)      val numPartitions = left.partitionsWithSizes.length      // Use the median size of the actual (coalesced) partition sizes to detect skewed partitions.      val leftMedSize = medianSize(left.partitionsWithSizes.map(_._2))      val rightMedSize = medianSize(right.partitionsWithSizes.map(_._2))      logDebug(        s"""          |Optimizing skewed join.          |Left side partitions size info:          |${getSizeInfo(leftMedSize, left.partitionsWithSizes.map(_._2))}          |Right side partitions size info:          |${getSizeInfo(rightMedSize, right.partitionsWithSizes.map(_._2))}        """.stripMargin)      val canSplitLeft = canSplitLeftSide(joinType)      val canSplitRight = canSplitRightSide(joinType)      // We use the actual partition sizes (may be coalesced) to calculate target size, so that      // the final data distribution is even (coalesced partitions + split partitions).      val leftActualSizes = left.partitionsWithSizes.map(_._2)      val rightActualSizes = right.partitionsWithSizes.map(_._2)      val leftTargetSize = targetSize(leftActualSizes, leftMedSize)      val rightTargetSize = targetSize(rightActualSizes, rightMedSize)      val leftSidePartitions = mutable.ArrayBuffer.empty[ShufflePartitionSpec]      val rightSidePartitions = mutable.ArrayBuffer.empty[ShufflePartitionSpec]      val leftSkewDesc = new SkewDesc      val rightSkewDesc = new SkewDesc      for (partitionIndex <- 0 until numPartitions) {        val isLeftSkew = isSkewed(leftActualSizes(partitionIndex), leftMedSize) && canSplitLeft        val leftPartSpec = left.partitionsWithSizes(partitionIndex)._1        val isLeftCoalesced = leftPartSpec.startReducerIndex + 1 < leftPartSpec.endReducerIndex        val isRightSkew = isSkewed(rightActualSizes(partitionIndex), rightMedSize) && canSplitRight        val rightPartSpec = right.partitionsWithSizes(partitionIndex)._1        val isRightCoalesced = rightPartSpec.startReducerIndex + 1 < rightPartSpec.endReducerIndex        // A skewed partition should never be coalesced, but skip it here just to be safe.        val leftParts = if (isLeftSkew && !isLeftCoalesced) {          val reducerId = leftPartSpec.startReducerIndex          val skewSpecs = createSkewPartitionSpecs(            left.mapStats.shuffleId, reducerId, leftTargetSize)          if (skewSpecs.isDefined) {            logDebug(s"Left side partition $partitionIndex is skewed, split it into " +              s"${skewSpecs.get.length} parts.")            leftSkewDesc.addPartitionSize(leftActualSizes(partitionIndex))          }          skewSpecs.getOrElse(Seq(leftPartSpec))        } else {          Seq(leftPartSpec)        }        // A skewed partition should never be coalesced, but skip it here just to be safe.        val rightParts = if (isRightSkew && !isRightCoalesced) {          val reducerId = rightPartSpec.startReducerIndex          val skewSpecs = createSkewPartitionSpecs(            right.mapStats.shuffleId, reducerId, rightTargetSize)          if (skewSpecs.isDefined) {            logDebug(s"Right side partition $partitionIndex is skewed, split it into " +              s"${skewSpecs.get.length} parts.")            rightSkewDesc.addPartitionSize(rightActualSizes(partitionIndex))          }          skewSpecs.getOrElse(Seq(rightPartSpec))        } else {          Seq(rightPartSpec)        }        for {          leftSidePartition <- leftParts          rightSidePartition <- rightParts        } {          leftSidePartitions += leftSidePartition          rightSidePartitions += rightSidePartition        }      }      logDebug("number of skewed partitions: " +        s"left ${leftSkewDesc.numPartitions}, right ${rightSkewDesc.numPartitions}")      if (leftSkewDesc.numPartitions > 0 || rightSkewDesc.numPartitions > 0) {        val newLeft = CustomShuffleReaderExec(          left.shuffleStage, leftSidePartitions, leftSkewDesc.toString)        val newRight = CustomShuffleReaderExec(          right.shuffleStage, rightSidePartitions, rightSkewDesc.toString)        smj.copy(          left = s1.copy(child = newLeft), right = s2.copy(child = newRight), isSkewJoin = true)      } else {        smj      }  }
  1. SortMergeJoinExec说明适用于sort merge join

  2. assert(left.partitionsWithSizes.length == right.partitionsWithSizes.length)保证进行join的两个task的分区数相等

  3. 分别计算进行join的task的分区中位数的大小leftMedSize和rightMedSize

  4. 分别计算进行join的task的分区的targetzise大小leftTargetSize和rightTargetSize

  5. 循环判断两个task的每个分区的是否存在倾斜,如果倾斜且满足没有进行过shuffle分区合并,则进行倾斜分区处理,否则不处理

  6. createSkewPartitionSpecs方法为: 1.获取每个join的task的对应分区的数据大小 2.根据targetSize分成多个slice

  7. 如果存在数据倾斜,则构造包装成CustomShuffleReaderExec,进行后续任务的运行,最最终调用ShuffledRowRDD的compute方法 匹配case PartialMapperPartitionSpec进行数据的读取,其中还会自动开启"spark.sql.adaptive.fetchShuffleBlocksInBatch"批量fetch减少io

OptimizeSkewedJoin/CoalesceShufflePartitions 在哪里被调用

如:AdaptiveSparkPlanExec

@transient privateval queryStageOptimizerRules: Seq[Rule[SparkPlan]] = Seq(    ReuseAdaptiveSubquery(conf, context.subqueryCache),    CoalesceShufflePartitions(context.session),    // The following two rules need to make use of 'CustomShuffleReaderExec.partitionSpecs'    // added by `CoalesceShufflePartitions`. So they must be executed after it.    OptimizeSkewedJoin(conf),    OptimizeLocalShuffleReader(conf)  )

可见在AdaptiveSparkPlanExec中被调用 ,且CoalesceShufflePartitions先于OptimizeSkewedJoin, 而AdaptiveSparkPlanExec在InsertAdaptiveSparkPlan中被调用 ,而InsertAdaptiveSparkPlan在QueryExecution中被调用

而在InsertAdaptiveSparkPlan.shouldApplyAQE方法和supportAdaptive中我们看到

private def shouldApplyAQE(plan: SparkPlan, isSubquery: Boolean): Boolean = {    conf.getConf(SQLConf.ADAPTIVE_EXECUTION_FORCE_APPLY) || isSubquery || {      plan.find {        case _: Exchange => true        case p if !p.requiredChildDistribution.forall(_ == UnspecifiedDistribution) => true        case p => p.expressions.exists(_.find {          case _: SubqueryExpression => true          case _ => false        }.isDefined)      }.isDefined    }  }private def supportAdaptive(plan: SparkPlan): Boolean = {    // TODO migrate dynamic-partition-pruning onto adaptive execution.    sanityCheck(plan) &&      !plan.logicalLink.exists(_.isStreaming) &&      !plan.expressions.exists(_.find(_.isInstanceOf[DynamicPruningSubquery]).isDefined) &&    plan.children.forall(supportAdaptive)  }

如果不满足以上条件也是不会开启AQE的,如果要强制开启,也可以配置spark.sql.adaptive.forceApply 为true(文档中提示是内部配置)

注意:

在spark 3.0.1中已经废弃了如下的配置:

spark.sql.adaptive.skewedPartitionMaxSplits    spark.sql.adaptive.skewedPartitionRowCountThreshold    spark.sql.adaptive.skewedPartitionSizeThreshold

以上是"spark 3.0.1中AQE配置的示例分析"这篇文章的所有内容,感谢各位的阅读!希望分享的内容对大家有帮助,更多相关知识,欢迎关注行业资讯频道!

分析 数据 配置 大小 方法 也就是 规则 处理 多个 物理 阈值 示例 两个 代码 内容 同时 就是 建议 核心 版本 数据库的安全要保护哪些东西 数据库安全各自的含义是什么 生产安全数据库录入 数据库的安全性及管理 数据库安全策略包含哪些 海淀数据库安全审计系统 建立农村房屋安全信息数据库 易用的数据库客户端支持安全管理 连接数据库失败ssl安全错误 数据库的锁怎样保障安全 网络安全行业五行属什么 爱如生古籍数据库如何使用 新数据库恢复rman全备 安徽服务器迁移选哪家服务器 福建网络安全朝阳行业 如何查看数据库是哪一个版本的 向小学生宣传网络安全的展板 绩溪终维网络技术 数据库服务器查询格式化显示 部队网络安全个人对照检查 服务器mysql连接不上 2008r2数据库如何共享 遴选的服务器 软件开发的个人职业规划 互联网游戏服务器是什么意思 数据库删除部门表没有员工的记录 服务器每次重启ip地址丢失 联通有多少台服务器 天津正规软件开发生产厂家 巨兽战场服务器d 七牛猫客网络安全宣传周 探营网络安全管理 福建网络安全朝阳行业 2018华三网络技术大赛 用云服务器怎么搭建软件测试环境 超微服务器开机卡logo 数据库基础35是什么意思 georef数据库 软件开发方法跟开发模型的区别 软件开发中最难的部分是什么
0