如何使用spark-core实现广度优先搜索
发表于:2024-12-02 作者:千家信息网编辑
千家信息网最后更新 2024年12月02日,如何使用spark-core实现广度优先搜索,针对这个问题,这篇文章详细介绍了相对应的分析和解答,希望可以帮助更多想解决这个问题的小伙伴找到更简单易行的方法。需求描述数据源是一批网络日志数据,每条数据
千家信息网最后更新 2024年12月02日如何使用spark-core实现广度优先搜索
如何使用spark-core实现广度优先搜索,针对这个问题,这篇文章详细介绍了相对应的分析和解答,希望可以帮助更多想解决这个问题的小伙伴找到更简单易行的方法。
需求描述
数据源是一批网络日志数据,每条数据都有两个字段srcip和dstip,字段之间以逗号分隔,问题的需求是给定一个srcip和dstip,在给定的搜索深度下检索这两个ip之间所有的通联路径。这个问题是网络日志处理中的一个实际需求,之前在单机的程序中实现过,但是需要将所有的ip对加载到内存中。考虑到如果数据量太大的情况,可能单节点的内存无法支撑这样的操作,但是如果不将ip对全加载内存中,使用深度优先遍历的方法,搜索过程又会很慢。最近在学习spark框架,刚接触RDD,就是这用RDD来解决这个问题。以下是scala代码
package com.pxu.spark.coreimport org.apache.spark.{HashPartitioner, SparkConf, SparkContext}/** * pxu * 2021-01-29 16:57 */object FindIpRel { def main(args: Array[String]): Unit = { val srcIp = args(0) // 源ip val dstIp = args(1) // 目标ip val depth = args(2).toInt //搜索深度 val resPath = args(3) //搜索结果的输出位置 val conf = new SparkConf().setAppName("findIpRel") val sc = new SparkContext(conf) /** * 从数据源中构建原始rdd,每一行的数据形式为a,b */ val ori = sc.textFile("hdfs://master:9000/submitTest/input/ipconn/srcdst.csv") /** * 对原始Rdd进行元组形式转化,现在每一行的数据形式为(a,b) * 除此之外还对数据进行了去重处理,并显示使用hash分区器对RDD中的数据进行分区 * 为后面的join操作,做一些优化 */ val base = ori.map(a => { val tmpArr = a.split(",") (tmpArr(0), tmpArr(1)) }).distinct().partitionBy(new HashPartitioner(10)) /** * 这是一个用于保存结果的RDD,其中每一行的形式为(dstIp,List(ip on path)) * 在查找过程中,发现了搜索结果后,就会将其并入到res中 */ var res = sc.makeRDD[(String,List[String])](List()) /** * 这是一个用于迭代的RDD,其初始化的内容是,首先从baseRdd中过滤出元组第一个元素a是参数SrcIp的, * 然后将其转化成(b,List(a))的格式,其中b总是代表当前搜索路径上的尾ip,list中的其他内容代表搜索 * 路径上其他的ip */ var iteration = base.filter(_._1.equals(srcIp)).map(a => (a._2,List(a._1))) for(i <- 2 to depth){ /** * 1.首先iteration和base按照key进行join,这个操作的意义就是更深一层的搜索,结果RDD的格式是(b,(List(ip on path),c)) * 2.对数据进行一次过滤,过去掉那些路径已经形成环的元素,成环的判据就是List(ip on path)中的数据已经包含c了 * 3.进行map操作,b并入到List(ip on path),将c作为新的key,因此此时更深一层的搜索,导致c成为了当前搜索路径中的尾节点, * 此时RDD中的每一个元素的格式应该是(c,(List(ip on path)) */ val tmp = iteration.join(base).filter(a => !a._2._1.contains(a._2._2)).map(a => (a._2._2,a._2._1:+a._1)) /** * 将tmp中已经成功搜索的路径筛选出来,成功搜索的判据是(c,(List(ip on path)),c与dstIp相等 */ val success = tmp.filter(a => a._1.equals(dstIp)) /** * 将成功搜索的数据合并到res中 */ res = res.union(success) /** * 更新iteration */ iteration = tmp.subtract(success) } /** * 将成功搜索的路径并入到res中 */ res.union(iteration.filter(a => a._1.equals(dstIp))) /** * 执行一次转换操作,将res中的元素从(c,(List(ip on path))格式转换成List(all ip on path) */ val finalResult = res.map(a => a._2 :+ a._1) finalResult.saveAsTextFile(resPath) }}
关于如何使用spark-core实现广度优先搜索问题的解答就分享到这里了,希望以上内容可以对大家有一定的帮助,如果你还有很多疑惑没有解开,可以关注行业资讯频道了解更多相关知识。
搜索
数据
问题
路径
成功
形式
一行
内存
内容
深度
结果
需求
广度
原始
两个
之间
代表
元素
字段
搜索结果
数据库的安全要保护哪些东西
数据库安全各自的含义是什么
生产安全数据库录入
数据库的安全性及管理
数据库安全策略包含哪些
海淀数据库安全审计系统
建立农村房屋安全信息数据库
易用的数据库客户端支持安全管理
连接数据库失败ssl安全错误
数据库的锁怎样保障安全
萃华互联网信息科技有限公司
博途数据库曲线
软件开发成员英文
关注网络安全下一句标语
软件开发周期 uml
2017软件开发市场前景
外卖软件开发计划书
涉密网络安全保密测试
手机用别人网络安全吗
什么网站无法访问服务器
数据库技术与应用考试重点
各大公司采用的软件开发模型
数据库的字段可以用中文吗
界面 数据库 增删改查
软件开发项目打字需要很快嘛
厦门网络安全免费试学
香山互联网科技有限公司
传到数据库中的文字分段吗
网信意识形态与网络安全
软件开发 评审
服务器是每台电脑都有吗
查询sql数据库所有表使用时间
新余企业服务器
php数据库网络安全
计算机软件开发技能包含什么
军团要塞2是哪个服务器
小学国家网络安全宣传模板
小乐技术软件开发
设置服务器端口为静态
数据库将两个字符串连接