如何进行delta lake 的curd操作
发表于:2025-02-02 作者:千家信息网编辑
千家信息网最后更新 2025年02月02日,这篇文章给大家介绍如何进行delta lake 的curd操作,内容非常详细,感兴趣的小伙伴们可以参考借鉴,希望对大家能有所帮助。delta lake 的表支持删除和更新数据的语法,下面主要是从sql
千家信息网最后更新 2025年02月02日如何进行delta lake 的curd操作
这篇文章给大家介绍如何进行delta lake 的curd操作,内容非常详细,感兴趣的小伙伴们可以参考借鉴,希望对大家能有所帮助。
delta lake 的表支持删除和更新数据的语法,下面主要是从sql和scala两个语法说起吧。
1. 删除delta 表数据
可以根据查询条件,从delta表中删除数据,比如删除日期在2017年之前的数据,sql和scala的表达语法如下。
sql
DELETE FROM events WHERE date < '2017-01-01'
DELETE FROM delta.`/data/events/` WHERE date < '2017-01-01'
scala
import io.delta.tables._
val deltaTable = DeltaTable.forPath(spark, "/data/events/")
deltaTable.delete("date < '2017-01-01'") // predicate using SQL formatted string
import org.apache.spark.sql.functions._
import spark.implicits._
deltaTable.delete(col("date") < "2017-01-01") // predicate using Spark SQL functions and implicits
请注意,delete操作会将数据从delta 表的最新版本中删除,但其实只有到历史版本直接被vacuum清空的时候,才会从物理存储中删除数据。
2. 更新表
可以更新满足条件的表。比如想更新eventType的字段字符串的编写失误,可以使用下面的表达,sql和scala的表达分别如下:
sql
UPDATE events SET eventType = 'click' WHERE eventType = 'clck'UPDATE delta.`/data/events/` SET eventType = 'click' WHERE eventType = 'clck'
scala
import io.delta.tables._
val deltaTable = DeltaTable.forPath(spark, "/data/events/")
deltaTable.updateExpr( // predicate and update expressions using SQL formatted string
"eventType = 'clck'",
Map("eventType" -> "'click'")
import org.apache.spark.sql.functions._
import spark.implicits._
deltaTable.update( // predicate using Spark SQL functions and implicits
col("eventType") === "clck",
Map("eventType" -> lit("click")));
3.merge算子实现upsert操作
使用merge操作可以将source表,view,dataframe中的数据upsert到目标的delta lake表中。该操作很像传统数据库的merge into操作,但是额外的支持删除操作,和更新,插入和删除的额外条件。
假设你计算过程中生成了一个dataframe,元素是events,包含eventId。而且该dataframe中数据部分数据的eventId已经在events表中存在了。这个时候就可以使用merge into实现,eventId存在的话就更新其对应的值,不存在就插入其对应的值。实现表达式如下:
sql
MERGE INTO eventsUSING updatesON events.eventId = updates.eventIdWHEN MATCHED THEN UPDATE SET events.data = updates.dataWHEN NOT MATCHED THEN INSERT (date, eventId, data) VALUES (date, eventId, data)
scala
import io.delta.tables._
import org.apache.spark.sql.functions._
val updatesDF = ... // define the updates DataFrame[date, eventId, data]
DeltaTable.forPath(spark, "/data/events/")
.as("events")
.merge(
updatesDF.as("updates"),
"events.eventId = updates.eventId")
.whenMatched
.updateExpr(
Map("data" -> "updates.data"))
.whenNotMatched
.insertExpr(
Map(
"date" -> "updates.date",
"eventId" -> "updates.eventId",
"data" -> "updates.data"))
.execute()
关于如何进行delta lake 的curd操作就分享到这里了,希望以上内容可以对大家有一定的帮助,可以学到更多知识。如果觉得文章不错,可以把它分享出去让更多的人看到。
数据
更新
条件
语法
内容
时候
更多
版本
帮助
支持
不错
两个
传统
元素
兴趣
历史
只有
字段
字符
字符串
数据库的安全要保护哪些东西
数据库安全各自的含义是什么
生产安全数据库录入
数据库的安全性及管理
数据库安全策略包含哪些
海淀数据库安全审计系统
建立农村房屋安全信息数据库
易用的数据库客户端支持安全管理
连接数据库失败ssl安全错误
数据库的锁怎样保障安全
单位网络安全总结范文
什么存储引擎无法修改数据库名称
三级网络技术软件新思路
网络技术技能考核
服务器2008断电后无法进系统
云服务器的搭建
如何查询RRP数据库
苹果手机发件服务器在哪里
sql数据库中文件有哪些
网络技术项目可行报告
沧州科生网络技术有限公司
南京软件开发公司招聘
华龙重庆网络安全宣传
数据库迁移的技术
小海豚数据库使用
国际服什么时候能更新到服务器
霍尼韦尔服务器故障
创建启动数据库
统信系统安装神舟数据库
航空运输业网络安全洞察报告
中小型企业网的网络安全部署
重邮的网络安全专业如何
昭通计算机网络技术费用
国家网络安全宣传周刊
视频在线播放软件开发
王牌服务器管理员能修改战局吗
智联网络技术公司
护苗网络安全公益宣传片5集
南京阿里巴巴网络技术有限公司
医院系统服务器算啥科目