千家信息网

Spark中的RDD到底是什么

发表于:2025-02-23 作者:千家信息网编辑
千家信息网最后更新 2025年02月23日,这篇文章主要讲解了"Spark中的RDD到底是什么",文中的讲解内容简单清晰,易于学习与理解,下面请大家跟着小编的思路慢慢深入,一起来研究和学习"Spark中的RDD到底是什么"吧!Spark是开源的
千家信息网最后更新 2025年02月23日Spark中的RDD到底是什么

这篇文章主要讲解了"Spark中的RDD到底是什么",文中的讲解内容简单清晰,易于学习与理解,下面请大家跟着小编的思路慢慢深入,一起来研究和学习"Spark中的RDD到底是什么"吧!

Spark是开源的分布式计算引擎,基于RDD来构造数据处理流程,并在集群间调度任务,通过分区数据管理机制来划分任务的并行度,并在任务之间交换分区数据,实现分布式的数据处理。

RDD是Spark中最重要的概念,理解了RDD是什么,基本也就理解了一半Spark的内部机密了。

1、RDD基类

RDD是Spark中表示数据集的基类,是可序列号的对象,因此RDD可在Spark节点中复制。RDD定义了数据迭代器来循环读取数据,以及在数据集上定义各类转换操作,生成新的RDD。

RDD的各种算子会触发生成新的RDD。如:

map操作生成MapPartitionsRDD。

filter操作也生成MapPartitionsRDD,filter操作其实是在之前的RDD迭代器上封装了一层filter操作,其实还是第一个迭代器,只不过这个迭代器会抛弃掉一些不满足的记录。

RDD的计算过程是通过compute方法来触发的。

1.1 RDD触发任务

submit过程是提交spark程序到集群,这时候会触发application事件和driver事件等,并通过master节点选择对应的node来创建app和driver,同时在node上执行spark jar包里的main方法。但task的真正执行要等到RDD的compute动作来触发的。

RDD通过compute触发任务,提交FinalStage给Dag执行。如collect(),count()等方法都会触发compute过程,间接提交任务。

RDD.compute()=> finalStage => dag.submitJob()=> submitMissingStage() .

dag.submitJob()=> scheduleImpl.launchTask()=>scheduleBackend => executorBackend=> executor.launchTask()=> executorBackend.taskComplete msg => scheduleBackend.taskCompleted=>dag.stageFinished()=> ...

上面是RDD提交任务的大致流程。Compute函数是触发函数,这会导致最后一个RDD被执行,也是finalStage;finalStage调用DAG的submitJob函数提交stage,这里的stage就是finalStage。

Stage是从源头到finalStage串起来的,执行的时候是反向寻找的,这句话要好好体会,这个过程其实就是RDD的秘密了。

我们先看下RDD的经典图例。图中中间的部分Transformation是RDD的计算过程,左边的HDFS示意数据源,右边的HDFS示意RDD的finalStage执行的操作(图中的操作是写入hdfs,当然也可以是print操作等等,就看你怎么写了)。

Stage1和stage2是窄依赖,map和union都是窄依赖;stage3是宽依赖,这里是join操作。窄依赖的意思就是操作只依赖一个stage的数据,宽依赖的意思是依赖于多个stage,对这多个stage的数据要做全连接操作。

1.2、RDD执行示例

RDD通过runJob调用来获得执行,如下:

def collect(): Array[T] = withScope {    val results = sc.runJob(this, (iter: Iterator[T]) => iter.toArray)    Array.concat(results: _*)  }

Sc是SparkContext。

对每个分区执行func操作,返回结果是一个长度等于分区数的Array。

Sc.runJob再调dagScheduler.runJob方法。具体可以看DagScheduler的作业执行步骤,这里先不说,看笔者的专门论述DagScheduler的文章。

1.3、迭代器

RDD实际执行是通过迭代器读取数据的。

RDD是抽象类,定义了几个接口:

