千家信息网

spark之master与worker通信模型讲解

发表于:2025-01-26 作者:千家信息网编辑
千家信息网最后更新 2025年01月26日,通信模型架构图master 端代码import akka.actor.{Actor, ActorSystem, Props}import com.typesafe.config.ConfigFacto
千家信息网最后更新 2025年01月26日spark之master与worker通信模型讲解

通信模型架构图

master 端代码import akka.actor.{Actor, ActorSystem, Props}import com.typesafe.config.ConfigFactory// 需要导入这2个包 封装一些属性。class MasterActor extends Actor {  //在开始之前调用一次  override def preStart(): Unit = {  }  //用于接收消息  override def receive: Receive = {    case "started" => {      println("Master has been started!")      //进入这个分支,说明这个Master线程已经启动完成    }    case "connecting" => {      println("Master has been get connect from Worker!")      println("a Worker Node has been register!")      //返回消息给Worker      sender() ! "connected"      Thread.sleep(1000)    }    case "stoped" => {    }  }}object Demo01MasterActor {  def main(args: Array[String]) {    //设置MasterIP和端口    val masterHost = "localhost"    val masterPort = "1234"    //端口和IP封装到akka架构,获取一个属性配置文件    val conStr =      s"""         |akka.actor.provider = "akka.remote.RemoteActorRefProvider"         |akka.remote.netty.tcp.hostname = "$masterHost"         |akka.remote.netty.tcp.port = "$masterPort"  """.stripMargin    val config = ConfigFactory.parseString(conStr)    val masterActorSystem = ActorSystem("MasterActorSystem", config)    val masterActor = masterActorSystem.actorOf(Props[MasterActor], "MasterActor")    masterActor ! "started"    masterActorSystem.awaitTermination();  }}worker端代码import akka.actor.{Actor, ActorSelection, ActorSystem, Props}import com.typesafe.config.ConfigFactoryclass WorkerActor extends Actor {  var masterURL: ActorSelection = null  //启动Actor之前执行,做初始化工作  override def preStart(): Unit = {    //配置访问Master的URL    //MasterIP:localhost    //MasterPort:8888(根据Master配置)    //Master的 ActorSystem对象:MasterActorSystem、MasterActor    masterURL = context.actorSelection("akka.tcp://MasterActorSystem@localhost:8888/user/MasterActor")  }  override def receive: Receive = {    case "started" => {      println("Worker has been started!")      //进入这个分支,说明这个Worker线程已经启动完成      //可以去向Master注册      //请求和Master建立连接      masterURL ! "connecting"    }    case "connected" => {      println("Worker 收到来自Master确认信息!")    }    case "stoped" => {    }  }}object Demo01WorkerActor {  def main(args: Array[String]) {    //初始化MastereIP和端口、WorkerIP和端口    //    val masterHost = args(0)    //    val masterPort = args(1)    //    val workerHost = args(2)    //    val workePort = args(3)    val masterHost = "localhost"    val masterPort = "8888"    val workerHost = "localhost"    val workePort = "8889"    //端口和IP封装到akka架构,获取一个属性配置文件    val conStr =      s"""         |akka.actor.provider = "akka.remote.RemoteActorRefProvider"         |akka.remote.netty.tcp.hostname = "$workerHost"         |akka.remote.netty.tcp.port = "$workePort"  """.stripMargin    val config = ConfigFactory.parseString(conStr)    val workerActorSystem = ActorSystem("WorkerActorSystem", config)    val workerActor = workerActorSystem.actorOf(Props[WorkerActor], "WorkerActor")    workerActor ! "started"    workerActorSystem.awaitTermination();  }}


0