千家信息网

Spark生产作业容错能力的负面影响有哪些

发表于:2024-10-03 作者:千家信息网编辑
千家信息网最后更新 2024年10月03日,这篇文章主要讲解了"Spark生产作业容错能力的负面影响有哪些",文中的讲解内容简单清晰,易于学习与理解,下面请大家跟着小编的思路慢慢深入,一起来研究和学习"Spark生产作业容错能力的负面影响有哪些
千家信息网最后更新 2024年10月03日Spark生产作业容错能力的负面影响有哪些

这篇文章主要讲解了"Spark生产作业容错能力的负面影响有哪些",文中的讲解内容简单清晰,易于学习与理解,下面请大家跟着小编的思路慢慢深入,一起来研究和学习"Spark生产作业容错能力的负面影响有哪些"吧!

1. Spark TaskLocality

在 Spark 中数据本地性通过 TaskLocality 来表示,有如下几个级别,

  • PROCESS_LOCAL

  • NODE_LOCAL

  • NO_PREF

  • RACK_LOCAL

  • ANY

从上到下数据本地性依次递减。

Spark 在执行前通过数据的分区信息进行计算 Task 的 Locality,Task 总是会被优先分配到它要计算的数据所在节点以尽可能地减少网络 IO。这个计算的过程通过 spark.locality.wait 默认为3s,控制这个计算的过程。

2. Spark 内部容错

原理这里不细讲,简而言之就是重试。Spark 规定了同一个 Job 中同一个 Stage 连续失败重试的上限(spark.stage.maxConsecutiveAttempts),默认为4,也规定了一个 Stage 中 同一个 Task 可以失败重试的次数(spark.task.maxFailures),默认为4。当其中任何一个阈值达到上限,Spark 都会使整个 Job 失败,停止可能的"无意义"的重试。

3. 数据本地性和容错的冲突

我们首先来看一个例子,如图所示,图为 Spark Stage 页面下 Task Page 的详细视图。

  • 第一列表示该 Task 进行了4次重试,所以这个 Task 对应的 Job 也因此失败了。

  • 第三列表示该 Task 的数据本地性,都是 NODE_LOCAL 级别,对于一个从HDFS读取数据的任务,显然获得了最优的数据本地性

  • 第四列表示的是 Executor ID,我们可以看到我们任务的重试被分配到ID 为5和6两个 Executor 上

  • 第五列表示我们运行这些重试的 Task 所在的 Executor 所在的物理机地址,我们可以看到他们都被调度到了同一个

  • 最后列表示每次重试失败的错误栈

3.1 问题一:单个 Task 重试为什么失败?

结合硬件层面的排查,发现是 NodeManager 物理节点上挂在的 /mnt/dfs/4,出现硬件故障导致盘只读,ShuffleMapTask 在即将完成时,将index文件和data文件commit时,获取index的临时文件时候发生FileNotFoundException

java.io.FileNotFoundException: /mnt/dfs/4/yarn/local/usercache/da_haitao/appcache/application_1568691584183_1953115/blockmgr-1b6553f2-a564-4b31-a4a6-031f21c9c30f/0a/shuffle_96_2685_0.index.82594412-1f46-465e-a067-2c5e386a978e (No such file or directory)    at java.io.FileOutputStream.open0(Native Method)    at java.io.FileOutputStream.open(FileOutputStream.java:270)    at java.io.FileOutputStream.(FileOutputStream.java:213)    at java.io.FileOutputStream.(FileOutputStream.java:162)    at org.apache.spark.shuffle.IndexShuffleBlockResolver.writeIndexFileAndCommit(IndexShuffleBlockResolver.scala:144)    at org.apache.spark.shuffle.sort.UnsafeShuffleWriter.closeAndWriteOutput(UnsafeShuffleWriter.java:245)    at org.apache.spark.shuffle.sort.UnsafeShuffleWriter.write(UnsafeShuffleWriter.java:190)    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:96)    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53)    at org.apache.spark.scheduler.Task.run(Task.scala:109)

3.2 问题二:为什么该 Task 的4次重试都在同一个物理节点?

这是由于 Driver 在调度该 Task 的时候进行了数据本地性的运算,而且在 spark.locality.wait 默认为3s的时间约束内成功获得了NODE_LOCAL级别的数据本地性,故而都调度到了同一个 NodeManger 物理节点。

