千家信息网

Spark分区并行度决定机制

发表于:2025-01-23 作者:千家信息网编辑
千家信息网最后更新 2025年01月23日,本篇内容主要讲解"Spark分区并行度决定机制",感兴趣的朋友不妨来看看。本文介绍的方法操作简单快捷,实用性强。下面就让小编来带大家学习"Spark分区并行度决定机制"吧!大家都知道Spark job
千家信息网最后更新 2025年01月23日Spark分区并行度决定机制

本篇内容主要讲解"Spark分区并行度决定机制",感兴趣的朋友不妨来看看。本文介绍的方法操作简单快捷,实用性强。下面就让小编来带大家学习"Spark分区并行度决定机制"吧!

大家都知道Spark job中最小执行单位为task,合理设置Spark job每个stage的task数是决定性能好坏的重要因素之一,但是Spark自己确定最佳并行度的能力有限,这就要求我们在了解其中内在机制的前提下,去各种测试、计算等来最终确定最佳参数配比。

Spark任务在执行时会将RDD划分为不同的stage,一个stage中task的数量跟最后一个RDD的分区数量相同。之前已经介绍过,stage划分的关键是宽依赖,而宽依赖往往伴随着shuffle操作。对于一个stage接收另一个stage的输入,这种操作通常都会有一个参数numPartitions来显示指定分区数。最典型的就是一些ByKey算子,比如groupByKey(numPartitions: Int),但是这个分区数需要多次测试来确定合适的值。首先确定父RDD中的分区数(通过rdd.partitions().size()可以确定RDD的分区数),然后在此基础上增加分区数,多次调试直至在确定的资源任务能够平稳、安全的运行。
对于没有父RDD的RDD,比如通过加载HDFS上的数据生成的RDD,它的分区数由InputFormat切分机制决定。通常就是一个HDFS block块对应一个分区,对于不可切分文件则一个文件对应一个分区。

对于通过SparkContext的parallelize方法或者makeRDD生成的RDD分区数可以直接在方法中指定,如果未指定,则参考spark.default.parallelism的参数配置。下面是默认情况下确定defaultParallelism的源码:
override def defaultParallelism(): Int = {
conf.getInt("spark.default.parallelism", math.max(totalCoreCount.get(), 2))
}

通常,RDD的分区数与其所依赖的RDD的分区数相同,除非shuffle。但有几个特殊的算子:

1.coalesce和repartition算子

笔者先放两张关于该coalesce算子分别在RDD和DataSet中的源码图:(DataSet是Spark SQL中的分布式数据集,后边说到Spark时再细讲)

通过coalesce源码分析,无论是在RDD中还是DataSet,默认情况下coalesce不会产生shuffle,此时通过coalesce创建的RDD分区数小于等于父RDD的分区数。

笔者这里就不放repartition算子的源码了,分析起来也比较简单,图中我有所提示。但笔者建议,如下两种情况,请使用repartition算子:

1)增加分区数
repartition触发shuffle,shuffle的情况下可以增加分区数。

coalesce默认不触发shuffle,即使调用该算子增加分区数,实际情况是分区数仍然是当前的分区数。

2)极端情况减少分区数,比如将分区数减少为1
调整分区数为1,此时数据处理上游stage并行度降,很影响性能。此时repartition的优势即不改变原来stage的并行度就体现出来了,在大数据量下,更为明显。
但需要注意,因为repartition会触发shuffle,而要衡量好shuffle产生的代价和因为用repartition增加并行度带来的效益。

2.union算子

还是直接看源码:

通过分析源码,RDD在调用union算子时,最终生成的RDD分区数分两种情况:
1)union的RDD分区器已定义并且它们的分区器相同

多个父RDD具有相同的分区器,union后产生的RDD的分区器与父RDD相同且分区数也相同。比如,n个RDD的分区器相同且是defined,分区数是m个。那么这n个RDD最终union生成的一个RDD的分区数仍是m,分区器也是相同的

2)不满足第一种情况,则通过union生成的RDD的分区数为父RDD的分区数之和
4.cartesian算子

通过上述coalesce、repartition、union算子介绍和源码分析,很容易分析cartesian算子的源码。通过cartesian得到RDD分区数是其父RDD分区数的乘积。

在Spark SQL中,任务并行度参数则要参考spark.sql.shuffle.partitions,笔者这里先放一张图,详细的后面讲到Spark SQL时再细说:

看下图在Spark流式计算中,通常将SparkStreaming和Kafka整合,这里又分两种情况:

1.Receiver方式生成的微批RDD即BlockRDD,分区数就是block数

2.Direct方式生成的微批RDD即kafkaRDD,分区数和kafka分区数一一对应

到此,相信大家对"Spark分区并行度决定机制"有了更深的了解,不妨来实际操作一番吧!这里是网站,更多相关内容可以进入相关频道进行查询,关注我们,继续学习!

算子 情况 相同 源码 生成 机制 分析 参数 数据 笔者 任务 就是 方法 内容 实际 性能 数量 文件 方式 还是 数据库的安全要保护哪些东西 数据库安全各自的含义是什么 生产安全数据库录入 数据库的安全性及管理 数据库安全策略包含哪些 海淀数据库安全审计系统 建立农村房屋安全信息数据库 易用的数据库客户端支持安全管理 连接数据库失败ssl安全错误 数据库的锁怎样保障安全 网络安全手抄报图片边框 伊犁网络技术哪个好 前端软件开发培训排名 网络安全为人民的主题班会 网络安全课程讲课视频 互联网专业和网络技术有区别吗 一台主机做服务器需要啥配置 批量管理域控服务器 服务器硬件测试工具 生活中如何应对网络安全问题 山东商城软件开发企业 服务器如何查询电源型号 保电期间网络安全防护工作心得 新疆手机软件开发公司简介 qt 删除数据库表 数据 武汉科技大学互联网 数据库实训项目经历描述 农安智能化网络技术服务诚信合作 河北区有哪些服务器云主机 电子科技大学网络安全考研难吗 小森生活服务器已达上限怎么解决 适合金融软件开发外包公司 web怎么与数据库建立连接 属于4G网络技术的是 系统软件开发包和应用软件 南宁网络安全局电话 精达股份经营涉及软件开发吗 通信软件开发行业前景 林业一张图数据库代码gllx 微信游戏用什么软件开发
0