千家信息网

Spark Core 的RDD

发表于:2024-10-25 作者:千家信息网编辑
千家信息网最后更新 2024年10月25日,(1)RDD的介绍     RDD(Resilient Distributed Dataset)叫做分布式数据集,是Spark中最基本的数据抽象,它代表一个不可变(RDD中的数据,不能增删改),可分区
千家信息网最后更新 2024年10月25日Spark Core 的RDD

(1)RDD的介绍

   
  RDD(Resilient Distributed Dataset)叫做分布式数据集,是Spark中最基本的数据抽象,它代表一个不可变(RDD中的数据,不能增删改),可分区、元素可并行计算的集合。
  具有数据流的模型的特点,自动容错、位置感知性调度和可伸缩性。RDD允许用户在执行多个查询时显示的将工作集缓存在内存中。后续的查询能够重用工作集,这极大地提升了查询速度。
  RDD可以从 三方面理解:
    - 数据集:RDD是数据集合的抽象,是复杂物理介质上存在数据的一种逻辑视图。从外部看RDD的确可以被看待成经过封装,带扩展特性(如容错性)的数据集合。
    - 分布式:RDD的数据可能存储在多个节点的磁盘上或者内存中,也就是所谓的多级存储。
    - 弹性:虽然 RDD 内部存储的数据是只读的,但是,我们可以去修改(例如通 过 repartition 转换操作)并行计算计算单元的划分结构,也就是分区的数量。
  总之:RDD就是一个大集合,将所有的数据都加载到内存中,方便多次进行重用。它的数据可以在多个节点上,并且RDD可以保存在内存中,当如果某个阶段的RDD丢失,不需要重新计算,只需要提取上一次的RDD,在相应的计算即可。

(2)RDD的属性

  

 1)A list of partitions(一组分片,数据集的基本单位)

  一个分区通常与一个任务向关联,分区的个数决定了并行的粒度。分区的个数可以在创建RDD的时候指定,如果不指定,那么默认的由节点的cores个数决定。最终每一个分区会被映射成为BlockManager 中的一个Block,而这个Block会被下一个task使用进行计算。

 2)A function for computing each split(算子)

  每一个RDD都会实现compute,用于分区进行计算

 3)A list of dependencies on other RDDs(RDD之间的依赖)

  RDD 的每次转换都会生成一个新的 RDD,所以 RDD 之间就会形成类似于流水线一样的前后依赖关系。在部分分区数据丢失时,Spark 可以通过这个依赖关系重新计算丢失的分区数据, 而不是对 RDD 的所有分区进行重新计算。
宽依赖和窄依赖

窄依赖(完全依赖):一个父分区唯一对应一个子分区,例:map操作
宽依赖(部分依赖):一个父分区对应多个子分区,如:reduce、group操作
区分宽依赖和窄依赖:当前这个算子的执行过程中是否有shuffle操作。

 4)Optionally a Partitioner for key-value RDDs(分区函数)

  当前 Spark 中实现了两种类型的分片函数,一个是基于哈希的 HashPartitioner,另外一个是基于范围的 RangePartitioner。只有对于 key-value 的 RDD,才会有 Partitioner,非 key-value的 RDD 的 Parititioner 的值是 None。Partitioner 函数不但决定了 RDD 本身的分片数量,也决 定了 parent RDD Shuffle 输出时的分片数量。

 5)Optionally a list of preferred locations to compute each split on

  一个列表,存储存取每个 Partition 的优先位置(preferred location)。按照"移动数据不如移动计算"的理念,Spark 在进行任务调度的时候,会尽可能地将计算任务分配到其所要处理数据块的存储位置。而这个列表中就存放着每个分区的优先位置。

(3)RDD的API(相关算子)

  RDD编程中有两种中形式:Transformation(转换)和Action(行动)。
  Transformation:表示把一个RDD ---->RDD。
  Action:表示把RDD----集合或者scala对象。

 1)RDD的创建:

object SparktTest {    def main(args: Array[String]): Unit = {        val conf: SparkConf = new SparkConf()        conf.setAppName("SparktTest")        conf.setMaster("local[2]")        val sc: SparkContext = new SparkContext()        //由一个已经存在的 Scala 数据集合创建        val arr=Array(1,2,3,4)        val arr1RDD: RDD[Int] = sc.parallelize(arr)        val arr2RDD: RDD[Int] = sc.makeRDD(arr)        //由外部存储系统的数据创建(HDFS、HBase...)        val HDFSRDD: RDD[String] = sc.textFile("/data/input")    }}

 2)Transformation:

  官网:http://spark.apache.org/docs/latest/rdd-programming-guide.html#transformations
  注意:RDD中所有的转换(Transformation)都是延迟加载,也就是说,他们并不是直接计算结果,相反的,他们只是记住这些应用到基础数据集,上的一个转换动作,只有当发生一个要求返回一个Driver动作的时候,这些转换才真正运行。

map()算子

        val HDFSRDD: RDD[String] = sc.textFile("/data/input")        /**          * map 算子,返回一个新的RDD,该RDD由每一个输入元素经过function函数转换后组成          */        val mapRDD: RDD[(String, Int)] = HDFSRDD.map(ele=>(ele,1))

flatMap()算子:

val arr=Array("hive hbase hadoop","spark hadoop","yarn hdfs")        val lineRDD: RDD[String] = sc.parallelize(arr)        /**          * flagMap:类似于map,但是每一个元素输入的元素可以被          * 映射成为0个或者多个输出的元素(返回的是一个序列,而不是单一的元素)          */        //返回一个集合hive hbase hadoop spark hadoop yarn hdfs        val wordRDD: RDD[String] = lineRDD.flatMap(line=>line.split("\\s+"))

filter()算子:

        val arr=Array(1,2,3,4,5)        val arrRDD: RDD[Int] = sc.parallelize(arr)        /**          * filter过滤:返回一个新的RDD,该RDD由经过func函数计算后返回          * 值为true的输入元素组成          */        val filterRDD: RDD[Int] = arrRDD.filter(num=>num%2==0)

mapPartitions()算子:

        val hdfsRDD: RDD[String] = sc.textFile("/data/input")        /**          * mapPartitions与map的唯一区别就是,mapPartitions迭代的是一个分区,          * 而map遍历的每一个元素,mapPartitions参数是一个迭代对象,返回的也是一个迭代对象          */        val partitionRDD: RDD[String] = hdfsRDD.mapPartitions((x: Iterator[String]) => {            val temp = x.toList.map(line => line + "!")            temp.toIterator        })

mapPartitionsWithIndex()算子:

