千家信息网

Spark Streaming流计算框架如何运行

发表于:2025-01-23 作者:千家信息网编辑
千家信息网最后更新 2025年01月23日,这篇文章主要讲解了"Spark Streaming流计算框架如何运行",文中的讲解内容简单清晰,易于学习与理解,下面请大家跟着小编的思路慢慢深入,一起来研究和学习"Spark Streaming流计算
千家信息网最后更新 2025年01月23日Spark Streaming流计算框架如何运行

这篇文章主要讲解了"Spark Streaming流计算框架如何运行",文中的讲解内容简单清晰,易于学习与理解,下面请大家跟着小编的思路慢慢深入,一起来研究和学习"Spark Streaming流计算框架如何运行"吧!

先贴案例

import org.apache.spark.SparkConfimport org.apache.spark.streaming.{Durations, StreamingContext}object StreamingWordCountSelfScala {  def main(args: Array[String]) {    val sparkConf = new SparkConf().setMaster("spark://master:7077").setAppName("StreamingWordCountSelfScala")    val ssc = new StreamingContext(sparkConf, Durations.seconds(5)) // 每5秒收割一次数据    val lines = ssc.socketTextStream("localhost", 9999) // 监听 本地9999 socket 端口    val words = lines.flatMap(_.split(" ")).map((_, 1)).reduceByKey(_ + _) // flat map 后 reduce    words.print() // 打印结果    ssc.start() // 启动    ssc.awaitTermination()    ssc.stop(true)  }}

再来回溯下触发过程。

定时器定时触发执行某个方法。这里是 longTime => eventLoop.post(GenerateJobs(new Time(longTime))),将一个 GenerateJobs 类型的事件消息发送到 eventLoop的 队列中。

// JobGenerator.scala line 58  privateval timer = new RecurringTimer(clock, ssc.graph.batchDuration.milliseconds,    longTime => eventLoop.post(GenerateJobs(new Time(longTime))), "JobGenerator")

另一方便,eventLoop一直循环取出队列中的事件消息,当取出 GenerateJobs类型的事件消息时。会调用onReceive(event) 。

// EventLoop.scala line 48    onReceive(event)

此时的onReceive(event)在 JobGenerator实例化 eventLoop时已经override了。

// JobGenerator.scala line 87      override protected def onReceive(event: JobGeneratorEvent): Unit = processEvent(event)

调用generatorJobs(time)

// JobGenerator.scala line 181      case GenerateJobs(time) => generateJobs(time)

graph.generateJobs

// JobGenerator.scala line 248graph.generateJobs(time)

通过outputStream.generateJob 还原出RDD的整个依赖,并创建出Job。这个outputStream就是ForEachDStream。

// DStreamGraph.scala line 115        val jobOption = outputStream.generateJob(time)
在本案例中,按照 SocketInputDStream << FlatMappedDStream << MappedDStream << ShuffledDStream << ForEachDStream 的依赖关系调用parent.getOrCompute,此getOrCompute只在DStream中有定义,所有子类都没重写过此方法。在此方法中,会调用当前DStream的compute方法,而compute中又调用了parent.getOrCompute,同时将当前的DStream的func加入到串联的RDD之后。

一直循环,直到inputStream,本例中为SocketInputDStream的compute被执行,实际上执行的是ReceiverInputDStream.compute,创建出BlockRDD。

至此整个RDD被还原出来。作为参数传入Job的构造中。

至此Job创建成功,但是此Job为Spark Core中的Job,而且也并没有被提交到spark集群中。

获取给定时间对应的输入数据的信息,此时得到的都是元数据,即输入数据的元数据。

再创建成JobSet,并提交JobSet

// JobGenerator.scala line 251        val streamIdToInputInfos = jobScheduler.inputInfoTracker.getInfo(time)        jobScheduler.submitJobSet(JobSet(time, jobs, streamIdToInputInfos))

jobScheduler.submitJobSet

交由jobExecutor线程池来处理,这里显然可以推测出,JobHandler一定是一个Runnable或者Callable接口的实现。

另外jobExecutor默认的线程数量是1,从并发性考虑,建议与outputStreams的数量保持一致:DStreamGraph.outputStreams.size

// JobScheduler.scala line 122  def submitJobSet(jobSet: JobSet) {    if (jobSet.jobs.isEmpty) {      logInfo("No jobs added for time " + jobSet.time)    } else {      listenerBus.post(StreamingListenerBatchSubmitted(jobSet.toBatchInfo))      jobSets.put(jobSet.time, jobSet)      jobSet.jobs.foreach(job => jobExecutor.execute(new JobHandler(job)))      logInfo("Added jobs for time " + jobSet.time)    }  }
JobHandler中封装的run方法
  1. 发送JobStarted事件消息,用于监控

  2. job.run,真正的Job提交,注意,这里的Job提交是指提交Streaming的Job到Spark 集群,类似普通Spark程序将RDD提交给Spark集群运行

// JobScheduler.scala line 202    def run() {      try {        val formattedTime = UIUtils.formatBatchTime(          job.time.milliseconds, ssc.graph.batchDuration.milliseconds, showYYYYMMSS = false)        val batchUrl = s"/streaming/batch/?id=${job.time.milliseconds}"        val batchLinkText = s"[output operation ${job.outputOpId}, batch time ${formattedTime}]"        ssc.sc.setJobDescription(          s"""Streaming job from $batchLinkText""")        ssc.sc.setLocalProperty(BATCH_TIME_PROPERTY_KEY, job.time.milliseconds.toString)        ssc.sc.setLocalProperty(OUTPUT_OP_ID_PROPERTY_KEY, job.outputOpId.toString)        // We need to assign `eventLoop` to a temp variable. Otherwise, because        // `JobScheduler.stop(false)` may set `eventLoop` to null when this method is running, then        // it's possible that when `post` is called, `eventLoop` happens to null.        var _eventLoop = eventLoop        if (_eventLoop != null) {          _eventLoop.post(JobStarted(job, clock.getTimeMillis()))          // Disable checks for existing output directories in jobs launched by the streaming          // scheduler, since we may need to write output to an existing directory during checkpoint          // recovery; see SPARK-4835 for more details.          PairRDDFunctions.disableOutputSpecValidation.withValue(true) {            job.run()          }          _eventLoop = eventLoop          if (_eventLoop != null) {            _eventLoop.post(JobCompleted(job, clock.getTimeMillis()))          }        } else {          // JobScheduler has been stopped.        }      } finally {        ssc.sc.setLocalProperty(JobScheduler.BATCH_TIME_PROPERTY_KEY, null)        ssc.sc.setLocalProperty(JobScheduler.OUTPUT_OP_ID_PROPERTY_KEY, null)      }    }

job.run

// Job.scala line 38  def run() {    _result = Try(func())  }

执行func(),而此时的func就是在ForEachDStream中封装Job的第二个参数。

在本例中,即为

() => foreachFunc(new BlockRDD[T](ssc.sc, validBlockIds).map(_.flatMap(t=>t.split(" "))).map(_.map[U](t=>(t,1))).combineByKey[C](t=>t, (t1,t2)=>t1+t2, (t1,t2)=>t1+t2,partitioner, true),time)

至于如何推导出此RDD,可参考前文。

读者们,至此,是否有很熟悉的感觉,很明显,上面的代码就是一个函数,函数没有参数,方法体中,执行的代码中,从new BlockRDD开始,就是我们普通的Spark的程序:新建RDD,然后一连串transform,最后将结果交给foreachFunc 处理。

由此,SparkStreaming最终是转变为普通的Spark Application来提交给Spark 集群来执行。是否也可以理解Spark Streaming其实就是Spark 的一个应用程序。而已。

感谢各位的阅读,以上就是"Spark Streaming流计算框架如何运行"的内容了,经过本文的学习后,相信大家对Spark Streaming流计算框架如何运行这一问题有了更深刻的体会,具体使用情况还需要大家实践验证。这里是,小编将为大家推送更多相关知识点的文章,欢迎关注!

0