如何使用spark-core实现广度优先搜索
发表于:2025-02-23 作者:千家信息网编辑
千家信息网最后更新 2025年02月23日,如何使用spark-core实现广度优先搜索,针对这个问题,这篇文章详细介绍了相对应的分析和解答,希望可以帮助更多想解决这个问题的小伙伴找到更简单易行的方法。需求描述数据源是一批网络日志数据,每条数据
千家信息网最后更新 2025年02月23日如何使用spark-core实现广度优先搜索
如何使用spark-core实现广度优先搜索,针对这个问题,这篇文章详细介绍了相对应的分析和解答,希望可以帮助更多想解决这个问题的小伙伴找到更简单易行的方法。
需求描述
数据源是一批网络日志数据,每条数据都有两个字段srcip和dstip,字段之间以逗号分隔,问题的需求是给定一个srcip和dstip,在给定的搜索深度下检索这两个ip之间所有的通联路径。这个问题是网络日志处理中的一个实际需求,之前在单机的程序中实现过,但是需要将所有的ip对加载到内存中。考虑到如果数据量太大的情况,可能单节点的内存无法支撑这样的操作,但是如果不将ip对全加载内存中,使用深度优先遍历的方法,搜索过程又会很慢。最近在学习spark框架,刚接触RDD,就是这用RDD来解决这个问题。以下是scala代码
package com.pxu.spark.coreimport org.apache.spark.{HashPartitioner, SparkConf, SparkContext}/** * pxu * 2021-01-29 16:57 */object FindIpRel { def main(args: Array[String]): Unit = { val srcIp = args(0) // 源ip val dstIp = args(1) // 目标ip val depth = args(2).toInt //搜索深度 val resPath = args(3) //搜索结果的输出位置 val conf = new SparkConf().setAppName("findIpRel") val sc = new SparkContext(conf) /** * 从数据源中构建原始rdd,每一行的数据形式为a,b */ val ori = sc.textFile("hdfs://master:9000/submitTest/input/ipconn/srcdst.csv") /** * 对原始Rdd进行元组形式转化,现在每一行的数据形式为(a,b) * 除此之外还对数据进行了去重处理,并显示使用hash分区器对RDD中的数据进行分区 * 为后面的join操作,做一些优化 */ val base = ori.map(a => { val tmpArr = a.split(",") (tmpArr(0), tmpArr(1)) }).distinct().partitionBy(new HashPartitioner(10)) /** * 这是一个用于保存结果的RDD,其中每一行的形式为(dstIp,List(ip on path)) * 在查找过程中,发现了搜索结果后,就会将其并入到res中 */ var res = sc.makeRDD[(String,List[String])](List()) /** * 这是一个用于迭代的RDD,其初始化的内容是,首先从baseRdd中过滤出元组第一个元素a是参数SrcIp的, * 然后将其转化成(b,List(a))的格式,其中b总是代表当前搜索路径上的尾ip,list中的其他内容代表搜索 * 路径上其他的ip */ var iteration = base.filter(_._1.equals(srcIp)).map(a => (a._2,List(a._1))) for(i <- 2 to depth){ /** * 1.首先iteration和base按照key进行join,这个操作的意义就是更深一层的搜索,结果RDD的格式是(b,(List(ip on path),c)) * 2.对数据进行一次过滤,过去掉那些路径已经形成环的元素,成环的判据就是List(ip on path)中的数据已经包含c了 * 3.进行map操作,b并入到List(ip on path),将c作为新的key,因此此时更深一层的搜索,导致c成为了当前搜索路径中的尾节点, * 此时RDD中的每一个元素的格式应该是(c,(List(ip on path)) */ val tmp = iteration.join(base).filter(a => !a._2._1.contains(a._2._2)).map(a => (a._2._2,a._2._1:+a._1)) /** * 将tmp中已经成功搜索的路径筛选出来,成功搜索的判据是(c,(List(ip on path)),c与dstIp相等 */ val success = tmp.filter(a => a._1.equals(dstIp)) /** * 将成功搜索的数据合并到res中 */ res = res.union(success) /** * 更新iteration */ iteration = tmp.subtract(success) } /** * 将成功搜索的路径并入到res中 */ res.union(iteration.filter(a => a._1.equals(dstIp))) /** * 执行一次转换操作,将res中的元素从(c,(List(ip on path))格式转换成List(all ip on path) */ val finalResult = res.map(a => a._2 :+ a._1) finalResult.saveAsTextFile(resPath) }}
关于如何使用spark-core实现广度优先搜索问题的解答就分享到这里了,希望以上内容可以对大家有一定的帮助,如果你还有很多疑惑没有解开,可以关注行业资讯频道了解更多相关知识。
搜索
数据
问题
路径
成功
形式
一行
内存
内容
深度
结果
需求
广度
原始
两个
之间
代表
元素
字段
搜索结果
数据库的安全要保护哪些东西
数据库安全各自的含义是什么
生产安全数据库录入
数据库的安全性及管理
数据库安全策略包含哪些
海淀数据库安全审计系统
建立农村房屋安全信息数据库
易用的数据库客户端支持安全管理
连接数据库失败ssl安全错误
数据库的锁怎样保障安全
管理软件开发咨询
数据库建立一个图书信息表
mysql 数据库昨日
新一线城市数据库网站
jaba服务器推荐
鹤岗服务器机柜批发
阴阳师同一账号登录两个服务器
数据库 18456
sql停止数据库
明日之后南希市这个服务器哪去了
数据库读取需要什么配置
客户端与游戏服务器断开
网络安全宣传词语
分布式数据库中pec
文本文档连接服务器失败
杭州春季观花互联网科技有限公司
任务管理打印服务器
网络安全企业家论坛
深圳未来网络技术
后台数据库连接
音乐文献数据库
怎样用八k的纸画网络安全手抄报
仙剑初代用什么软件开发
net core数据库操作
我的世界服务器删mod
数据库唯一索引空格
中山19寸戴尔服务器大全
数据库系统概念总结资料
联想服务器走阵列
龙华软件开发招应届毕业生