Spark Core 的RDD
(1)RDD的介绍
RDD(Resilient Distributed Dataset)叫做分布式数据集,是Spark中最基本的数据抽象,它代表一个不可变(RDD中的数据,不能增删改),可分区、元素可并行计算的集合。
具有数据流的模型的特点,自动容错、位置感知性调度和可伸缩性。RDD允许用户在执行多个查询时显示的将工作集缓存在内存中。后续的查询能够重用工作集,这极大地提升了查询速度。
RDD可以从 三方面理解:
- 数据集:RDD是数据集合的抽象,是复杂物理介质上存在数据的一种逻辑视图。从外部看RDD的确可以被看待成经过封装,带扩展特性(如容错性)的数据集合。
- 分布式:RDD的数据可能存储在多个节点的磁盘上或者内存中,也就是所谓的多级存储。
- 弹性:虽然 RDD 内部存储的数据是只读的,但是,我们可以去修改(例如通 过 repartition 转换操作)并行计算计算单元的划分结构,也就是分区的数量。
总之:RDD就是一个大集合,将所有的数据都加载到内存中,方便多次进行重用。它的数据可以在多个节点上,并且RDD可以保存在内存中,当如果某个阶段的RDD丢失,不需要重新计算,只需要提取上一次的RDD,在相应的计算即可。
(2)RDD的属性
1)A list of partitions(一组分片,数据集的基本单位)
一个分区通常与一个任务向关联,分区的个数决定了并行的粒度。分区的个数可以在创建RDD的时候指定,如果不指定,那么默认的由节点的cores个数决定。最终每一个分区会被映射成为BlockManager 中的一个Block,而这个Block会被下一个task使用进行计算。
2)A function for computing each split(算子)
每一个RDD都会实现compute,用于分区进行计算
3)A list of dependencies on other RDDs(RDD之间的依赖)
RDD 的每次转换都会生成一个新的 RDD,所以 RDD 之间就会形成类似于流水线一样的前后依赖关系。在部分分区数据丢失时,Spark 可以通过这个依赖关系重新计算丢失的分区数据, 而不是对 RDD 的所有分区进行重新计算。
宽依赖和窄依赖:
窄依赖(完全依赖):一个父分区唯一对应一个子分区,例:map操作
宽依赖(部分依赖):一个父分区对应多个子分区,如:reduce、group操作
区分宽依赖和窄依赖:当前这个算子的执行过程中是否有shuffle操作。
4)Optionally a Partitioner for key-value RDDs(分区函数)
当前 Spark 中实现了两种类型的分片函数,一个是基于哈希的 HashPartitioner,另外一个是基于范围的 RangePartitioner。只有对于 key-value 的 RDD,才会有 Partitioner,非 key-value的 RDD 的 Parititioner 的值是 None。Partitioner 函数不但决定了 RDD 本身的分片数量,也决 定了 parent RDD Shuffle 输出时的分片数量。
5)Optionally a list of preferred locations to compute each split on
一个列表,存储存取每个 Partition 的优先位置(preferred location)。按照"移动数据不如移动计算"的理念,Spark 在进行任务调度的时候,会尽可能地将计算任务分配到其所要处理数据块的存储位置。而这个列表中就存放着每个分区的优先位置。
(3)RDD的API(相关算子)
RDD编程中有两种中形式:Transformation(转换)和Action(行动)。
Transformation:表示把一个RDD ---->RDD。
Action:表示把RDD----集合或者scala对象。
1)RDD的创建:
object SparktTest { def main(args: Array[String]): Unit = { val conf: SparkConf = new SparkConf() conf.setAppName("SparktTest") conf.setMaster("local[2]") val sc: SparkContext = new SparkContext() //由一个已经存在的 Scala 数据集合创建 val arr=Array(1,2,3,4) val arr1RDD: RDD[Int] = sc.parallelize(arr) val arr2RDD: RDD[Int] = sc.makeRDD(arr) //由外部存储系统的数据创建(HDFS、HBase...) val HDFSRDD: RDD[String] = sc.textFile("/data/input") }}
2)Transformation:
官网:http://spark.apache.org/docs/latest/rdd-programming-guide.html#transformations
注意:RDD中所有的转换(Transformation)都是延迟加载,也就是说,他们并不是直接计算结果,相反的,他们只是记住这些应用到基础数据集,上的一个转换动作,只有当发生一个要求返回一个Driver动作的时候,这些转换才真正运行。
map()算子:
val HDFSRDD: RDD[String] = sc.textFile("/data/input") /** * map 算子,返回一个新的RDD,该RDD由每一个输入元素经过function函数转换后组成 */ val mapRDD: RDD[(String, Int)] = HDFSRDD.map(ele=>(ele,1))
flatMap()算子:
val arr=Array("hive hbase hadoop","spark hadoop","yarn hdfs") val lineRDD: RDD[String] = sc.parallelize(arr) /** * flagMap:类似于map,但是每一个元素输入的元素可以被 * 映射成为0个或者多个输出的元素(返回的是一个序列,而不是单一的元素) */ //返回一个集合hive hbase hadoop spark hadoop yarn hdfs val wordRDD: RDD[String] = lineRDD.flatMap(line=>line.split("\\s+"))
filter()算子:
val arr=Array(1,2,3,4,5) val arrRDD: RDD[Int] = sc.parallelize(arr) /** * filter过滤:返回一个新的RDD,该RDD由经过func函数计算后返回 * 值为true的输入元素组成 */ val filterRDD: RDD[Int] = arrRDD.filter(num=>num%2==0)
mapPartitions()算子:
val hdfsRDD: RDD[String] = sc.textFile("/data/input") /** * mapPartitions与map的唯一区别就是,mapPartitions迭代的是一个分区, * 而map遍历的每一个元素,mapPartitions参数是一个迭代对象,返回的也是一个迭代对象 */ val partitionRDD: RDD[String] = hdfsRDD.mapPartitions((x: Iterator[String]) => { val temp = x.toList.map(line => line + "!") temp.toIterator })
mapPartitionsWithIndex()算子:
val hdfsRDD: RDD[String] = sc.textFile("/data/input") /** * 第一个参数是分区编号:分区编号是从0开始的不间断的连续编号 * 第二个参数和mapPartitions相同 */ val partitionRDD: RDD[String] = hdfsRDD.mapPartitionsWithIndex((parnum:Int,x: Iterator[String]) => { println(parnum) //分区编号 val temp = x.toList.map(line => line + "!") temp.toIterator })
sample()算子:
val list=1 to 5000 /** * sample方法有三个参数: * withReplacement:代表是否有放回的抽取(false 不放回,true:放回) * fraction:抽取样本空间占总体的比例,(以分数的形式) 0<=fraction <=1 * seed:随机数生成器,new Random().nextInt(10),不传表示使用系统的 * 注意:我们使用的sample算子,不能保证提供集合大小就恰巧是rdd.size()*fraction,结果大小会上下浮动 * sample在做抽样调查的时候,特别受用 */ val listRDD: RDD[Int] = sc.parallelize(list) val sampleRDD: RDD[Int] = listRDD.sample(false,0.2) println(sampleRDD.count()) //大概是5000*0.2 上下浮动
groupByKey()算子:
val list=List(("math",18),("hbase",18),("hive",22),("hive",18)) /** * groupByKey,分组 * 建议groupByKey在实践中,能不用就不用,主要因为groupByKey的效率低, * 因为有大量的数据在网络中传输,而且还没有进行本地的预处理 * 可以使用reduceByKey或者aggregateByKey或者combineByKey代替这个groupByKey */ val stuRDD: RDD[(String, Int)] = sc.parallelize(list) //分组 val groupRDD: RDD[(String, Iterable[Int])] = stuRDD.groupByKey() //求平均值 val result: RDD[(String, Double)] = groupRDD.map { case (name, score) => { val avg = score.sum.toDouble / (score.size) (name, avg) } }
reduceByKey算子:
val list=List(("math",18),("hbase",18),("hive",22),("hive",18)) /** * reduceByKey:在一个(K,V)对的数据集上使用,返回一个(K,V)对的数据 * 集,key 相同的值,都被使用指定的 reduce 函数聚合 * 到一起。和 groupByKey 类似,任务的个数是可以通过 * 第二个可选参数来配置的。 */ val stuRDD: RDD[(String, Int)] = sc.parallelize(list) //分组,求总分 val sumRDD: RDD[(String, Int)] = stuRDD.reduceByKey((x, y)=>x+y) sumRDD.foreach(println) //打印:(hbase,36)(math,18)(hbase,18)
sortByKey()算子:
val list=List(("math",18),("hbase",18),("hive",22),("hive",18)) /** * sortByKey:在一个(K,V)的 RDD 上调用,K 必须实现 Ordered 接口, * 返回一个按照 key 进行排序的(K,V)的 RDD */ //分组,求总分,排序 val stuRDD: RDD[(String, Int)] = sc.parallelize(list) val sumRDD: RDD[(String, Int)] = stuRDD.reduceByKey((x, y)=>x+y) sumRDD.foreach(println) //打印:(hbase,36)(math,18)(hbase,18) val sortRDD: RDD[(String, Int)] = sumRDD.map(kv=>(kv._2,kv._1)).sortByKey().map(kv=>(kv._2,kv._1)) sortRDD.foreach(println)
sortBy算子:
val list=List(("math",18),("hbase",18),("hive",22),("hive",18)) /** * sortBy(func,[ascending], [numTasks]) * 与 sortByKey 类似,但是更灵活 * 第一个参数是根据什么排序 * 第二个是怎么排序,true 正序,false 倒序 * 第三个排序后分区数,默认与原 RDD 一样 */ //分组,求总分,排序 val stuRDD: RDD[(String, Int)] = sc.parallelize(list) val sumRDD: RDD[(String, Int)] = stuRDD.reduceByKey((x, y)=>x+y) sumRDD.foreach(println) //打印:(hbase,36)(math,18)(hbase,18) val sortRDD: RDD[(String, Int)] = sumRDD.sortBy(kv=>kv._2,false,2)
aggregateByKey()算子:
object SparktTest { def main(args: Array[String]): Unit = { val conf: SparkConf = new SparkConf() conf.setAppName("SparktTest") conf.setMaster("local[2]") val sc: SparkContext = new SparkContext() /** * aggregateByKey(zeroValue)(seqOp,combOp, [numTasks]) * 先按分区聚合再总的聚合,每次要跟初始值交流 * zeroValue:初始值 * seqOp:迭代操作,拿RDD中的每一个元素跟初始值进行合并 * combOp:分区结果的最终合并 * numTasks:分区个数 * aggregate+groupByKey=aggregateByKey * aggregate对单个值进行RDD,aggregateByKey对(K,V)值进行RDD */ //aggregateval list = Array(1, 2, 3, 4, 5, 6, 7, 8, 9, 10) val listRDD: RDD[Int] = sc.parallelize(list) //求平均值 /** * seqOp: (U, T) => U * combOp: (U, U) => U * u:(Int,Int) 总和,总次数 */ val result: (Int, Int) = listRDD.aggregate(0, 0)((u: (Int, Int), x: Int) => { (u._1 + x, u._2 + 1) } , (u1: (Int, Int), u2: (Int, Int)) => { (u1._1 + u2._1, u1._2 + u2._2) }) println(result._1 / result._2) //aggregateByKey已经根据(k,v)k 进行分组,以下的操作,是对v进行操作 //以下操作时求平均值 val list1 = List(("math", 18), ("hbase", 18), ("hive", 22), ("hive", 18)) val stuRDD: RDD[(String, Int)] = sc.parallelize(list1) val reslutRDD2: RDD[(String, (Int, Int))] = stuRDD.aggregateByKey((0, 0))((x: (Int, Int), y: Int) => { (x._1 + y, x._2 + 1) }, (x: (Int, Int), y: (Int, Int)) => { (x._1 + y._1, x._2 + y._2) }) reslutRDD2.foreach(kv=>{ val name=kv._1 val avg=kv._2._1.toDouble/kv._2._2 }) }}
foldLeft()算子:(不是spark的算子,是scala的高级操作)
/** * foldLeft * (zeroValue: T) 初值值 * (B, A) => B B是一个元组,B._1 表示累加元素,B._2 表示个数, A 表示下一个元素 */ //aggregateval list = Array(1, 2, 3, 4, 5, 6, 7, 8, 9, 10) val result: (Int, Int) = list.foldLeft((0,0))((x, y)=>{(x._1+y,x._2+1)}) println(result._1.toDouble/result._2)
combineByKey()算子:
object SparktTest { def main(args: Array[String]): Unit = { val conf: SparkConf = new SparkConf() conf.setAppName("SparktTest") conf.setMaster("local[2]") val sc: SparkContext = new SparkContext(conf) /** * combineByKey: * 合并相同的 key 的值 rdd1.combineByKey(x => x, (a: Int, * b: Int) => a + b, (m: Int, n: Int) => m + n) */ //求平均值 val list1 = List(("math", 18), ("hbase", 18), ("hive", 22), ("hive", 18)) val listRDD: RDD[(String, Int)] = sc.parallelize(list1) /** * createCombiner: V => C, * mergeValue: (C, V) => C, * mergeCombiners: (C, C) => C): RDD[(K, C)] */ val resultRDD: RDD[(String, (Int, Int))] = listRDD.combineByKey(x => { (x, 1) }, (x: (Int, Int), y: Int) => { (x._1 + y, x._2 + 1) }, (x: (Int, Int), y: (Int, Int)) => { (x._1 + y._1, x._2 + y._2) }) resultRDD.foreach{case (name,(sum,count))=>{ val avg=sum.toDouble/count println(s"${name}:${avg}") }} }}
连接操作:
object SparktTest { def main(args: Array[String]): Unit = { val conf: SparkConf = new SparkConf() conf.setAppName("SparktTest") conf.setMaster("local[2]") val sc: SparkContext = new SparkContext(conf) val arr1 = Array(1, 2, 4, 5) val arr1RDD = sc.parallelize(arr1) val arr2 = Array(4, 5, 6, 7) val arr2RDD = sc.parallelize(arr2) //cartesian 笛卡尔积 val cartesianRDD: RDD[(Int, Int)] = arr1RDD.cartesian(arr2RDD) //union : 连接 val unionRDD: RDD[Int] = arr1RDD.union(arr2RDD) //subtract,求,差集 val sbutractRDD: RDD[Int] = arr1RDD.subtract(arr2RDD) //join val list1 = List(("a", 1), ("b", 2), ("c", 3)) val list1RDD = sc.parallelize(list1) val list2 = List(("a", "zs"), ("b", "sl")) val list2RDD = sc.parallelize(list2) /** * 根据元组中的key进行join 操作,相同的key向连接 * 返回的是RDD[(String, (Int, String))] (key,连接结果) */ val joinRDD: RDD[(String, (Int, String))] = list1RDD.join(list2RDD) //cogroup /** * (String key , * (Iterable[Int] arr1中的相应的key所有value的集合 * , Iterable[String])) arr2中的相应的key所有value的集合 */ val cogroupRDD: RDD[(String, (Iterable[Int], Iterable[String]))] = list1RDD.cogroup(list2RDD) }}
分区操作:
object SparktTest { def main(args: Array[String]): Unit = { val conf: SparkConf = new SparkConf() conf.setAppName("SparktTest") conf.setMaster("local[2]") val sc: SparkContext = new SparkContext(conf) val hdfsRDD: RDD[String] = sc.textFile("/data/word.txt") /** * 表示在执行了filter操作之后,由于大量的数据被过滤,导致之前设定的分区task个数, * 处理剩下的数据导致资源浪费,为了合理高效的利用资源, * 可以对task重新定义,在coalesce方法中的分区个数一定要小于之前设置的分区个数。 */ hdfsRDD.coalesce(2) //打乱数据,重新分区,分区规则为随机分区 hdfsRDD.repartition(3) //自定义分区规则(注意,只在有key-value的RDD中可以使用) var arr1 = Array(("a", 1), ("a", 2), ("c", 1), ("b", 2), ("d", 2) ("b", 2), ("e", 2) , ("b", 2) , ("f", 2), ("g", 2), ("h", 2)) val arrRDD: RDD[(String, Int)] = sc.parallelize(arr1,4) arrRDD.partitionBy(new MyPartitioner(3)) }}class MyPartitioner(val numPTN:Int) extends Partitioner{ //分区个数 override def numPartitions: Int = numPTN //分区规则 override def getPartition(key: Any): Int = { val num=key.hashCode()&Integer.MAX_VALUE%numPTN return num }}
总结:
- Transformation返回的仍然是一个RDD
- 它使用了链式调用的设计模式,对一个 RDD 进行计 算后,变换成另外一个 RDD,然后这个 RDD 又可以进行另外一次转换。这个过程是分布式的。
3)Action:
常见操作:
object SparktTest { def main(args: Array[String]): Unit = { val conf: SparkConf = new SparkConf() conf.setAppName("SparktTest") conf.setMaster("local[2]") val sc: SparkContext = new SparkContext(conf) val list = List(("math", 18), ("hbase", 18), ("hive", 22), ("hive", 18)) val listRDD: RDD[(String, Int)] = sc.parallelize(list) //action rdd ---map listRDD.reduceByKeyLocally((x,y)=>x+y) //调用collect的目的是:触发所有的计算,最终收集当前这个调用者RDD的所有数据,返回到客户端,如果数据量比较大,谨慎使用 listRDD.collect() //统计RDD中有多少记录 listRDD.count() //取出RDD中的第一条记录 listRDD.first() //取出RDD前几条记录 listRDD.take(5) //随机采样 listRDD.takeSample(false,20) //按照某种格式,排序后的前几条 listRDD.top(50) //按照升序或者降序,取相应的条数的记录(其中的元素必须继承Ordered) listRDD.takeOrdered(3) //统计每一个key中的value有多少个 listRDD.countByKey() //统计有多少个元素 listRDD.countByValue() //遍历RDD中每一个元素 listRDD.foreach(kv=>{}) //分区遍历RDD中的元素 listRDD.foreachPartition(kv=>{}) //将RDD的结果,保存到相应的文件系统中(注意这个目录一定是不存在的目录) listRDD.saveAsTextFile("/data/output") }}
总结:Action返回值不是一个RDD。它要么是一个scala的集合,要么是一个值,要么是空。最终返回到Driver程序,或者把RDD写入到文件系统中。