9.spark core之共享变量
发表于:2025-01-24 作者:千家信息网编辑
千家信息网最后更新 2025年01月24日,简介 spark执行操作时,可以使用驱动器程序Driver中定义的变量,但有时这种默认的使用方式却并不理想。集群中运行的每个任务都会连接驱动器获取变量。如果获取的变量比较大,执行效率会非常低下。每个
千家信息网最后更新 2025年01月24日9.spark core之共享变量
简介
spark执行操作时,可以使用驱动器程序Driver中定义的变量,但有时这种默认的使用方式却并不理想。
- 集群中运行的每个任务都会连接驱动器获取变量。如果获取的变量比较大,执行效率会非常低下。
- 每个任务都会得到这些变量的一份新的副本,更新这些副本的值不会影响驱动器中的对应变量。如果驱动器需要获取变量的结果值,这种方式是不可行的。
spark为了解决这两个问题,提供了两种类型的共享变量:广播变量(broadcast variable)和累加器(accumulator)。
- 广播变量用于高效分发较大的对象。会在每个执行器本地缓存一份大对象,而避免每次都连接驱动器获取。
- 累加器用于在驱动器中对数据结果进行聚合。
广播变量
原理
- 广播变量只能在Driver端定义,不能在Executor端定义。
- 在Driver端可以修改广播变量的值,在Executor端无法修改广播变量的值。
- 如果不使用广播变量在Executor有多少task就有多少Driver端的变量副本;如果使用广播变量在每个Executor中只有一份Driver端的变量副本。
用法
- 通过对一个类型T的对象调用SparkContext.broadcast创建出一个BroadCast[T]对象,任何可序列化的类型都可以这么实现。
- 通过value属性访问该对象的值
- 变量只会被发到各个节点一次,应作为只读值处理。(修改这个值不会影响到别的节点)
实例
查询每个国家的呼号个数
python
# 将呼号前缀(国家代码)作为广播变量signPrefixes = sc.broadcast(loadCallSignTable())def processSignCount(sign_count, signPrefixes): country = lookupCountry(sign_count[0], signPrefixes.value) count = sign_count[1] return (country, count)countryContactCounts = (contactCounts.map(processSignCount).reduceByKey((lambda x, y: x+y)))countryContactCounts.saveAsTextFile(outputDir + "/countries.txt")
scala
// 将呼号前缀(国家代码)作为广播变量val signPrefixes = sc.broadcast(loadCallSignTable())def processSignCount(sign_count, signPrefixes): country = lookupCountry(sign_count[0], signPrefixes.value) count = sign_count[1] return (country, count)val countryContactCounts = contactCounts.map{case (sign, count) => { val country = lookupInArray(sign, signPrefixes.value) (country, count) }}.reduceByKey((x, y) => x+y)countryContactCounts.saveAsTextFile(outputDir + "/countries.txt")
java
// 将呼号前缀(国家代码)作为广播变量final Broadcast signPrefixes = sc.broadcast(loadCallSignTable());JavaPairRDD countryContactCounts = contactCounts.mapToPair(new PairFunction, String, Integer>() { public Tuple2 call(Tuple2 callSignCount) { String sign = callSignCount._1(); String country = lookupCountry(sign, signPrefixes.value()); return new Tuple2(country, callSignCount._2()); }}).reduceByKey(new SumInts());countryContactCounts.saveAsTextFile(outputDir + "/countries.txt");
累加器
原理
- 累加器在Driver端定义赋初始值。
- 累加器只能在Driver端读取最后的值,在Excutor端更新。
用法
- 通过调用sc.accumulator(initivalValue)方法,创建出存有初始值的累加器。返回值为org.apache.spark.Accumulator[T]对象,其中T是初始值initialValue的类型。
- Spark闭包里的执行器代码可以使用累加器的+=方法增加累加器的值
- 驱动器程序可以调用累加器的value属性来访问累加器的值
实例
累加空行
python
file = sc.textFile(inputFile)# 创建Accumulator[Int]并初始化为0blankLines = sc.accumulator(0)def extractCallSigns(line): global blankLines # 访问全局变量 if (line == ""): blankLines += 1 return line.split(" ")callSigns = file.flatMap(extractCallSigns)callSigns.saveAsTextFile(outputDir + "/callsigns")print "Blank lines: %d" % blankLines.value
scala
val file = sc.textFile("file.txt")val blankLines = sc.accumulator(0) //创建Accumulator[Int]并初始化为0val callSigns = file.flatMap(line => { if (line == "") { blankLines += 1 //累加器加1 } line.split(" ")})callSigns.saveAsTextFile("output.txt")println("Blank lines:" + blankLines.value)
java
JavaRDD rdd = sc.textFile(args[1]);final Accumulator blankLines = sc.accumulator(0);JavaRDD callSigns = rdd.flatMap(new FlatMapFunction() { public Iterable call(String line) { if ("".equals(line)) { blankLines.add(1); } return Arrays.asList(line.split(" ")); }});callSigns.saveAsTextFile("output.text");System.out.println("Blank lines:" + blankLines.value());
忠于技术,热爱分享。欢迎关注公众号:java大数据编程,了解更多技术内容。
变量
累加器
广播
驱动器
驱动
对象
代码
副本
呼号
国家
类型
前缀
任务
原理
实例
属性
技术
数据
方式
方法
数据库的安全要保护哪些东西
数据库安全各自的含义是什么
生产安全数据库录入
数据库的安全性及管理
数据库安全策略包含哪些
海淀数据库安全审计系统
建立农村房屋安全信息数据库
易用的数据库客户端支持安全管理
连接数据库失败ssl安全错误
数据库的锁怎样保障安全
云服务器防护软件大全
九江学院网络安全
龙腾简合网络技术有限公司
dell服务器进bios
命令打开数据库表
node给前台数据库
宁波品牌网络技术有哪些
常州竺森网络技术有限公司
单片机qt软件开发
干货满满快来开启网络安全大作战
三星samba服务器
网络安全调查报告及结果
四川人工智能软件开发哪家专业
网络安全博览会河北
人脸识别像素数据库
数据库坏块
服务器起立
方知中国数据库属于什么数据库
重庆网络软件开发条件
建筑网络安全防线
博山客户办公crm软件开发公司
金山区品质软件开发销售电话
数据库的服务名称
软件开发英语需要多高
商务局网络安全制度
网络安全100秒漫谈
十大网络安全咨询公司排名
壹点金融网络安全
大专计算机网络技术上课
只有mdm服务器才能更新其自有的描述文件