
spark 3.0.1集成delta 0.7.0之delta如何进行DDL操作

delta在0.7.0以前是不能够进行save表操作的,只能存储到文件中,也就是说他的元数据是和spark的其他元数据是分开的,delta是独立存在的,也是不能和其他表进行关联操作的,只有到了delta 0.7.0版本以后,才真正意义上和spark进行了集成,这也得益于spark 3.x的Catalog plugin API 特性。
还是先从delta的configurate sparksession入手,如下:

import org.apache.spark.sql.SparkSessionval spark = SparkSession  .builder()  .appName("...")  .master("...")  .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension")  .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog")  .getOrCreate()

对于第二个配置 config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog") 从spark configuration,我们可以看到对该spark.sql.catalog.spark_catalog的解释是

A catalog implementation that will be used as the v2 interface to Spark's built-in v1 catalog: spark_catalog. This catalog shares its identifier namespace with the spark_catalog and must be consistent with it; for example, if a table can be loaded by the spark_catalog, this catalog must also return the table metadata. To delegate operations to the spark_catalog, implementations can extend 'CatalogExtension'.


spark 3.x的Catalog plugin API

为了能搞懂delta为什么能够进行DDL和DML操作,就得先知道spark 3.x的Catalog plugin机制SPARK-31121.

首先是interface CatalogPlugin,该接口是catalog plugin的顶级接口,正如注释所说:

 * A marker interface to provide a catalog implementation for Spark. * 

* Implementations can provide catalog functions by implementing additional interfaces for tables, * views, and functions. *

