Spark笔记整理(四):Spark RDD算子实战
[TOC]
Spark算子概述
RDD:弹性分布式数据集,是一种特殊集合、支持多种来源、有容错机制、可以被缓存、支持并行操作,一个RDD代表多个分区里的数据集。
RDD有两种操作算子:
- Transformation(转换):Transformation属于延迟计算,当一个RDD转换成另一个RDD时并没有立即进行转换,仅仅是记住了数据集的逻辑操作
- Action(执行):触发Spark作业的运行,真正触发转换算子的计算
需要说明的是,下面写的scala代码,其实都是可以简写的,但是为了方便理解,我都没有简写,因为要简写的话对于scala来说真的就是一句话的事情了。
另外如果是在本地环境进行开发,那么需要添加相关依赖:
org.scala-lang scala-library 2.10.5 org.apache.spark spark-core_2.10 1.6.2
Transformation算子
概述
需要操作的Transformation算子说明如下:
map(func)
返回一个新的分布式数据集,由每个原元素经过func函数转换后组成
filter(func)
返回一个新的数据集,由经过func函数后返回值为true的原元素组成
flatMap(func)
类似于map,但是每一个输入元素,会被映射为0到多个输出元素(因此,func函数的返回值是一个Seq,而不是单一元素)
sample(withReplacement, frac, seed)
根据给定的随机种子seed,随机抽样出数量为frac的数据
union(otherDataset)
返回一个新的数据集,由原数据集和参数联合而成
groupByKey([numTasks])
在一个由(K,V)对组成的数据集上调用,返回一个(K,Seq[V])对的数据集。注意:默认情况下,使用8个并行任务进行分组,你可以传入numTask可选参数,根据数据量设置不同数目的Task
reduceByKey(func, [numTasks])
在一个(K,V)对的数据集上使用,返回一个(K,V)对的数据集,key相同的值,都被使用指定的reduce函数聚合到一起。和groupbykey类似,任务的个数是可以通过第二个可选参数来配置的。
join(otherDataset, [numTasks])
在类型为(K,V)和(K,W)类型的数据集上调用,返回一个(K,(V,W))对,每个key中的所有元素都在一起的数据集
map
测试代码如下:
object _02SparkTransformationOps { def main(args: Array[String]): Unit = { val conf = new SparkConf().setMaster("local[2]").setAppName(_02SparkTransformationOps.getClass.getSimpleName) Logger.getLogger("org.apache.spark").setLevel(Level.OFF) val sc = new SparkContext(conf) transformationOps1(sc) sc.stop() } /** * 1、map:将集合中每个元素乘以7 * map(func):返回一个新的分布式数据集,由每个原元素经过func函数转换后组成 */ def transformationOps1(sc:SparkContext): Unit = { val list = List(1, 2, 3, 4, 5, 6, 7, 8, 9, 10) val listRDD = sc.parallelize(list) val retRDD = listRDD.map(num => num * 7) retRDD.foreach(num => println(num)) }}
执行结果如下:
4274914562163287035
filter
测试代码如下:
object _02SparkTransformationOps { def main(args: Array[String]): Unit = { val conf = new SparkConf().setMaster("local[2]").setAppName(_02SparkTransformationOps.getClass.getSimpleName) Logger.getLogger("org.apache.spark").setLevel(Level.OFF) val sc = new SparkContext(conf) transformationOps2(sc) sc.stop() } /** * 2、filter:过滤出集合中的奇数 * filter(func): 返回一个新的数据集,由经过func函数后返回值为true的原元素组成 * * 一般在filter操作之后都要做重新分区(因为可能数据量减少了很多) */ def transformationOps2(sc:SparkContext): Unit = { val list = List(1, 2, 3, 4, 5, 6, 7, 8, 9, 10) val listRDD = sc.parallelize(list) val retRDD = listRDD.filter(num => num % 2 == 0) retRDD.foreach(println) }}
输出结果如下:
628410
flatMap
测试代码如下:
object _02SparkTransformationOps { def main(args: Array[String]): Unit = { val conf = new SparkConf().setMaster("local[2]").setAppName(_02SparkTransformationOps.getClass.getSimpleName) Logger.getLogger("org.apache.spark").setLevel(Level.OFF) val sc = new SparkContext(conf) transformationOps3(sc) sc.stop() } /** * 3、flatMap:将行拆分为单词 * flatMap(func):类似于map,但是每一个输入元素, * 会被映射为0到多个输出元素(因此,func函数的返回值是一个Seq,而不是单一元素) */ def transformationOps3(sc:SparkContext): Unit = { val list = List("hello you", "hello he", "hello me") val listRDD = sc.parallelize(list) val wordsRDD = listRDD.flatMap(line => line.split(" ")) wordsRDD.foreach(println) }}
输出结果如下:
hellohelloheyouhellome
sample
测试代码如下:
object _02SparkTransformationOps { def main(args: Array[String]): Unit = { val conf = new SparkConf().setMaster("local[2]").setAppName(_02SparkTransformationOps.getClass.getSimpleName) Logger.getLogger("org.apache.spark").setLevel(Level.OFF) val sc = new SparkContext(conf) transformationOps4(sc) sc.stop() } /** * 4、sample:根据给定的随机种子seed,随机抽样出数量为frac的数据 * sample(withReplacement, frac, seed): 根据给定的随机种子seed,随机抽样出数量为frac的数据 * 抽样的目的:就是以样本评估整体 * withReplacement: * true:有放回的抽样 * false:无放回的抽样 * frac:就是样本空间的大小,以百分比小数的形式出现,比如20%,就是0.2 * * 使用sample算子计算出来的结果可能不是很准确,1000个数,20%,样本数量在200个左右,不一定为200 * * 一般情况下,使用sample算子在做spark优化(数据倾斜)的方面应用最广泛 */ def transformationOps4(sc:SparkContext): Unit = { val list = 1 to 1000 val listRDD = sc.parallelize(list) val sampleRDD = listRDD.sample(false, 0.2) sampleRDD.foreach(num => print(num + " ")) println println("sampleRDD count: " + sampleRDD.count()) println("Another sampleRDD count: " + sc.parallelize(list).sample(false, 0.2).count()) }}
输出结果如下:
sampleRDD count: 219Another sampleRDD count: 203
union
测试代码如下:
object _02SparkTransformationOps { def main(args: Array[String]): Unit = { val conf = new SparkConf().setMaster("local[2]").setAppName(_02SparkTransformationOps.getClass.getSimpleName) Logger.getLogger("org.apache.spark").setLevel(Level.OFF) val sc = new SparkContext(conf) transformationOps5(sc) sc.stop() } /** * 5、union:返回一个新的数据集,由原数据集和参数联合而成 * union(otherDataset): 返回一个新的数据集,由原数据集和参数联合而成 * 类似数学中的并集,就是sql中的union操作,将两个集合的所有元素整合在一块,包括重复元素 */ def transformationOps5(sc:SparkContext): Unit = { val list1 = List(1, 2, 3, 4, 5, 6, 7, 8, 9, 10) val list2 = List(7, 8, 9, 10, 11, 12) val listRDD1 = sc.parallelize(list1) val listRDD2 = sc.parallelize(list2) val unionRDD = listRDD1.union(listRDD2) unionRDD.foreach(println) }}
输出结果如下:
16273849510789101112
groupByKey
测试代码如下:
object _02SparkTransformationOps { def main(args: Array[String]): Unit = { val conf = new SparkConf().setMaster("local[2]").setAppName(_02SparkTransformationOps.getClass.getSimpleName) Logger.getLogger("org.apache.spark").setLevel(Level.OFF) val sc = new SparkContext(conf) transformationOps6(sc) sc.stop() } /** * 6、groupByKey:对数组进行 group by key操作 * groupByKey([numTasks]): 在一个由(K,V)对组成的数据集上调用,返回一个(K,Seq[V])对的数据集。 * 注意:默认情况下,使用8个并行任务进行分组,你可以传入numTask可选参数,根据数据量设置不同数目的Task * mr中: * --->map操作--->--->shuffle--->---> * groupByKey类似于shuffle操作 * * 和reduceByKey有点类似,但是有区别,reduceByKey有本地的规约,而groupByKey没有本地规约,所以一般情况下, * 尽量慎用groupByKey,如果一定要用的话,可以自定义一个groupByKey,在自定义的gbk中添加本地预聚合操作 */ def transformationOps6(sc:SparkContext): Unit = { val list = List("hello you", "hello he", "hello me") val listRDD = sc.parallelize(list) val wordsRDD = listRDD.flatMap(line => line.split(" ")) val pairsRDD:RDD[(String, Int)] = wordsRDD.map(word => (word, 1)) pairsRDD.foreach(println) val gbkRDD:RDD[(String, Iterable[Int])] = pairsRDD.groupByKey() println("=============================================") gbkRDD.foreach(t => println(t._1 + "..." + t._2)) }}
输出结果如下:
(hello,1)(hello,1)(you,1)(he,1)(hello,1)(me,1)=============================================you...CompactBuffer(1)hello...CompactBuffer(1, 1, 1)he...CompactBuffer(1)me...CompactBuffer(1)
reduceByKey
测试代码如下:
object _02SparkTransformationOps { def main(args: Array[String]): Unit = { val conf = new SparkConf().setMaster("local[2]").setAppName(_02SparkTransformationOps.getClass.getSimpleName) Logger.getLogger("org.apache.spark").setLevel(Level.OFF) val sc = new SparkContext(conf) transformationOps7(sc) sc.stop() } /** * 7、reduceByKey:统计每个班级的人数 * reduceByKey(func, [numTasks]): 在一个(K,V)对的数据集上使用,返回一个(K,V)对的数据集, * key相同的值,都被使用指定的reduce函数聚合到一起。和groupbykey类似,任务的个数是可以通过第二个可选参数来配置的。 * * 需要注意的是还有一个reduce的操作,其为action算子,并且其返回的结果只有一个,而不是一个数据集 * 而reduceByKey是一个transformation算子,其返回的结果是一个数据集 */ def transformationOps7(sc:SparkContext): Unit = { val list = List("hello you", "hello he", "hello me") val listRDD = sc.parallelize(list) val wordsRDD = listRDD.flatMap(line => line.split(" ")) val pairsRDD:RDD[(String, Int)] = wordsRDD.map(word => (word, 1)) val retRDD:RDD[(String, Int)] = pairsRDD.reduceByKey((v1, v2) => v1 + v2) retRDD.foreach(t => println(t._1 + "..." + t._2)) }}
输出结果如下:
you...1hello...3he...1me...1
join
测试代码如下:
object _02SparkTransformationOps { def main(args: Array[String]): Unit = { val conf = new SparkConf().setMaster("local[2]").setAppName(_02SparkTransformationOps.getClass.getSimpleName) Logger.getLogger("org.apache.spark").setLevel(Level.OFF) val sc = new SparkContext(conf) transformationOps8(sc) sc.stop() } /** * 8、join:打印关联的组合信息 * join(otherDataset, [numTasks]): 在类型为(K,V)和(K,W)类型的数据集上调用,返回一个(K,(V,W))对,每个key中的所有元素都在一起的数据集 * 学生基础信息表和学生考试成绩表 * stu_info(sid ,name, birthday, class) * stu_score(sid, chinese, english, math) * * * Serialization stack: - object not serializable 这种分布式计算的过程,一个非常重要的点,传递的数据必须要序列化 通过代码测试,该join是等值连接(inner join) A.leftOuterJoin(B) A表所有的数据都包涵,B表中在A表没有关联的数据,显示为null 之后执行一次filter就是join的结果 */ def transformationOps8(sc: SparkContext): Unit = { val infoList = List( "1,钟 潇,1988-02-04,bigdata", "2,刘向前,1989-03-24,linux", "3,包维宁,1984-06-16,oracle") val scoreList = List( "1,50,21,61", "2,60,60,61", "3,62,90,81", "4,72,80,81" ) val infoRDD:RDD[String] = sc.parallelize(infoList) val scoreRDD:RDD[String] = sc.parallelize(scoreList) val infoPairRDD:RDD[(String, Student)] = infoRDD.map(line => { val fields = line.split(",") val student = new Student(fields(0), fields(1), fields(2), fields(3)) (fields(0), student) }) val scorePairRDD:RDD[(String, Score)] = scoreRDD.map(line => { val fields = line.split(",") val score = new Score(fields(0), fields(1).toFloat, fields(2).toFloat, fields(3).toFloat) (fields(0), score) }) val joinedRDD:RDD[(String, (Student, Score))] = infoPairRDD.join(scorePairRDD) joinedRDD.foreach(t => { val sid = t._1 val student = t._2._1 val score = t._2._2 println(sid + "\t" + student + "\t" + score) }) println("=========================================") val leftOuterRDD:RDD[(String, (Score, Option[Student]))] = scorePairRDD.leftOuterJoin(infoPairRDD) leftOuterRDD.foreach(println) }}
输出结果如下:
3 3 包维宁 1984-06-16 oracle 3 62.0 90.0 81.02 2 刘向前 1989-03-24 linux 2 60.0 60.0 61.01 1 钟 潇 1988-02-04 bigdata 1 50.0 21.0 61.0=========================================(4,(4 72.0 80.0 81.0,None))(3,(3 62.0 90.0 81.0,Some(3 包维宁 1984-06-16 oracle)))(2,(2 60.0 60.0 61.0,Some(2 刘向前 1989-03-24 linux)))(1,(1 50.0 21.0 61.0,Some(1 钟 潇 1988-02-04 bigdata)))
为了更好进行操作和理解,下面提供一个Spark-shell的经典例子:
scala> val infoList = List("1,zhongxiang","2,liuxiangqian","3,baweining")infoList: List[String] = List(1,zhongxiang, 2,liuxiangqian, 3,baweining)scala> val infoRDD = sc.parallelize(infoList)infoRDD: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[31] at parallelize at :29scala> val infoPairRDD = infoRDD.map(line => (line.split(",")(0),line.split(",")(1)))infoPairRDD: org.apache.spark.rdd.RDD[(String, String)] = MapPartitionsRDD[32] at map at :31scala> val scoreList = List("1,50-21-61","2,60-60-61","3,62-90-81","4,72-80-81")scoreList: List[String] = List(1,50-21-61, 2,60-60-61, 3,62-90-81, 4,72-80-81)scala> val scoreRDD = sc.parallelize(scoreList)scoreRDD: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[33] at parallelize at :29scala> val scorePairRDD = scoreRDD.map(line => (line.split(",")(0),line.split(",")(1)))scorePairRDD: org.apache.spark.rdd.RDD[(String, String)] = MapPartitionsRDD[34] at map at :31scala>scala> val joinedRDD = infoPairRDD.join(scorePairRDD)joinedRDD: org.apache.spark.rdd.RDD[(String, (String, String))] = MapPartitionsRDD[37] at join at :39scala> joinedRDD.foreach(t => println(t._1 + "\t" + "name:" + t._2._1 + "\t" + "score:" + t._2._2))1 name:zhongxiang score:50-21-613 name:baweining score:62-90-812 name:liuxiangqian score:60-60-61
有读者反应上面的案例还是过于复杂化,于是又写了下面这个demo,相信就很好理解了:
scala> val infoPairRDD = sc.parallelize(Seq((1,"leaf"),(2,"xpleaf"),(3,"yyh")))infoPairRDD: org.apache.spark.rdd.RDD[(Int, String)] = ParallelCollectionRDD[41] at parallelize at :27scala> infoPairRDD.foreach(println)(2,xpleaf)(1,leaf)(3,yyh)scala> val scorePairRDD = sc.parallelize(Seq((1, 93), (2, 91), (3, 86), (4, 97)))scorePairRDD: org.apache.spark.rdd.RDD[(Int, Int)] = ParallelCollectionRDD[44] at parallelize at :27scala> scorePairRDD.foreach(println)(1,93)(3,86)(2,91)(4,97)scala> val joinedRDD = infoPairRDD.join(scorePairRDD)joinedRDD: org.apache.spark.rdd.RDD[(Int, (String, Int))] = MapPartitionsRDD[53] at join at :31scala> joinedRDD.foreach(println)(2,(xpleaf,91))(1,(leaf,93))(3,(yyh,86))
1.应该很清楚地理解到,spark中的join其实跟sql中的join是类似的,infoPairRDD和scorePairRDD就可以理解为两张表,而RDD中的每一条数据就可以理解为表中的一条数据,上面的盒子,相当于两个表中都有相同的id,需要将两张表中的数据根据id来进行连接,因此,在上面演示的等值连接中,左表的每一条数据,只要左表有出现的id,在右表也有相同的id,那么就会进行连接操作,当然,这是等值连接的情况,对于左外连接,则是不管右表有没有该id出现,左边的数据都会显示出来。
2.spark在进行开发级别的调优时,要尽可能避免出现shuffle操作,对于join操作,尤其需要注意的是大小表join问题,如果采用大表.join(小表)的join操作,实际上,在网络上或者节点之间传输的是小表的数据,这不会有太大的性能问题,但是如果是采用小表.join(大表),那么在网络上或者节点之间就会传输大量的数据,这会造成很严重的性能问题。所以,当需要执行join操作时,请一定要警惕大小表的问题。
3.看下面的两份RDD数据,显然是从infoRDD的分区传输到scoreRDD的分区成本更低:
infoRDD:(1,"info")(2,"info")(3,"info")(4,"info")scoreRDD:(1,"score1")(1,"score2")(1,"score3")(1,"score4")(1,"score5")(2,"score1")(2,"score2")(2,"score3")(2,"score4")(2,"score5")(3,"score1")(3,"score2")(3,"score3")(3,"score4")(3,"score5")(4,"score1")(4,"score2")(4,"score3")(4,"score4")(4,"score5")
sortByKey
测试代码如下:
object _02SparkTransformationOps { def main(args: Array[String]): Unit = { val conf = new SparkConf().setMaster("local[2]").setAppName(_02SparkTransformationOps.getClass.getSimpleName) Logger.getLogger("org.apache.spark").setLevel(Level.OFF) val sc = new SparkContext(conf) transformationOps7(sc) sc.stop() } /** * sortByKey:将学生身高进行(降序)排序 * 身高相等,按照年龄排(升序) */ def transformationOps9(sc: SparkContext): Unit = { val list = List( "1,李 磊,22,175", "2,刘银鹏,23,175", "3,齐彦鹏,22,180", "4,杨 柳,22,168", "5,敦 鹏,20,175" ) val listRDD:RDD[String] = sc.parallelize(list) /* // 使用sortBy操作完成排序 val retRDD:RDD[String] = listRDD.sortBy(line => line, numPartitions = 1)(new Ordering[String] { override def compare(x: String, y: String): Int = { val xFields = x.split(",") val yFields = y.split(",") val xHgiht = xFields(3).toFloat val yHgiht = yFields(3).toFloat val xAge = xFields(2).toFloat val yAge = yFields(2).toFloat var ret = yHgiht.compareTo(xHgiht) if (ret == 0) { ret = xAge.compareTo(yAge) } ret } } ,ClassTag.Object.asInstanceOf[ClassTag[String]]) */ // 使用sortByKey完成操作,只做身高降序排序 val heightRDD:RDD[(String, String)] = listRDD.map(line => { val fields = line.split(",") (fields(3), line) }) val retRDD:RDD[(String, String)] = heightRDD.sortByKey(ascending = false, numPartitions = 1) // 需要设置1个分区,否则只是各分区内有序 retRDD.foreach(println) // 使用sortByKey如何实现sortBy的二次排序?将上面的信息写成一个java对象,然后重写compareTo方法,在做map时,key就为该对象本身,而value可以为null }}
输出结果如下:
(180,3,齐彦鹏,22,180)(175,1,李 磊,22,175)(175,2,刘银鹏,23,175)(175,5,敦 鹏,20,175)(168,4,杨 柳,22,168)
下面是一个快速入门的demo:
scala> val rdd = sc.parallelize(Seq((1,"one"),(2,"two"),(3,"three")))rdd: org.apache.spark.rdd.RDD[(Int, String)] = ParallelCollectionRDD[10] at parallelize at :21scala> rdd.sortByKey(true, 1).foreach(println)(1,one)(2,two)(3,three)
combineByKey与aggregateByKey
下面的代码分别使用combineByKey和aggregateByKey来模拟groupByKey和reduceBykey,所以是有4个操作,只要把combineByKey模拟groupByKey的例子掌握了,其它三个相对就容易许多了。
整体来说理解不太容易,但是非常重要,所以一定是要掌握的!
/** * spark的transformation操作: * aggregateByKey * combineByKey * * 使用combineByKey和aggregateByKey模拟groupByKey和reduceByKey * * 通过查看源码,我们发现aggregateByKey底层,还是combineByKey * * 问题:combineByKey和aggregateByKey的区别? * aggregateByKey是柯里化形式的,目前底层源码还没时间去分析,所知道的区别是这个 */object _03SparkTransformationOps { def main(args: Array[String]): Unit = { val conf = new SparkConf().setMaster("local[2]").setAppName(_03SparkTransformationOps.getClass.getSimpleName) Logger.getLogger("org.apache.spark").setLevel(Level.OFF) val sc = new SparkContext(conf)// combineByKey2GroupByKey(sc)// combineByKey2ReduceByKey(sc)// aggregateByKey2ReduceByKey(sc) aggregateByKey2GroupByKey(sc) sc.stop() } /** * 使用aggregateByKey模拟groupByKey */ def aggregateByKey2GroupByKey(sc: SparkContext): Unit = { val list = List("hello bo bo", "zhou xin xin", "hello song bo") val lineRDD = sc.parallelize(list) val wordsRDD = lineRDD.flatMap(line => line.split(" ")) val pairsRDD = wordsRDD.map(word => (word, 1)) val retRDD:RDD[(String, ArrayBuffer[Int])] = pairsRDD.aggregateByKey(ArrayBuffer[Int]()) ( // 这里需要指定value的类型为ArrayBuffer[Int]() (part, num) => { part.append(num) part }, (part1, part2) => { part1.++=(part2) part1 } ) retRDD.foreach(println) } /** * 使用aggregateByKey模拟reduceByKey * def aggregateByKey[U: ClassTag](zeroValue: U)(seqOp: (U, V) => U, combOp: (U, U) => U): RDD[(K, U)] (zeroValue: U)就对应的是combineByKey中的第一个函数的返回值 seqOp 就对应的是combineByKey中的第二个函数,也就是mergeValue combOp 就对应的是combineByKey中的第三个函数,也就是mergeCombiners */ def aggregateByKey2ReduceByKey(sc:SparkContext): Unit = { val list = List("hello bo bo", "zhou xin xin", "hello song bo") val lineRDD = sc.parallelize(list) val wordsRDD = lineRDD.flatMap(line => line.split(" ")) val pairsRDD = wordsRDD.map(word => (word, 1)) val retRDD:RDD[(String, Int)] = pairsRDD.aggregateByKey(0) ( (partNum, num) => partNum + num, // 也就是mergeValue (partNum1, partNum2) => partNum1 + partNum2 // 也就是mergeCombiners ) retRDD.foreach(println) } /** * 使用reduceByKey模拟groupByKey */ def combineByKey2ReduceByKey(sc:SparkContext): Unit = { val list = List("hello bo bo", "zhou xin xin", "hello song bo") val lineRDD = sc.parallelize(list) val wordsRDD = lineRDD.flatMap(line => line.split(" ")) val pairsRDD = wordsRDD.map(word => (word, 1)) /** * 对于createCombiner1 mergeValue1 mergeCombiners1 * 代码的参数已经体现得很清楚了,其实只要理解了combineByKey模拟groupByKey的例子,这个就非常容易了 */ var retRDD:RDD[(String, Int)] = pairsRDD.combineByKey(createCombiner1, mergeValue1, mergeCombiners1) retRDD.foreach(println) } /** * reduceByKey操作,value就是该数值本身,则上面的数据会产生: * (hello, 1) (bo, 1) (bo, 1) * (zhou, 1) (xin, 1) (xin, 1) * (hello, 1) (song, 1) (bo, 1) * 注意有别于groupByKey的操作,它是创建一个容器 */ def createCombiner1(num:Int):Int = { num } /** * 同一partition内,对于有相同key的,这里的mergeValue直接将其value相加 * 注意有别于groupByKey的操作,它是添加到value到一个容器中 */ def mergeValue1(localNum1:Int, localNum2:Int): Int = { localNum1 + localNum2 } /** * 将两个不同partition中的key相同的value值相加起来 * 注意有别于groupByKey的操作,它是合并两个容器 */ def mergeCombiners1(thisPartitionNum1:Int, anotherPartitionNum2:Int):Int = { thisPartitionNum1 + anotherPartitionNum2 } /** * 使用combineByKey模拟groupByKey */ def combineByKey2GroupByKey(sc:SparkContext): Unit = { val list = List("hello bo bo", "zhou xin xin", "hello song bo") val lineRDD = sc.parallelize(list) val wordsRDD = lineRDD.flatMap(line => line.split(" ")) val pairsRDD = wordsRDD.map(word => (word, 1)) // 输出每个partition中的map对 pairsRDD.foreachPartition( partition => { println("<=========partition-start=========>") partition.foreach(println) println("<=========partition-end=========>") }) val gbkRDD:RDD[(String, ArrayBuffer[Int])] = pairsRDD.combineByKey(createCombiner, mergeValue, mergeCombiners) gbkRDD.foreach(println) // 如果要测试最后groupByKey的结果是在几个分区,可以使用下面的代码进行测试 /*gbkRDD.foreachPartition(partition => { println("~~~~~~~~~~~~~~~~~~~~~~~~~~~") partition.foreach(println) })*/ } /** * 初始化,将value转变成为标准的格式数据 * 是在每个分区中进行的操作,去重后的key有几个,就调用次, * 因为对于每个key,其容器创建一次就ok了,之后有key相同的,只需要执行mergeValue到已经创建的容器中即可 */ def createCombiner(num:Int):ArrayBuffer[Int] = { println("----------createCombiner----------") ArrayBuffer[Int](num) } /** * 将key相同的value,添加到createCombiner函数创建的ArrayBuffer容器中 * 一个分区内的聚合操作,将一个分区内key相同的数据,合并 */ def mergeValue(ab:ArrayBuffer[Int], num:Int):ArrayBuffer[Int] = { println("----------mergeValue----------") ab.append(num) ab } /** * 将key相同的多个value数组,进行整合 * 分区间的合并操作 */ def mergeCombiners(ab1:ArrayBuffer[Int], ab2:ArrayBuffer[Int]):ArrayBuffer[Int] = { println("----------mergeCombiners----------") ab1 ++= ab2 ab1 }}/*combineByKey模拟groupByKey的一个输出效果,可以很好地说明createCombiner、mergeValue和mergeCombiners各个阶段的执行时机:<=========partition-start=========><=========partition-start=========>(hello,1)(zhou,1)(bo,1)(xin,1)(bo,1)(xin,1)<=========partition-end=========>(hello,1)(song,1)(bo,1)<=========partition-end=========>----------createCombiner--------------------createCombiner--------------------createCombiner--------------------createCombiner--------------------mergeValue--------------------mergeValue--------------------createCombiner--------------------createCombiner--------------------createCombiner--------------------mergeCombiners--------------------mergeCombiners----------(song,ArrayBuffer(1))(hello,ArrayBuffer(1, 1))(bo,ArrayBuffer(1, 1, 1))(zhou,ArrayBuffer(1))(xin,ArrayBuffer(1, 1)) */
Actions算子
概述
前面Transformationt算子的测试都是在本地开发环境中直接跑代码,这里Actions算子的测试主要在spark-shell中进行操作,因为会方便很多。
需要说明的Actions算子如下:
reduce(func)
通过函数func聚集数据集中的所有元素。Func函数接受2个参数,返回一个值。这个函数必须是关联性的,确保可以被正确的并发执行
collect()
在Driver的程序中,以数组的形式,返回数据集的所有元素。这通常会在使用filter或者其它操作后,返回一个足够小的数据子集再使用,直接将整个RDD集Collect返回,很可能会让Driver程序OOM
count()
返回数据集的元素个数
take(n)
返回一个数组,由数据集的前n个元素组成。注意,这个操作目前并非在多个节点上,并行执行,而是Driver程序所在机器,单机计算所有的元素(Gateway的内存压力会增大,需要谨慎使用)
first()
返回数据集的第一个元素(类似于take(1))
saveAsTextFile(path)
将数据集的元素,以textfile的形式,保存到本地文件系统,hdfs或者任何其它hadoop支持的文件系统。Spark将会调用每个元素的toString方法,并将它转换为文件中的一行文本
saveAsSequenceFile(path)
将数据集的元素,以sequencefile的格式,保存到指定的目录下,本地系统,hdfs或者任何其它hadoop支持的文件系统。RDD的元素必须由key-value对组成,并都实现了Hadoop的Writable接口,或隐式可以转换为Writable(Spark包括了基本类型的转换,例如Int,Double,String等等)
foreach(func)
在数据集的每一个元素上,运行函数func。这通常用于更新一个累加器变量,或者和外部存储系统做交互
reduce
通过函数func聚集数据集中的所有元素。Func函数接受2个参数,返回一个值。这个函数必须是关联性的,确保可以被正确的并发执行。
关于reduce的执行过程,可以对比scala中类似的reduce函数,相关说明可以参考我的scala整理的知识点。
scala> val list = List(1, 2, 3, 4, 5, 6)list: List[Int] = List(1, 2, 3, 4, 5, 6)scala> val listRDD = sc.parallelize(list)listRDD: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[2] at parallelize at :29scala> val ret = listRDD.reduce((v1, v2) => v1 + v2)...ret: Int = 21
需要注意的是,不同于Transformation算子,其结果仍然是RDD,但是执行Actions算子之后,其结果不再是RDD,而是一个标量。
collect
在Driver的程序中,以数组的形式,返回数据集的所有元素。这通常会在使用filter或者其它操作后,返回一个足够小的数据子集再使用,直接将整个RDD集Collect返回,很可能会让Driver程序OOM,这点尤其需要注意。
scala> val list = List(1, 2, 3, 4, 5, 6)list: List[Int] = List(1, 2, 3, 4, 5, 6)scala> val listRDD = sc.parallelize(list)listRDD: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[3] at parallelize at :29scala> val ret = listRDD.collect()...ret: Array[Int] = Array(1, 2, 3, 4, 5, 6)
count
返回数据集的元素个数。
scala> val list = List(1, 2, 3, 4, 5, 6)list: List[Int] = List(1, 2, 3, 4, 5, 6)scala> val listRDD = sc.parallelize(list)listRDD: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[4] at parallelize at :29scala> val ret = listRDD.count()...ret: Long = 6
take
返回一个数组,由数据集的前n个元素组成。注意,这个操作目前并非在多个节点上,并行执行,而是Driver程序所在机器,单机计算所有的元素(Gateway的内存压力会增大,需要谨慎使用)。
scala> val list = List(1, 2, 3, 4, 5, 6)list: List[Int] = List(1, 2, 3, 4, 5, 6)scala> val listRDD = sc.parallelize(list)listRDD: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[5] at parallelize at :29scala> listRDD.take(3)...res7: Array[Int] = Array(1, 2, 3)
first
返回数据集的第一个元素(类似于take(1))。
scala> val list = List(1, 2, 3, 4, 5, 6)list: List[Int] = List(1, 2, 3, 4, 5, 6)scala> val listRDD = sc.parallelize(list)listRDD: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[6] at parallelize at :29scala> listRDD.first()...res8: Int = 1
saveAsTextFile
将数据集的元素,以textfile的形式,保存到本地文件系统,hdfs或者任何其它hadoop支持的文件系统。Spark将会调用每个元素的toString方法,并将它转换为文件中的一行文本。
scala> val list = List(1, 2, 3, 4, 5, 6)list: List[Int] = List(1, 2, 3, 4, 5, 6)scala> val listRDD = sc.parallelize(list)listRDD: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[7] at parallelize at :29scala> listRDD.saveAsTextFile("file:///home/uplooking/data/spark/action")...
可以在文件系统中查看到保存的文件:
[uplooking@uplooking01 action]$ pwd/home/uplooking/data/spark/action[uplooking@uplooking01 action]$ lspart-00000 part-00001 part-00002 part-00003 _SUCCESS
其实可以看到,保存的跟Hadoop的格式是一样的。
当然因为我的spark集群中已经做了跟hadoop相关的配置,所以也可以把文件保存到hdfs中:
scala> listRDD.saveAsTextFile("hdfs://ns1/output/spark/action")...
然后就可以在hdfs中查看到保存的文件:
[uplooking@uplooking01 action]$ hdfs dfs -ls /output/spark/action18/04/27 10:27:55 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicableFound 5 items-rw-r--r-- 3 uplooking supergroup 0 2018-04-27 10:25 /output/spark/action/_SUCCESS-rw-r--r-- 3 uplooking supergroup 2 2018-04-27 10:25 /output/spark/action/part-00000-rw-r--r-- 3 uplooking supergroup 4 2018-04-27 10:25 /output/spark/action/part-00001-rw-r--r-- 3 uplooking supergroup 2 2018-04-27 10:25 /output/spark/action/part-00002-rw-r--r-- 3 uplooking supergroup 4 2018-04-27 10:25 /output/spark/action/part-00003
可以看到,保存的格式跟保存到本地文件系统是一样的。
foreach
在数据集的每一个元素上,运行函数func。这通常用于更新一个累加器变量,或者和外部存储系统做交互。
scala> val list = List(1, 2, 3, 4, 5, 6)list: List[Int] = List(1, 2, 3, 4, 5, 6)scala> val listRDD = sc.parallelize(list)listRDD: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[12] at parallelize at :29scala> listRDD.foreach(println)...
saveAsNewAPIHadoopFile
也就是将数据保存到Hadoop HDFS中,但是需要注意的是,前面使用saveAsTextFile也可以进行相关操作,其使用的就是saveAsNewAPIHadoopFile或者saveAsHadoopFile这两个API,而其两者的区别是:
- saveAsHadoopFile的OutputFormat使用的:org.apache.hadoop.mapred中的早期的类
- saveAsNewAPIHadoopFile的OutputFormat使用的:org.apache.hadoop.mapreduce中的新的类。但不管使用哪一个,都是可以完成工作的。
测试代码如下:
package cn.xpleaf.bigdata.spark.scala.core.p2import org.apache.hadoop.io.{IntWritable, Text}import org.apache.hadoop.mapreduce.lib.output.TextOutputFormatimport org.apache.spark.{SparkConf, SparkContext}/** * Spark算子操作之Action * saveAsNewAPIHAdoopFile * * saveAsHadoopFile * 和saveAsNewAPIHadoopFile的唯一区别就在于OutputFormat的不同 * saveAsHadoopFile的OutputFormat使用的:org.apache.hadoop.mapred中的早期的类 * saveAsNewAPIHadoopFile的OutputFormat使用的:org.apache.hadoop.mapreduce中的新的类 * 使用哪一个都可以完成工作 * * 前面在使用saveAsTextFile时也可以保存到hadoop文件系统中,注意其源代码也是使用上面的操作的 * * Caused by: java.net.UnknownHostException: ns1 ... 35 more 找不到ns1,因为我们在本地没有配置,无法正常解析,就需要将hadoop的配置文件信息给我们加载进来 hdfs-site.xml.heihei,core-site.xml.heihei */object _05SparkActionOps { def main(args: Array[String]): Unit = { val conf = new SparkConf().setMaster("local[2]").setAppName(_05SparkActionOps.getClass.getSimpleName) val sc = new SparkContext(conf) val list = List("hello you", "hello he", "hello me") val listRDD = sc.parallelize(list) val pairsRDD = listRDD.map(word => (word, 1)) val retRDD = pairsRDD.reduceByKey((v1, v2) => v1 + v2) retRDD.saveAsNewAPIHadoopFile( "hdfs://ns1/spark/action", // 保存的路径 classOf[Text], // 相当于mr中的k3 classOf[IntWritable], // 相当于mr中的v3 classOf[TextOutputFormat[Text, IntWritable]] // 设置(k3, v3)的outputFormatClass ) }}
之后我们可以在hdfs中查看到相应的文件输出:
[uplooking@uplooking01 ~]$ hdfs dfs -ls /spark/action 18/04/27 12:07:49 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicableFound 3 items-rw-r--r-- 3 Administrator supergroup 0 2018-04-27 12:07 /spark/action/_SUCCESS-rw-r--r-- 3 Administrator supergroup 13 2018-04-27 12:07 /spark/action/part-r-00000-rw-r--r-- 3 Administrator supergroup 11 2018-04-27 12:07 /spark/action/part-r-00001[uplooking@uplooking01 ~]$ hdfs dfs -text /spark/action/part-r-0000018/04/27 12:08:06 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicablehello 3me 1[uplooking@uplooking01 ~]$ hdfs dfs -text /spark/action/part-r-0000118/04/27 12:08:11 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicableyou 1he 1
宽依赖和窄依赖
窄依赖(narrow dependencies)
子RDD的每个分区依赖于常数个父分区(与数据规模无关)输入输出一对一的算子,且结果RDD的分区结构不变。主要是map/flatmap输入输出一对一的算子,但结果RDD的分区结构发生了变化,如union/coalesce从输入中选择部分元素的算子,如filter、distinct、substract、sample
宽依赖(wide dependencies)
子RDD的每个分区依赖于所有的父RDD分区对单个RDD基于key进行重组和reduce,如groupByKey,reduceByKey对两个RDD基于key进行join和重组,如join经过大量shuffle生成的RDD,建议进行缓存。这样避免失败后重新计算带来的开销。
注意:reduce是一个action,和reduceByKey完全不同。
关于宽依赖和窄依赖,《Hadoop与大数据挖掘》书本上的说明非常精简,但是理解起来也是不错的,可以参考一下,当然,这本书的Spark内容就写得非常少了。