千家信息网

Java spark中的bug分析

发表于:2025-01-25 作者:千家信息网编辑
千家信息网最后更新 2025年01月25日,这篇文章主要介绍"Java spark中的bug分析",在日常操作中,相信很多人在Java spark中的bug分析问题上存在疑惑,小编查阅了各式资料,整理出简单好用的操作方法,希望对大家解答"Jav
千家信息网最后更新 2025年01月25日Java spark中的bug分析

这篇文章主要介绍"Java spark中的bug分析",在日常操作中,相信很多人在Java spark中的bug分析问题上存在疑惑,小编查阅了各式资料,整理出简单好用的操作方法,希望对大家解答"Java spark中的bug分析"的疑惑有所帮助!接下来,请跟着小编一起来学习吧!

在spark 中存在一个bug,该bug的详细信息如下:

None.getjava.util.NoSuchElementException: None.getscala.None$.get(Option.scala:529)scala.None$.get(Option.scala:527)org.apache.spark.sql.execution.FileSourceScanExec.needsUnsafeRowConversion$lzycompute(DataSourceScanExec.scala:178)org.apache.spark.sql.execution.FileSourceScanExec.needsUnsafeRowConversion(DataSourceScanExec.scala:176)org.apache.spark.sql.execution.FileSourceScanExec.doExecute(DataSourceScanExec.scala:463)org.apache.spark.sql.execution.SparkPlan.$anonfun$execute$1(SparkPlan.scala:175)org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:213)org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:210)org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:171)org.apache.spark.sql.execution.InputAdapter.inputRDD(WholeStageCodegenExec.scala:525)org.apache.spark.sql.execution.InputRDDCodegen.inputRDDs(WholeStageCodegenExec.scala:453)org.apache.spark.sql.execution.InputRDDCodegen.inputRDDs$(WholeStageCodegenExec.scala:452)org.apache.spark.sql.execution.InputAdapter.inputRDDs(WholeStageCodegenExec.scala:496)org.apache.spark.sql.execution.FilterExec.inputRDDs(basicPhysicalOperators.scala:133)org.apache.spark.sql.execution.ProjectExec.inputRDDs(basicPhysicalOperators.scala:47)org.apache.spark.sql.execution.WholeStageCodegenExec.doExecute(WholeStageCodegenExec.scala:720)org.apache.spark.sql.execution.SparkPlan.$anonfun$execute$1(SparkPlan.scala:175)org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:213)org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:210)org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:171)org.apache.spark.sql.execution.DeserializeToObjectExec.doExecute(objects.scala:96)org.apache.spark.sql.execution.SparkPlan.$anonfun$execute$1(SparkPlan.scala:175)org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:213)org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:210)org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:171)org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:122)org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:121)org.apache.spark.sql.Dataset.rdd$lzycompute(Dataset.scala:3200)org.apache.spark.sql.Dataset.rdd(Dataset.scala:3198)

根据源码定位FileSourceScanExec,定位到如下位置:

 SparkSession.getActiveSession.get.sessionState.conf.parquetVectorizedReaderEnabled

SparkSession.getActiveSession.get的内容如下:

/**   * Returns the active SparkSession for the current thread, returned by the builder.   *   * @note Return None, when calling this function on executors   *   * @since 2.2.0   */  def getActiveSession: Option[SparkSession] = {    if (TaskContext.get != null) {      // Return None when running on executors.      None    } else {      Option(activeThreadSession.get)    }  }

正如注释所写的一样,当在executors端获取SparkSession的时候,直接返回None。 为什么直接返回none,可以参考spark-pr-21436
当然这个问题,已经有人发现了并且提交了pr-29667,所以拿到commitID(37a660866342f2d64ad2990a5596e67cfdf044c0)直接cherry-pick就ok了,

分析一下原因: 其实该原因就是同一个jvm中,两个不同的线程同步调用,就如unit test所示:

test("SPARK-32813: Table scan should work in different thread") {    val executor1 = Executors.newSingleThreadExecutor()    val executor2 = Executors.newSingleThreadExecutor()    var session: SparkSession = null    SparkSession.cleanupAnyExistingSession()    withTempDir { tempDir =>      try {        val tablePath = tempDir.toString + "/table"        val df = ThreadUtils.awaitResult(Future {          session = SparkSession.builder().appName("test").master("local[*]").getOrCreate()          session.createDataFrame(            session.sparkContext.parallelize(Row(Array(1, 2, 3)) :: Nil),            StructType(Seq(              StructField("a", ArrayType(IntegerType, containsNull = false), nullable = false))))            .write.parquet(tablePath)          session.read.parquet(tablePath)        }(ExecutionContext.fromExecutorService(executor1)), 1.minute)        ThreadUtils.awaitResult(Future {          assert(df.rdd.collect()(0) === Row(Seq(1, 2, 3)))        }(ExecutionContext.fromExecutorService(executor2)), 1.minute)      } finally {        executor1.shutdown()        executor2.shutdown()        session.stop()      }    }  }

到此,关于"Java spark中的bug分析"的学习就结束了,希望能够解决大家的疑惑。理论与实践的搭配能更好的帮助大家学习,快去试试吧!若想继续学习更多相关知识,请继续关注网站,小编会继续努力为大家带来更多实用的文章!

0