生产常用Spark累加器剖析之三(自定义累加器)
发表于:2025-01-31 作者:千家信息网编辑
千家信息网最后更新 2025年01月31日,思路 & 需求参考IntAccumulatorParam的实现思路(上述文章中有讲):trait AccumulatorParam[T] extends AccumulableParam[T, T]
千家信息网最后更新 2025年01月31日生产常用Spark累加器剖析之三(自定义累加器)
思路 & 需求
参考IntAccumulatorParam的实现思路(上述文章中有讲):
trait AccumulatorParam[T] extends AccumulableParam[T, T] { def addAccumulator(t1: T, t2: T): T = { // addInPlace有很多具体的实现类 // 如果想要实现自定义的话,就得实现这个方法 addInPlace(t1, t2) }}
自定义也可以通过这个方法去实现,从而兼容我们自定义的累加器
需求:这里实现一个简单的案例,用分布式的方法去实现随机数
** * 自定义的AccumulatorParam * * Created by lemon on 2018/7/28. */object UniqueKeyAccumulator extends AccumulatorParam[Map[Int, Int]] { override def addInPlace(r1: Map[Int, Int], r2: Map[Int, Int]): Map[Int, Int] = { // ++用于两个集合相加 r1++r2 } override def zero(initialValue: Map[Int, Int]): Map[Int, Int] = { var data: Map[Int, Int] = Map() data }}/** * 使用自定义的累加器,实现随机数 * * Created by lemon on 2018/7/28. */object CustomAccumulator { def main(args: Array[String]): Unit = { val sparkConf = new SparkConf().setAppName("CustomAccumulator").setMaster("local[2]") val sc = new SparkContext(sparkConf) val uniqueKeyAccumulator = sc.accumulable(Map[Int, Int]())(UniqueKeyAccumulator) val distData = sc.parallelize(1 to 10) val mapCount = distData.map(x => { val randomNum = new Random().nextInt(20) // 构造一个k-v对 val map: Map[Int, Int] = Map[Int, Int](randomNum -> randomNum) uniqueKeyAccumulator += map }) println(mapCount.count()) // 获取到累加器的值 中的key值,并进行打印 uniqueKeyAccumulator.value.keys.foreach(println) sc.stop() }}
运行结果如下图:## 思路 & 需求
参考IntAccumulatorParam的实现思路(上述文章中有讲):
trait AccumulatorParam[T] extends AccumulableParam[T, T] { def addAccumulator(t1: T, t2: T): T = { // addInPlace有很多具体的实现类 // 如果想要实现自定义的话,就得实现这个方法 addInPlace(t1, t2) }}
自定义也可以通过这个方法去实现,从而兼容我们自定义的累加器
需求:这里实现一个简单的案例,用分布式的方法去实现随机数
** * 自定义的AccumulatorParam * * Created by lemon on 2018/7/28. */object UniqueKeyAccumulator extends AccumulatorParam[Map[Int, Int]] { override def addInPlace(r1: Map[Int, Int], r2: Map[Int, Int]): Map[Int, Int] = { // ++用于两个集合相加 r1++r2 } override def zero(initialValue: Map[Int, Int]): Map[Int, Int] = { var data: Map[Int, Int] = Map() data }}/** * 使用自定义的累加器,实现随机数 * * Created by lemon on 2018/7/28. */object CustomAccumulator { def main(args: Array[String]): Unit = { val sparkConf = new SparkConf().setAppName("CustomAccumulator").setMaster("local[2]") val sc = new SparkContext(sparkConf) val uniqueKeyAccumulator = sc.accumulable(Map[Int, Int]())(UniqueKeyAccumulator) val distData = sc.parallelize(1 to 10) val mapCount = distData.map(x => { val randomNum = new Random().nextInt(20) // 构造一个k-v对 val map: Map[Int, Int] = Map[Int, Int](randomNum -> randomNum) uniqueKeyAccumulator += map }) println(mapCount.count()) // 获取到累加器的值 中的key值,并进行打印 uniqueKeyAccumulator.value.keys.foreach(println) sc.stop() }}
运行结果如下图:
累加器
方法
思路
随机数
需求
两个
分布式
可以通过
文章
案例
结果
得实
参考
运行
常用
剖析
生产
数据库的安全要保护哪些东西
数据库安全各自的含义是什么
生产安全数据库录入
数据库的安全性及管理
数据库安全策略包含哪些
海淀数据库安全审计系统
建立农村房屋安全信息数据库
易用的数据库客户端支持安全管理
连接数据库失败ssl安全错误
数据库的锁怎样保障安全
石油化工概算应用数据库2017
海淀区专业软件开发联系方式
大学生毕业想从事软件开发
促织翻译软件开发
曙光服务器机柜
西北农林科技大学软件开发
幼儿网络安全情景短片
梁溪区机电软件开发设计规范
杭州睿趣网络技术有限公司
江苏安卓软件开发推荐
sql数据库行显示出来
数据库索引排序
sql 写数据库语句吗
连按不到服务器
信息网络安全在生产的影响
大麦22d打印服务器固件
计算机网络技术学校在哪里
物理服务器租用
知名的管理软件开发
百度最新研发的数据库产品
创新网络安全培训
云服务器电脑怎么下载
山西通用软件开发均价
两个项目用xml传数据库
计算机网络技术高等数学
一起考教师的软件开发的需求分析
悦天数据库
新疆网络技术公司招聘
应用软件开发大牛
汽车网络安全R156法规