3.3 问题三:为什么总是"本地重试",不是"异地重试"?
这个过程从逻辑上讲,其实已经不是"本地重试",而恰恰是"异地重试"了。这我们可以从4次的重试的 Executor ID 上进行判断,第0、1和3次是在 ID 6上进行的,而第2次是在 ID 5上发生的。但由于ID 5和6都在同一个 NodeManger 节点,所以我们看起来像是"本地重试"。另一个原因就是上面所说的数据本地性的成功解析,所以这些 Task 的每次重试都高概率的来到这个节点。
所有 Spark Task 级别的重试从逻辑上都应该属于"异地重试",他们都需要通过 Driver 重新调度到新的 Executor 进行重试。我们所观测到的"本地"和"异地"是属于"现象"而非"本质",影响这种现象的条件有比如下面几个(不一定全面):1. 数据本地性 2. Executor 由于 NodeLabel 限制,只在若干有限的物理机上分配 3. ResourceManager 调度时刚好把所有的 Executor 都分配到某个节点上。
3.4 问题5:为什么4次失败都操作同一个坏的盘?
该 NodeManger 实际上有/mnt/dfs/{0-11}, 一共12块盘,从物理检查上看,整个过程中也只有/mnt/dfs/4有异常告警,那为啥 Spark 这么傻?这么多好盘不用,专挑一块坏的盘死磕?
我们可以先看下出错的文件,我们包这个文件分成5个部分来看,
      1. /mnt/dfs/4/yarn/local/2. usercache/da_haitao/appcache/application_1568691584183_1953115/ blockmgr-1b6553f2-a564-4b31-a4a6-031f21c9c30f/3. 0a/4. shuffle_96_2685_0.index5. .82594412-1f46-465e-a067-2c5e386a978e
  • 第一行,是 Yarn NodeManger 所配置的LOCAL_DIR的一部分,完整的应该包括12块盘
  • 第二行,是 Spark 生成的 BlockManger 的根目录之一,其他盘符下也有类似的一个目录
  • 第三行,是一个根目录下的一级子目录,数量由 spark.diskStore.subDirectories 默认为64控制
  • 第四行,Spark Shuffle 过程产生的两个重要的文件之一,一个是数据文件 .data 结尾,另一个就是这个与之对应的 .index 文件。96是 ShuffleID 表标识是哪个Shuffle 过程,2685是 MapID 对应的是 一个RDD 所以有分区中其中一个的顺序号, 而0是一个固定值,原本表示是ReduceID,Spark Sort Based Shuffle 的实现不需要依赖这个值,所以被固定为了0。通过Shuffle ID和 MapId,Shufle Write 阶段就可以生成类似shuffle_96_2685_0.index这样的文件,而Shuffle Read 阶段也可以通过两个ID 定位到这个文件。
  • 第五行, 是Index文件的对应临时文件的UUID标识。
基于这样的逻辑,对于某次Shuffle 过程的某个分区(Partition)的最终输出文件名其实是可以预测的也是固定的,比如我们这个 case 中,第96次shuffle的第2685分区的 index 文件的文件名即为shuffle_96_2685_0.index。
Spark 在写和读这个文件的时候,基于相同的定位逻辑(算法)来保证依赖关系,
第一步确定根目录,Spark 通过文件名的hash绝对值与盘符数的模,作为索引却确定根目录
  scala> math.abs("shuffle_96_2685_0.index".hashCode) % 12res0: Int = 6
而根目录的数组对于一个 Executor 的这个生命周期内而言是确定的,它是一个由简单随机算法将所有路径打散的一个固定数组。所以一旦文件名称确定,Executor 不换的话,根目录一定是确定的。所以都固定的去访问/mnt/dfs/4这个坏盘。
但这只解释了一个 Executor 所被分配 Task 失败的原因,我们的 Task 还在不同的 executor 上进行过尝试。
3.5 问题5:为什么两个 Executor 上的重试都失败了?
其实这个问题只是概率的问题, Spark 用类似下面算法打乱所有LOCAL_DIRS的配置,如下面的的简单测试,这种碰撞的概率还是极高的,我们ID 5,6,的 Executor 下 DiskBlockManager 包含的 localDirs(6)应该都对应于 /mnt/dfs/4 这个坏盘。
scala> def randomizeInPlace[T](arr: Array[Int], rand: java.util.Random = new java.util.Random): Array[Int] = {     |     for (i <- (arr.length - 1) to 1 by -1) {     |       val j = rand.nextInt(i + 1)     |       val tmp = arr(j)     |       arr(j) = arr(i)     |       arr(i) = tmp     |     }     |     arr     |   }randomizeInPlace: [T](arr: Array[Int], rand: java.util.Random)Array[Int]scala> randomizeInPlace(res11)res23: Array[Int] = Array(3, 2, 4, 1)
scala> randomizeInPlace(res11)res24: Array[Int] = Array(2, 3, 4, 1)
scala> randomizeInPlace(res11)res25: Array[Int] = Array(2, 1, 3, 4)
scala> randomizeInPlace(res11)res26: Array[Int] = Array(4, 2, 1, 3)
scala> randomizeInPlace(res11)res27: Array[Int] = Array(2, 3, 4, 1)

感谢各位的阅读,以上就是"Spark生产作业容错能力的负面影响有哪些"的内容了,经过本文的学习后,相信大家对Spark生产作业容错能力的负面影响有哪些这一问题有了更深刻的体会,具体使用情况还需要大家实践验证。这里是,小编将为大家推送更多相关知识点的文章,欢迎关注!

文件 数据 问题 节点 过程 容错 根目录 物理 影响 分配 调度 能力 作业 生产 两个 就是 异地 级别 逻辑 所在 数据库的安全要保护哪些东西 数据库安全各自的含义是什么 生产安全数据库录入 数据库的安全性及管理 数据库安全策略包含哪些 海淀数据库安全审计系统 建立农村房屋安全信息数据库 易用的数据库客户端支持安全管理 连接数据库失败ssl安全错误 数据库的锁怎样保障安全 电脑网络技术怎样收费 青少年网络安全探究 服务器安装软件提示要管理员权限 lol登陆服务器连接异常 公安部网络技术研究中心 成都中国网络安全大会 服务器和客户电脑添加用户 idm设置代理服务器 华宇软件开发有限公司招聘 网络安全行业会议 中国的互联网科技有哪些产品 数据库的结构文件类型 阴阳师有网络但是无法连接服务器 成都 服务器托管 软件开发难还是维护难 无线路由器访问服务器设置 合作百度推广的软件开发行业 数据库之间的联系 微软服务器应用程序服务去哪儿了 sis系统用什么软件开发的 搭建流媒体点播服务器 敏捷软件开发思想 服务器网站安全软件有哪些 网络安全监测和态势感知技术 汕头数据链软件开发供应商 服务器虚拟化基础架构 戴尔服务器自带系统还原教程 谷歌dns解析服务器 中皮魔兽数据库 服务器为何都用centos
0