千家信息网

DataStreamReader和DataStreamWriter怎么使用

发表于:2025-01-23 作者:千家信息网编辑
千家信息网最后更新 2025年01月23日,这篇文章主要介绍"DataStreamReader和DataStreamWriter怎么使用",在日常操作中,相信很多人在DataStreamReader和DataStreamWriter怎么使用问题
千家信息网最后更新 2025年01月23日DataStreamReader和DataStreamWriter怎么使用

这篇文章主要介绍"DataStreamReader和DataStreamWriter怎么使用",在日常操作中,相信很多人在DataStreamReader和DataStreamWriter怎么使用问题上存在疑惑,小编查阅了各式资料,整理出简单好用的操作方法,希望对大家解答"DataStreamReader和DataStreamWriter怎么使用"的疑惑有所帮助!接下来,请跟着小编一起来学习吧!

流的读取是从DataStreamReader和DataStreamWriter开始的。

DataStreamReader

DataStreamReader是生成流读取者的入口所在,关键方法是load。这段代码很关键,所以把全部代码先贴出来,慢慢分析。

def load(): DataFrame = {        val ds = DataSource.lookupDataSource(source, sparkSession.sqlContext.conf).      getConstructor().newInstance()       val v1DataSource = DataSource(      sparkSession,      userSpecifiedSchema = userSpecifiedSchema,      className = source,      options = extraOptions.toMap)    val v1Relation = ds match {      case _: StreamSourceProvider => Some(StreamingRelation(v1DataSource))      case _ => None    }    ds match {      case provider: TableProvider =>        val sessionOptions = DataSourceV2Utils.extractSessionConfigs(          source = provider, conf = sparkSession.sessionState.conf)        val options = sessionOptions ++ extraOptions        val dsOptions = new CaseInsensitiveStringMap(options.asJava)        val table = userSpecifiedSchema match {          case Some(schema) => provider.getTable(dsOptions, schema)          case _ => provider.getTable(dsOptions)        }        import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Implicits._        table match {          case _: SupportsRead if table.supportsAny(MICRO_BATCH_READ, CONTINUOUS_READ) =>            Dataset.ofRows(              sparkSession,              StreamingRelationV2(                provider, source, table, dsOptions, table.schema.toAttributes, v1Relation)(                sparkSession))          // fallback to v1          // TODO (SPARK-27483): we should move this fallback logic to an analyzer rule.          case _ => Dataset.ofRows(sparkSession, StreamingRelation(v1DataSource))        }      case _ =>        // Code path for data source v1.        Dataset.ofRows(sparkSession, StreamingRelation(v1DataSource))    }  }

有好多分支,重要的是区分开V1和V2。

V1用的逻辑关系是StreamingRelation;而V2用的逻辑关系是StreamingRelationV2。这里先看看他们对应的物理计划是什么?

在SparkStrategies.scala文件中,定义了物理计划:

/**   * This strategy is just for explaining `Dataset/DataFrame` created by `spark.readStream`.   * It won't affect the execution, because `StreamingRelation` will be replaced with   * `StreamingExecutionRelation` in `StreamingQueryManager` and `StreamingExecutionRelation` will   * be replaced with the real relation using the `Source` in `StreamExecution`.   */object StreamingRelationStrategy extends Strategy {    def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {      case s: StreamingRelation =>        StreamingRelationExec(s.sourceName, s.output) :: Nil      case s: StreamingExecutionRelation =>        StreamingRelationExec(s.toString, s.output) :: Nil      case s: StreamingRelationV2 =>        StreamingRelationExec(s.sourceName, s.output) :: Nil      case _ => Nil    }  }

物理计划都是StreamingRelationExec,StreamingRelationExec的代码其实啥都没实现,所以最后其实看代码注释StreamingRelationExec也不是真正的物理计划。

这里先记得相关的类ContinuousExecution和MicroBatchExecution。一时找不到怎么执行到具体的物理计划ContinuousExecution和MicroBatchExecution的,我们就试试反推把。先看看ContinuousExecution的代码。

StreamExecution

StreamExecution是抽象类。其抽象方法runActivatedStream是执行具体的连续流读取任务的,子类会重写该函数。

runStream方法封装了runActivatedStream方法,额外加了些事件通知等处理机制,知道这一点就行了。

StreamingQueryManager

这里先尝试看看StreamingQueryManager是干什么用的,看注释应该是管理所有的StreamingQuery的。

 private def createQuery(...): StreamingQueryWrapper ={   (sink, trigger) match {      case (table: SupportsWrite, trigger: ContinuousTrigger) =>               new StreamingQueryWrapper(new ContinuousExecution(          sparkSession,          userSpecifiedName.orNull,          checkpointLocation,          analyzedPlan,          table,          trigger,          triggerClock,          outputMode,          extraOptions,          deleteCheckpointOnStop))      case _ =>        if (operationCheckEnabled) {          UnsupportedOperationChecker.checkForStreaming(analyzedPlan, outputMode)        }        new StreamingQueryWrapper(new MicroBatchExecution(          sparkSession,          userSpecifiedName.orNull,          checkpointLocation,          analyzedPlan,          sink,          trigger,          triggerClock,          outputMode,          extraOptions,          deleteCheckpointOnStop))    }}

对于连续流,返回一个:

new StreamingQueryWrapper(new ContinuousExecution))

StreamingQueryWrapper的作用,就是将StreamingQuery封装成可序列化的,别的和StreamingQuery没什么区别。这里对于连续流就是包装了ContinuousExecution。

ContinuousExecution

ContinuousExecution看名称应该是对应连续流的物理执行计划的,继承自StreamExecution(抽象类)。看看主要代码其实就是重写了runActivatedStream方法。

 override protected def runActivatedStream(sparkSessionForStream: SparkSession): Unit = {    val stateUpdate = new UnaryOperator[State] {      override def apply(s: State) = s match {        // If we ended the query to reconfigure, reset the state to active.        case RECONFIGURING => ACTIVE        case _ => s      }    }    do {      runContinuous(sparkSessionForStream)    } while (state.updateAndGet(stateUpdate) == ACTIVE)    stopSources()  }

真正的执行逻辑代码在私有方法runContinuous中,这里就不详细展开了,知道了主要流程就可以了。

下面就是要看看ContinuousExecution到底是在哪里被从逻辑计划转换到物理计划的。

搜索全文,找到了StreamingQueryManager.scala这个文件。对了,就是从上面的StreamingQueryManager找到这个ContinuousExecution。

DataStreamWriter

DataStreamWriter是真正触发流计算开始启动执行的地方。

start()方法得到要给StreamingQuery,方法里的关键代码片段:

 df.sparkSession.sessionState.streamingQueryManager.startQuery(        extraOptions.get("queryName"),        extraOptions.get("checkpointLocation"),        df,        extraOptions.toMap,        sink,        outputMode,        useTempCheckpointLocation = source == "console" || source == "noop",        recoverFromCheckpointLocation = true,        trigger = trigger)

跟踪进去到了StreamingQueryManager,看它的startQuery方法。

startQuery方法分为几步:

  1. 调用createQuery方法返回StreamingQuery。

val query = createQuery(      userSpecifiedName,      userSpecifiedCheckpointLocation,      df,      extraOptions,      sink,      outputMode,      useTempCheckpointLocation,      recoverFromCheckpointLocation,      trigger,      triggerClock)

query就是StreamingQueryWrapper,就是类似这样的代码:

new StreamingQueryWrapper(new ContinuousExecution))

2、启动上一步的query

try {           query.streamingQuery.start()    } catch {         }

这里的代码直接调用到StreamingQuery的父类StreamExecution的start方法。代码定义:

def start(): Unit = {    logInfo(s"Starting $prettyIdString. Use $resolvedCheckpointRoot to store the query checkpoint.")    queryExecutionThread.setDaemon(true)    queryExecutionThread.start()    startLatch.await()  // Wait until thread started and QueryStart event has been posted  }

queryExecutionThread线程的定义又是这样的:

val queryExecutionThread: QueryExecutionThread =    new QueryExecutionThread(s"stream execution thread for $prettyIdString") {      override def run(): Unit = {        sparkSession.sparkContext.setCallSite(callSite)        runStream()      }    }

最后在线程中启动runStream这个私有方法。

3、返回query

最后返回query,注意这里的query在上面的代码中已经start运行了。

到此,关于"DataStreamReader和DataStreamWriter怎么使用"的学习就结束了,希望能够解决大家的疑惑。理论与实践的搭配能更好的帮助大家学习,快去试试吧!若想继续学习更多相关知识,请继续关注网站,小编会继续努力为大家带来更多实用的文章!

0