千家信息网

基本的 RDD 操作——PySpark

发表于:2025-01-23 作者:千家信息网编辑
千家信息网最后更新 2025年01月23日,基本的 RDD 转化操作map()语法:RDD.map(,preservesPartitoning=False)转化操作 map() 是所有转化操作中最基本的。它将一个具名函数或匿名函数对数据集内的所
千家信息网最后更新 2025年01月23日基本的 RDD 操作——PySpark

基本的 RDD 转化操作

  1. map()

语法:RDD.map(,preservesPartitoning=False)

转化操作 map() 是所有转化操作中最基本的。它将一个具名函数或匿名函数对数据集内的所有元素进行求值。map() 函数可以异步执行,也不会尝试与别的 map() 操作通信或同步。也就是说,这是无共享的操作。

参数 preserversPatitioning 是可选的,为 Boolean 类型的参数,用于定义了区分规则的 RDD,它们有定义好的键,并按照键的哈希值或范围进行了分组。如果这个参数被设为 True,这些分区会保存完整。这个参数可以被 Spark 调度器用于优化后续操作,比如,基于分区的键进行的连接操作。

转化操作 map() 对输入的每条记录计算同一个函数,并生成转化后的输出记录。

# map()

map_rdd=sc.textFile('file:///usr/local/spark/README.md')

print(map_rdd.take(5))

map_rdd_new=map_rdd.map(lambda x:x.split(' '))

print(map_rdd_new.take(5))

# 输出

['# Apache Spark', '', 'Spark is a fast and general cluster computing system for Big Data. It provides', 'high-level APIs in Scala, Java, Python, and R, and an optimized engine that', 'supports general computation graphs for data analysis. It also supports a']

[['#', 'Apache', 'Spark'], [''], ['Spark', 'is', 'a', 'fast', 'and', 'general', 'cluster', 'computing', 'system', 'for', 'Big', 'Data.', 'It', 'provides'], ['high-level', 'APIs', 'in', 'Scala,', 'Java,', 'Python,', 'and', 'R,', 'and', 'an', 'optimized', 'engine', 'that'], ['supports', 'general', 'computation', 'graphs', 'for', 'data', 'analysis.', 'It', 'also', 'supports', 'a']]

在这个例子中,split 函数接收一个字符串,生成一个列表,输入数据中的每个字符串元素都被映射为输出数据中的一个列表元素。产生的结果为一个列表的列表。

2.flatMap()

语法::RDD.flatMap(,preservesPartitioning=False)

转化操作 flatMap() 和转化操作 map() 类似,都将函数作用于输入数据集的每条记录。但是,flatMap() 还会"拍平"输出数据,这表示它会去掉一层嵌套。比如,给定一个包含字符串列表的列表,"拍平"操作会产生一个由字符串组成的列表,也就是"拍平"了所有嵌套的列表。

# flatMap()

flat_map_rdd=sc.textFile('file:///usr/local/spark/README.md')

print(flat_map_rdd.take(5))

map_rdd_new=flat_map_rdd.flatMap(lambda x:x.split(' '))

print(map_rdd_new.take(5))

# 输出

['# Apache Spark', '', 'Spark is a fast and general cluster computing system for Big Data. It provides', 'high-level APIs in Scala, Java, Python, and R, and an optimized engine that', 'supports general computation graphs for data analysis. It also supports a']

['#', 'Apache', 'Spark', '', 'Spark']

在这个例子中,flatMap() 使用的匿名函数和 map() 操作所使用的相同。注意,每个字符串并没有产生一个对应的列表对象,所有的元素拍平到一个列表中。换句话说,这个例子里的 flatMap() 产生了一个组合的列表作为输出,而不是 map() 中那个列表的列表。

3.filter()

语法:RDD.filter()

转化操作 filter 讲一个 Boolean 类型的表达式对数据集里的每个元素进行求值,这个表达式通常用匿名函数来表示。返回的布尔值决定了该记录是否被包含在产生的输出 RDD 里。这是一种常用的转化操作,用于从 RDD 中移除不需要的记录作为中间结果,或者移除不需要放在最终输出里的记录。

# filter()

licenses = sc.textFile('file:///usr/local/spark/README.md')

words = licenses.flatMap(lambda x:x.spilt(' '))

print(words.take(5))

lowercase = words.map(lambda x:x.lower())

print(lowercase.take(5))

longwords = lowercase.filter(lambda x:len(x) > 12)

print(longwords.take(5))

# 输出

['#', 'Apache', 'Spark', '', 'Spark']

['#', 'apache', 'spark', '', 'spark']

