千家信息网

spark delta写操作ACID事务中基础类FileFormat/FileCommitProtocol的示例分析

发表于:2025-01-23 作者:千家信息网编辑
千家信息网最后更新 2025年01月23日,小编给大家分享一下spark delta写操作ACID事务中基础类FileFormat/FileCommitProtocol的示例分析,相信大部分人都还不怎么了解,因此分享这篇文章给大家参考一下,希望
千家信息网最后更新 2025年01月23日spark delta写操作ACID事务中基础类FileFormat/FileCommitProtocol的示例分析

小编给大家分享一下spark delta写操作ACID事务中基础类FileFormat/FileCommitProtocol的示例分析,相信大部分人都还不怎么了解,因此分享这篇文章给大家参考一下,希望大家阅读完这篇文章后大有收获,下面让我们一起去了解一下吧!

分析

直接进入主题FileFormatWriter.write,这个是spark写入文件的入口:

 def write(      sparkSession: SparkSession,      plan: SparkPlan,      fileFormat: FileFormat,      committer: FileCommitProtocol,      outputSpec: OutputSpec,      hadoopConf: Configuration,      partitionColumns: Seq[Attribute],      bucketSpec: Option[BucketSpec],      statsTrackers: Seq[WriteJobStatsTracker],      options: Map[String, String])    : Set[String] = {

因为delta是基于parquet实现的, 所以我们fileformat选择分析ParquetFileFormat, 而对于FileCommitProtocol,我们分析SQLHadoopMapReduceCommitProtocol

  1. 该write方法实现比较长,我们讲重点 :

committer.setupJob(job)

这个做一些job提交前的准备工作,比如设置jobId,taskId,设置OutputCommitter,OutputCommitter是用来。。

override def setupJob(jobContext: JobContext): Unit = {    // Setup IDs    val jobId = SparkHadoopWriterUtils.createJobID(new Date, 0)    val taskId = new TaskID(jobId, TaskType.MAP, 0)    val taskAttemptId = new TaskAttemptID(taskId, 0)    // Set up the configuration object    jobContext.getConfiguration.set("mapreduce.job.id", jobId.toString)    jobContext.getConfiguration.set("mapreduce.task.id", taskAttemptId.getTaskID.toString)    jobContext.getConfiguration.set("mapreduce.task.attempt.id", taskAttemptId.toString)    jobContext.getConfiguration.setBoolean("mapreduce.task.ismap", true)    jobContext.getConfiguration.setInt("mapreduce.task.partition", 0)    val taskAttemptContext = new TaskAttemptContextImpl(jobContext.getConfiguration, taskAttemptId)    committer = setupCommitter(taskAttemptContext)    committer.setupJob(jobContext)  }

ParquetFileFormat对应的OutputCommitter是ParquetOutputCommitter,我们看一下方法:format.getOutputCommitter(context),ParquetOutputCommitter为:

@Override  public OutputCommitter getOutputCommitter(TaskAttemptContext context)      throws IOException {    if (committer == null) {      Path output = getOutputPath(context);      committer = new ParquetOutputCommitter(output, context);    }    return committer;  }

而最终调用了父类的构造方法:

public FileOutputCommitter(Path outputPath,                              TaskAttemptContext context) throws IOException {    this(outputPath, (JobContext)context);    if (outputPath != null) {      workPath = getTaskAttemptPath(context, outputPath);    }  }

注意这里的workPath(全局变量)赋值为$outputPath/_temporary,在以下newTaskTempFile方法中会用到

接着进行setupJob操作:

public void setupJob(JobContext context) throws IOException {    if (hasOutputPath()) {      Path jobAttemptPath = getJobAttemptPath(context);      FileSystem fs = jobAttemptPath.getFileSystem(          context.getConfiguration());      if (!fs.mkdirs(jobAttemptPath)) {        LOG.error("Mkdirs failed to create " + jobAttemptPath);      }    } else {      LOG.warn("Output Path is null in setupJob()");

而getJobAttemptPath中引用到$path/_temporary目录(其中path是文件输出目录),且建立该目录

接下来是进行任务的提交:

sparkSession.sparkContext.runJob(        rddWithNonEmptyPartitions,        (taskContext: TaskContext, iter: Iterator[InternalRow]) => {          executeTask(            description = description,            jobIdInstant = jobIdInstant,            sparkStageId = taskContext.stageId(),            sparkPartitionId = taskContext.partitionId(),            sparkAttemptNumber = taskContext.taskAttemptId().toInt & Integer.MAX_VALUE,            committer,            iterator = iter)        },        rddWithNonEmptyPartitions.partitions.indices,        (index, res: WriteTaskResult) => {          committer.onTaskCommit(res.commitMsg)          ret(index) = res        })

其中重点看看executeTask方法:

committer.setupTask(taskAttemptContext)    val dataWriter =      if (sparkPartitionId != 0 && !iterator.hasNext) {        // In case of empty job, leave first partition to save meta for file format like parquet.        new EmptyDirectoryDataWriter(description, taskAttemptContext, committer)      } else if (description.partitionColumns.isEmpty && description.bucketIdExpression.isEmpty) {        new SingleDirectoryDataWriter(description, taskAttemptContext, committer)      } else {        new DynamicPartitionDataWriter(description, taskAttemptContext, committer)      }
  • 对于SQLHadoopMapReduceCommitProtocol:setupTask实现如下:

committer = setupCommitter(taskContext)    committer.setupTask(taskContext)    addedAbsPathFiles = mutable.Map[String, String]()    partitionPaths = mutable.Set[String]()

而committer.setupTask(taskContext),对应到ParquetOutputCommitter为空实现,

  • 之后看数据写入的最终执行者dataWriter, 如果是没有分区,则是SingleDirectoryDataWriter:

class SingleDirectoryDataWriter(    description: WriteJobDescription,    taskAttemptContext: TaskAttemptContext,    committer: FileCommitProtocol)  extends FileFormatDataWriter(description, taskAttemptContext, committer) {  private var fileCounter: Int = _  private var recordsInFile: Long = _  // Initialize currentWriter and statsTrackers  newOutputWriter()  private def newOutputWriter(): Unit = {    recordsInFile = 0    releaseResources()    val ext = description.outputWriterFactory.getFileExtension(taskAttemptContext)    val currentPath = committer.newTaskTempFile(      taskAttemptContext,      None,      f"-c$fileCounterd" + ext)    currentWriter = description.outputWriterFactory.newInstance(      path = currentPath,      dataSchema = description.dataColumns.toStructType,      context = taskAttemptContext)    statsTrackers.foreach(_.newFile(currentPath))  }  override def write(record: InternalRow): Unit = {    if (description.maxRecordsPerFile > 0 && recordsInFile >= description.maxRecordsPerFile) {      fileCounter += 1      assert(fileCounter < MAX_FILE_COUNTER,        s"File counter $fileCounter is beyond max value $MAX_FILE_COUNTER")      newOutputWriter()    }    currentWriter.write(record)    statsTrackers.foreach(_.newRow(record))    recordsInFile += 1  }}

这里写文件是哪里呢?

    val currentPath = committer.newTaskTempFile(      taskAttemptContext,      None,      f"-c$fileCounterd" + ext)

对应到HadoopMapReduceCommitProtocol到newTaskTempFile方法为:

override def newTaskTempFile(      taskContext: TaskAttemptContext, dir: Option[String], ext: String): String = {    val filename = getFilename(taskContext, ext)    val stagingDir: Path = committer match {      case _ if dynamicPartitionOverwrite =>        assert(dir.isDefined,          "The dataset to be written must be partitioned when dynamicPartitionOverwrite is true.")        partitionPaths += dir.get        this.stagingDir      // For FileOutputCommitter it has its own staging path called "work path".      case f: FileOutputCommitter =>        new Path(Option(f.getWorkPath).map(_.toString).getOrElse(path))      case _ => new Path(path)    }    dir.map { d =>      new Path(new Path(stagingDir, d), filename).toString    }.getOrElse {      new Path(stagingDir, filename).toString    }  }

如果开启partitionOverwriteMode,则设置为new Path(path, ".spark-staging-" + jobId) 如果没有开启partitionOverwriteMode,且FileOutputCommitter的子类,如果workpath存在则设置为workPath,否则为path,注意我们之前FileOutputCommitter构造方法中已经设置了workPath,所以最终的输出目录为$path/_temporary

所以job向该目录写入数据。 DynamicPartitionDataWriter的分析,读者可以进行类似的分析,只不过目录则加了分区信息,只写入自己的分区目录中

  • 如果写入成功的话执行如下:

try {      Utils.tryWithSafeFinallyAndFailureCallbacks(block = {        // Execute the task to write rows out and commit the task.        while (iterator.hasNext) {          dataWriter.write(iterator.next())        }        dataWriter.commit()      })(catchBlock = {        // If there is an error, abort the task        dataWriter.abort()        logError(s"Job $jobId aborted.")      }, finallyBlock = {        dataWriter.close()      })

dataWriter.commit()如下:

override def commit(): WriteTaskResult = {    releaseResources()    val summary = ExecutedWriteSummary(      updatedPartitions = updatedPartitions.toSet,      stats = statsTrackers.map(_.getFinalStats()))    WriteTaskResult(committer.commitTask(taskAttemptContext), summary)  }

首先会释放资源,也就是关闭writer 之后调用FileCommitProtocol.commitTask();

 override def commitTask(taskContext: TaskAttemptContext): TaskCommitMessage = {    val attemptId = taskContext.getTaskAttemptID    logTrace(s"Commit task ${attemptId}")    SparkHadoopMapRedUtil.commitTask(      committer, taskContext, attemptId.getJobID.getId, attemptId.getTaskID.getId)    new TaskCommitMessage(addedAbsPathFiles.toMap -> partitionPaths.toSet)  }

而SparkHadoopMapRedUtil.commitTask最终调用FileOutputCommitter的commitTask方法把$PATH/_temporary下文件mv到$PATH下
之后返回统计的数值,数据格式如下:

case class BasicWriteTaskStats(    numPartitions: Int,    numFiles: Int,    numBytes: Long,    numRows: Long)  extends WriteTaskStats
  1. 之后会committer.onTaskCommit(res.commitMsg)操作,
    对于SQLHadoopMapReduceCommitProtocol的实现为: logDebug(s"onTaskCommit($taskCommit)")

  2. 下一步committer.commitJob(job, commitMsgs):

... committer.commitJob(jobContext) ...  for ((src, dst) <- filesToMove) {        fs.rename(new Path(src), new Path(dst))      } ... fs.delete(stagingDir, true)

这里主要涉及清理job,以及把task所产生的文件(writer输出的临时文件)移动到path目录下,且清理临时目录,至此文件真正的写入到了path目录下

  1. 指标记录

private[datasources] def processStats(      statsTrackers: Seq[WriteJobStatsTracker],      statsPerTask: Seq[Seq[WriteTaskStats]])  : Unit = {    val numStatsTrackers = statsTrackers.length    assert(statsPerTask.forall(_.length == numStatsTrackers),      s"""Every WriteTask should have produced one `WriteTaskStats` object for every tracker.         |There are $numStatsTrackers statsTrackers, but some task returned         |${statsPerTask.find(_.length != numStatsTrackers).get.length} results instead.       """.stripMargin)    val statsPerTracker = if (statsPerTask.nonEmpty) {      statsPerTask.transpose    } else {      statsTrackers.map(_ => Seq.empty)    }    statsTrackers.zip(statsPerTracker).foreach {      case (statsTracker, stats) => statsTracker.processStats(stats)    }  }

主要是把刚才job的指标通过statsTrackers传给driver,而目前的statsTracker实现类为BasicWriteJobStatsTracker,也就是说最终会通过listenerbus以事件的形式传播, 如下代码:

class BasicWriteJobStatsTracker(    serializableHadoopConf: SerializableConfiguration,    @transient val metrics: Map[String, SQLMetric])  extends WriteJobStatsTracker {   ...  override def processStats(stats: Seq[WriteTaskStats]): Unit = {    ...    metrics(BasicWriteJobStatsTracker.NUM_FILES_KEY).add(numFiles)    metrics(BasicWriteJobStatsTracker.NUM_OUTPUT_BYTES_KEY).add(totalNumBytes)    metrics(BasicWriteJobStatsTracker.NUM_OUTPUT_ROWS_KEY).add(totalNumOutput)    metrics(BasicWriteJobStatsTracker.NUM_PARTS_KEY).add(numPartitions)    val executionId = sparkContext.getLocalProperty(SQLExecution.EXECUTION_ID_KEY)    SQLMetrics.postDriverMetricUpdates(sparkContext, executionId, metrics.values.toList)  }}

至此整个spark parquet write文件的数据流程我们就已经全部过了一遍了,部分细节没有展示。 最终的数据流如下:

实例化Job对象      |      vFileCommitProtocol.setupJob -> OutputCommitter.setupJob  进行作业运行前的准备,如建立临时目录_temporary等      |      vexecuteTask()-> FileCommitProtocol.setupTask -> OutputCommitter.setupTask 目前为空实现                 |                v         FileCommitProtocol.newTaskTempFile/newTaskTempFileAbsPath 建立写任务的临时目录                |                v         dataWriter.write()                 |                v         dataWriter.commit() 释放资源以及返回写入文件的指标信息 -> HadoopMapReduceCommitProtocol.commitTask                          |                          v                 SparkHadoopMapRedUtil.commitTask 完成mv $PATH/_temporary文件 到$PATH目录,以及做outputCommitCoordination                           |                           v                 返回需要额外临时目录的信息        |      vFileCommitProtocol.onTaskCommit      |      vFileCommitProtocol.commitJob -> OutputCommitter.commitJob 清理$PATH/_temporary目录且把写额外临时目录下的文件mv到最终path目录下      |      vprocessStats,处理写入的文件指标

那对应到delta中,spark写入delta数据是怎么写入的呢?其实流程和以上的流程一模一样,唯一不同的是FileCommitProtocol类的实现,直接到TransactionalWrite.writeFiles:

def writeFiles(      data: Dataset[_],      writeOptions: Option[DeltaOptions],      isOptimize: Boolean): Seq[AddFile] = {    hasWritten = true    ...    val committer = getCommitter(outputPath)    ...      FileFormatWriter.write(        sparkSession = spark,        plan = physicalPlan,        fileFormat = snapshot.fileFormat, // TODO doesn't support changing formats.        committer = committer,        outputSpec = outputSpec,        hadoopConf = spark.sessionState.newHadoopConfWithOptions(metadata.configuration),        partitionColumns = partitioningColumns,        bucketSpec = None,        statsTrackers = statsTrackers,        options = Map.empty)    }    committer.addedStatuses  }

而这里的commiter为DelayedCommitProtocol,如下:

    new DelayedCommitProtocol("delta", outputPath.toString, None)

我们来看一下DelayedCommitProtocol方法:

override def newTaskTempFile(      taskContext: TaskAttemptContext, dir: Option[String], ext: String): String = {    val filename = getFileName(taskContext, ext)    val partitionValues = dir.map(parsePartitions).getOrElse(Map.empty[String, String])    val relativePath = randomPrefixLength.map { prefixLength =>      getRandomPrefix(prefixLength) // Generate a random prefix as a first choice    }.orElse {      dir // or else write into the partition directory if it is partitioned    }.map { subDir =>      new Path(subDir, filename)    }.getOrElse(new Path(filename)) // or directly write out to the output path    addedFiles.append((partitionValues, relativePath.toUri.toString))    new Path(path, relativePath).toString  }  override def commitTask(taskContext: TaskAttemptContext): TaskCommitMessage = {    if (addedFiles.nonEmpty) {      val fs = new Path(path, addedFiles.head._2).getFileSystem(taskContext.getConfiguration)      val statuses: Seq[AddFile] = addedFiles.map { f =>        val filePath = new Path(path, new Path(new URI(f._2)))        val stat = fs.getFileStatus(filePath)        AddFile(f._2, f._1, stat.getLen, stat.getModificationTime, true)      }      new TaskCommitMessage(statuses)    } else {      new TaskCommitMessage(Nil)    }  }  override def commitJob(jobContext: JobContext, taskCommits: Seq[TaskCommitMessage]): Unit = {    val fileStatuses = taskCommits.flatMap(_.obj.asInstanceOf[Seq[AddFile]]).toArray    addedStatuses ++= fileStatuses  }
  • 其中newTaskTempFile生成的文件中多了一个UUID.randomUUID.toString,这能减少文件的冲突

  • newTaskTempFile目前直接是返回了输出目录,而不是_temporary目录

  • commitTask只是记录增加的文件

  • commitJob并没有真正的提交job,只是把AddFile保存到了内存中

后续我们会分析delta怎么处理AddFile,从而做到事务性

注意task输出的文件目录为:${output.dir.root}/_temporary/${appAttempt}/_temporary/${taskAttempt}/${fileName}如:/data/_temporary/0/_temporary/attempt_20190219101232_0003_m_000005_0/part-00000-936a4f3b-8f71-48ce-960a-e60d3cf4488f.c000.snappy.parquet

以上是"spark delta写操作ACID事务中基础类FileFormat/FileCommitProtocol的示例分析"这篇文章的所有内容,感谢各位的阅读!相信大家都有了一定的了解,希望分享的内容对大家有所帮助,如果还想学习更多知识,欢迎关注行业资讯频道!

0