spark通过combineByKey算子实现条件性聚合的方法
发表于:2024-09-21 作者:千家信息网编辑
千家信息网最后更新 2024年09月21日,实际开发过程中遇到了需要实现选择性聚合的场景,即对于某一个key对应的数据,将满足条件的记录进行聚合,不满足条件的则不进行聚合。使用spark处理这种计算场景时,想到了使用combineByKey算子
千家信息网最后更新 2024年09月21日spark通过combineByKey算子实现条件性聚合的方法
实际开发过程中遇到了需要实现选择性聚合的场景,即对于某一个key对应的数据,将满足条件的记录进行聚合,不满足条件的则不进行聚合。
使用spark处理这种计算场景时,想到了使用combineByKey算子,先将输入数据中的value映射成含一个元素的ArrayBuffer(scala中相当于java中的ArrayList),然后在聚合时对满足聚合条件的记录聚合后覆盖这一个ArrayBuffer,不满足条件的待聚合的两条记录都填入ArrayBuffer。最后调用flatMap将ArrayBuffer中的元素分拆。
比如下面的代码实现了对某个字段聚合时,按照时间条件进行选择性的聚合:
val rdd1 = sc.textFile(dayDayDir).union(sc.textFile(thisDayDir)) .map(line => line.split("\\|")) .filter(arr => if(arr.length != 14 || !arr(3).substring(0, 8).equals(lastDay)) false else true) .map(arr => (arr(0), arr)) .reduceByKey( (pure, after) => reduceSession(pure, after)) .map(tup => (tup._2(13), tup._2)) .combineByKey( x => ArrayBuffer(x), (x:ArrayBuffer[Array[String]],y) => combineMergeValue(x, y), (x:ArrayBuffer[Array[String]],y:ArrayBuffer[Array[String]]) => combineMergeCombiners(x, y)) .flatMap(tup => arrToStr(tup._2))def combineMergeValue(x:ArrayBuffer[Array[String]], y:Array[String]) : ArrayBuffer[Array[String]] = { var outList = x.clone() var outarr = y.clone() var flag = true for(i <- 0 until outList.length){ if(checkTime(outList(i)(3), outList(i)(4), y(3), y(4))) { outarr = reduceSession(outList(i), y) outList(i) = outarr flag = false } } if(flag) { outList += y } outList}def combineMergeCombiners(x:ArrayBuffer[Array[String]], y:ArrayBuffer[Array[String]]) : ArrayBuffer[Array[String]] = { var outList = x.clone(); for(i <- 0 until y.length){ var outarr = y(i).clone() var flag = true for(j <- 0 until outList.length){ if(checkTime(outList(j)(3), outList(j)(4), y(i)(3), y(i)(4))) { outarr = reduceSession(outList(j), y(i)) outList(j) = outarr flag = false } } if(flag) { outList += y(i) } } outList}
条件
元素
场景
数据
选择性
合时
选择
算子
代码
字段
实际
时间
过程
面的
先将
处理
开发
输入
方法
条件性
数据库的安全要保护哪些东西
数据库安全各自的含义是什么
生产安全数据库录入
数据库的安全性及管理
数据库安全策略包含哪些
海淀数据库安全审计系统
建立农村房屋安全信息数据库
易用的数据库客户端支持安全管理
连接数据库失败ssl安全错误
数据库的锁怎样保障安全
供电公司科技互联网部
信息网络安全培训个人总结
杭州购物直播软件开发
网络安全问题是
深圳程序软件开发需要多少钱
徐汇区品质数据库服务清单
网络安全句子长
备份 服务器
一般将软件开发环境称为
女生手机app软件开发
网络文件服务器软件
扫描文件和数据库是一样的吗
黑客进攻服务器不留痕迹吗
上海甄郝网络技术公司
最新企业工商数据库
数据库表怎么删除
中央空调网络安全吗
北邮时间服务器
新乡市聚弘网络技术有限公司
数据库 查询001课程
数据库怎么查表的具体人
b s软件开发公司
初中网络安全宣传周总结
网络安全主要目的
查文献专利的数据库
网络技术文章翻译
163pop服务器
杭州软件开发驻场服务公司
七日杀服务器管理代码
成都web前端软件开发有用吗