这篇文章主要介绍"ReceiverSupervisorImpl实例化怎么实现",在日常操作中,相信很多人在ReceiverSupervisorImpl实例化怎么实现问题上存在疑惑,小编查阅了各式资料,
ReceiverSupervisorImpl实例化怎么实现


先回顾下 在 Executor执行的具体的方法

  1. 实例化ReceiverSupervisorImpl

  2. start之后等待awaitTermination

// ReceiverTracker.scala line 564val startReceiverFunc: Iterator[Receiver[_]] => Unit =  (iterator: Iterator[Receiver[_]]) => {    if (!iterator.hasNext) {      throw new SparkException(        "Could not start receiver as object not found.")    }    if (TaskContext.get().attemptNumber() == 0) {      val receiver = iterator.next()      assert(iterator.hasNext == false)      val supervisor = new ReceiverSupervisorImpl(        receiver, SparkEnv.get, serializableHadoopConf.value, checkpointDirOption)      supervisor.start()      supervisor.awaitTermination()    } else {      // It's restarted by TaskScheduler, but we want to reschedule it again. So exit it.    }  }

看下ReceiverSupervisorImpl的父类 ReceiverSupervisor的构造。

成员变量赋值、将当前supervisor与receiver关联( receiver.attachSupervisor(this) )

注释也很清晰:在Worker上负责监督Receiver。提供所需所有 处理从receiver接收到的数据 的接口

// ReceiverSupervisor.scala line 31/** * Abstract class that is responsible for supervising a Receiver in the worker. * It provides all the necessary interfaces for handling the data received by the receiver. */private[streaming] abstract class ReceiverSupervisor(    receiver: Receiver[_],    conf: SparkConf  ) extends Logging {  /** Enumeration to identify current state of the Receiver */  object ReceiverState extends Enumeration {    type CheckpointState = Valueval Initialized, Started, Stopped = Value  }  import ReceiverState._  // Attach the supervisor to the receiver  receiver.attachSupervisor(this)               // 将receiver与supervisor关联  privateval futureExecutionContext = ExecutionContext.fromExecutorService(    ThreadUtils.newDaemonCachedThreadPool("receiver-supervisor-future", 128))  /** Receiver id */  protected val streamId = receiver.streamId  /** Has the receiver been marked for stop. */  privateval stopLatch = new CountDownLatch(1)  /** Time between a receiver is stopped and started again */  privateval defaultRestartDelay = conf.getInt("spark.streaming.receiverRestartDelay", 2000)  /** The current maximum rate limit for this receiver. */  private[streaming] def getCurrentRateLimit: Long = Long.MaxValue  /** Exception associated with the stopping of the receiver */  @volatile protected var stoppingError: Throwable = null  /** State of the receiver */  @volatile private[streaming] var receiverState = Initialized  // 一些方法,其实就是 数据处理接口}


  1. 实例化了 BlockManagerBasedBlockHandler,用于将数据发送到BlockManager

  2. 实例化RpcEndpoint

  3. 实例化 BlockGenerator

  4. 实例化 BlockGeneratorListener 监听器

