spark之master与worker通信模型讲解
发表于:2025-01-26 作者:千家信息网编辑
千家信息网最后更新 2025年01月26日,通信模型架构图master 端代码import akka.actor.{Actor, ActorSystem, Props}import com.typesafe.config.ConfigFacto
千家信息网最后更新 2025年01月26日spark之master与worker通信模型讲解
通信模型架构图
master 端代码import akka.actor.{Actor, ActorSystem, Props}import com.typesafe.config.ConfigFactory// 需要导入这2个包 封装一些属性。class MasterActor extends Actor { //在开始之前调用一次 override def preStart(): Unit = { } //用于接收消息 override def receive: Receive = { case "started" => { println("Master has been started!") //进入这个分支,说明这个Master线程已经启动完成 } case "connecting" => { println("Master has been get connect from Worker!") println("a Worker Node has been register!") //返回消息给Worker sender() ! "connected" Thread.sleep(1000) } case "stoped" => { } }}object Demo01MasterActor { def main(args: Array[String]) { //设置MasterIP和端口 val masterHost = "localhost" val masterPort = "1234" //端口和IP封装到akka架构,获取一个属性配置文件 val conStr = s""" |akka.actor.provider = "akka.remote.RemoteActorRefProvider" |akka.remote.netty.tcp.hostname = "$masterHost" |akka.remote.netty.tcp.port = "$masterPort" """.stripMargin val config = ConfigFactory.parseString(conStr) val masterActorSystem = ActorSystem("MasterActorSystem", config) val masterActor = masterActorSystem.actorOf(Props[MasterActor], "MasterActor") masterActor ! "started" masterActorSystem.awaitTermination(); }}worker端代码import akka.actor.{Actor, ActorSelection, ActorSystem, Props}import com.typesafe.config.ConfigFactoryclass WorkerActor extends Actor { var masterURL: ActorSelection = null //启动Actor之前执行,做初始化工作 override def preStart(): Unit = { //配置访问Master的URL //MasterIP:localhost //MasterPort:8888(根据Master配置) //Master的 ActorSystem对象:MasterActorSystem、MasterActor masterURL = context.actorSelection("akka.tcp://MasterActorSystem@localhost:8888/user/MasterActor") } override def receive: Receive = { case "started" => { println("Worker has been started!") //进入这个分支,说明这个Worker线程已经启动完成 //可以去向Master注册 //请求和Master建立连接 masterURL ! "connecting" } case "connected" => { println("Worker 收到来自Master确认信息!") } case "stoped" => { } }}object Demo01WorkerActor { def main(args: Array[String]) { //初始化MastereIP和端口、WorkerIP和端口 // val masterHost = args(0) // val masterPort = args(1) // val workerHost = args(2) // val workePort = args(3) val masterHost = "localhost" val masterPort = "8888" val workerHost = "localhost" val workePort = "8889" //端口和IP封装到akka架构,获取一个属性配置文件 val conStr = s""" |akka.actor.provider = "akka.remote.RemoteActorRefProvider" |akka.remote.netty.tcp.hostname = "$workerHost" |akka.remote.netty.tcp.port = "$workePort" """.stripMargin val config = ConfigFactory.parseString(conStr) val workerActorSystem = ActorSystem("WorkerActorSystem", config) val workerActor = workerActorSystem.actorOf(Props[WorkerActor], "WorkerActor") workerActor ! "started" workerActorSystem.awaitTermination(); }}
端口
配置
属性
封装
代码
分支
文件
架构
消息
线程
模型
通信
信息
去向
对象
工作
数据库的安全要保护哪些东西
数据库安全各自的含义是什么
生产安全数据库录入
数据库的安全性及管理
数据库安全策略包含哪些
海淀数据库安全审计系统
建立农村房屋安全信息数据库
易用的数据库客户端支持安全管理
连接数据库失败ssl安全错误
数据库的锁怎样保障安全
数据库阅读器
oltp数据库和邮件服务器
学网络技术的就业率
太原推广机器人rpa软件开发
网络安全法宣传横幅
新网服务器安全组
ftp服务器上传
塔科夫为什么老服务器连接中断
服务器 安全防御
航海王启航服务器爆满
平台显示服务器维护中说明什么
新疆网信办网络安全知识
excel与数据库
数据库创建架构失败
大话西游2锦绣山河服务器人多吗
查找我的手机服务器
互联网科技趣闻
软件开发畅想
微耕数据库导入
鹰潭正规服务器找哪家好
ajax跨服务器访问
华多网络技术有限公司诈骗
弋江网络安全考试scsa认证
大麦22d打印服务器固件
市北区游戏软件开发企业
excel表格中查询数据库
电梯正常运行可以插服务器吗
北京大学图书馆红楼梦数据库
苏州软件开发优势
形容网络技术发展的句子