千家信息网

Spark常用的action算子

发表于:2024-09-23 作者:千家信息网编辑
千家信息网最后更新 2024年09月23日,action算子简介Action类算子也是一类算子(函数)叫做行动算子,如foreach,collect,count等。Transformations类算子是延迟执行,Action类算子是触发执行。一
千家信息网最后更新 2024年09月23日Spark常用的action算子

action算子简介

Action类算子也是一类算子(函数)叫做行动算子,如foreach,collect,count等。Transformations类算子是延迟执行,Action类算子是触发执行。一个application应用程序(就是我们编写的一个应用程序)中有几个Action类算子执行,就有几个job运行。

1.reduce

通过函数func聚集数据集中的所有元素,这个函数必须是关联性的,确保可以被正确的并发执行

scala> val rdd1 = sc.makeRDD(1 to 10)rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[3] at makeRDD at :24scala> rdd1.reduce(_+_)res3: Int = 55

2.collect

在driver的程序中,以数组的形式,返回数据集的所有元素,这通常会在使用filter或者其它操作后,返回一个足够小的数据子集再使用

scala> var rdd1 = sc.makeRDD(1 to 10)rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[2] at makeRDD at :24scala> rdd1.collectres2: Array[Int] = Array(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)

3.count

返回数据集的元素个数

scala> val rdd1 = sc.makeRDD(1 to 10)rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[4] at makeRDD at :24scala> rdd1.countres4: Long = 10

4.first

返回数据集的第一个元素(类似于take(1))

scala> val rdd1 = sc.makeRDD(1 to 10)rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[5] at makeRDD at :24scala> rdd1.firstres5: Int = 1

5.take

返回一个数组,由数据集的前n个元素组成。注意此操作目前并非并行执行的,而是driver程序所在机器

scala> val rdd1 = sc.makeRDD(1 to 10)rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[7] at makeRDD at :24scala> rdd1.take(3)res6: Array[Int] = Array(1, 2, 3)

6.takeSample(withReplacement,num,seed)

withReplacement:结果中是否可重复
num:取多少个
seed:随机种子
返回一个数组,在数据集中随机采样num个元素组成,可以选择是否用随机数替换不足的部分,seed用于指定的随机数生成器种子
原理
takeSample()函数和sample函数是一个原理,但是不使用相对比例采样,而是按设定的采样个数进行采样,同时返回结果不再是RDD,而是相当于对采样后的数据进行collect(),返回结果的集合为单机的数组

scala> val rdd1 = sc.makeRDD(1 to 10)rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[20] at makeRDD at :24scala> rdd1.takeSample(true,4,10)res19: Array[Int] = Array(10, 10, 2, 3)

7.takeOrdered

takeOrdered和top类似,只不过以和top相反的顺序返回元素。
top默认倒序,taskOrdered默认正序
top方法其实就是调用的taskOrdered,然后反转的结果

def top(num: Int)(implicit ord: Ordering[T]): Array[T] = withScope {    takeOrdered(num)(ord.reverse)  }
scala> val rdd1 = sc.makeRDD(1 to 10)rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[23] at makeRDD at :24scala> rdd1.top(5)res22: Array[Int] = Array(10, 9, 8, 7, 6)scala> rdd1.takeOrdered(5)res23: Array[Int] = Array(1, 2, 3, 4, 5)

8.saveAsTextFile

saveAsTextFile用于将RDD以文本文件的格式存储到文件系统中

val conf = new SparkConf()      .setAppName("saveFile")      .setMaster("local[*]")val sc = new SparkContext(conf)val rdd1: RDD[Int] = sc.parallelize(1 to 10)rdd1.repartition(1).saveAsTextFile("/tmp/fff")

9.saveAsSequenceFile

saveAsSequenceFile用于将RDD以SequenceFile的文件格式保存到HDFS上。使用方法和saveAsTextFile类似

10.saveAsObjectFile

saveAsObjectFile用于将RDD中的元素序列化成对象,存储到文件中。使用方法和saveAsTextFile类似

11.countByKey

对(K,V)类型的RDD有效,返回一个(K,Int)对的map,表示每一个可以对应的元素个数

scala> val rdd1 = sc.makeRDD(Array(("A",0),("A",2),("B",1),("B",2),("C",3)))rdd1: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[3] at makeRDD at :24scala> rdd1.countByKeyres1: scala.collection.Map[String,Long] = Map(B -> 2, A -> 2, C -> 1)

12.foreach

在数据集的每一个元素上,运行函数func,t通常用于更新一个累加器变量,或者和外部存储系统做交互

scala> val rdd1 = sc.makeRDD(Array(("A",0),("A",2),("B",1),("B",2),("C",3)))rdd1: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[9] at makeRDD at :24scala> rdd1.collect.foreach(println(_))(A,0)(A,2)(B,1)(B,2)(C,3)
元素 数据 算子 函数 数组 文件 程序 结果 个数 方法 而是 存储 使用方法 原理 就是 应用程序 格式 种子 系统 随机数 数据库的安全要保护哪些东西 数据库安全各自的含义是什么 生产安全数据库录入 数据库的安全性及管理 数据库安全策略包含哪些 海淀数据库安全审计系统 建立农村房屋安全信息数据库 易用的数据库客户端支持安全管理 连接数据库失败ssl安全错误 数据库的锁怎样保障安全 网络安全知识手抄报2年级 绝地求生国人服务器排名 大学计算机网络技术课程课件 四川服务器阵列卡云服务器 两类常见软件开发模型 青岛商志信网络技术有限公司 表单能提交到数据库的表单类型 内存数据库技术的应用 世界互联网大会黑科技无线电 多人视频软件开发商 乐生活互联网科技有限公司 epic在中国有多少服务器 sql 删除所以数据库 wow数据库7.0 安卓用户登录的数据库怎么写 数据库技术包含哪些技术 ensp网络技术实训书学生版 河南省工程建设交易基础数据库 迷你世界用什么软件开发 什么叫参考数据库 poc全称 网络安全 微控制器的软件开发系统 数据库开发工程师的工资和待遇 网络中心管理服务器平台 未授权开展扫描网络安全漏洞 教育厅网络安全演习 杭州函晁互联网科技有限公司 方舟怎么在游戏内收藏服务器 内蒙古时钟监控网关服务器 大乐数据库
0