['', 'documentation', 'documentation,', 'page](http://spark.apache.org/documentation.html).', 'instructions.']

4.distinct()

语法:RDD.distinct(numPartitions=None)

转化操作 distinct() 返回一个新的 RDD,其中仅包含输入 RDD 中去重后的元素。它可以用来去除重复的值。参数 numPartitions 可以把数据重新分区为给定的分区数量。如果没有提供这个参数或是使用了默认值,那么转化操作 distinct() 返回的分区数和输入的 RDD 的 分区数保持一致。

# distinct()

licenses = sc.textFile('file:///usr/local/spark/README.md')

words = licenses.flatMap(lambda x : x.split(' '))

lowercase = words.map(lambda x : x.lower())

allwords = lowercase.count()

diswords = lowercase.distinct().count()

print ("Total words : {} ,Distinct words: {}".format(allwords,diswords))

# 输出

Total words : 579 ,Distinct words: 276

5.groupBy()

语法:RDD.groupBy(,numPartitons=None)

转化操作 groupBy() 返回一个按指定函数对元素进行分组的 RDD。参数 可以是具名函数,也可以是匿名函数,用来确定对所有元素进行分组的键,或者指定用于对元素进行求值以确定其所属分组的表达式。参数 numPartitions,通过计算分组函数输出的键空间的哈希值,以自动创建指定数量的分区。要注意的是,groupBy() 返回的是一个可迭代对象。

# groupBy()

licenses = sc.textFile('file:///usr/local/spark/README.md')

words = licenses.flatMap(lambda x: x.split(' ')).filter(lambda x:len(x) > 0)

groupbyfirstletter = words.groupBy(lambda x: x[0].lower)

print(groupbyfirstletter.take(1))

# 输出

[(, )]

6.sortBy()

语法:RDD.sortBy(,ascending=True,numPartitions=None)

转化操作 sortBy() 将 RDD 按照 参数选出的指定数据集的键进行排序。它根据键对象的类型的顺序进行排序。参数 ascending 是布尔类型的参数,默认为 True,指定所使用的排序顺序。如果要使用降序,需要设置 ascending=False。

# sortBy()

readme = sc.textFile('file:///usr/local/spark/README.md')

words = readme.flatMap(lambda x:x.split(' ')).filter(lambda x:len(x) > 0)

sortbyfirstletter = words.sortBy(lambda x:x[0].lower(),ascending=False)

print(sortbyfirstletter.take(5))

# 输出

['You', 'you', 'You', 'you', 'you']

基本的 RDD 行动操作

Spark 中的行动操作要么返回值,比如 count();要么返回数据,比如 collect();要么保存数据到外部,比如 saveAsTextFile()。在所有情况中,行动操作都会对 RDD 及其所有父 RDD 强制进行计算。一些行动操作返回计数,或是数据的聚合值,或是 RDD 中全部或部分数据。与这些不同的是,行动操作 foreach() 会对 RDD 中的每个元素执行一个函数。

1.count()

语法:RDD.count()

行动操作 count() 不接收参数,返回一个 long 类型的值,代表 RDD 中元素的个数。

# count()

licenses = sc.textFile('file:///usr/local/spark/licenses')

words = licenses.flatMap(lambda x: x.split(' '))

print(words.count())

# 输出

22997

注意,对于不接收参数的行动操作,需要在行动操作名带上空的括号 ()。

2.collect()

语法:RDD.collect()

行动操作 collect() 向 Spark 驱动器进程返回一个由 RDD 中所有元素组成的列表。collect() 没有限制输出,可能导致输出量相当大。一般只用在小规模 RDD 或开发中。

# collect()

licenses = sc.textFile('file:///usr/local/spark/licenses')

words = licenses.flatMap(lambda x: x.split(' '))

print(words.collect())

# 输出