分别是getPartitions、compute、getPreferredLocations。RDD数据是分区存储,每一个分区可能分布在申请spark资源的任何位置。这三个接口可以描述RDD的全部信息,其中getPreferredLocations这个方法是和计算本地化有关的,这里我们就先忽略它,不影响我们理解RDD的原理。

override protected def getPartitions: Array[Partition] = {}
override def compute(split: Partition, context: TaskContext): Iterator[java.lang.Integer] = new NextIterator[Integer] {}

getPartitions方法我们也不用太关注,它的作用是返回一个分区列表,表示这个RDD有几个分区,实际运行的时候RDD的每个分区会被安排到单独的节点上运行,这样来实现分布式计算的。

我们最关心的是compute的方法,这个方法返回一个迭代器,这个迭代器就是这个RDD的split这个分区的数据集。至于这个迭代器的数据是什么,是在compute方法体中写代码来生成的。我们可以定义自己的RDD,只要写代码实现这几个方法就可以了!

自定义RDD有什么好处呢?最大的好处就是可以把自己的数据集纳入到Spark的分布式计算体系中,帮助你实现数据分区,任务分配,和其他RDD执行全连接汇聚操作等。

言归正传,回到compute方法本身。

怎么获得Iterator[T],对ShuffleRDD来说是从BlockManager获取迭代器Iterator[T]。这种迭代器是blockResult,是ShuffleMapTask执行结果的保存格式;另一种就是直接获得iter,这种是ResultTask的执行结果的数据。

第一种情况,看BlockManager能否找到本RDD的partition的BlockResult。看看getOrElseUpdate方法还传递了一个函数作为最后一个入参,如果不存在指定的BlockResult,则返回入参函数来计算得到iter,方法体定义如下:

() => {  readCachedBlock = false  computeOrReadCheckpoint(partition, context)}

主要就是调用computeOrReadCheckpoint方法计算分区。

def computeOrReadCheckpoint(split: Partition, context: TaskContext): Iterator[T] ={  if (isCheckpointedAndMaterialized) {    firstParent[T].iterator(split, context)  } else {    compute(split, context)  }}

computeOrReadCheckpoint得到Iterator,如果是checkpoint的那么调用第一个父类的iterator方法得到Iterator,这里父类就是CheckpointRDD;否则就是调用compute方法得到Iterator。

所以,RDD的迭代器的实际获取分成两步:

首先,判断是否存在该RDD指定partition的BlockResult,如果存在则将BlockResult作为Iterator结果,此时表示该RDD是shuffleRDD之类。

然后如果上述不满足,则又分两种情况,第一种这是checkpoint的RDD,则调用父RDD的iterator方法(此时父RDD就是CheckpointRDD);否则调用compute方法来获得Iterator。

2、Stage划分

我们知道RDD的提交Spark集群执行是分阶段划分Stage提交的。从最后一个Stage开始,依次循环递归判断是否要调用依赖RDD的Stage,Stage的划分是根据是否要Shuffle作为分界点的。

如果某个RDD的依赖(dep)是ShuffleDependency,则次RDD作为ShuffleMapTask任务提交,否则最后一个RDD作为ResultTask提交。

递归提交Stage,对ShuffleMapTask类型的RDD,会一直递归判断该RDD是否存在前置的ShuffleDependency,如果存在则递归提交前依赖RDD。

整个Spark作业是RDD串接的,如果不存在Shuffle依赖,则提交最后一个RDD,并且只有这一个RDD被提交。在计算最后一个RDD的iterator时,被调用到父RDD的iterator方法,此时父RDD一般都是MapPartitionsRDD。在MapPartitionsRDD中有进一步叙述。

3、RDD子类

RDD含有多个子类,如MapPartitionRDD,HadoopRDD、CoGroupedRDD等等。笔者这里就找几个例子简单说明一下他们的内部逻辑。

3.1 MapPartitionsRDD

MapPartitionsRDD是RDD的子类,前面看到RDD的诸多算子都会生成新的MapPartitionRDD。

MapPartitionsRDD的构造函数需要入参f,它是一个函数抽象类或者叫做泛类。