* Catalog implementations must implement this marker interface to be loaded by * {@link Catalogs#load(String, SQLConf)}. The loader will instantiate catalog classes using the * required public no-arg constructor. After creating an instance, it will be configured by calling * {@link #initialize(String, CaseInsensitiveStringMap)}. *

* Catalog implementations are registered to a name by adding a configuration option to Spark: * {@code spark.sql.catalog.catalog-name=com.example.YourCatalogClass}. All configuration properties * in the Spark configuration that share the catalog name prefix, * {@code spark.sql.catalog.catalog-name.(key)=(value)} will be passed in the case insensitive * string map of options in initialization with the prefix removed. * {@code name}, is also passed and is the catalog's name; in this case, "catalog-name".

该类的实现还可以集成其他额外的tables views functions的接口,这里就得提到接口TableCatalog,该类提供了与tables相关的方法:

/**   * List the tables in a namespace from the catalog.   * 

* If the catalog supports views, this must return identifiers for only tables and not views. * * @param namespace a multi-part namespace * @return an array of Identifiers for tables * @throws NoSuchNamespaceException If the namespace does not exist (optional). */ Identifier[] listTables(String[] namespace) throws NoSuchNamespaceException; /** * Load table metadata by {@link Identifier identifier} from the catalog. *

* If the catalog supports views and contains a view for the identifier and not a table, this * must throw {@link NoSuchTableException}. * * @param ident a table identifier * @return the table's metadata * @throws NoSuchTableException If the table doesn't exist or is a view */ Table loadTable(Identifier ident) throws NoSuchTableException;

这样就可以基于TableCatalog开发自己的catalog,从而实现multi-catalog support

还得有个接口DelegatingCatalogExtension,这是个实现了CatalogExtension接口的抽象类,而CatalogExtension继承了TableCatalog, SupportsNamespaces。DeltaCatalog实现了DelegatingCatalogExtension ,这部分后续进行分析。
最后还有一个class CatalogManager,这个类是用来管理CatalogPlugins的,且是线程安全的:

/** * A thread-safe manager for [[CatalogPlugin]]s. It tracks all the registered catalogs, and allow * the caller to look up a catalog by name. * * There are still many commands (e.g. ANALYZE TABLE) that do not support v2 catalog API. They * ignore the current catalog and blindly go to the v1 `SessionCatalog`. To avoid tracking current * namespace in both `SessionCatalog` and `CatalogManger`, we let `CatalogManager` to set/get * current database of `SessionCatalog` when the current catalog is the session catalog. */// TODO: all commands should look up table from the current catalog. The `SessionCatalog` doesn't//       need to track current database at all.private[sql]class CatalogManager(    conf: SQLConf,    defaultSessionCatalog: CatalogPlugin,    val v1SessionCatalog: SessionCatalog) extends Logging {

我们看到CatalogManager管理了v2版本的 CatalogPlugin和v1版本的sessionCatalog,这个是因为历史的原因导致必须得兼容v1版本

那CatalogManager在哪里被调用呢。 我们看一下BaseSessionStateBuilder ,可以看到该类中才是正宗使用CatalogManager的地方:

/**   * Catalog for managing table and database states. If there is a pre-existing catalog, the state   * of that catalog (temp tables & current database) will be copied into the new catalog.   *   * Note: this depends on the `conf`, `functionRegistry` and `sqlParser` fields.   */  protected lazy val catalog: SessionCatalog = {    val catalog = new SessionCatalog(      () => session.sharedState.externalCatalog,      () => session.sharedState.globalTempViewManager,      functionRegistry,      conf,      SessionState.newHadoopConf(session.sparkContext.hadoopConfiguration, conf),      sqlParser,      resourceLoader)    parentState.foreach(_.catalog.copyStateTo(catalog))    catalog  }  protected lazy val v2SessionCatalog = new V2SessionCatalog(catalog, conf)  protected lazy val catalogManager = new CatalogManager(conf, v2SessionCatalog, catalog)

SessionCatalog 是v1版本的,主要是跟底层的元数据存储通信,以及管理临时视图,udf的,这一部分暂时不分析,重点放到v2版本的sessionCatalog, 我们看一下V2SessionCatalog:

/** * A [[TableCatalog]] that translates calls to the v1 SessionCatalog. */class V2SessionCatalog(catalog: SessionCatalog, conf: SQLConf)  extends TableCatalog with SupportsNamespaces {  import org.apache.spark.sql.connector.catalog.CatalogV2Implicits.NamespaceHelper  import V2SessionCatalog._  overrideval defaultNamespace: Array[String] = Array("default")  override def name: String = CatalogManager.SESSION_CATALOG_NAME  // This class is instantiated by Spark, so `initialize` method will not be called.  override def initialize(name: String, options: CaseInsensitiveStringMap): Unit = {}  override def listTables(namespace: Array[String]): Array[Identifier] = {    namespace match {      case Array(db) =>        catalog          .listTables(db)          .map(ident => Identifier.of(Array(ident.database.getOrElse("")), ident.table))          .toArray      case _ =>        throw new NoSuchNamespaceException(namespace)    }  }

我们分析一下listTables方法可知,v2的sessionCatalog操作 都是委托给了v1版本的sessionCatalog去操作的,其他的方法也是一样, 而且name默认为CatalogManager.SESSION_CATALOG_NAME,也就是spark_catalog,这里后面也会提到,注意一下。 而且,catalogmanager在逻辑计划中的分析器和优化器中也会用到,因为会用到其中的元数据:

protected def analyzer: Analyzer = new Analyzer(catalogManager, conf) {...protected def optimizer: Optimizer = {    new SparkOptimizer(catalogManager, catalog, experimentalMethods) {      override def earlyScanPushDownRules: Seq[Rule[LogicalPlan]] =        super.earlyScanPushDownRules ++ customEarlyScanPushDownRules      override def extendedOperatorOptimizationRules: Seq[Rule[LogicalPlan]] =        super.extendedOperatorOptimizationRules ++ customOperatorOptimizationRules    }  }

而analyzer和optimizer正是spark sql进行解析的核心中的核心,当然还有物理计划的生成。 那这些analyzer和optimizer是在哪里被调用呢?

 */  def filter(conditionExpr: String): Dataset[T] = {    filter(Column(sparkSession.sessionState.sqlParser.parse_Expression(conditionExpr)))  }


protected lazy val sqlParser: ParserInterface = {    extensions.buildParser(session, new SparkSqlParser(conf))  }

只有整个逻辑 从sql解析到使用元数据的数据链路,我们就能大致知道怎么一回事了。


我们回过头来看看,delta的DeltaCatalog是怎么和spark 3.x进行结合的 ,上源码DeltaCatalog:

class DeltaCatalog(val spark: SparkSession) extends DelegatingCatalogExtension  with StagingTableCatalog  with SupportsPathIdentifier {  def this() = {    this(SparkSession.active)  }  ...


public abstract class DelegatingCatalogExtension implements CatalogExtension {  private CatalogPlugin delegate;  public final void setDelegateCatalog(CatalogPlugin delegate) {    this.delegate = delegate;  }


private def loadV2SessionCatalog(): CatalogPlugin = {    Catalogs.load(SESSION_CATALOG_NAME, conf) match {      case extension: CatalogExtension =>        extension.setDelegateCatalog(defaultSessionCatalog)        extension      case other => other    }  }


private[sql] def v2SessionCatalog: CatalogPlugin = {    conf.getConf(SQLConf.V2_SESSION_CATALOG_IMPLEMENTATION).map { customV2SessionCatalog =>      try {        catalogs.getOrElseUpdate(SESSION_CATALOG_NAME, loadV2SessionCatalog())      } catch {        case NonFatal(_) =>          logError(            "Fail to instantiate the custom v2 session catalog: " + customV2SessionCatalog)          defaultSessionCatalog      }    }.getOrElse(defaultSessionCatalog)  }


   首先得到配置项SQLConf.V2_SESSION_CATALOG_IMPLEMENTATION,也就是spark.sql.catalog.spark_catalog配置,   如果spark配置了的话,就调用loadV2SessionCatalog加载该类,,否则就加载默认的v2SessionCatalog,也就是V2SessionCatalog实例


具体看看DeltaCatalog 的createTable方法,其他的方法类似:

override def createTable(      ident: Identifier,      schema: StructType,      partitions: Array[Transform],      properties: util.Map[String, String]): Table = {    if (DeltaSourceUtils.isDeltaDataSourceName(getProvider(properties))) {      createDeltaTable(        ident, schema, partitions, properties, sourceQuery = None, TableCreationModes.Create)    } else {      super.createTable(ident, schema, partitions, properties)    }  }...private def createDeltaTable(      ident: Identifier,      schema: StructType,      partitions: Array[Transform],      properties: util.Map[String, String],      sourceQuery: Option[LogicalPlan],      operation: TableCreationModes.CreationMode): Table = {     ...    val tableDesc = new CatalogTable(      identifier = TableIdentifier(ident.name(), ident.namespace().lastOption),      tableType = tableType,      storage = storage,      schema = schema,      provider = Some("delta"),      partitionColumnNames = partitionColumns,      bucketSpec = maybeBucketSpec,      properties = tableProperties.toMap,      comment = Option(properties.get("comment")))    // END: copy-paste from the super method finished.    val withDb = verifyTableAndSolidify(tableDesc, None)    ParquetSchemaConverter.checkFieldNames(tableDesc.schema.fieldNames)    CreateDeltaTableCommand(      withDb,      getExistingTableIfExists(tableDesc),      operation.mode,      sourceQuery,      operation,      tableByPath = isByPath).run(spark)    loadTable(ident)      } override def loadTable(ident: Identifier): Table = {    try {      super.loadTable(ident) match {        case v1: V1Table if DeltaTableUtils.isDeltaTable(v1.catalogTable) =>          DeltaTableV2(            spark,            new Path(v1.catalogTable.location),            catalogTable = Some(v1.catalogTable),            tableIdentifier = Some(ident.toString))        case o => o      }      }
  • 判断是否是delta数据源,如果是的话,跳到createDeltaTable方法,否则直接调用super.createTable方法,

  • createDeltaTable先会进行delta特有的CreateDeltaTableCommand.run()命令写入delta数据,之后载loadTable

  • loadTable则会调用super的loadTable,而方法会调用V2SessionCatalog的loadTable,而V2SessionCatalog最终会调用v1版本sessionCatalog的getTableMetadata方法,从而组成V1Table(catalogTable)返回,这样就把delta的元数据信息持久化到了v1 SessionCatalog管理的元数据库中

  • 如果不是delta数据源,则调用super.createTable方法,该方法调用V2SessionCatalog的createTable,而最终还是调用v1版本sessionCatalog的createTable方法



