Akka学习 实现workcount
发表于:2025-01-23 作者:千家信息网编辑
千家信息网最后更新 2025年01月23日,package com.dcx.scala.actorimport akka.actor.{Actor, ActorRef, ActorSystem, Props}import scala.colle
千家信息网最后更新 2025年01月23日Akka学习 实现workcount
package com.dcx.scala.actorimport akka.actor.{Actor, ActorRef, ActorSystem, Props}import scala.collection.mutable.HashMapimport scala.collection.mutable.ListBufferimport scala.io.Source/** * 思路: * 要有个Server * 要有个Client去通信,client统计文本后把(qy,3)输出给Server;Server再把所有的qy聚合,放到ListBuffer中 */object AkkaWordCount {// 可变长List val list = new ListBuffer[HashMap[String,Int]] def main(args: Array[String]): Unit = {// 输入数据文本 val files: Array[String] = Array("D:\\tmp\\input\\word3.txt", "D:\\tmp\\input\\word3.txt", "D:\\tmp\\input\\word3.txt") //存放接收到的每个actor处理的结果数据 //存放有actor返回结果的Future数据 //拿ActorSystem是一个静态工厂 val weChatApp = ActorSystem("WeChatApp") //拿到两个Actor的通信地址 val akkaServerRef: ActorRef = weChatApp.actorOf(Props[AkkaServer],"jianjian1") val clientRef: ActorRef = weChatApp.actorOf(Props(new Client(akkaServerRef)),"jianjian") for (file <- files) { clientRef ! file }// 让该线程先睡一下,过早进入死循环会导致list没有3个,一直循环不出来 Thread.sleep(1000)// 如果list把三个文件都放满了,就退出循环 while(true){ if(list.size == 3){// 输出list println(list(list.size -1)) return } } }}//把每次聚合后的值都发送给AkkaServerclass Client(val serverRef:ActorRef) extends Actor { override def receive: Receive = { {// 偏函数 常用作模式匹配// case filePath: String => {//// map阶段// val list: List[String] = Source.fromFile(filePath).getLines().toList// val words: List[String] = list.flatMap(_.split(" "))// val res: Map[String, Int] = words.map((_, 1)).groupBy(_._1).mapValues(_.size)// //异步发送结果数据 res发送到Server,去模式匹配// serverRef ! res// } case filePath:String => { val list: List[String] = Source.fromFile(filePath).getLines().toList val words: List[String] = list.flatMap(_.split(" "))// 得出: (qy,3) 格式 val res: Map[String, Int] = words.map((_, 1)).groupBy(_._1).mapValues(_.size) serverRef ! res } } }}import scala.collection.mutable.HashMapclass AkkaServer extends Actor { private var hashMap: HashMap[String, Int] = new HashMap[String, Int] override def receive: Receive = { case context: Map[String, Int] =>{// (qy,3) context.map( (map:(String,Int)) => {// 聚合 val value: Any = hashMap.getOrElse(map._1,None) if(value != None){ hashMap(map._1) = value.asInstanceOf[Int] + map._2 }else{ hashMap(map._1) = map._2 } } ) AkkaWordCount.list += hashMap } }}
数据
结果
文本
通信
两个
地址
工厂
思路
格式
模式
通信地址
阶段
静态
可变
处理
统计
输入
输出
学习
数据库的安全要保护哪些东西
数据库安全各自的含义是什么
生产安全数据库录入
数据库的安全性及管理
数据库安全策略包含哪些
海淀数据库安全审计系统
建立农村房屋安全信息数据库
易用的数据库客户端支持安全管理
连接数据库失败ssl安全错误
数据库的锁怎样保障安全
什么是信息网络技术发展的方式
软件开发可以跟着混吗
白银软件开发公司
获取某个网站服务器时间
opr服务器
速达3000 连数据库
闵行区咨询软件开发报价表
厦门软件开发APP
网络安全交链设备
高科技与互联网技术
江苏工控软件开发大概要多少钱
电脑网络技术工作室
单路c612服务器
东吴志通软件开发
计算机软件开发一年工资多少
智能化网络安全服务至上
数据库查询全部
阿里云服务器安全管理平台
北京公安系统vr软件开发
存储节点管理服务器cdv
英灵神殿加入服务器可以保存吗
学软件开发的能不能做游戏
服务器怎么开通ftp
怎么在页面引入前端数据库
华勤通讯软件开发
南京秦淮区轩恩软件开发公司
数据库备份恢复语句格式
海南通讯软件开发服务有哪些
计算机移动网络技术专业
计算机网络技术主要学什么舞蹈