Java spark中的bug分析
发表于:2025-01-25 作者:千家信息网编辑
千家信息网最后更新 2025年01月25日,这篇文章主要介绍"Java spark中的bug分析",在日常操作中,相信很多人在Java spark中的bug分析问题上存在疑惑,小编查阅了各式资料,整理出简单好用的操作方法,希望对大家解答"Jav
千家信息网最后更新 2025年01月25日Java spark中的bug分析
这篇文章主要介绍"Java spark中的bug分析",在日常操作中,相信很多人在Java spark中的bug分析问题上存在疑惑,小编查阅了各式资料,整理出简单好用的操作方法,希望对大家解答"Java spark中的bug分析"的疑惑有所帮助!接下来,请跟着小编一起来学习吧!
在spark 中存在一个bug,该bug的详细信息如下:
None.getjava.util.NoSuchElementException: None.getscala.None$.get(Option.scala:529)scala.None$.get(Option.scala:527)org.apache.spark.sql.execution.FileSourceScanExec.needsUnsafeRowConversion$lzycompute(DataSourceScanExec.scala:178)org.apache.spark.sql.execution.FileSourceScanExec.needsUnsafeRowConversion(DataSourceScanExec.scala:176)org.apache.spark.sql.execution.FileSourceScanExec.doExecute(DataSourceScanExec.scala:463)org.apache.spark.sql.execution.SparkPlan.$anonfun$execute$1(SparkPlan.scala:175)org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:213)org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:210)org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:171)org.apache.spark.sql.execution.InputAdapter.inputRDD(WholeStageCodegenExec.scala:525)org.apache.spark.sql.execution.InputRDDCodegen.inputRDDs(WholeStageCodegenExec.scala:453)org.apache.spark.sql.execution.InputRDDCodegen.inputRDDs$(WholeStageCodegenExec.scala:452)org.apache.spark.sql.execution.InputAdapter.inputRDDs(WholeStageCodegenExec.scala:496)org.apache.spark.sql.execution.FilterExec.inputRDDs(basicPhysicalOperators.scala:133)org.apache.spark.sql.execution.ProjectExec.inputRDDs(basicPhysicalOperators.scala:47)org.apache.spark.sql.execution.WholeStageCodegenExec.doExecute(WholeStageCodegenExec.scala:720)org.apache.spark.sql.execution.SparkPlan.$anonfun$execute$1(SparkPlan.scala:175)org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:213)org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:210)org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:171)org.apache.spark.sql.execution.DeserializeToObjectExec.doExecute(objects.scala:96)org.apache.spark.sql.execution.SparkPlan.$anonfun$execute$1(SparkPlan.scala:175)org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:213)org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:210)org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:171)org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:122)org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:121)org.apache.spark.sql.Dataset.rdd$lzycompute(Dataset.scala:3200)org.apache.spark.sql.Dataset.rdd(Dataset.scala:3198)
根据源码定位FileSourceScanExec,定位到如下位置:
SparkSession.getActiveSession.get.sessionState.conf.parquetVectorizedReaderEnabled
SparkSession.getActiveSession.get的内容如下:
/** * Returns the active SparkSession for the current thread, returned by the builder. * * @note Return None, when calling this function on executors * * @since 2.2.0 */ def getActiveSession: Option[SparkSession] = { if (TaskContext.get != null) { // Return None when running on executors. None } else { Option(activeThreadSession.get) } }
正如注释所写的一样,当在executors端获取SparkSession的时候,直接返回None。 为什么直接返回none,可以参考spark-pr-21436
当然这个问题,已经有人发现了并且提交了pr-29667,所以拿到commitID(37a660866342f2d64ad2990a5596e67cfdf044c0)直接cherry-pick就ok了,
分析一下原因: 其实该原因就是同一个jvm中,两个不同的线程同步调用,就如unit test所示:
test("SPARK-32813: Table scan should work in different thread") { val executor1 = Executors.newSingleThreadExecutor() val executor2 = Executors.newSingleThreadExecutor() var session: SparkSession = null SparkSession.cleanupAnyExistingSession() withTempDir { tempDir => try { val tablePath = tempDir.toString + "/table" val df = ThreadUtils.awaitResult(Future { session = SparkSession.builder().appName("test").master("local[*]").getOrCreate() session.createDataFrame( session.sparkContext.parallelize(Row(Array(1, 2, 3)) :: Nil), StructType(Seq( StructField("a", ArrayType(IntegerType, containsNull = false), nullable = false)))) .write.parquet(tablePath) session.read.parquet(tablePath) }(ExecutionContext.fromExecutorService(executor1)), 1.minute) ThreadUtils.awaitResult(Future { assert(df.rdd.collect()(0) === Row(Seq(1, 2, 3))) }(ExecutionContext.fromExecutorService(executor2)), 1.minute) } finally { executor1.shutdown() executor2.shutdown() session.stop() } } }
到此,关于"Java spark中的bug分析"的学习就结束了,希望能够解决大家的疑惑。理论与实践的搭配能更好的帮助大家学习,快去试试吧!若想继续学习更多相关知识,请继续关注网站,小编会继续努力为大家带来更多实用的文章!
分析
学习
原因
更多
问题
定位
帮助
不同
实用
接下来
两个
位置
信息
内容
就是
文章
方法
时候
正如
注释
数据库的安全要保护哪些东西
数据库安全各自的含义是什么
生产安全数据库录入
数据库的安全性及管理
数据库安全策略包含哪些
海淀数据库安全审计系统
建立农村房屋安全信息数据库
易用的数据库客户端支持安全管理
连接数据库失败ssl安全错误
数据库的锁怎样保障安全
污染源普查数据库报价
华为上网行为管理认证服务器
河北士凯互联网科技有限公司
服务器cpu散热器哪家便宜
获取本地指定的db2数据库名
数据库索引读写效率
怎么打游戏服务器ip
常用的下载外文的数据库
武汉丿aVa软件开发招聘
服务器可以开网站吗
核恐怖主义网络安全
服务器安全策略的制定
山东存储服务器怎么收费
卵巢肿瘤标志物数据库
山西智游臻龙互联网科技
网站网络安全事件应急预案
网络技术贴
dba初级数据库管理员待遇
在vs里控件链接数据库
二本软件开发
阿里巴巴云服务器登录密码
沈阳直销系统软件开发
做软件开发需要哪些设备
安装hp服务器
终端拷贝文件夹到服务器
网络安全宣传画小报
怎么看数据库主机内存
那时花开下载软件开发
正当防卫无法与服务器连接
在vs里控件链接数据库