Scala笔记整理(九):Actor和AKKA
发表于:2025-01-24 作者:千家信息网编辑
千家信息网最后更新 2025年01月24日,[TOC]概述 Scala的Actor有点类似于Java中的多线程编程。但是不同的是,Scala的Actor提供的模型与多线程有所不同。Scala的Actor尽可能地避免锁和共享状态,从而避免多线程
千家信息网最后更新 2025年01月24日Scala笔记整理(九):Actor和AKKA
[TOC]
概述
Scala的Actor有点类似于Java中的多线程编程。但是不同的是,Scala的Actor提供的模型与多线程有所不同。Scala的Actor尽可能地避免锁和共享状态,从而避免多线程并发时出现资源争用的情况,进而提升多线程编程的性能。
Spark中使用的分布式多线程框架,是Akka,是Scala的一种多线程的类库。Akka也实现了类似Scala Actor的模型,其核心概念同样也是Actor。Scala Actor模型已经在2.1.0的时候还在用,但是在2.1.1的时候已经被遗弃了,Spark开始转换用AKKA来替代Scala Actor,但是Scala Actor概念和原理都还是相同的。所以学习Scala Actor对我们学习AKKA,Spark还是有所帮助的
之所以学习Scala Actor,AKKA是因为在学习Spark源码的时,我们能看懂Spark的源码,因为在底层消息传递机制上大量使用AKKA的传送机制。
scala actor
在使用前,需要先引入maven依赖:
org.scala-lang scala-actors 2.10.5
actor单向通信
测试代码如下:
package cn.xpleaf.bigdata.p5.myactorimport scala.actors.Actor/** * 学习scala actor的基本操作 * 和java中的Runnable Thread几乎一致 * * 第一步:编写一个类,扩展特质trait Actor(scala 的actor) * 第二步:复写其中的act方法 * 第三步:创建该actor的对象,调用该对象的start()方法,启动该线程 * 第四步:通过scala的操作符"!",发送消息 * 第五步:结束的话,调用close即可 * * 模拟单向打招呼 */object ActorOps { def main(args: Array[String]): Unit = { val mFActor = new MyFirstActor() mFActor.start() // 发送消息 mFActor ! "小美,睡了吗?" mFActor ! "我去洗澡了~" mFActor ! "呵呵" }}class MyFirstActor extends Actor { override def act(): Unit = { while(true) { receive { case str: String => println(str) } } }}
输出结果如下:
小美,睡了吗?我去洗澡了~呵呵
使用样例类(case class)进行actor消息传递
测试代码如下:
package cn.xpleaf.bigdata.p5.myactorimport scala.actors.Actor/** * */object GreetingActor { def main(args: Array[String]): Unit = { val ga = new GreetingActor ga.start() ga ! Greeting("小美") ga ! WorkContent("装系统") }}case class Greeting(name:String)case class WorkContent(content:String)class GreetingActor extends Actor { override def act(): Unit = { while(true) { receive { case Greeting(name) => println(s"Hello, $name") case WorkContent(content) => println(s"Let's talk about sth. with $content") } } }}
输出结果如下:
Hello, 小美Let's talk about sth. with 装系统
actor相互通信
测试代码如下:
package cn.xpleaf.bigdata.p5.myactorimport scala.actors.Actor/** * actor之线程间,互相通信 * * studentActor * 向老师问了一个问题 * * teacherActor * 向学生做回应 * * 通信的协议: * 请求,使用Request(内容)来表示 * 响应,使用Response(内容)来表示 */object _03CommunicationActorOps { def main(args: Array[String]): Unit = { val teacherActor = new TeacherActor() teacherActor.start() val studentActor = new StudentActor(teacherActor) studentActor.start() studentActor ! Request("老李啊,scala学习为什么这么难啊") }}case class Request(req:String)case class Response(resp:String)class StudentActor(teacherActor: TeacherActor) extends Actor { override def act(): Unit = { while(true) { receive { case Request(req) => { // 向老师请求相关的问题 println("学生向老师说:" + req) teacherActor ! Request(req) } case Response(resp) => { println(resp) println("高!") } } } }}class TeacherActor() extends Actor { override def act(): Unit = { while (true) { receive { case Request(req) => { // 接收到学生的请求 sender ! Response("这个问题,需要如此搞定~") } } } }}
输出结果如下:
学生向老师说:老李啊,scala学习为什么这么难啊这个问题,需要如此搞定~高!
消息的同步和Future
1、Scala在默认情况下,消息都是以异步进行发送的;但是如果发送的消息是同步的,即对方接受后,一定要给自己返回结果,那么可以使用!?的方式发送消息。即:
val response= activeActor !? activeMessage
2、如果要异步发送一个消息,但是在后续要获得消息的返回值,那么可以使用Future。即!!语法,如下:
val futureResponse = activeActor !! activeMessageval activeReply = future()
AKKA actor
首先需要添加akka的maven依赖:
com.typesafe.akka akka-actor_2.10 2.3.16 com.typesafe.akka akka-remote_2.10 2.3.16 com.typesafe.akka akka-slf4j_2.10 2.3.16
AKKA消息传递--本地
原理如下:
_01StudentActorOps
package cn.xpleaf.bigdata.p5.myakka.p1import akka.actor.{Actor, ActorSystem, Props}import cn.xpleaf.bigdata.p5.myakka.MessageProtocol.{QuoteRequest, QuoteResponse}import scala.util.Random/** * 基于AKKA Actor的单向通信案例 * 学生向老师发送请求 */object _01StudentActorOps { def main(args: Array[String]): Unit = { // 第一步:构建Actor操作系统 val actorSystem = ActorSystem("StudentActorSystem") // 第二步:actorSystem创建TeacherActor的代理对象ActorRef val teacherActorRef = actorSystem.actorOf(Props[TeacherActor]) // 第三步:发送消息 teacherActorRef ! QuoteRequest() Thread.sleep(2000) // 第四步:关闭 actorSystem.shutdown() }}class TeacherActor extends Actor { val quotes = List( "Moderation is for cowards", "Anything worth doing is worth overdoing", "The trouble is you think you have time", "You never gonna know if you never even try") override def receive = { case QuoteRequest() => { val random = new Random() val randomIndex = random.nextInt(quotes.size) val randomQuote = quotes(randomIndex) val response = QuoteResponse(randomQuote) println(response) } }}
MessageProtocol
后面akka通信的几个测试程序都会使用到这个object,只在这里给出,后面不再给出。
package cn.xpleaf.bigdata.p5.myakka/** * akka actor通信协议 */object MessageProtocol { case class QuoteRequest() case class QuoteResponse(resp: String) case class InitSign()}object Start extends Serializableobject Stop extends Serializabletrait Message { val id: String}case class Shutdown(waitSecs: Int) extends Serializablecase class Heartbeat(id: String, magic: Int) extends Message with Serializablecase class Header(id: String, len: Int, encrypted: Boolean) extends Message with Serializablecase class Packet(id: String, seq: Long, content: String) extends Message with Serializable
测试
输出结果如下:
QuoteResponse(Anything worth doing is worth overdoing)
AKKA请求与响应--本地
原理如下:
TeacherActor
package cn.xpleaf.bigdata.p5.myakka.p2import akka.actor.Actorimport cn.xpleaf.bigdata.p5.myakka.MessageProtocol.{QuoteRequest, QuoteResponse}import scala.util.Random/** * Teacher Actor */class TeacherActor extends Actor { val quotes = List( "Moderation is for cowards", "Anything worth doing is worth overdoing", "The trouble is you think you have time", "You never gonna know if you never even try") override def receive = { case QuoteRequest() => { val random = new Random() val randomIndex = random.nextInt(quotes.size) val randomQuote = quotes(randomIndex) val response = QuoteResponse(randomQuote)// println(response) sender ! response } }}
StudentActor
package cn.xpleaf.bigdata.p5.myakka.p2import akka.actor.{Actor, ActorLogging, ActorRef}import cn.xpleaf.bigdata.p5.myakka.MessageProtocol.{InitSign, QuoteRequest, QuoteResponse}/** * Student Actor * 当学生接收到InitSign信号之后,便向老师发送一条Request请求的消息 */class StudentActor(teacherActorRef:ActorRef) extends Actor with ActorLogging { override def receive = { case InitSign => { teacherActorRef ! QuoteRequest()// println("student send request") } case QuoteResponse(resp) => { log.info(s"$resp") } }}
DriverApp
package cn.xpleaf.bigdata.p5.myakka.p2import akka.actor.{ActorSystem, Props}import cn.xpleaf.bigdata.p5.myakka.MessageProtocol.InitSignobject DriverApp { def main(args: Array[String]): Unit = { val actorSystem = ActorSystem("teacherStudentSystem") // 老师的代理对象 val teacherActorRef = actorSystem.actorOf(Props[TeacherActor], "teacherActor") // 学生的代理对象 val studentActorRef = actorSystem.actorOf(Props[StudentActor](new StudentActor(teacherActorRef)), "studentActor") studentActorRef ! InitSign Thread.sleep(2000) actorSystem.shutdown() }}
测试
输出结果如下:
[INFO] [04/24/2018 10:02:19.932] [teacherStudentSystem-akka.actor.default-dispatcher-2] [akka://teacherStudentSystem/user/studentActor] Anything worth doing is worth overdoing
AKKA请求与响应--远程
application.conf
MyRemoteServerSideActor { akka { actor { provider = "akka.remote.RemoteActorRefProvider" } remote { enabled-transports = ["akka.remote.netty.tcp"] netty.tcp { hostname = "127.0.0.1" port = 2552 } } }}MyRemoteClientSideActor { akka { actor { provider = "akka.remote.RemoteActorRefProvider" } }}
RemoteActor
package cn.xpleaf.bigdata.p5.myakka.p3import akka.actor.{Actor, ActorLogging}import cn.xpleaf.bigdata.p5.myakka.{Header, Shutdown, Start, Stop}class RemoteActor extends Actor with ActorLogging { def receive = { case Start => { // 处理Start消息 log.info("Remote Server Start ==>RECV Start event : " + Start) } case Stop => { // 处理Stop消息 log.info("Remote Server Stop ==>RECV Stop event: " + Stop) } case Shutdown(waitSecs) => { // 处理Shutdown消息 log.info("Remote Server Shutdown ==>Wait to shutdown: waitSecs=" + waitSecs) Thread.sleep(waitSecs) log.info("Remote Server Shutdown ==>Shutdown this system.") context.system.shutdown // 停止当前ActorSystem系统 } case Header(id, len, encrypted) => log.info("Remote Server => RECV header: " + (id, len, encrypted)) // 处理Header消息 case _ => }}
AkkaServerApplication
package cn.xpleaf.bigdata.p5.myakka.p3import akka.actor.{ActorSystem, Props}import com.typesafe.config.ConfigFactoryobject AkkaServerApplication extends App { // 创建名称为remote-system的ActorSystem:从配置文件application.conf中获取该Actor的配置内容 val system = ActorSystem("remote-system", ConfigFactory.load().getConfig("MyRemoteServerSideActor")) val log = system.log log.info("===>Remote server actor started: " + system) // 创建一个名称为remoteActor的Actor,返回一个ActorRef,这里我们不需要使用这个返回值 system.actorOf(Props[RemoteActor], "remoteActor")}
ClientActor
package cn.xpleaf.bigdata.p5.myakka.p3import akka.actor.SupervisorStrategy.Stopimport akka.actor.{Actor, ActorLogging}import cn.xpleaf.bigdata.p5.myakka.{Header, Start}class ClientActor extends Actor with ActorLogging { // akka.://@:/ val path = "akka.tcp://remote-system@127.0.0.1:2552/user/remoteActor" // 远程Actor的路径,通过该路径能够获取到远程Actor的一个引用 val remoteServerRef = context.actorSelection(path) // 获取到远程Actor的一个引用,通过该引用可以向远程Actor发送消息 @volatile var connected = false @volatile var stop = false def receive = { case Start => { // 发送Start消息表示要与远程Actor进行后续业务逻辑处理的通信,可以指示远程Actor初始化一些满足业务处理的操作或数据 send(Start) if (!connected) { connected = true log.info("ClientActor==> Actor connected: " + this) } } case Stop => { send(Stop) stop = true connected = false log.info("ClientActor=> Stopped") } case header: Header => { log.info("ClientActor=> Header") send(header) } case (seq, result) => log.info("RESULT: seq=" + seq + ", result=" + result) // 用于接收远程Actor处理一个Packet消息的结果 case m => log.info("Unknown message: " + m) } private def send(cmd: Serializable): Unit = { log.info("Send command to server: " + cmd) try { remoteServerRef ! cmd // 发送一个消息到远程Actor,消息必须是可序列化的,因为消息对象要经过网络传输 } catch { case e: Exception => { connected = false log.info("Try to connect by sending Start command...") send(Start) } } }}
AkkaClientApplication
package cn.xpleaf.bigdata.p5.myakka.p3import akka.actor.{ActorSystem, Props}import cn.xpleaf.bigdata.p5.myakka.{Header, Start}import com.typesafe.config.ConfigFactoryobject AkkaClientApplication extends App { // 通过配置文件application.conf配置创建ActorSystem系统 val system = ActorSystem("client-system", ConfigFactory.load().getConfig("MyRemoteClientSideActor")) val log = system.log val clientActor = system.actorOf(Props[ClientActor], "clientActor") // 获取到ClientActor的一个引用 clientActor ! Start // 发送一个Start消息,第一次与远程Actor握手(通过本地ClientActor进行转发) Thread.sleep(2000) clientActor ! Header("What's your name: Can you tell me ", 20, encrypted = false) // 发送一个Header消息到远程Actor(通过本地ClientActor进行转发) Thread.sleep(2000)}
测试
服务端输出结果如下:
[INFO] [04/24/2018 09:39:49.271] [main] [Remoting] Starting remoting[INFO] [04/24/2018 09:39:49.508] [main] [Remoting] Remoting started; listening on addresses :[akka.tcp://remote-system@127.0.0.1:2552][INFO] [04/24/2018 09:39:49.509] [main] [Remoting] Remoting now listens on addresses: [akka.tcp://remote-system@127.0.0.1:2552][INFO] [04/24/2018 09:39:49.517] [main] [ActorSystem(remote-system)] ===>Remote server actor started: akka://remote-system[INFO] [04/24/2018 09:46:01.872] [remote-system-akka.actor.default-dispatcher-3] [akka.tcp://remote-system@127.0.0.1:2552/user/remoteActor] Remote Server Start ==>RECV Start event : cn.xpleaf.bigdata.p5.myakka.Start$@325737b3[INFO] [04/24/2018 09:46:03.501] [remote-system-akka.actor.default-dispatcher-3] [akka.tcp://remote-system@127.0.0.1:2552/user/remoteActor] Remote Server => RECV header: (What's your name: Can you tell me ,20,false)
客户端输出结果如下:
[INFO] [04/24/2018 09:46:01.274] [main] [Remoting] Starting remoting[INFO] [04/24/2018 09:46:01.479] [main] [Remoting] Remoting started; listening on addresses :[akka.tcp://client-system@192.168.43.132:2552][INFO] [04/24/2018 09:46:01.480] [main] [Remoting] Remoting now listens on addresses: [akka.tcp://client-system@192.168.43.132:2552][INFO] [04/24/2018 09:46:01.493] [client-system-akka.actor.default-dispatcher-4] [akka.tcp://client-system@192.168.43.132:2552/user/clientActor] Send command to server: cn.xpleaf.bigdata.p5.myakka.Start$@4f00805d[INFO] [04/24/2018 09:46:01.496] [client-system-akka.actor.default-dispatcher-4] [akka.tcp://client-system@192.168.43.132:2552/user/clientActor] ClientActor==> Actor connected: cn.xpleaf.bigdata.p5.myakka.p3.ClientActor@5a85b576[INFO] [04/24/2018 09:46:03.490] [client-system-akka.actor.default-dispatcher-2] [akka.tcp://client-system@192.168.43.132:2552/user/clientActor] ClientActor=> Header[INFO] [04/24/2018 09:46:03.491] [client-system-akka.actor.default-dispatcher-2] [akka.tcp://client-system@192.168.43.132:2552/user/clientActor] Send command to server: Header(What's your name: Can you tell me ,20,false)
消息
结果
线程
通信
学生
老师
处理
学习
测试
输出
对象
系统
问题
小美
配置
代码
内容
单向
原理
模型
数据库的安全要保护哪些东西
数据库安全各自的含义是什么
生产安全数据库录入
数据库的安全性及管理
数据库安全策略包含哪些
海淀数据库安全审计系统
建立农村房屋安全信息数据库
易用的数据库客户端支持安全管理
连接数据库失败ssl安全错误
数据库的锁怎样保障安全
数据库四级是
网络安全收集分析通报
ios 数据库搜索引擎
生物智慧铸牢网络安全
广教版网络技术应用教学参考
gmail 邮件服务器
天津网络技术开发价格表格
手机服务器如何设置ssl
沉迷网络安全的危害性
数据库安全机制有
广东net软件开发靠谱吗
面向对象软件开发
华为与中国5g网络技术
软件开发的数据库端
南通营销网络技术市场价格
山西创梦者互联网科技公司
网络安全法知识征文500字
穿越网络网络安全管理限制
矿井人员管理系统软件开发
服务器维护图片
艾欧尼亚服务器在哪
网络安全工具管理
必维 网络安全
作文网络安全300字
桂东县java数据库开发
沉迷网络安全的危害性
怎么知道高斯数据库表已经锁表了
万方是外文数据库么
防诈骗网络安全观后感
服务器只有一块硬盘怎样启动