        val hdfsRDD: RDD[String] = sc.textFile("/data/input")        /**          * 第一个参数是分区编号:分区编号是从0开始的不间断的连续编号          * 第二个参数和mapPartitions相同          */        val partitionRDD: RDD[String] = hdfsRDD.mapPartitionsWithIndex((parnum:Int,x: Iterator[String]) => {            println(parnum) //分区编号            val temp = x.toList.map(line => line + "!")            temp.toIterator        })

sample()算子:

        val list=1 to 5000        /**          * sample方法有三个参数:          * withReplacement:代表是否有放回的抽取(false 不放回,true:放回)          * fraction:抽取样本空间占总体的比例,(以分数的形式) 0<=fraction <=1          * seed:随机数生成器,new Random().nextInt(10),不传表示使用系统的          * 注意:我们使用的sample算子,不能保证提供集合大小就恰巧是rdd.size()*fraction,结果大小会上下浮动          * sample在做抽样调查的时候,特别受用          */        val listRDD: RDD[Int] = sc.parallelize(list)        val sampleRDD: RDD[Int] = listRDD.sample(false,0.2)        println(sampleRDD.count())  //大概是5000*0.2 上下浮动

groupByKey()算子:

        val list=List(("math",18),("hbase",18),("hive",22),("hive",18))        /**          * groupByKey,分组          * 建议groupByKey在实践中,能不用就不用,主要因为groupByKey的效率低,          * 因为有大量的数据在网络中传输,而且还没有进行本地的预处理          * 可以使用reduceByKey或者aggregateByKey或者combineByKey代替这个groupByKey          */        val stuRDD: RDD[(String, Int)] = sc.parallelize(list)        //分组        val groupRDD: RDD[(String, Iterable[Int])] = stuRDD.groupByKey()        //求平均值        val result: RDD[(String, Double)] = groupRDD.map { case (name, score) => {            val avg = score.sum.toDouble / (score.size)            (name, avg)        }        }

reduceByKey算子:

        val list=List(("math",18),("hbase",18),("hive",22),("hive",18))        /**          * reduceByKey:在一个(K,V)对的数据集上使用,返回一个(K,V)对的数据          * 集,key 相同的值,都被使用指定的 reduce 函数聚合          * 到一起。和 groupByKey 类似,任务的个数是可以通过          * 第二个可选参数来配置的。          */        val stuRDD: RDD[(String, Int)] = sc.parallelize(list)        //分组,求总分        val sumRDD: RDD[(String, Int)] = stuRDD.reduceByKey((x, y)=>x+y)        sumRDD.foreach(println) //打印:(hbase,36)(math,18)(hbase,18)

sortByKey()算子:

        val list=List(("math",18),("hbase",18),("hive",22),("hive",18))        /**          * sortByKey:在一个(K,V)的 RDD 上调用,K 必须实现 Ordered 接口,          * 返回一个按照 key 进行排序的(K,V)的 RDD          */        //分组,求总分,排序        val stuRDD: RDD[(String, Int)] = sc.parallelize(list)        val sumRDD: RDD[(String, Int)] = stuRDD.reduceByKey((x, y)=>x+y)        sumRDD.foreach(println) //打印:(hbase,36)(math,18)(hbase,18)        val sortRDD: RDD[(String, Int)] = sumRDD.map(kv=>(kv._2,kv._1)).sortByKey().map(kv=>(kv._2,kv._1))        sortRDD.foreach(println)

sortBy算子:

        val list=List(("math",18),("hbase",18),("hive",22),("hive",18))        /**          * sortBy(func,[ascending], [numTasks])          * 与 sortByKey 类似,但是更灵活          * 第一个参数是根据什么排序          * 第二个是怎么排序,true 正序,false 倒序          * 第三个排序后分区数,默认与原 RDD 一样          */        //分组,求总分,排序        val stuRDD: RDD[(String, Int)] = sc.parallelize(list)        val sumRDD: RDD[(String, Int)] = stuRDD.reduceByKey((x, y)=>x+y)        sumRDD.foreach(println) //打印:(hbase,36)(math,18)(hbase,18)        val sortRDD: RDD[(String, Int)] = sumRDD.sortBy(kv=>kv._2,false,2)

aggregateByKey()算子:

object SparktTest {    def main(args: Array[String]): Unit = {        val conf: SparkConf = new SparkConf()        conf.setAppName("SparktTest")        conf.setMaster("local[2]")        val sc: SparkContext = new SparkContext()        /**          * aggregateByKey(zeroValue)(seqOp,combOp, [numTasks])          * 先按分区聚合再总的聚合,每次要跟初始值交流          * zeroValue:初始值          * seqOp:迭代操作,拿RDD中的每一个元素跟初始值进行合并          * combOp:分区结果的最终合并          * numTasks:分区个数          * aggregate+groupByKey=aggregateByKey          * aggregate对单个值进行RDD,aggregateByKey对(K,V)值进行RDD          */        //aggregateval list = Array(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)        val listRDD: RDD[Int] = sc.parallelize(list)        //求平均值        /**          * seqOp: (U, T) => U          * combOp: (U, U) => U          * u:(Int,Int)   总和,总次数          */        val result: (Int, Int) = listRDD.aggregate(0, 0)((u: (Int, Int), x: Int) => {            (u._1 + x, u._2 + 1)        }            , (u1: (Int, Int), u2: (Int, Int)) => {                (u1._1 + u2._1, u1._2 + u2._2)            })        println(result._1 / result._2)        //aggregateByKey已经根据(k,v)k 进行分组,以下的操作,是对v进行操作        //以下操作时求平均值        val list1 = List(("math", 18), ("hbase", 18), ("hive", 22), ("hive", 18))        val stuRDD: RDD[(String, Int)] = sc.parallelize(list1)        val reslutRDD2: RDD[(String, (Int, Int))] = stuRDD.aggregateByKey((0, 0))((x: (Int, Int), y: Int) => {            (x._1 + y, x._2 + 1)        }, (x: (Int, Int), y: (Int, Int)) => {            (x._1 + y._1, x._2 + y._2)        })        reslutRDD2.foreach(kv=>{            val name=kv._1            val avg=kv._2._1.toDouble/kv._2._2        })    }}

foldLeft()算子:(不是spark的算子,是scala的高级操作)

        /**          *  foldLeft          * (zeroValue: T)  初值值          * (B, A) => B  B是一个元组,B._1 表示累加元素,B._2 表示个数, A 表示下一个元素          */        //aggregateval list = Array(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)        val result: (Int, Int) = list.foldLeft((0,0))((x, y)=>{(x._1+y,x._2+1)})        println(result._1.toDouble/result._2)

combineByKey()算子:

object SparktTest {    def main(args: Array[String]): Unit = {        val conf: SparkConf = new SparkConf()        conf.setAppName("SparktTest")        conf.setMaster("local[2]")        val sc: SparkContext = new SparkContext(conf)        /**          * combineByKey:          * 合并相同的 key 的值 rdd1.combineByKey(x => x, (a: Int,          * b: Int) => a + b, (m: Int, n: Int) => m + n)          */        //求平均值        val list1 = List(("math", 18), ("hbase", 18), ("hive", 22), ("hive", 18))        val listRDD: RDD[(String, Int)] = sc.parallelize(list1)        /**          * createCombiner: V => C,          * mergeValue: (C, V) => C,          * mergeCombiners: (C, C) => C): RDD[(K, C)]          */        val resultRDD: RDD[(String, (Int, Int))] = listRDD.combineByKey(x => {            (x, 1)        },            (x: (Int, Int), y: Int) => {                (x._1 + y, x._2 + 1)            },            (x: (Int, Int), y: (Int, Int)) => {                (x._1 + y._1, x._2 + y._2)            })        resultRDD.foreach{case (name,(sum,count))=>{            val avg=sum.toDouble/count            println(s"${name}:${avg}")        }}    }}

连接操作

object SparktTest {    def main(args: Array[String]): Unit = {        val conf: SparkConf = new SparkConf()        conf.setAppName("SparktTest")        conf.setMaster("local[2]")        val sc: SparkContext = new SparkContext(conf)        val arr1 = Array(1, 2, 4, 5)        val arr1RDD = sc.parallelize(arr1)        val arr2 = Array(4, 5, 6, 7)        val arr2RDD = sc.parallelize(arr2)        //cartesian  笛卡尔积        val cartesianRDD: RDD[(Int, Int)] = arr1RDD.cartesian(arr2RDD)        //union : 连接        val unionRDD: RDD[Int] = arr1RDD.union(arr2RDD)        //subtract,求,差集        val sbutractRDD: RDD[Int] = arr1RDD.subtract(arr2RDD)        //join        val list1 = List(("a", 1), ("b", 2), ("c", 3))        val list1RDD = sc.parallelize(list1)        val list2 = List(("a", "zs"), ("b", "sl"))        val list2RDD = sc.parallelize(list2)        /**          * 根据元组中的key进行join 操作,相同的key向连接          * 返回的是RDD[(String, (Int, String))] (key,连接结果)          */        val joinRDD: RDD[(String, (Int, String))] = list1RDD.join(list2RDD)        //cogroup        /**          * (String key   ,          * (Iterable[Int] arr1中的相应的key所有value的集合          * , Iterable[String]))  arr2中的相应的key所有value的集合          */        val cogroupRDD: RDD[(String, (Iterable[Int], Iterable[String]))] = list1RDD.cogroup(list2RDD)    }}

分区操作

object SparktTest {    def main(args: Array[String]): Unit = {        val conf: SparkConf = new SparkConf()        conf.setAppName("SparktTest")        conf.setMaster("local[2]")        val sc: SparkContext = new SparkContext(conf)        val hdfsRDD: RDD[String] = sc.textFile("/data/word.txt")        /**          * 表示在执行了filter操作之后,由于大量的数据被过滤,导致之前设定的分区task个数,          * 处理剩下的数据导致资源浪费,为了合理高效的利用资源,          * 可以对task重新定义,在coalesce方法中的分区个数一定要小于之前设置的分区个数。          */        hdfsRDD.coalesce(2)        //打乱数据,重新分区,分区规则为随机分区        hdfsRDD.repartition(3)        //自定义分区规则(注意,只在有key-value的RDD中可以使用)        var arr1 = Array(("a", 1), ("a", 2), ("c", 1), ("b", 2), ("d", 2)            ("b", 2), ("e", 2)            , ("b", 2)            , ("f", 2), ("g", 2), ("h", 2))        val arrRDD: RDD[(String, Int)] = sc.parallelize(arr1,4)        arrRDD.partitionBy(new MyPartitioner(3))    }}class MyPartitioner(val numPTN:Int) extends Partitioner{    //分区个数    override def numPartitions: Int = numPTN    //分区规则    override def getPartition(key: Any): Int = {        val num=key.hashCode()&Integer.MAX_VALUE%numPTN        return num    }}

总结
   - Transformation返回的仍然是一个RDD
   - 它使用了链式调用的设计模式,对一个 RDD 进行计 算后,变换成另外一个 RDD,然后这个 RDD 又可以进行另外一次转换。这个过程是分布式的。

 3)Action:

常见操作

object SparktTest {    def main(args: Array[String]): Unit = {        val conf: SparkConf = new SparkConf()        conf.setAppName("SparktTest")        conf.setMaster("local[2]")        val sc: SparkContext = new SparkContext(conf)        val list = List(("math", 18), ("hbase", 18), ("hive", 22), ("hive", 18))        val listRDD: RDD[(String, Int)] = sc.parallelize(list)        //action  rdd ---map        listRDD.reduceByKeyLocally((x,y)=>x+y)        //调用collect的目的是:触发所有的计算,最终收集当前这个调用者RDD的所有数据,返回到客户端,如果数据量比较大,谨慎使用        listRDD.collect()        //统计RDD中有多少记录        listRDD.count()        //取出RDD中的第一条记录        listRDD.first()        //取出RDD前几条记录        listRDD.take(5)        //随机采样        listRDD.takeSample(false,20)        //按照某种格式,排序后的前几条        listRDD.top(50)        //按照升序或者降序,取相应的条数的记录(其中的元素必须继承Ordered)        listRDD.takeOrdered(3)        //统计每一个key中的value有多少个        listRDD.countByKey()        //统计有多少个元素        listRDD.countByValue()        //遍历RDD中每一个元素        listRDD.foreach(kv=>{})        //分区遍历RDD中的元素        listRDD.foreachPartition(kv=>{})        //将RDD的结果,保存到相应的文件系统中(注意这个目录一定是不存在的目录)        listRDD.saveAsTextFile("/data/output")    }}

总结:Action返回值不是一个RDD。它要么是一个scala的集合,要么是一个值,要么是空。最终返回到Driver程序,或者把RDD写入到文件系统中。

0