千家信息网

Akka学习 实现workcount

发表于:2025-01-23 作者:千家信息网编辑
千家信息网最后更新 2025年01月23日,package com.dcx.scala.actorimport akka.actor.{Actor, ActorRef, ActorSystem, Props}import scala.colle
千家信息网最后更新 2025年01月23日Akka学习 实现workcount
package com.dcx.scala.actorimport akka.actor.{Actor, ActorRef, ActorSystem, Props}import scala.collection.mutable.HashMapimport scala.collection.mutable.ListBufferimport scala.io.Source/**  * 思路:  * 要有个Server  * 要有个Client去通信,client统计文本后把(qy,3)输出给Server;Server再把所有的qy聚合,放到ListBuffer中  */object AkkaWordCount {//  可变长List  val list = new ListBuffer[HashMap[String,Int]]  def main(args: Array[String]): Unit = {//   输入数据文本    val files: Array[String] = Array("D:\\tmp\\input\\word3.txt", "D:\\tmp\\input\\word3.txt", "D:\\tmp\\input\\word3.txt")    //存放接收到的每个actor处理的结果数据    //存放有actor返回结果的Future数据    //拿ActorSystem是一个静态工厂    val weChatApp = ActorSystem("WeChatApp")    //拿到两个Actor的通信地址    val akkaServerRef: ActorRef = weChatApp.actorOf(Props[AkkaServer],"jianjian1")    val clientRef: ActorRef = weChatApp.actorOf(Props(new Client(akkaServerRef)),"jianjian")    for (file <- files) {      clientRef ! file    }//  让该线程先睡一下,过早进入死循环会导致list没有3个,一直循环不出来    Thread.sleep(1000)//  如果list把三个文件都放满了,就退出循环    while(true){      if(list.size == 3){//       输出list        println(list(list.size -1))        return      }    }  }}//把每次聚合后的值都发送给AkkaServerclass Client(val serverRef:ActorRef) extends Actor {  override def receive: Receive = {    {//     偏函数 常用作模式匹配//      case filePath: String => {////      map阶段//        val list: List[String] = Source.fromFile(filePath).getLines().toList//        val words: List[String] = list.flatMap(_.split(" "))//        val res: Map[String, Int] = words.map((_, 1)).groupBy(_._1).mapValues(_.size)//        //异步发送结果数据    res发送到Server,去模式匹配//        serverRef ! res//      }      case filePath:String => {        val list: List[String] = Source.fromFile(filePath).getLines().toList        val words: List[String] = list.flatMap(_.split(" "))//      得出: (qy,3) 格式        val res: Map[String, Int] = words.map((_, 1)).groupBy(_._1).mapValues(_.size)        serverRef ! res      }    }  }}import scala.collection.mutable.HashMapclass AkkaServer extends Actor {  private var hashMap: HashMap[String, Int] = new HashMap[String, Int]  override def receive: Receive = {    case context: Map[String, Int] =>{//      (qy,3)       context.map( (map:(String,Int)) => {//      聚合        val value: Any = hashMap.getOrElse(map._1,None)        if(value != None){          hashMap(map._1) = value.asInstanceOf[Int] + map._2        }else{          hashMap(map._1) = map._2        }      }      )      AkkaWordCount.list += hashMap    }  }}
0