千家信息网

如何进行delta lake 的curd操作

发表于:2025-02-02 作者:千家信息网编辑
千家信息网最后更新 2025年02月02日,这篇文章给大家介绍如何进行delta lake 的curd操作,内容非常详细,感兴趣的小伙伴们可以参考借鉴,希望对大家能有所帮助。delta lake 的表支持删除和更新数据的语法,下面主要是从sql
千家信息网最后更新 2025年02月02日如何进行delta lake 的curd操作

这篇文章给大家介绍如何进行delta lake 的curd操作,内容非常详细,感兴趣的小伙伴们可以参考借鉴,希望对大家能有所帮助。

delta lake 的表支持删除和更新数据的语法,下面主要是从sql和scala两个语法说起吧。

1. 删除delta 表数据

可以根据查询条件,从delta表中删除数据,比如删除日期在2017年之前的数据,sql和scala的表达语法如下。

sql


DELETE FROM events WHERE date < '2017-01-01'
DELETE FROM delta.`/data/events/` WHERE date < '2017-01-01'

scala


import io.delta.tables._
val deltaTable = DeltaTable.forPath(spark, "/data/events/")
deltaTable.delete("date < '2017-01-01'") // predicate using SQL formatted string
import org.apache.spark.sql.functions._import spark.implicits._
deltaTable.delete(col("date") < "2017-01-01") // predicate using Spark SQL functions and implicits

请注意,delete操作会将数据从delta 表的最新版本中删除,但其实只有到历史版本直接被vacuum清空的时候,才会从物理存储中删除数据。

2. 更新表

可以更新满足条件的表。比如想更新eventType的字段字符串的编写失误,可以使用下面的表达,sql和scala的表达分别如下:

sql

UPDATE events SET eventType = 'click' WHERE eventType = 'clck'UPDATE delta.`/data/events/` SET eventType = 'click' WHERE eventType = 'clck'

scala


import io.delta.tables._
val deltaTable = DeltaTable.forPath(spark, "/data/events/")
deltaTable.updateExpr( // predicate and update expressions using SQL formatted string "eventType = 'clck'", Map("eventType" -> "'click'")
import org.apache.spark.sql.functions._import spark.implicits._
deltaTable.update( // predicate using Spark SQL functions and implicits col("eventType") === "clck", Map("eventType" -> lit("click")));

3.merge算子实现upsert操作

使用merge操作可以将source表,view,dataframe中的数据upsert到目标的delta lake表中。该操作很像传统数据库的merge into操作,但是额外的支持删除操作,和更新,插入和删除的额外条件。

假设你计算过程中生成了一个dataframe,元素是events,包含eventId。而且该dataframe中数据部分数据的eventId已经在events表中存在了。这个时候就可以使用merge into实现,eventId存在的话就更新其对应的值,不存在就插入其对应的值。实现表达式如下:

sql

MERGE INTO eventsUSING updatesON events.eventId = updates.eventIdWHEN MATCHED THEN  UPDATE SET events.data = updates.dataWHEN NOT MATCHED  THEN INSERT (date, eventId, data) VALUES (date, eventId, data)

scala


import io.delta.tables._import org.apache.spark.sql.functions._
val updatesDF = ... // define the updates DataFrame[date, eventId, data]
DeltaTable.forPath(spark, "/data/events/") .as("events") .merge( updatesDF.as("updates"), "events.eventId = updates.eventId") .whenMatched .updateExpr( Map("data" -> "updates.data")) .whenNotMatched .insertExpr( Map( "date" -> "updates.date", "eventId" -> "updates.eventId", "data" -> "updates.data")) .execute()

关于如何进行delta lake 的curd操作就分享到这里了,希望以上内容可以对大家有一定的帮助,可以学到更多知识。如果觉得文章不错,可以把它分享出去让更多的人看到。

0