9.spark core之共享变量
发表于:2024-10-25 作者:千家信息网编辑
千家信息网最后更新 2024年10月25日,简介 spark执行操作时,可以使用驱动器程序Driver中定义的变量,但有时这种默认的使用方式却并不理想。集群中运行的每个任务都会连接驱动器获取变量。如果获取的变量比较大,执行效率会非常低下。每个
千家信息网最后更新 2024年10月25日9.spark core之共享变量
简介
spark执行操作时,可以使用驱动器程序Driver中定义的变量,但有时这种默认的使用方式却并不理想。
- 集群中运行的每个任务都会连接驱动器获取变量。如果获取的变量比较大,执行效率会非常低下。
- 每个任务都会得到这些变量的一份新的副本,更新这些副本的值不会影响驱动器中的对应变量。如果驱动器需要获取变量的结果值,这种方式是不可行的。
spark为了解决这两个问题,提供了两种类型的共享变量:广播变量(broadcast variable)和累加器(accumulator)。
- 广播变量用于高效分发较大的对象。会在每个执行器本地缓存一份大对象,而避免每次都连接驱动器获取。
- 累加器用于在驱动器中对数据结果进行聚合。
广播变量
原理
- 广播变量只能在Driver端定义,不能在Executor端定义。
- 在Driver端可以修改广播变量的值,在Executor端无法修改广播变量的值。
- 如果不使用广播变量在Executor有多少task就有多少Driver端的变量副本;如果使用广播变量在每个Executor中只有一份Driver端的变量副本。
用法
- 通过对一个类型T的对象调用SparkContext.broadcast创建出一个BroadCast[T]对象,任何可序列化的类型都可以这么实现。
- 通过value属性访问该对象的值
- 变量只会被发到各个节点一次,应作为只读值处理。(修改这个值不会影响到别的节点)
实例
查询每个国家的呼号个数
python
# 将呼号前缀(国家代码)作为广播变量signPrefixes = sc.broadcast(loadCallSignTable())def processSignCount(sign_count, signPrefixes): country = lookupCountry(sign_count[0], signPrefixes.value) count = sign_count[1] return (country, count)countryContactCounts = (contactCounts.map(processSignCount).reduceByKey((lambda x, y: x+y)))countryContactCounts.saveAsTextFile(outputDir + "/countries.txt")
scala
// 将呼号前缀(国家代码)作为广播变量val signPrefixes = sc.broadcast(loadCallSignTable())def processSignCount(sign_count, signPrefixes): country = lookupCountry(sign_count[0], signPrefixes.value) count = sign_count[1] return (country, count)val countryContactCounts = contactCounts.map{case (sign, count) => { val country = lookupInArray(sign, signPrefixes.value) (country, count) }}.reduceByKey((x, y) => x+y)countryContactCounts.saveAsTextFile(outputDir + "/countries.txt")
java
// 将呼号前缀(国家代码)作为广播变量final Broadcast signPrefixes = sc.broadcast(loadCallSignTable());JavaPairRDD countryContactCounts = contactCounts.mapToPair(new PairFunction, String, Integer>() { public Tuple2 call(Tuple2 callSignCount) { String sign = callSignCount._1(); String country = lookupCountry(sign, signPrefixes.value()); return new Tuple2(country, callSignCount._2()); }}).reduceByKey(new SumInts());countryContactCounts.saveAsTextFile(outputDir + "/countries.txt");
累加器
原理
- 累加器在Driver端定义赋初始值。
- 累加器只能在Driver端读取最后的值,在Excutor端更新。
用法
- 通过调用sc.accumulator(initivalValue)方法,创建出存有初始值的累加器。返回值为org.apache.spark.Accumulator[T]对象,其中T是初始值initialValue的类型。
- Spark闭包里的执行器代码可以使用累加器的+=方法增加累加器的值
- 驱动器程序可以调用累加器的value属性来访问累加器的值
实例
累加空行
python
file = sc.textFile(inputFile)# 创建Accumulator[Int]并初始化为0blankLines = sc.accumulator(0)def extractCallSigns(line): global blankLines # 访问全局变量 if (line == ""): blankLines += 1 return line.split(" ")callSigns = file.flatMap(extractCallSigns)callSigns.saveAsTextFile(outputDir + "/callsigns")print "Blank lines: %d" % blankLines.value
scala
val file = sc.textFile("file.txt")val blankLines = sc.accumulator(0) //创建Accumulator[Int]并初始化为0val callSigns = file.flatMap(line => { if (line == "") { blankLines += 1 //累加器加1 } line.split(" ")})callSigns.saveAsTextFile("output.txt")println("Blank lines:" + blankLines.value)
java
JavaRDD rdd = sc.textFile(args[1]);final Accumulator blankLines = sc.accumulator(0);JavaRDD callSigns = rdd.flatMap(new FlatMapFunction() { public Iterable call(String line) { if ("".equals(line)) { blankLines.add(1); } return Arrays.asList(line.split(" ")); }});callSigns.saveAsTextFile("output.text");System.out.println("Blank lines:" + blankLines.value());
忠于技术,热爱分享。欢迎关注公众号:java大数据编程,了解更多技术内容。
变量
累加器
广播
驱动器
驱动
对象
代码
副本
呼号
国家
类型
前缀
任务
原理
实例
属性
技术
数据
方式
方法
数据库的安全要保护哪些东西
数据库安全各自的含义是什么
生产安全数据库录入
数据库的安全性及管理
数据库安全策略包含哪些
海淀数据库安全审计系统
建立农村房屋安全信息数据库
易用的数据库客户端支持安全管理
连接数据库失败ssl安全错误
数据库的锁怎样保障安全
服务器机柜可调节托盘
我的世界如何服务器
数据库技术三级博客
海陵区个性化网络技术解决方案
数据库管理和手工管理的差别
黄山学院软件开发学院
杭州软件开发相关公司
网络安全宣传片背景音乐
qq网络技术钻石皇朝
数据库前端怎么连接
软件开发赚钱模式
ios设置代理服务器
服务器未获取角色信息王者荣耀
网络安全文科生可报吗
骑马与砍杀战团服务器
程序员怎么练习软件开发
sw pdm数据库安装
互联app软件开发
台湾公司生鲜配送软件开发
网络安全事件负面影响
德勤数据库开发
对口升学计算机网络技术题
移星串口服务器web设置
网络安全法 告知和报告
vr和ar软件开发费用
mcpe服务器文件介绍
思科网络安全产品
徐汇区营销软件开发信息中心
无锡工程软件开发技术指导
服务器的使用方法