// ReceiverSupervisorImpl.scala line 43/** * Concrete implementation of [[org.apache.spark.streaming.receiver.ReceiverSupervisor]] * which provides all the necessary functionality for handling the data received by * the receiver. Specifically, it creates a [[org.apache.spark.streaming.receiver.BlockGenerator]] * object that is used to divide the received data stream into blocks of data. */private[streaming] class ReceiverSupervisorImpl(    receiver: Receiver[_],    env: SparkEnv,    hadoopConf: Configuration,    checkpointDirOption: Option[String]  ) extends ReceiverSupervisor(receiver, env.conf) with Logging {  privateval host = SparkEnv.get.blockManager.blockManagerId.host  privateval executorId = SparkEnv.get.blockManager.blockManagerId.executorId  privateval receivedBlockHandler: ReceivedBlockHandler = {    if (WriteAheadLogUtils.enableReceiverLog(env.conf)) {  // 默认是不开启      if (checkpointDirOption.isEmpty) {        throw new SparkException(          "Cannot enable receiver write-ahead log without checkpoint directory set. " +            "Please use streamingContext.checkpoint() to set the checkpoint directory. " +            "See documentation for more details.")      }      new WriteAheadLogBasedBlockHandler(env.blockManager, receiver.streamId,        receiver.storageLevel, env.conf, hadoopConf, checkpointDirOption.get)    } else {      new BlockManagerBasedBlockHandler(env.blockManager, receiver.storageLevel)      }  }  /** Remote RpcEndpointRef for the ReceiverTracker */  privateval trackerEndpoint = RpcUtils.makeDriverRef("ReceiverTracker", env.conf, env.rpcEnv)  /** RpcEndpointRef for receiving messages from the ReceiverTracker in the driver */  privateval endpoint = env.rpcEnv.setupEndpoint(    "Receiver-" + streamId + "-" + System.currentTimeMillis(), new ThreadSafeRpcEndpoint {      overrideval rpcEnv: RpcEnv = env.rpcEnv      override def receive: PartialFunction[Any, Unit] = {        case StopReceiver =>          logInfo("Received stop signal")          ReceiverSupervisorImpl.this.stop("Stopped by driver", None)        case CleanupOldBlocks(threshTime) =>          logDebug("Received delete old batch signal")          cleanupOldBlocks(threshTime)        case UpdateRateLimit(eps) =>          logInfo(s"Received a new rate limit: $eps.")          registeredBlockGenerators.foreach { bg =>            bg.updateRate(eps)          }      }    })  /** Unique block ids if one wants to add blocks directly */  privateval newBlockId = new AtomicLong(System.currentTimeMillis())  privateval registeredBlockGenerators = new mutable.ArrayBuffer[BlockGenerator] // 典型的面包模式    with mutable.SynchronizedBuffer[BlockGenerator]  /** Divides received data records into data blocks for pushing in BlockManager. */  privateval defaultBlockGeneratorListener = new BlockGeneratorListener {    def onAddData(data: Any, metadata: Any): Unit = { }    def onGenerateBlock(blockId: StreamBlockId): Unit = { }    def onError(message: String, throwable: Throwable) {      reportError(message, throwable)    }    def onPushBlock(blockId: StreamBlockId, arrayBuffer: ArrayBuffer[_]) {      pushArrayBuffer(arrayBuffer, None, Some(blockId))    }  }  privateval defaultBlockGenerator = createBlockGenerator(defaultBlockGeneratorListener)  // ... 一些方法  /** Store an ArrayBuffer of received data as a data block into Spark's memory. */def pushArrayBuffer(    arrayBuffer: ArrayBuffer[_],    metadataOption: Option[Any],    blockIdOption: Option[StreamBlockId]  ) {  pushAndReportBlock(ArrayBufferBlock(arrayBuffer), metadataOption, blockIdOption)}/** Store block and report it to driver */def pushAndReportBlock(    receivedBlock: ReceivedBlock,    metadataOption: Option[Any],    blockIdOption: Option[StreamBlockId]  ) {  val blockId = blockIdOption.getOrElse(nextBlockId)  val time = System.currentTimeMillis  val blockStoreResult = receivedBlockHandler.storeBlock(blockId, receivedBlock)  logDebug(s"Pushed block $blockId in ${(System.currentTimeMillis - time)} ms")  val numRecords = blockStoreResult.numRecords  val blockInfo = ReceivedBlockInfo(streamId, numRecords, metadataOption, blockStoreResult)  trackerEndpoint.askWithRetry[Boolean](AddBlock(blockInfo))  logDebug(s"Reported block $blockId")}}



  1. 周期性的 将上一批数据 作为一个block,并新建下一个批次的数据;RecurringTimer类,内部有Thread

  2. 将数据push到BlockManager

///** * Generates batches of objects received by a * [[org.apache.spark.streaming.receiver.Receiver]] and puts them into appropriately * named blocks at regular intervals. This class starts two threads, * one to periodically start a new batch and prepare the previous batch of as a block, * the other to push the blocks into the block manager. * * Note: Do not create BlockGenerator instances directly inside receivers. Use * `ReceiverSupervisor.createBlockGenerator` to create a BlockGenerator and use it. */private[streaming] class BlockGenerator(    listener: BlockGeneratorListener,    receiverId: Int,    conf: SparkConf,    clock: Clock = new SystemClock()  ) extends RateLimiter(conf) with Logging{private case class Block(id: StreamBlockId, buffer: ArrayBuffer[Any])/** * The BlockGenerator can be in 5 possible states, in the order as follows. * *  - Initialized: Nothing has been started *  - Active: start() has been called, and it is generating blocks on added data. *  - StoppedAddingData: stop() has been called, the adding of data has been stopped, *                       but blocks are still being generated and pushed. *  - StoppedGeneratingBlocks: Generating of blocks has been stopped, but *                             they are still being pushed. *  - StoppedAll: Everything has stopped, and the BlockGenerator object can be GCed. */private object GeneratorState extends Enumeration {  type GeneratorState = Valueval Initialized, Active, StoppedAddingData, StoppedGeneratingBlocks, StoppedAll = Value}import GeneratorState._privateval blockIntervalMs = conf.getTimeAsMs("spark.streaming.blockInterval", "200ms")require(blockIntervalMs > 0, s"'spark.streaming.blockInterval' should be a positive value")privateval blockIntervalTimer =  new RecurringTimer(clock, blockIntervalMs, updateCurrentBuffer, "BlockGenerator")  // 周期性线程privateval blockQueueSize = conf.getInt("spark.streaming.blockQueueSize", 10)privateval blocksForPushing = new ArrayBlockingQueue[Block](blockQueueSize)privateval blockPushingThread = new Thread() { override def run() { keepPushingBlocks() } } // 负责将数据push的@volatile private var currentBuffer = new ArrayBuffer[Any]@volatile private var state = Initialized//...}

