千家信息网

Spark sql流式处理的示例分析

发表于:2025-02-05 作者:千家信息网编辑
千家信息网最后更新 2025年02月05日,小编给大家分享一下Spark sql流式处理的示例分析,相信大部分人都还不怎么了解,因此分享这篇文章给大家参考一下,希望大家阅读完这篇文章后大有收获,下面让我们一起去了解一下吧!Spark sql支持
千家信息网最后更新 2025年02月05日Spark sql流式处理的示例分析

小编给大家分享一下Spark sql流式处理的示例分析,相信大部分人都还不怎么了解,因此分享这篇文章给大家参考一下,希望大家阅读完这篇文章后大有收获,下面让我们一起去了解一下吧!

Spark sql支持流式处理,流式处理有Source,Sink。Source定义了流的源头,Sink定义了流的目的地,流的执行是从Sink开始触发的。

Dataset的writeStream定义了流的目的地并触发流的真正执行,所以分析就从writeStream开始。

writeStream = new DataStreamWriter[T](this)

DataStreamWriter

DataStreamWriter的作用是将入参的dataset写入到外部存储,比如kafka,database,txt等。

主要触发方法是start方法,返回一个StreamingQuery对象,代码:

 def start(): StreamingQuery = {        if (source == "memory") {      assertNotPartitioned("memory")      val (sink, resultDf) = trigger match {        case _: ContinuousTrigger =>          val s = new MemorySinkV2()          val r = Dataset.ofRows(df.sparkSession, new MemoryPlanV2(s, df.schema.toAttributes))          (s, r)        case _ =>          val s = new MemorySink(df.schema, outputMode)          val r = Dataset.ofRows(df.sparkSession, new MemoryPlan(s))          (s, r)      }      val chkpointLoc = extraOptions.get("checkpointLocation")      val recoverFromChkpoint = outputMode == OutputMode.Complete()      val query = df.sparkSession.sessionState.streamingQueryManager.startQuery(        extraOptions.get("queryName"),        chkpointLoc,        df,        extraOptions.toMap,        sink,        outputMode,        useTempCheckpointLocation = true,        recoverFromCheckpointLocation = recoverFromChkpoint,        trigger = trigger)      resultDf.createOrReplaceTempView(query.name)      query    } else if (source == "foreach") {      assertNotPartitioned("foreach")      val sink = new ForeachSink[T](foreachWriter)(ds.exprEnc)      df.sparkSession.sessionState.streamingQueryManager.startQuery(        extraOptions.get("queryName"),        extraOptions.get("checkpointLocation"),        df,        extraOptions.toMap,        sink,        outputMode,        useTempCheckpointLocation = true,        trigger = trigger)    } else {      val ds = DataSource.lookupDataSource(source, df.sparkSession.sessionState.conf)      val disabledSources = df.sparkSession.sqlContext.conf.disabledV2StreamingWriters.split(",")      val sink = ds.newInstance() match {        case w: StreamWriteSupport if !disabledSources.contains(w.getClass.getCanonicalName) => w        case _ =>          val ds = DataSource(            df.sparkSession,            className = source,            options = extraOptions.toMap,            partitionColumns = normalizedParCols.getOrElse(Nil))          ds.createSink(outputMode)      }      df.sparkSession.sessionState.streamingQueryManager.startQuery(        extraOptions.get("queryName"),        extraOptions.get("checkpointLocation"),        df,        extraOptions.toMap,        sink,        outputMode,        useTempCheckpointLocation = source == "console",        recoverFromCheckpointLocation = true,        trigger = trigger)    }  }

我们这里看最后一个条件分支的代码,ds是对应的DataSource,sink有时候就是ds。最后通过streamingQueryManager的startQuery启动流的计算,返回计算中的StreamingQuery对象。

streamingQueryManager的startQuery方法里主要调用createQuery方法创建StreamingQueryWrapper对象,这是个私有方法:

