千家信息网

第43课:Spark 1.6 RPC内幕解密:运行机制、源码详解、Netty与Akka等

发表于:2024-11-15 作者:千家信息网编辑
千家信息网最后更新 2024年11月15日,Spark 是分布式计算框架,多台机器之间必然存在着通信。Spark在早期版本采用Akka实现。现在在Akka的上层抽象出了一个RpcEnv。RpcEnv负责管理机器之间的通信。RpcEnv包含了如下
千家信息网最后更新 2024年11月15日第43课:Spark 1.6 RPC内幕解密:运行机制、源码详解、Netty与Akka等

Spark 是分布式计算框架,多台机器之间必然存在着通信。Spark在早期版本采用Akka实现。现在在Akka的上层抽象出了一个RpcEnv。RpcEnv负责管理机器之间的通信。

RpcEnv包含了如下三大核心:

  • RpcEndpoint 消息循环体,负责接收并处理消息。Spark中的Master、Worker都是RpcEndpoint 。

  • RpcEndpointRef :RpcEndpoint的引用,如果需要和RpcEndpoint通信,就必须获取它的RpcEndpointRef,通过RpcEndpointRef发送消息。

  • Dispatcher:消息调度器,负责RPC消息路由到适当的RpcEndpoint。


RpcEnv被创建以后,RpcEndpoint可以注册到RpcEnv中,被注册的RpcEndpoint会生成一个相应的RpcEndpointRef来引用它。如果你需要向RpcEndpoint发送消息,必须到RpcEnv中通过RpcEndpoint的名称来获取对应的RpcEndpointRef,然后通过RpcEndpointRef向RpcEndpoint发送消息。


RpcEnv负责管理RpcEndpoint的整个生命周期

  • 注册RpcEndpoint,使用name或者uri

  • 路由发送给RpcEndpoint的消息。

  • 停止RpcEndpoint


注:一个RpcEndpoint只能注册给一个RpcEnv


RpcAddress:RpcEnv的逻辑地址,使用主机名和端口表示。

RpcEndpointAddress:注册到RpcEnv上的RpcEndpoint的地址,由RpcAddress和name构成。


由此可见RpcEnv和RpcEndpoint是在相同的机器上(相同的JVM中)。而要想给远端机器发送消息,是获取远端机器的RpcEndpointRef,而并不是远端的RpcEndpoint注册到本地的RpcEnv中。



在Spark1.6版本中,默认使用的是netty

private def getRpcEnvFactory(conf: SparkConf): RpcEnvFactory = {  val rpcEnvNames = Map(    "akka" -> "org.apache.spark.rpc.akka.AkkaRpcEnvFactory",    "netty" -> "org.apache.spark.rpc.netty.NettyRpcEnvFactory")  val rpcEnvName = conf.get("spark.rpc", "netty")  val rpcEnvFactoryClassName = rpcEnvNames.getOrElse(rpcEnvName.toLowerCase, rpcEnvName)  Utils.classForName(rpcEnvFactoryClassName).newInstance().asInstanceOf[RpcEnvFactory]}


RpcEndpoint是一个消息循环体,它的生命周期:

构造(Constructor)->启动(onStart)->消息接收(receive&receiveAndReply)->停止(onStop)

receive():不断的运行,处理客户端发送过来的消息。

receiveAndReply():处理消息,并且回应对方。


我们看一下Master的代码:

def main(argStrings: Array[String]) {  SignalLogger.register(log)  val conf = new SparkConf  val args = new MasterArguments(argStrings, conf)  //指定的主机名必须是start-master.sh脚本运行的本地机器名称  val (rpcEnv, _, _) = startRpcEnvAndEndpoint(args.host, args.port, args.webUiPort, conf)  rpcEnv.awaitTermination()}/** * Start the Master and return a three tuple of: *   (1) The Master RpcEnv *   (2) The web UI bound port *   (3) The REST server bound port, if any */def startRpcEnvAndEndpoint(    host: String,    port: Int,    webUiPort: Int,    conf: SparkConf): (RpcEnv, Int, Option[Int]) = {  val securityMgr = new SecurityManager(conf)  //创建Rpc环境,主机名和端口就是Standalone集群的访问地址。SYSTEM_NAME=sparkMaster  val rpcEnv = RpcEnv.create(SYSTEM_NAME, host, port, conf, securityMgr)  // 将Master实例注册到RpcEnv中  val masterEndpoint = rpcEnv.setupEndpoint(ENDPOINT_NAME,    new Master(rpcEnv, rpcEnv.address, webUiPort, securityMgr, conf))  val portsResponse = masterEndpoint.askWithRetry[BoundPortsResponse](BoundPortsRequest)  (rpcEnv, portsResponse.webUIPort, portsResponse.restPort)}

在main方法中创建了RpcEnv,并且实例化Master实例,然后注册到RpcEnv中。

RpcEndpoint其实是注册到Dispatcher中的,在netty中的代码实现如下:

override def setupEndpoint(name: String, endpoint: RpcEndpoint): RpcEndpointRef = {  dispatcher.registerRpcEndpoint(name, endpoint)}

注:NettyRpcEnv.scala的第135行


而Dispatcher中使用如下数据结构来存储RpcEndpoint和RpcEndpointRef

privateval endpoints = new ConcurrentHashMap[String, EndpointData]privateval endpointRefs = new ConcurrentHashMap[RpcEndpoint, RpcEndpointRef]


EndpointData为一个case class:

private class EndpointData(    val name: String,    val endpoint: RpcEndpoint,    val ref: NettyRpcEndpointRef) {  val inbox = new Inbox(ref, endpoint)}


在Master中使用数据结构WorkerInfo保存着每个Worker的信息,其中就包括每个Worker的RpcEndpointRef



备注:

1、DT大数据梦工厂微信公众号DT_Spark
2、IMF晚8点大数据实战YY直播频道号:68917580
3、新浪微博: http://www.weibo.com/ilovepains


0