['', '

3.take()

语法:RDD.take(n)

行动操作 take() 返回 RDD 的前 n 个元素。选取的元素没有特定的顺序。事实上,行动操作 take() 返回的元素是不确定的,这意味着再次运行同一个行动操作时,返回的元素可能会不同,尤其是在完全分布式的环境中。

对于横跨超过一个分区的 RDD,take() 会扫描一个分区,并使用该分区的结果来预估还需扫描多少分区才能满足获取所要求数量的全部的值。

# take()

licenses = sc.textFile('file:///usr/local/spark/licenses')

words = licenses.flatMap(lambda x: x.split(' '))

print(words.take(5))

# 输出

['', '

4.top()

语法:RDD.top(n,key=None)

行动操作 top() 返回一个 RDD 中的前 n 个元素,但是和 take() 不同的是,如果使用 top(),元素会排序并按照降序输出。参数 key 指定了按照什么对结果进行排序以返回前 n 个元素。如果没有提供,会使用根据 RDD 的元素所推断出来的键。

# top()

licenses = sc.textFile('file:///usr/local/spark/licenses')

words = licenses.flatMap(lambda x: x.split(' '))

print(words.distinct().top(5))

# 输出

['·', '©', '}', 'your', 'you.']

5.first()

语法:RDD.first()

行动操作 first() 返回 RDD 的第一个元素。first() 不考虑元素的顺序,是一个非确定性的操作,尤其是在完全分布式的环境中。

# first()

readme = sc.textFile('file:///usr/local/spark/README.md')

words = readme.flatMap(lambda x:x.split(' ')).filter(lambda x:len(x) > 0)

print(words.distinct().first())

print(words.distinct().take(1))

# 输出

#

['#']

first() 和 take(1) 最主要的区别在于 first() 返回一个原子的数据元素,而 take() (即使 n=1)返回的是由数据元素组成的列表。

6.reduce() 和 fold()

行动操作 reduce() 和 fold() 是执行聚合的行动操作,它们都执行满足交换律或结合律的操作,比如对 RDD 里的一系列值求和。这里的交换律和结合律表示操作与执行的顺序无关。这是分布式处理所要求的,因为在分布式处理中,顺序无法保证。

语法:RDD.reduce()

RDD.fold(zeroValue,)

行动操作 reduce() 使用指定的满足交换律或结合律的运算符来归约 RDD 中的所有元素。参数 指定接收两个输入的匿名函数(lambda x,y:....),它表示来自指定 RDD 的序列中的值。

行动操作 fold() 使用给定的 function 和 zeroValue 把 RDD 中每个分区的元素聚合,然后把每个分区的聚合结果再聚合。尽管 reduce() 和 fold() 的功能相似,但还是有区别的,fold() 不满足交换律,因此需要给定第一个值和最后一个值(zeroValue)。

# reduce(),fold()

numbers = sc.parallelize([1,2,3,4,5,6,7,8,9])

print(numbers.reduce(lambda x,y: x+y)) #输出:45

print(numbers.fold(0,lambda x,y: x+y)) #输出:45

empty = sc.parallelize([])

# print(empty.reduce(lambda x,y: x+y)) #输出:ValueError: Can not reduce() empty RDD

print(empty.getNumPartitions()) #查看rdd分区数,输出为 2

print(empty.fold(1,lambda x,y: x+y)) #输出:3

fold中zeroValue除了在每个分区计算中作为初始值使用之后,在最后的reduce操作仍然需要使用一次,所以fold()在zeroValue不为0是计算结果为reduce()+(分区数+1)*zeroValue(以加法为例),参考链接:Pyspark学习笔记,fold()官网源码

7.foreach()

语法:RDD.foreach()

行动操作 foreach() 把 <> 参数指定的具名或匿名函数应用到 RDD 中的所有元素上。因为 foreach() 是行动操作而不是转化操作,可以使用在转化操作中无法使用或不该使用的函数。

# foreach()

def printfunc(x):

print(x)

licenses = sc.textFile('file:///usr/local/spark/licenses')

longwords = licenses.flatMap(lambda x: x.split(' ')).filter(lambda x: len(x) > 12)

longwords.foreach(lambda x: printfunc(x))

# 输出

...

means

href="#exhibit-a">Exhibit

id="section-1.10.1">1.10.1.

...

键值对 RDD 的转化操作

键值对 RDD,也就是 PairRDD,它的记录由键和值组成。键可以是整型或者字符串对象,也可以是元组这样的复杂对象。而值可以是标量值,也可以是列表、元组、字典或集合等数据结构。这是在读时系统和 NoSQL 系统上进行各种结构化数据分析时常用的数据表示形式。PairRDD 及其成员函数大致被分为如下四类:

  • 字典函数

  • 函数式转化操作

  • 分组操作、聚合操作与排序操作

  • 连接操作

字典函数返回键值对 RDD 的键的集合或值的集合,比如 keys() 和 values()。

1.keys()

语法:RDD.keys()

keys() 函数返回键值对 RDD 中所有键组成的 RDD,或者说是由键值对 RDD 中每个二元组的第一个元素组成的 RDD。

# keys()

kvpairs = sc.parallelize([('city','Beijing'),('state','SHIGEZHUANG'),('zip','000000'),('country','China')])

print(kvpairs.keys().collect())

# 输出

['city', 'state', 'zip', 'country']

2.values()

语法:RDD.values()

values() 函数返回键值对 RDD 中所有值组成的 RDD,或者说是由键值对 RDD 中每个二元组的第二个元素组成的 RDD。

# values()

kvpairs = sc.parallelize([('city','Beijing'),('state','SHIGEZHUANG'),('zip','000000'),('country','China')])

print(kvpairs.values().collect())

# 输出

['Beijing', 'SHIGEZHUANG', '000000', 'China']

3.keyBy()

语法:RDD.keyBy()

转化操作 keyBy() 创建出由从 RDD 中的元素里提取的键与值组成的元组,其中 参数给定的函数将原元素转为输出元素的键,而原来的整个元组是输出的值

# keyBy()

locations = sc.parallelize([('city','Beijing',1),('state','SHIGEZHUANG',2),('zip','000000',3),('country','China',4)])

bylocno = locations.keyBy(lambda x: x[2])

print(bylocno.collect())

# 输出

[(1, ('city', 'Beijing', 1)), (2, ('state', 'SHIGEZHUANG', 2)), (3, ('zip', '000000', 3)), (4, ('country', 'China', 4))]

4.mapValues()

语法:RDD.mapValues()

转化操作 mapValues() 把键值对 RDD 的每个值都传给一个函数(通过 参数指定的具名函数或匿名函数),而键保持不变。mapValues() 对于每个输入元素输出一个元素。原 RDD 的分区保持不变。

5.flatMapValues()

语法:RDD.flatMapValues()

转化操作 flatMapValues() 把键值对 RDD 的每个值都传给一个函数处理,而键保持不变,并生成拍平的列表。对于每个输入元素,返回 0 个乃至多个输出元素。使用 flatMapValues() 是会保留原 RDD 的分区情况。

# mapValues(),flatMapValues()

locwtemps = sc.parallelize(['Beijing,71|72|73|72|70','Shanghai,46|42|40|37|39','Tianjin,50|48|51|43|44'])

kvpairs = locwtemps.map(lambda x: x.split(','))

print('kvpairs: ',kvpairs.take(4))

locwtemplist = kvpairs.mapValues(lambda x: x.split('|')).mapValues(lambda x: [int(s) for s in x])

print('locwtemplist: ',locwtemplist.take(3))

locwtemps = kvpairs.flatMapValues(lambda x: x.split('|')).map(lambda x:(x[0],int(x[1])))

print('locwtemps: ',locwtemps.take(4))

# 输出

kvpairs: [['Beijing', '71|72|73|72|70'], ['Shanghai', '46|42|40|37|39'], ['Tianjin', '50|48|51|43|44']]

locwtemplist: [('Beijing', [71, 72, 73, 72, 70]), ('Shanghai', [46, 42, 40, 37, 39]), ('Tianjin', [50, 48, 51, 43, 44])]

locwtemps: [('Beijing', 71), ('Beijing', 72), ('Beijing', 73), ('Beijing', 72)]

6.groupByKey()

语法:RDD.groupByKey(numPartitions=None,partitionFunc=)

转化操作 groupByKey() 将键值对RDD 按各个键对值进行分组,把同组的值整合成一个序列。参数 numPartitions 指定要创建多少个分区(也就是多少个分组)。分区使用 partitionFunc 参数的值创建,默认值为 Spark 内置的哈希分区函数。如果 numPartitions 为默认值 None,就使用系统默认的分区数( spark.default.parallelism )。

# groupByKey()

locwtemps = sc.parallelize(['Beijing,71|72|73|72|70','Shanghai,46|42|40|37|39','Tianjin,50|48|51|43|44'])

kvpairs = locwtemps.map(lambda x: x.split(','))

print('kvpairs: ',kvpairs.collect())

locwtemplist = kvpairs.mapValues(lambda x: x.split('|')).mapValues(lambda x: [int(s) for s in x])

print('locwtemplist: ',locwtemplist.collect())

locwtemps = kvpairs.flatMapValues(lambda x: x.split('|')).map(lambda x:(x[0],int(x[1])))

print('locwtemps: ',locwtemps.collect())

grouped = locwtemps.groupByKey()

print('grouped: ',grouped.collect())

avgtemps = grouped.mapValues(lambda x: sum(x)/len(x))

print('avgtemps: ',avgtemps.collect())

# 输出

kvpairs: [['Beijing', '71|72|73|72|70'], ['Shanghai', '46|42|40|37|39'], ['Tianjin', '50|48|51|43|44']]

locwtemplist: [('Beijing', [71, 72, 73, 72, 70]), ('Shanghai', [46, 42, 40, 37, 39]), ('Tianjin', [50, 48, 51, 43, 44])]

locwtemps: [('Beijing', 71), ('Beijing', 72), ('Beijing', 73), ('Beijing', 72), ('Beijing', 70), ('Shanghai', 46), ('Shanghai', 42), ('Shanghai', 40), ('Shanghai', 37), ('Shanghai', 39), ('Tianjin', 50), ('Tianjin', 48), ('Tianjin', 51), ('Tianjin', 43), ('Tianjin', 44)]

grouped: [('Beijing', ), ('Shanghai', ), ('Tianjin', )]

avgtemps: [('Beijing', 71.6), ('Shanghai', 40.8), ('Tianjin', 47.2)]

注意 groupByKey() 返回的分组后的值是一个 resultiterable 对象。Python 中的 iterable 对象是可以循环遍历的序列对象。Python 中的许多函数接受可迭代对象作为输入,比如 sum() 和 len() 函数。

7.reduceByKey()

语法:RDD.reduceByKey(,numPartitions=None,partitionFunc=)

转化操作 reduceByKey() 使用满足结合律的函数合并键对应的值。调用键值对数据集的 reduceByKey() 方法,返回的是键值对的数据集,其数据按照键聚合了对应的值。参数 numPartitions 和 partitionFunc 与使用 groupByKey() 函数时的用法一模一样。numPartitions 的值还影响 saveAsTextFile() 或是其他产生文件的行动操作所产生的文件数量。例如,numPartitions = 2 会把 RDD 保存在硬盘时共生成两个输出文件。

# reduceByKey()

locwtemps = sc.parallelize(['Beijing,71|72|73|72|70','Shanghai,46|42|40|37|39','Tianjin,50|48|51|43|44'])

kvpairs = locwtemps.map(lambda x: x.split(','))

locwtemplist = kvpairs.mapValues(lambda x: x.split('|')).mapValues(lambda x: [int(s) for s in x])

locwtemps = kvpairs.flatMapValues(lambda x: x.split('|')).map(lambda x:(x[0],int(x[1])))

temptups = locwtemps.mapValues(lambda x: (x,1))

print('temptups: ',temptups.collect())

inputstoavg = temptups.reduceByKey(lambda x,y: (x[0]+y[0],x[1]+y[1]))

print('inputstoavg: ',inputstoavg.collect())

averages = inputstoavg.map(lambda x:(x[0],x[1][0]/x[1][1]))

print('averages: ',averages.collect())

# 输出

temptups: [('Beijing', (71, 1)), ('Beijing', (72, 1)), ('Beijing', (73, 1)), ('Beijing', (72, 1)), ('Beijing', (70, 1)), ('Shanghai', (46, 1)), ('Shanghai', (42, 1)), ('Shanghai', (40, 1)), ('Shanghai', (37, 1)), ('Shanghai', (39, 1)), ('Tianjin', (50, 1)), ('Tianjin', (48, 1)), ('Tianjin', (51, 1)), ('Tianjin', (43, 1)), ('Tianjin', (44, 1))]

inputstoavg: [('Beijing', (358, 5)), ('Shanghai', (204, 5)), ('Tianjin', (236, 5))]

averages: [('Beijing', 71.6), ('Shanghai', 40.8), ('Tianjin', 47.2)]

求平均值不是满足结合律的操作,可以通过创建元组来绕过去,元组中包含每个键对应的值和与每个键对应的计数,这两个都满足交换律和结合律,然后在最后一步计算平均值。

注意 reduceByKey() 比较高效,是因为它在每个执行器本地对值进行了先行组合,然后把组合后的列表发送到远程的执行器来执行最后的阶段。这是一个会产生数据混洗的操作。

以求和函数为例,可以当作是累加一个由和组成的列表,而不是对单个值组成的更大的列表进行求和。因为在数据混洗时发送的数据更少,使用 reduceByKey() 进行求和一般要比使用 groupByKey() 并指定 sum() 函数的性能更好。

8.foldByKey()

语法:RDD.foldByKey(zeroValue,,numPartitions=None,partitionFunc=)

转化操作 foldByKey() 在功能上和行动操作 fold() 类似,但是 foldByKey() 是转化操作,操作预先定义的键值对元素。foldByKey() 和 fold() 都提供了相同数据类型的 zeroValue 参数供 RDD 为空时使用。参数 numPartitions 和 partitionFunc 与转化操作 groupByKey() 和 reduceByKey() 中的作用一样。

# foldByKey()

locwtemps = sc.parallelize(['Beijing,71|72|73|72|70','Shanghai,46|42|40|37|39','Tianjin,50|48|51|43|44'])

kvpairs = locwtemps.map(lambda x: x.split(','))

locwtemplist = kvpairs.mapValues(lambda x: x.split('|')).mapValues(lambda x: [int(s) for s in x])

locwtemps = kvpairs.flatMapValues(lambda x: x.split('|')).map(lambda x:(x[0],int(x[1])))

maxbycity = locwtemps.foldByKey(0,lambda x,y: x if x > y else y)

print('maxbycity: ',maxbycity.collect())

# 输出

maxbycity: [('Beijing', 73), ('Shanghai', 46), ('Tianjin', 51)]

9.sortByKey()

语法:RDD.sortByKey(ascending=True,numPartitions=None,keyfunc=)

转化操作 sortByKey() 把键值对 RDD 根据键进行排序。排序依据取决于键对象的类型。该操作与 sort() 的区别之处在于 sort() 要求指定排序依据的键,而 sortByKey() 的键是键值对 RDD 里定义的。

键按照 ascending 参数提供的顺序进行排序,该参数默认值为 True,表示升序。参数 numPartitions 指定了输出多少分区,分区函数为范围分区函数。参数 keyfunc 是一个可选参数,可以通过对原键使用另一个函数而修改原键。

# sortByKey()

locwtemps = sc.parallelize(['Beijing,71|72|73|72|70','Shanghai,46|42|40|37|39','Tianjin,50|48|51|43|44'])

kvpairs = locwtemps.map(lambda x: x.split(','))

locwtemplist = kvpairs.mapValues(lambda x: x.split('|')).mapValues(lambda x: [int(s) for s in x])

locwtemps = kvpairs.flatMapValues(lambda x: x.split('|')).map(lambda x:(x[0],int(x[1])))

sortedbykey = locwtemps.sortByKey()

print('sortedbykey: ',sortedbykey.collect())

sortedbyval = locwtemps.map(lambda x: (x[1],x[0])).sortByKey(ascending=False)

print('sortedbyval: ',sortedbyval.collect())

# 输出

sortedbykey: [('Beijing', 71), ('Beijing', 72), ('Beijing', 73), ('Beijing', 72), ('Beijing', 70), ('Shanghai', 46), ('Shanghai', 42), ('Shanghai', 40), ('Shanghai', 37), ('Shanghai', 39), ('Tianjin', 50), ('Tianjin', 48), ('Tianjin', 51), ('Tianjin', 43), ('Tianjin', 44)]

sortedbyval: [(73, 'Beijing'), (72, 'Beijing'), (72, 'Beijing'), (71, 'Beijing'), (70, 'Beijing'), (51, 'Tianjin'), (50, 'Tianjin'), (48, 'Tianjin'), (46, 'Shanghai'), (44, 'Tianjin'), (43, 'Tianjin'), (42, 'Shanghai'), (40, 'Shanghai'), (39, 'Shanghai'), (37, 'Shanghai')]

连接操作

1.join()

语法:RDD.join(,numPartitions=None)

转化操作 join() 是内连接的一个实现,根据键来匹配两个键值对 RDD。可选参数 numPartitions 决定生成的数据集要创建多少分区。如果不指明这个参数,缺省值为 spark.default.parallelism 配置参数对应的值。返回的 RDD 是一个列表,其结构包含匹配键,以及一个二元组。这个二元组包含来自两个 RDD 的一组匹配记录。

# 连接操作

stores = sc.parallelize([(100,'Beijing'),(101,'Shanghai'),(102,'Tianjin'),(103,'Taiyuan')])

salespeople = sc.parallelize([(1,'Tom',100),(2,'Karen',100),(3,'Paul',101),(4,'Jimmy',102),(5,'Jack',None)])

# join()

print(salespeople.keyBy(lambda x: x[2]).join(stores).collect())

# 输出

[(100, ((1, 'Tom', 100), 'Beijing')), (100, ((2, 'Karen', 100), 'Beijing')), (101, ((3, 'Paul', 101), 'Shanghai')), (102, ((4, 'Jimmy', 102), 'Tianjin'))]

2.leftOuterJoin()

语法:RDD.leftOuterJoin(,numPartitions=None)

转化操作 leftOuterJoin() 返回第一个 RDD 中包含的所有记录或元素。如果第一个 RDD(左 RDD)中的键在右 RDD 中存在,那么右 RDD 中匹配的记录会和左 RDD 的记录一起返回。否则,右 RDD 的记录为空。

# leftOuterJoin()

leftjoin = salespeople.keyBy(lambda x: x[2]).leftOuterJoin(stores)

print("leftjoin: ",leftjoin.collect())

print(leftjoin.filter(lambda x: x[1][1] is None).map(lambda x: "salesperson " + x[1][0][1] + " has no store").collect())

# 输出

leftjoin: [(100, ((1, 'Tom', 100), 'Beijing')), (100, ((2, 'Karen', 100), 'Beijing')), (None, ((5, 'Jack', None), None)), (101, ((3, 'Paul', 101), 'Shanghai')), (102, ((4, 'Jimmy', 102), 'Tianjin'))]

['salesperson Jack has no store']

3.rightOuterJoin()

语法:RDD.rightOuterJoin(,numPartitions=None)

转化操作 rightOuterJoin() 返回第二个 RDD 中包含的所有元素或记录。如果第二个 RDD(右 RDD)中的键在左 RDD 中存在,则左 RDD 中匹配的记录会和右 RDD 中的记录一起返回。否则,左 RDD 的记录为 None(空)。

# rightOuterJoin()

print(

salespeople.keyBy(lambda x: x[2])\

.rightOuterJoin(stores)\

.filter(lambda x: x[1][0] is None)\

.map(lambda x: x[1][1] + " store has no salespeople")\

.collect()

)

# 输出

['Taiyuan store has no salespeople']

4.fullOuterJoin()

语法:RDD.fullOuterJoin(,numPartitions=None)

fullOuterJoin() 无论是否有匹配的键,都会返回两个 RDD 中的所有元素。左数据集或右数据集中没有匹配的元素都用 None(空)来表示。

# fullOuterJoin

print(

salespeople.keyBy(lambda x: x[2]).fullOuterJoin(stores).filter(lambda x: x[1][0] is None or x[1][1] is None).collect()

)

# 输出

[(None, ((5, 'Jack', None), None)), (103, (None, 'Taiyuan'))]

print(

salespeople.keyBy(lambda x: x[2]).fullOuterJoin(stores).collect()

)

# 输出

[(100, ((1, 'Tom', 100), 'Beijing')), (100, ((2, 'Karen', 100), 'Beijing')), (None, ((5, 'Jack', None), None)), (101, ((3, 'Paul', 101), 'Shanghai')), (102, ((4, 'Jimmy', 102), 'Tianjin')), (103, (None, 'Taiyuan'))]

5.cogroup()

语法:RDD.cogroup(,numPartitions=None)

转化操作 cogroup() 将多个键值对数据集按键进行分组。在概念上和 fullOuterJoin() 有些类似,但在实现上有以下关键区别:

  • 转化操作 cogroup() 返回可迭代对象,类似 groupByKey() 函数。

  • 转化操作 cogroup() 将两个 RDD 中的多个元素进行分组,而 fullOuterJoin() 则对同一个键创建出多个分开的输出元素。

  • 转化操作 cogroup() 可以通过 Scala API 或者函数别名 groupWith() 对三个以上的 RDD 进行分组。

对 A、B 两个 RDD 按照键 K 进行 cogroup() 操作生成的 RDD 输出具有下面的结构:

[K, Iterable(K,VA,...), Iterable(K,VB,...) ]

如果一个 RDD 中没有另一个 RDD 中包含的给定键的值,相应的可迭代对象则为空。

# cogroup()

print(salespeople.keyBy(lambda x: x[2]).cogroup(stores).take(1))

print('----------------')

print(salespeople.keyBy(lambda x: x[2]).cogroup(stores).mapValues(lambda x: [item for sublist in x for item in sublist]).collect())

# 输出

[(100, (, ))]

----------------

[(100, [(1, 'Tom', 100), (2, 'Karen', 100), 'Beijing']), (None, [(5, 'Jack', None)]), (101, [(3, 'Paul', 101), 'Shanghai']), (102, [(4, 'Jimmy', 102), 'Tianjin']), (103, ['Taiyuan'])]

6.cartesian()

语法:RDD.cartesian()

转化操作 cartesian() 即笛卡尔集,有时也被称为交叉连接,会根据两个 RDD 的记录生成所有可能的组合。该操作生成的记录条数等于第一个 RDD 的记录条数乘以第二个 RDD 的记录条数。

# cartesian()

print(salespeople.keyBy(lambda x: x[2]).cartesian(stores).collect())

print('----------------')

print(salespeople.keyBy(lambda x: x[2]).cartesian(stores).count())

# 输出

[((100, (1, 'Tom', 100)), (100, 'Beijing')), ((100, (1, 'Tom', 100)), (101, 'Shanghai')), ((100, (2, 'Karen', 100)), (100, 'Beijing')), ((100, (2, 'Karen', 100)), (101, 'Shanghai')), ((100, (1, 'Tom', 100)), (102, 'Tianjin')), ((100, (1, 'Tom', 100)), (103, 'Taiyuan')), ((100, (2, 'Karen', 100)), (102, 'Tianjin')), ((100, (2, 'Karen', 100)), (103, 'Taiyuan')), ((101, (3, 'Paul', 101)), (100, 'Beijing')), ((101, (3, 'Paul', 101)), (101, 'Shanghai')), ((102, (4, 'Jimmy', 102)), (100, 'Beijing')), ((102, (4, 'Jimmy', 102)), (101, 'Shanghai')), ((None, (5, 'Jack', None)), (100, 'Beijing')), ((None, (5, 'Jack', None)), (101, 'Shanghai')), ((101, (3, 'Paul', 101)), (102, 'Tianjin')), ((101, (3, 'Paul', 101)), (103, 'Taiyuan')), ((102, (4, 'Jimmy', 102)), (102, 'Tianjin')), ((102, (4, 'Jimmy', 102)), (103, 'Taiyuan')), ((None, (5, 'Jack', None)), (102, 'Tianjin')), ((None, (5, 'Jack', None)), (103, 'Taiyuan'))]

----------------

20

集合操作

1.union()

语法:RDD.union()

转化操作 union() 将另一个 RDD 追加到 RDD 的后面,组合成一个输出 RDD。两个 RDD 不一定要有相同的结构。如果两个输入 RDD 有相同的记录,转化操作 union() 不会从输出 RDD 中过滤这些重复的数据。

>>> fibonacci = sc.parallelize([0,1,2,3,5,8])

>>> odds = sc.parallelize([1,3,5,7,9])

>>> odds.union(fibonacci).collect()

[1, 3, 5, 7, 9, 0, 1, 2, 3, 5, 8]

2.intersection()

语法:RDD.intersection()

转化操作 intersection() 返回两个 RDD 中共有的元素。也就是该操作会返回两个集合中共有的元素。返回的元素或者记录必须在两个集合中是一模一样的,需要记录的数据结构和每个字段都对的上。

>>> fibonacci = sc.parallelize([0,1,2,3,5,8])

>>> odds = sc.parallelize([1,3,5,7,9])

>>> odds.intersection(fibonacci).collect()

[1, 3, 5]

3.subtract()

语法:RDD.subtract(,numPartitions=None)

转化操作 subtract() 会返回第一个 RDD 中所有没有出现在第二个 RDD 中的元素。这是数学上的集合减法的一个实现。

>>> fibonacci = sc.parallelize([0,1,2,3,5,8])

>>> odds = sc.parallelize([1,3,5,7,9])

>>> odds.subtract(fibonacci).collect()

[7, 9]

4.subtractByKey()

语法:RDD.subtractByKey(,numPartitions=None)

转化操作 subtractByKey() 是一个和 subtract 类似的集合操作。subtractByKey() 操作返回一个键值对 RDD 中所有在另一个键值对 RDD 中没有对应键的元素。参数 numPartitions 可以指定生成的结果 RDD 包含多少个分区,缺省值为配置项 spark.default.parallelism 的值。

>>> cities1 = sc.parallelize([('Hayward',(37.668819,-122.080795)),

... ('Baumholder',(49.6489,7.3975)),

... ('Alexandria',(38.820450,-77.050552)),

... ('Melbourne',(37.663712,144.844788))])

>>> cities2 = sc.parallelize([('Boulder Creek',(64.0708333,-148.2236111)),

... ('Hayward',(37.668819,-122.080795)),

... ('Alexandria',(38.820450,-77.050552)),

... ('Arlington',(38.878337,-77.100703))])

>>> cities1.subtractByKey(cities2).collect()

[('Melbourne', (37.663712, 144.844788)), ('Baumholder', (49.6489, 7.3975))]

>>> cities2.subtractByKey(cities1).collect()

[('Boulder Creek', (64.0708333, -148.2236111)), ('Arlington', (38.878337, -77.100703))]

数值型 RDD 的操作

数值型 RDD 仅由数值组成,常用于统计分析。

>>> numbers = sc.parallelize([0,1,0,1,2,3,4,5,6,7,8,9])

>>> numbers.min() #最小值

0

>>> numbers.max() #最大值

9

>>> numbers.mean() #算术平均数

3.8333333333333335

>>> numbers.sum() #求和

46

>>> numbers.stdev() #标准差

3.0230595245361753

>>> numbers.variance() #方差

9.138888888888888

>>> numbers.stats() #返回 StatCounter 对象,一次调用获得一个包括 count()、mean()、stdev()、max()、min() 的结构

(count: 12, mean: 3.83333333333, stdev: 3.02305952454, max: 9.0, min: 0.0)


0