private def createQuery(      userSpecifiedName: Option[String],      userSpecifiedCheckpointLocation: Option[String],      df: DataFrame,      extraOptions: Map[String, String],      sink: BaseStreamingSink,      outputMode: OutputMode,      useTempCheckpointLocation: Boolean,      recoverFromCheckpointLocation: Boolean,      trigger: Trigger,      triggerClock: Clock): StreamingQueryWrapper = {    var deleteCheckpointOnStop = falseval checkpointLocation = userSpecifiedCheckpointLocation.map { userSpecified =>      new Path(userSpecified).toUri.toString    }.orElse {      df.sparkSession.sessionState.conf.checkpointLocation.map { location =>        new Path(location, userSpecifiedName.getOrElse(UUID.randomUUID().toString)).toUri.toString      }    }.getOrElse {      if (useTempCheckpointLocation) {        // Delete the temp checkpoint when a query is being stopped without errors.        deleteCheckpointOnStop = true        Utils.createTempDir(namePrefix = s"temporary").getCanonicalPath      } else {        throw new AnalysisException(          "checkpointLocation must be specified either " +            """through option("checkpointLocation", ...) or """ +            s"""SparkSession.conf.set("${SQLConf.CHECKPOINT_LOCATION.key}", ...)""")      }    }    // If offsets have already been created, we trying to resume a query.    if (!recoverFromCheckpointLocation) {      val checkpointPath = new Path(checkpointLocation, "offsets")      val fs = checkpointPath.getFileSystem(df.sparkSession.sessionState.newHadoopConf())      if (fs.exists(checkpointPath)) {        throw new AnalysisException(          s"This query does not support recovering from checkpoint location. " +            s"Delete $checkpointPath to start over.")      }    }    val analyzedPlan = df.queryExecution.analyzed    df.queryExecution.assertAnalyzed()    if (sparkSession.sessionState.conf.isUnsupportedOperationCheckEnabled) {      UnsupportedOperationChecker.checkForStreaming(analyzedPlan, outputMode)    }    if (sparkSession.sessionState.conf.adaptiveExecutionEnabled) {      logWarning(s"${SQLConf.ADAPTIVE_EXECUTION_ENABLED.key} " +          "is not supported in streaming DataFrames/Datasets and will be disabled.")    }    (sink, trigger) match {      case (v2Sink: StreamWriteSupport, trigger: ContinuousTrigger) =>        UnsupportedOperationChecker.checkForContinuous(analyzedPlan, outputMode)        new StreamingQueryWrapper(new ContinuousExecution(          sparkSession,          userSpecifiedName.orNull,          checkpointLocation,          analyzedPlan,          v2Sink,          trigger,          triggerClock,          outputMode,          extraOptions,          deleteCheckpointOnStop))      case _ =>        new StreamingQueryWrapper(new MicroBatchExecution(          sparkSession,          userSpecifiedName.orNull,          checkpointLocation,          analyzedPlan,          sink,          trigger,          triggerClock,          outputMode,          extraOptions,          deleteCheckpointOnStop))    }  }

它根据是否连续流操作还是微批处理操作分成ContinuousExecution和MicroBatchExecution,他们都是StreamExecution的子类,StreamExecution是流处理的抽象类。稍后会分析StreamExecution的类结构。

ContinuousExecution和MicroBatchExecution两者的代码结构和功能其实是很类似的,我们先拿ContinuousExecution举例吧。

ContinuousExecution

首先ContinuousExecution是没有结束的,是没有结束的流,当暂时流没有数据时,ContinuousExecution会阻塞线程等待新数据的到来,这是通过awaitEpoch方法来控制的。

其实,commit方法在每条数据处理完后被触发,commit方法将当前处理完成的偏移量(offset)写到commitLog中。

再看logicalPlan,在ContinuousExecution中入参的逻辑计划是StreamingRelationV2类型,会被转换成ContinuousExecutionRelation类型的LogicalPlan:

analyzedPlan.transform {
case r @ StreamingRelationV2(
source: ContinuousReadSupport, _, extraReaderOptions, output, _) =>
toExecutionRelationMap.getOrElseUpdate(r, {
ContinuousExecutionRelation(source, extraReaderOptions, output)(sparkSession)
})

}

还有addOffset方法,在每次读取完offset之后会将当前的读取offset写入到offsetLog中,以便下次恢复时知道从哪里开始。addOffset和commit两个方法一起保证了Exactly-once语义的执行。

以上是"Spark sql流式处理的示例分析"这篇文章的所有内容,感谢各位的阅读!相信大家都有了一定的了解,希望分享的内容对大家有所帮助,如果还想学习更多知识,欢迎关注行业资讯频道!

0