千家信息网

Scala笔记整理(九):Actor和AKKA

发表于:2025-01-24 作者:千家信息网编辑
千家信息网最后更新 2025年01月24日,[TOC]概述​ Scala的Actor有点类似于Java中的多线程编程。但是不同的是,Scala的Actor提供的模型与多线程有所不同。Scala的Actor尽可能地避免锁和共享状态,从而避免多线程
千家信息网最后更新 2025年01月24日Scala笔记整理(九):Actor和AKKA

[TOC]


概述

​ Scala的Actor有点类似于Java中的多线程编程。但是不同的是,Scala的Actor提供的模型与多线程有所不同。Scala的Actor尽可能地避免锁和共享状态,从而避免多线程并发时出现资源争用的情况,进而提升多线程编程的性能。

Spark中使用的分布式多线程框架,是Akka,是Scala的一种多线程的类库。Akka也实现了类似Scala Actor的模型,其核心概念同样也是Actor。Scala Actor模型已经在2.1.0的时候还在用,但是在2.1.1的时候已经被遗弃了,Spark开始转换用AKKA来替代Scala Actor,但是Scala Actor概念和原理都还是相同的。所以学习Scala Actor对我们学习AKKA,Spark还是有所帮助的

之所以学习Scala Actor,AKKA是因为在学习Spark源码的时,我们能看懂Spark的源码,因为在底层消息传递机制上大量使用AKKA的传送机制。

scala actor

在使用前,需要先引入maven依赖:

    org.scala-lang    scala-actors    2.10.5

actor单向通信

测试代码如下:

package cn.xpleaf.bigdata.p5.myactorimport scala.actors.Actor/**  * 学习scala actor的基本操作  * 和java中的Runnable Thread几乎一致  *  * 第一步:编写一个类,扩展特质trait Actor(scala 的actor)  * 第二步:复写其中的act方法  * 第三步:创建该actor的对象,调用该对象的start()方法,启动该线程  * 第四步:通过scala的操作符"!",发送消息  * 第五步:结束的话,调用close即可  *  * 模拟单向打招呼  */object ActorOps {    def main(args: Array[String]): Unit = {        val mFActor = new MyFirstActor()        mFActor.start()        // 发送消息        mFActor ! "小美,睡了吗?"        mFActor ! "我去洗澡了~"        mFActor ! "呵呵"    }}class MyFirstActor extends Actor {    override def act(): Unit = {        while(true) {            receive {                case str: String => println(str)            }        }    }}

输出结果如下:

小美,睡了吗?我去洗澡了~呵呵

使用样例类(case class)进行actor消息传递

测试代码如下:

package cn.xpleaf.bigdata.p5.myactorimport scala.actors.Actor/**  *  */object GreetingActor {    def main(args: Array[String]): Unit = {        val ga = new GreetingActor        ga.start()        ga ! Greeting("小美")        ga ! WorkContent("装系统")    }}case class Greeting(name:String)case class WorkContent(content:String)class GreetingActor extends Actor {    override def act(): Unit = {        while(true) {            receive {                case Greeting(name) => println(s"Hello, $name")                case WorkContent(content) => println(s"Let's talk about sth. with $content")            }        }    }}

输出结果如下:

Hello, 小美Let's talk about sth. with 装系统

actor相互通信

测试代码如下:

package cn.xpleaf.bigdata.p5.myactorimport scala.actors.Actor/**  * actor之线程间,互相通信  *  * studentActor  *     向老师问了一个问题  *  * teacherActor  *     向学生做回应  *  * 通信的协议:  * 请求,使用Request(内容)来表示  * 响应,使用Response(内容)来表示  */object _03CommunicationActorOps {    def main(args: Array[String]): Unit = {        val teacherActor = new TeacherActor()        teacherActor.start()        val studentActor = new StudentActor(teacherActor)        studentActor.start()        studentActor ! Request("老李啊,scala学习为什么这么难啊")    }}case class Request(req:String)case class Response(resp:String)class StudentActor(teacherActor: TeacherActor) extends Actor {    override def act(): Unit = {        while(true) {            receive {                case Request(req) => {                    // 向老师请求相关的问题                    println("学生向老师说:" + req)                    teacherActor ! Request(req)                }                case Response(resp) => {                    println(resp)                    println("高!")                }            }        }    }}class TeacherActor() extends Actor {    override def act(): Unit = {        while (true) {            receive {                case Request(req) => {  // 接收到学生的请求                    sender ! Response("这个问题,需要如此搞定~")                }            }        }    }}

输出结果如下:

学生向老师说:老李啊,scala学习为什么这么难啊这个问题,需要如此搞定~高!

消息的同步和Future

1、Scala在默认情况下,消息都是以异步进行发送的;但是如果发送的消息是同步的,即对方接受后,一定要给自己返回结果,那么可以使用!?的方式发送消息。即:

val response= activeActor !? activeMessage

2、如果要异步发送一个消息,但是在后续要获得消息的返回值,那么可以使用Future。即!!语法,如下:

val futureResponse = activeActor !! activeMessageval activeReply = future()

AKKA actor

首先需要添加akka的maven依赖:

    com.typesafe.akka    akka-actor_2.10    2.3.16    com.typesafe.akka    akka-remote_2.10    2.3.16    com.typesafe.akka    akka-slf4j_2.10    2.3.16

AKKA消息传递--本地

原理如下:

_01StudentActorOps
package cn.xpleaf.bigdata.p5.myakka.p1import akka.actor.{Actor, ActorSystem, Props}import cn.xpleaf.bigdata.p5.myakka.MessageProtocol.{QuoteRequest, QuoteResponse}import scala.util.Random/**  * 基于AKKA Actor的单向通信案例  * 学生向老师发送请求  */object _01StudentActorOps {    def main(args: Array[String]): Unit = {        // 第一步:构建Actor操作系统        val actorSystem = ActorSystem("StudentActorSystem")        // 第二步:actorSystem创建TeacherActor的代理对象ActorRef        val teacherActorRef = actorSystem.actorOf(Props[TeacherActor])        // 第三步:发送消息        teacherActorRef ! QuoteRequest()        Thread.sleep(2000)        // 第四步:关闭        actorSystem.shutdown()    }}class TeacherActor extends Actor {    val quotes = List(        "Moderation is for cowards",        "Anything worth doing is worth overdoing",        "The trouble is you think you have time",        "You never gonna know if you never even try")    override def receive = {        case QuoteRequest() => {            val random = new Random()            val randomIndex = random.nextInt(quotes.size)            val randomQuote = quotes(randomIndex)            val response = QuoteResponse(randomQuote)            println(response)        }    }}
MessageProtocol

后面akka通信的几个测试程序都会使用到这个object,只在这里给出,后面不再给出。

package cn.xpleaf.bigdata.p5.myakka/**  * akka actor通信协议  */object MessageProtocol {    case class QuoteRequest()    case class QuoteResponse(resp: String)    case class InitSign()}object Start extends Serializableobject Stop extends Serializabletrait Message {    val id: String}case class Shutdown(waitSecs: Int) extends Serializablecase class Heartbeat(id: String, magic: Int) extends Message with Serializablecase class Header(id: String, len: Int, encrypted: Boolean) extends Message with Serializablecase class Packet(id: String, seq: Long, content: String) extends Message with Serializable
测试

输出结果如下:

QuoteResponse(Anything worth doing is worth overdoing)

AKKA请求与响应--本地

原理如下:

TeacherActor
package cn.xpleaf.bigdata.p5.myakka.p2import akka.actor.Actorimport cn.xpleaf.bigdata.p5.myakka.MessageProtocol.{QuoteRequest, QuoteResponse}import scala.util.Random/**  * Teacher Actor  */class TeacherActor extends Actor {    val quotes = List(        "Moderation is for cowards",        "Anything worth doing is worth overdoing",        "The trouble is you think you have time",        "You never gonna know if you never even try")    override def receive = {        case QuoteRequest() => {            val random = new Random()            val randomIndex = random.nextInt(quotes.size)            val randomQuote = quotes(randomIndex)            val response = QuoteResponse(randomQuote)//            println(response)            sender ! response        }    }}
StudentActor
package cn.xpleaf.bigdata.p5.myakka.p2import akka.actor.{Actor, ActorLogging, ActorRef}import cn.xpleaf.bigdata.p5.myakka.MessageProtocol.{InitSign, QuoteRequest, QuoteResponse}/**  * Student Actor  * 当学生接收到InitSign信号之后,便向老师发送一条Request请求的消息  */class StudentActor(teacherActorRef:ActorRef) extends Actor with ActorLogging {    override def receive = {        case InitSign => {            teacherActorRef ! QuoteRequest()//            println("student send request")        }        case QuoteResponse(resp) => {            log.info(s"$resp")        }    }}
DriverApp
package cn.xpleaf.bigdata.p5.myakka.p2import akka.actor.{ActorSystem, Props}import cn.xpleaf.bigdata.p5.myakka.MessageProtocol.InitSignobject DriverApp {    def main(args: Array[String]): Unit = {        val actorSystem = ActorSystem("teacherStudentSystem")        // 老师的代理对象        val teacherActorRef = actorSystem.actorOf(Props[TeacherActor], "teacherActor")        // 学生的代理对象        val studentActorRef = actorSystem.actorOf(Props[StudentActor](new StudentActor(teacherActorRef)), "studentActor")        studentActorRef ! InitSign        Thread.sleep(2000)        actorSystem.shutdown()    }}
测试

输出结果如下:

[INFO] [04/24/2018 10:02:19.932] [teacherStudentSystem-akka.actor.default-dispatcher-2] [akka://teacherStudentSystem/user/studentActor] Anything worth doing is worth overdoing

AKKA请求与响应--远程

application.conf
MyRemoteServerSideActor {  akka {    actor {      provider = "akka.remote.RemoteActorRefProvider"    }    remote {      enabled-transports = ["akka.remote.netty.tcp"]      netty.tcp {        hostname = "127.0.0.1"        port = 2552      }    }  }}MyRemoteClientSideActor {  akka {    actor {      provider = "akka.remote.RemoteActorRefProvider"    }  }}
RemoteActor
package cn.xpleaf.bigdata.p5.myakka.p3import akka.actor.{Actor, ActorLogging}import cn.xpleaf.bigdata.p5.myakka.{Header, Shutdown, Start, Stop}class RemoteActor extends Actor with ActorLogging {    def receive = {        case Start => { // 处理Start消息            log.info("Remote Server Start ==>RECV Start event : " + Start)        }        case Stop => { // 处理Stop消息            log.info("Remote Server Stop ==>RECV Stop event: " + Stop)        }        case Shutdown(waitSecs) => { // 处理Shutdown消息            log.info("Remote Server Shutdown ==>Wait to shutdown: waitSecs=" + waitSecs)            Thread.sleep(waitSecs)            log.info("Remote Server Shutdown ==>Shutdown this system.")            context.system.shutdown // 停止当前ActorSystem系统        }        case Header(id, len, encrypted) => log.info("Remote Server => RECV header: " + (id, len, encrypted)) // 处理Header消息        case _ =>    }}
AkkaServerApplication
package cn.xpleaf.bigdata.p5.myakka.p3import akka.actor.{ActorSystem, Props}import com.typesafe.config.ConfigFactoryobject AkkaServerApplication extends App {    // 创建名称为remote-system的ActorSystem:从配置文件application.conf中获取该Actor的配置内容    val system = ActorSystem("remote-system",        ConfigFactory.load().getConfig("MyRemoteServerSideActor"))    val log = system.log    log.info("===>Remote server actor started: " + system)    // 创建一个名称为remoteActor的Actor,返回一个ActorRef,这里我们不需要使用这个返回值    system.actorOf(Props[RemoteActor], "remoteActor")}
ClientActor
package cn.xpleaf.bigdata.p5.myakka.p3import akka.actor.SupervisorStrategy.Stopimport akka.actor.{Actor, ActorLogging}import cn.xpleaf.bigdata.p5.myakka.{Header, Start}class ClientActor extends Actor with ActorLogging {    // akka.://@:/    val path = "akka.tcp://remote-system@127.0.0.1:2552/user/remoteActor" // 远程Actor的路径,通过该路径能够获取到远程Actor的一个引用    val remoteServerRef = context.actorSelection(path) // 获取到远程Actor的一个引用,通过该引用可以向远程Actor发送消息    @volatile var connected = false    @volatile var stop = false    def receive = {        case Start => { // 发送Start消息表示要与远程Actor进行后续业务逻辑处理的通信,可以指示远程Actor初始化一些满足业务处理的操作或数据            send(Start)            if (!connected) {                connected = true                log.info("ClientActor==> Actor connected: " + this)            }        }        case Stop => {            send(Stop)            stop = true            connected = false            log.info("ClientActor=> Stopped")        }        case header: Header => {            log.info("ClientActor=> Header")            send(header)        }        case (seq, result) => log.info("RESULT: seq=" + seq + ", result=" + result) // 用于接收远程Actor处理一个Packet消息的结果        case m => log.info("Unknown message: " + m)    }    private def send(cmd: Serializable): Unit = {        log.info("Send command to server: " + cmd)        try {            remoteServerRef ! cmd // 发送一个消息到远程Actor,消息必须是可序列化的,因为消息对象要经过网络传输        } catch {            case e: Exception => {                connected = false                log.info("Try to connect by sending Start command...")                send(Start)            }        }    }}
AkkaClientApplication
package cn.xpleaf.bigdata.p5.myakka.p3import akka.actor.{ActorSystem, Props}import cn.xpleaf.bigdata.p5.myakka.{Header, Start}import com.typesafe.config.ConfigFactoryobject AkkaClientApplication extends App {    // 通过配置文件application.conf配置创建ActorSystem系统    val system = ActorSystem("client-system",        ConfigFactory.load().getConfig("MyRemoteClientSideActor"))    val log = system.log    val clientActor = system.actorOf(Props[ClientActor], "clientActor") // 获取到ClientActor的一个引用    clientActor ! Start // 发送一个Start消息,第一次与远程Actor握手(通过本地ClientActor进行转发)    Thread.sleep(2000)    clientActor ! Header("What's your name: Can you tell me ", 20, encrypted = false) // 发送一个Header消息到远程Actor(通过本地ClientActor进行转发)    Thread.sleep(2000)}
测试

服务端输出结果如下:

[INFO] [04/24/2018 09:39:49.271] [main] [Remoting] Starting remoting[INFO] [04/24/2018 09:39:49.508] [main] [Remoting] Remoting started; listening on addresses :[akka.tcp://remote-system@127.0.0.1:2552][INFO] [04/24/2018 09:39:49.509] [main] [Remoting] Remoting now listens on addresses: [akka.tcp://remote-system@127.0.0.1:2552][INFO] [04/24/2018 09:39:49.517] [main] [ActorSystem(remote-system)] ===>Remote server actor started: akka://remote-system[INFO] [04/24/2018 09:46:01.872] [remote-system-akka.actor.default-dispatcher-3] [akka.tcp://remote-system@127.0.0.1:2552/user/remoteActor] Remote Server Start ==>RECV Start event : cn.xpleaf.bigdata.p5.myakka.Start$@325737b3[INFO] [04/24/2018 09:46:03.501] [remote-system-akka.actor.default-dispatcher-3] [akka.tcp://remote-system@127.0.0.1:2552/user/remoteActor] Remote Server => RECV header: (What's your name: Can you tell me ,20,false)

客户端输出结果如下:

[INFO] [04/24/2018 09:46:01.274] [main] [Remoting] Starting remoting[INFO] [04/24/2018 09:46:01.479] [main] [Remoting] Remoting started; listening on addresses :[akka.tcp://client-system@192.168.43.132:2552][INFO] [04/24/2018 09:46:01.480] [main] [Remoting] Remoting now listens on addresses: [akka.tcp://client-system@192.168.43.132:2552][INFO] [04/24/2018 09:46:01.493] [client-system-akka.actor.default-dispatcher-4] [akka.tcp://client-system@192.168.43.132:2552/user/clientActor] Send command to server: cn.xpleaf.bigdata.p5.myakka.Start$@4f00805d[INFO] [04/24/2018 09:46:01.496] [client-system-akka.actor.default-dispatcher-4] [akka.tcp://client-system@192.168.43.132:2552/user/clientActor] ClientActor==> Actor connected: cn.xpleaf.bigdata.p5.myakka.p3.ClientActor@5a85b576[INFO] [04/24/2018 09:46:03.490] [client-system-akka.actor.default-dispatcher-2] [akka.tcp://client-system@192.168.43.132:2552/user/clientActor] ClientActor=> Header[INFO] [04/24/2018 09:46:03.491] [client-system-akka.actor.default-dispatcher-2] [akka.tcp://client-system@192.168.43.132:2552/user/clientActor] Send command to server: Header(What's your name: Can you tell me ,20,false)
0