f: (TaskContext, Int, Iterator[T]) => Iterator[U]

f的入参有三个:

(1) TaskContext:是任务上下文

(2) Int:是分区编码

(3) Iterator[T]是分区迭代器

f的输出也是一个Iterator迭代器。可以看出,f是一个抽象的从一个迭代器生成另一个迭代器的自定义函数。对数据的处理逻辑就是体现在f上。

MapPartitionRDD中触发计算的compute方法定义如下:

override def compute(split: Partition, context: TaskContext): Iterator[U] =    f(context, split.index, firstParent[T].iterator(split, context))

这里的f是MapPartitionRDD的构造函数中传进入的入参,是用户自定义的map函数。这样,通过RDD的map、flatmap等算子和MapPartitionRDD,可以将RDD上的一系列操作不停的串联下去。

3.2 CoalescedRDD

CoalescedRDD将M个分区的RDD重新分成N个分区,形成新的RDD。在计算过程中,会引起Shuffle工程。

首先CoalescedRDD需要一个重新分区算法,将M个分区如何划分到N个分区,这里M>N。重新分区的结果是N的每个分区对应了M的多个分区,用List来表示,List中每个Int表示父RDD中M个分区之一的编号。

如果CoalescedRDD没有指定自己的重新分区算法,则用DefaultPartitionCoalescer来做重新分区计算。

CoalescedRDD的compute过程如下:

override def compute(partition: Partition, context: TaskContext): Iterator[T] = {  partition.asInstanceOf[CoalescedRDDPartition].parents.iterator.flatMap {     parentPartition => firstParent[T].iterator(parentPartition, context)  }}

partition.parents是指CoalescedRDD的第partition分区所对应的父RDD的分区列表,对分区列表的每个分区,执行:

firstParent[T].iterator(parentPartition, context)

然后得到最终的Iterator[T]。这段应该不难理解。

需要留意的是,这里得到的Iterator[T]最终是要写到Shuffle的,因为CoalescedRDD对应的ShuffleMapTask而不是ResultTask。

对于理解Spark计算流程来说,理解了Shuffle的过程,也就解决了一半的疑惑了。

感谢各位的阅读,以上就是"Spark中的RDD到底是什么"的内容了,经过本文的学习后,相信大家对Spark中的RDD到底是什么这一问题有了更深刻的体会,具体使用情况还需要大家实践验证。这里是,小编将为大家推送更多相关知识点的文章,欢迎关注!

方法 数据 迭代 就是 任务 函数 过程 生成 结果 分布式 多个 递归 子类 实际 情况 流程 算子 节点 集群 处理 数据库的安全要保护哪些东西 数据库安全各自的含义是什么 生产安全数据库录入 数据库的安全性及管理 数据库安全策略包含哪些 海淀数据库安全审计系统 建立农村房屋安全信息数据库 易用的数据库客户端支持安全管理 连接数据库失败ssl安全错误 数据库的锁怎样保障安全 洪山专业的软件开发 桂阳计算机软件开发培训 我的世界梦想曙光服务器安装不了 珠海服务软件开发哪家好 嵌入式软件开发工程师学校 网络安全意识和反思 信息化的快速发展与网络安全 数据库平台建设项目党史馆 软件开发产品代办列表模板 嘉定区无线网络技术展示 游戏服务器教程 cpu和高速缓存数据库 网络安全对电子商务的影响有哪些 oa系统需要服务器吗 网络安全贴纸怎么贴 镇政府网络安全工作责任制细则 分区数据库导入ixf 数据库信息越多越好吗 南京光学作图软件开发 如何让一个电脑充当服务器 怎么创建ftp服务器的快捷方式 泗阳巨型网络技术批发价 java生成数据存入数据库 三调耕地资源质量分类数据库 破坏计算机网络安全法 苹果水母服务器多少钱一套 服务器 路数 聚合支付软件开发多少钱 服务器主板一定要插服务器内存吗 雨花区软件开发专业
0