spark通过combineByKey算子实现条件性聚合的方法
发表于:2024-11-23 作者:千家信息网编辑
千家信息网最后更新 2024年11月23日,实际开发过程中遇到了需要实现选择性聚合的场景,即对于某一个key对应的数据,将满足条件的记录进行聚合,不满足条件的则不进行聚合。使用spark处理这种计算场景时,想到了使用combineByKey算子
千家信息网最后更新 2024年11月23日spark通过combineByKey算子实现条件性聚合的方法
实际开发过程中遇到了需要实现选择性聚合的场景,即对于某一个key对应的数据,将满足条件的记录进行聚合,不满足条件的则不进行聚合。
使用spark处理这种计算场景时,想到了使用combineByKey算子,先将输入数据中的value映射成含一个元素的ArrayBuffer(scala中相当于java中的ArrayList),然后在聚合时对满足聚合条件的记录聚合后覆盖这一个ArrayBuffer,不满足条件的待聚合的两条记录都填入ArrayBuffer。最后调用flatMap将ArrayBuffer中的元素分拆。
比如下面的代码实现了对某个字段聚合时,按照时间条件进行选择性的聚合:
val rdd1 = sc.textFile(dayDayDir).union(sc.textFile(thisDayDir)) .map(line => line.split("\\|")) .filter(arr => if(arr.length != 14 || !arr(3).substring(0, 8).equals(lastDay)) false else true) .map(arr => (arr(0), arr)) .reduceByKey( (pure, after) => reduceSession(pure, after)) .map(tup => (tup._2(13), tup._2)) .combineByKey( x => ArrayBuffer(x), (x:ArrayBuffer[Array[String]],y) => combineMergeValue(x, y), (x:ArrayBuffer[Array[String]],y:ArrayBuffer[Array[String]]) => combineMergeCombiners(x, y)) .flatMap(tup => arrToStr(tup._2))def combineMergeValue(x:ArrayBuffer[Array[String]], y:Array[String]) : ArrayBuffer[Array[String]] = { var outList = x.clone() var outarr = y.clone() var flag = true for(i <- 0 until outList.length){ if(checkTime(outList(i)(3), outList(i)(4), y(3), y(4))) { outarr = reduceSession(outList(i), y) outList(i) = outarr flag = false } } if(flag) { outList += y } outList}def combineMergeCombiners(x:ArrayBuffer[Array[String]], y:ArrayBuffer[Array[String]]) : ArrayBuffer[Array[String]] = { var outList = x.clone(); for(i <- 0 until y.length){ var outarr = y(i).clone() var flag = true for(j <- 0 until outList.length){ if(checkTime(outList(j)(3), outList(j)(4), y(i)(3), y(i)(4))) { outarr = reduceSession(outList(j), y(i)) outList(j) = outarr flag = false } } if(flag) { outList += y(i) } } outList}
条件
元素
场景
数据
选择性
合时
选择
算子
代码
字段
实际
时间
过程
面的
先将
处理
开发
输入
方法
条件性
数据库的安全要保护哪些东西
数据库安全各自的含义是什么
生产安全数据库录入
数据库的安全性及管理
数据库安全策略包含哪些
海淀数据库安全审计系统
建立农村房屋安全信息数据库
易用的数据库客户端支持安全管理
连接数据库失败ssl安全错误
数据库的锁怎样保障安全
可用性属于网络安全吗
东城区网络安全培训
国家网络安全周启幕
java查数据库消息
moxa串口服务器双网口
软件开发与项目管理专业代码
海康怎么进入存储服务器
服务器公网ip 配置
重庆联通服务器租用好不好
如需访问该服务器的ftp资源
数据库开发做什么内容
网络技术服务及开发公司
通信及网络技术
同城软件开发公司
redis服务器内存
国内外在网络技术上的差距
服务器开机识别不了键盘
互联网和科技与教育的结合
梦幻西游哪个服务器
20行业网络安全工作报告
cisco服务器开机原理
c#从文件读取数据库连接
浪潮服务器生态
网络技术的心得体会
海康怎么进入存储服务器
郑州互易网络技术有限公司
数据库如何修改字段含义
设计数据库概念模型
苏州德尔福软件开发
更改数据库默认存储位置