千家信息网

结构化处理之Spark Session的示例分析

发表于:2025-01-23 作者:千家信息网编辑
千家信息网最后更新 2025年01月23日,小编给大家分享一下结构化处理之Spark Session的示例分析,相信大部分人都还不怎么了解,因此分享这篇文章给大家参考一下,希望大家阅读完这篇文章后大有收获,下面让我们一起去了解一下吧!创建Dat
千家信息网最后更新 2025年01月23日结构化处理之Spark Session的示例分析

小编给大家分享一下结构化处理之Spark Session的示例分析,相信大部分人都还不怎么了解,因此分享这篇文章给大家参考一下,希望大家阅读完这篇文章后大有收获,下面让我们一起去了解一下吧!

创建DataFrame,有三种模式,一种是sql()主要是访问Hive表;一种是从RDD生成DataFrame,主要从ExistingRDD开始创建;还有一种是read/format格式,从json/txt/csv等数据源格式创建。

先看看第三种方式的创建流程。

1、read/format

def read: DataFrameReader = new DataFrameReader(self)

SparkSession.read()方法直接创建DataFrameReader,然后再DataFrameReader的load()方法来导入外部数据源。load()方法主要逻辑如下:

def load(paths: String*): DataFrame = {    sparkSession.baseRelationToDataFrame(      DataSource.apply(        sparkSession,        paths = paths,        userSpecifiedSchema = userSpecifiedSchema,        className = source,        options = extraOptions.toMap).resolveRelation())  }

创建对应数据源类型的DataSource,DataSource解析成BaseRelation,然后通过SparkSession的baseRelationToDataFrame方法从BaseRelation映射生成DataFrame。从BaseRelation创建LogicalRelation,然后调用Dataset.ofRows方法从LogicalRelation创建DataFrame。DataFrame实际就是Dataset。

type DataFrame = Dataset[Row]

baseRelationToDataFrame的定义:

def baseRelationToDataFrame(baseRelation: BaseRelation): DataFrame = {    Dataset.ofRows(self, LogicalRelation(baseRelation))  }

Dataset.ofRows方法主要是将逻辑计划转换成物理计划,然后生成新的Dataset

2、执行

SparkSession的执行关键是如何从LogicalPlan生成物理计划。我们试试跟踪这部分逻辑。

def count(): Long = withAction("count", groupBy().count().queryExecution) { plan =>

plan.executeCollect().head.getLong(0)

}

Dataset的count()动作触发物理计划的执行,调用物理计划plan的executeCollect方法,该方法实际上会调用doExecute()方法生成Array[InternalRow]格式。executeCollect方法在SparkPlan中定义。

3、HadoopFsRelation

需要跟踪下如何从HadoopFsRelation生成物理计划(也就是SparkPlan)

通过FileSourceStrategy来解析。它在FileSourceScanExec上叠加Filter和Projection等操作,看看FileSourceScanExec的定义:

case class FileSourceScanExec(    @transient relation: HadoopFsRelation,    output: Seq[Attribute],    requiredSchema: StructType,    partitionFilters: Seq[Expression],    dataFilters: Seq[Expression],    overrideval metastoreTableIdentifier: Option[TableIdentifier])  extends DataSourceScanExec with ColumnarBatchScan  {。。。}

它的主要执行代码doExecute()的功能逻辑如下:

protected override def doExecute(): RDD[InternalRow] = {    if (supportsBatch) {      // in the case of fallback, this batched scan should never fail because of:      // 1) only primitive types are supported      // 2) the number of columns should be smaller than spark.sql.codegen.maxFields      WholeStageCodegenExec(this).execute()    } else {      val unsafeRows = {        val scan = inputRDD        if (needsUnsafeRowConversion) {          scan.mapPartitionsWithIndexInternal { (index, iter) =>            val proj = UnsafeProjection.create(schema)            proj.initialize(index)            iter.map(proj)          }        } else {          scan        }      }      val numOutputRows = longMetric("numOutputRows")      unsafeRows.map { r =>        numOutputRows += 1        r      }    }  }

inputRDD有两种方式创建,一是createBucketedReadRDD,二是createNonBucketedReadRDD。两者没有本质的区别,仅仅是文件分区规则的不同。

private lazy val inputRDD: RDD[InternalRow] = {    val readFile: (PartitionedFile) => Iterator[InternalRow] =      relation.fileFormat.buildReaderWithPartitionValues(        sparkSession = relation.sparkSession,        dataSchema = relation.dataSchema,        partitionSchema = relation.partitionSchema,        requiredSchema = requiredSchema,        filters = pushedDownFilters,        options = relation.options,        hadoopConf = relation.sparkSession.sessionState.newHadoopConfWithOptions(relation.options))    relation.bucketSpec match {      case Some(bucketing) if relation.sparkSession.sessionState.conf.bucketingEnabled =>        createBucketedReadRDD(bucketing, readFile, selectedPartitions, relation)      case _ =>        createNonBucketedReadRDD(readFile, selectedPartitions, relation)    }  }createNonBucketedReadRDD调用FileScanRDD :new FileScanRDD(fsRelation.sparkSession, readFile, partitions)

以上是"结构化处理之Spark Session的示例分析"这篇文章的所有内容,感谢各位的阅读!相信大家都有了一定的了解,希望分享的内容对大家有所帮助,如果还想学习更多知识,欢迎关注行业资讯频道!

0