mapPartitions的简单介绍及使用方法
发表于:2024-11-14 作者:千家信息网编辑
千家信息网最后更新 2024年11月14日,本篇内容介绍了"mapPartitions的简单介绍及使用方法"的有关知识,在实际案例的操作过程中,不少人都会遇到这样的困境,接下来就让小编带领大家学习一下如何处理这些情况吧!希望大家仔细阅读,能够学
千家信息网最后更新 2024年11月14日mapPartitions的简单介绍及使用方法
首先,说到mapPartitions大家肯定想到的是map和MapPartitions的对比。大家都知道mapPartition算子是使用一个函数针对分区计算的,函数参数是一个迭代器。而map只针对每条数据调用的,所以存在访问外部数据库等情况时mapParititons更加高效。
mapPartitions函数: 有代码可知mapPartitions的函数参数是传入一个迭代器,返回值是另一个迭代器。 map函数:
map函数就是将rdd的元素由T类型转化为U类型。 综上可知,map和foreach这类的是针对一个元素调用一次我们的函数,也即是我们的函数参数是单个元素,假如函数内部存在数据库链接、文件等的创建及关闭,那么会导致处理每个元素时创建一次链接或者句柄,导致性能底下,很多初学者犯过这种毛病。 而foreachpartition/mapPartitions是针对每个分区调用一次我们的函数,也即是我们函数传入的参数是整个分区数据的迭代器,这样避免了创建过多的临时链接等,提升了性能。 下面的例子都是1-20这20个数字,经过map完成a*3的转换: 结果
大家通常的做法都是申请一个迭代器buffer,将处理后的数据加入迭代器buffer,然后返回迭代器。如下面的demo。 结果乱序了,因为我的list是无序的,可以使用LinkList:
注意,3中的例子,会在mappartition执行期间,在内存中定义一个数组并且将缓存所有的数据。假如数据集比较大,内存不足,会导致内存溢出,任务失败。对于这样的案例,Spark的RDD不支持像mapreduce那些有上下文的写方法。其实,浪尖有个方法是无需缓存数据的,那就是自定义一个迭代器类。如下例:
结果:
本篇内容介绍了"mapPartitions的简单介绍及使用方法"的有关知识,在实际案例的操作过程中,不少人都会遇到这样的困境,接下来就让小编带领大家学习一下如何处理这些情况吧!希望大家仔细阅读,能够学有所成!
1. mappartition简介
/** * Return a new RDD by applying a function to each partition of this RDD. * * `preservesPartitioning` indicates whether the input function preserves the partitioner, which * should be `false` unless this is a pair RDD and the input function doesn't modify the keys. */ def mapPartitions[U: ClassTag]( f: Iterator[T] => Iterator[U], preservesPartitioning: Boolean = false): RDD[U] = withScope { val cleanedF = sc.clean(f) new MapPartitionsRDD( this, (_: TaskContext, _: Int, iter: Iterator[T]) => cleanedF(iter), preservesPartitioning) }
/** * Return a new RDD by applying a function to all elements of this RDD. */ def map[U: ClassTag](f: T => U): RDD[U] = withScope { val cleanF = sc.clean(f) new MapPartitionsRDD[U, T](this, (_, _, iter) => iter.map(cleanF)) }
val a = sc.parallelize(1 to 20, 2)
def mapTerFunc(a : Int) : Int = {a*3}
val mapResult = a.map(mapTerFunc)
println(mapResult.collect().mkString(","))
3,6,9,12,15,18,21,24,27,30,33,36,39,42,45,48,51,54,57,60
3. mappartitions低效用法
val a = sc.parallelize(1 to 20, 2)
def terFunc(iter: Iterator[Int]) : Iterator[Int] = {
var res = List[Int]()
while (iter.hasNext)
{
val cur = iter.next;
res.::= (cur*3) ;
}
res.iterator
}
val result = a.mapPartitions(terFunc)
println(result.collect().mkString(","))
30,27,24,21,18,15,12,9,6,3,60,57,54,51,48,45,42,39,36,33
4. mappartitions高效用法
class CustomIterator(iter: Iterator[Int]) extends Iterator[Int] {
def hasNext : Boolean = {
iter.hasNext
}
def next : Int= {
val cur = iter.next
cur*3
}
}
val result = a.mapPartitions(v => new CustomIterator(v))
println(result.collect().mkString(","))
3,6,9,12,15,18,21,24,27,30,33,36,39,42,45,48,51,54,57,60
"mapPartitions的简单介绍及使用方法"的内容就介绍到这里了,感谢大家的阅读。如果想了解更多行业相关的知识可以关注网站,小编将为大家输出更多高质量的实用文章!
函数
数据
迭代
方法
元素
参数
内存
结果
链接
使用方法
例子
内容
就是
性能
情况
数据库
更多
案例
知识
类型
数据库的安全要保护哪些东西
数据库安全各自的含义是什么
生产安全数据库录入
数据库的安全性及管理
数据库安全策略包含哪些
海淀数据库安全审计系统
建立农村房屋安全信息数据库
易用的数据库客户端支持安全管理
连接数据库失败ssl安全错误
数据库的锁怎样保障安全
网络安全组织架构体系
英雄大作战不能连接服务器
2008年9月网络技术
互联网金融 服务器
瑞庭网络技术是外包吗
文明重启创建服务器要多少钱
数据库读一致性场景
互联网财税科技
php sql数据库
原油关注哪些数据库
皮书数据库阅读器
工业网络技术有多垃圾
魔兽世界各服务器联网时间
杭州杉饶网络技术有限公司
软件开发最火的领域
中国网络安全问题真的很严重吗
数据库中怎么求两个数的和
学生网络技术咨询信息推荐
个人电脑如何搭建成服务器
网络安全法 人大
乡镇 网络安全应急预案
服务器cpu主频
数据库读一致性场景
小程序服务器端
网络安全知识竞赛怎么找答案
值得相信的网络安全公司
事业单位计算机网络技术类
眼科软件开发
管家婆怎么倒销售数据库
diy男装数据库