spark通过combineByKey算子实现条件性聚合的方法
发表于:2025-01-28 作者:千家信息网编辑
千家信息网最后更新 2025年01月28日,实际开发过程中遇到了需要实现选择性聚合的场景,即对于某一个key对应的数据,将满足条件的记录进行聚合,不满足条件的则不进行聚合。使用spark处理这种计算场景时,想到了使用combineByKey算子
千家信息网最后更新 2025年01月28日spark通过combineByKey算子实现条件性聚合的方法
实际开发过程中遇到了需要实现选择性聚合的场景,即对于某一个key对应的数据,将满足条件的记录进行聚合,不满足条件的则不进行聚合。
使用spark处理这种计算场景时,想到了使用combineByKey算子,先将输入数据中的value映射成含一个元素的ArrayBuffer(scala中相当于java中的ArrayList),然后在聚合时对满足聚合条件的记录聚合后覆盖这一个ArrayBuffer,不满足条件的待聚合的两条记录都填入ArrayBuffer。最后调用flatMap将ArrayBuffer中的元素分拆。
比如下面的代码实现了对某个字段聚合时,按照时间条件进行选择性的聚合:
val rdd1 = sc.textFile(dayDayDir).union(sc.textFile(thisDayDir)) .map(line => line.split("\\|")) .filter(arr => if(arr.length != 14 || !arr(3).substring(0, 8).equals(lastDay)) false else true) .map(arr => (arr(0), arr)) .reduceByKey( (pure, after) => reduceSession(pure, after)) .map(tup => (tup._2(13), tup._2)) .combineByKey( x => ArrayBuffer(x), (x:ArrayBuffer[Array[String]],y) => combineMergeValue(x, y), (x:ArrayBuffer[Array[String]],y:ArrayBuffer[Array[String]]) => combineMergeCombiners(x, y)) .flatMap(tup => arrToStr(tup._2))def combineMergeValue(x:ArrayBuffer[Array[String]], y:Array[String]) : ArrayBuffer[Array[String]] = { var outList = x.clone() var outarr = y.clone() var flag = true for(i <- 0 until outList.length){ if(checkTime(outList(i)(3), outList(i)(4), y(3), y(4))) { outarr = reduceSession(outList(i), y) outList(i) = outarr flag = false } } if(flag) { outList += y } outList}def combineMergeCombiners(x:ArrayBuffer[Array[String]], y:ArrayBuffer[Array[String]]) : ArrayBuffer[Array[String]] = { var outList = x.clone(); for(i <- 0 until y.length){ var outarr = y(i).clone() var flag = true for(j <- 0 until outList.length){ if(checkTime(outList(j)(3), outList(j)(4), y(i)(3), y(i)(4))) { outarr = reduceSession(outList(j), y(i)) outList(j) = outarr flag = false } } if(flag) { outList += y(i) } } outList}
条件
元素
场景
数据
选择性
合时
选择
算子
代码
字段
实际
时间
过程
面的
先将
处理
开发
输入
方法
条件性
数据库的安全要保护哪些东西
数据库安全各自的含义是什么
生产安全数据库录入
数据库的安全性及管理
数据库安全策略包含哪些
海淀数据库安全审计系统
建立农村房屋安全信息数据库
易用的数据库客户端支持安全管理
连接数据库失败ssl安全错误
数据库的锁怎样保障安全
我的世界服务器封面怎么解除
如何建立文档数据库
plus连接不上本地数据库
服务器的计划管理软件
戴尔服务器工作站识别不到硬盘
常用fda数据库介绍
oracle数据库学习通
数据库设计规范应该写什么
我的世界1.8.3服务器
网络安全管理类联考书籍
怎样制作小程序软件开发
excel怎么连接数据库
关于数据库视图删除数据库
通俗解释软件开发技术
软件开发 胜任力
世纪星组态软件开发报价
华为服务器出厂报告
哪些措施有利于数据库的安全
小鸟云服务器怎么样
服务器硬盘sas
软件开发在外包工作3年
计算机网络技术及应用考卷
电脑服务器管理器打不开
全局数据库名oracle
XULEI下载软件开发
深圳永邦移动互联网科技公司
网络安全 判刑
支付宝里怎么搭建服务器
迷你世界无规则服务器生存
数据库创建区域经理表