千家信息网

如何进行delta lake 的curd操作

发表于:2024-10-21 作者:千家信息网编辑
千家信息网最后更新 2024年10月21日,这篇文章给大家介绍如何进行delta lake 的curd操作,内容非常详细,感兴趣的小伙伴们可以参考借鉴,希望对大家能有所帮助。delta lake 的表支持删除和更新数据的语法,下面主要是从sql
千家信息网最后更新 2024年10月21日如何进行delta lake 的curd操作

这篇文章给大家介绍如何进行delta lake 的curd操作,内容非常详细,感兴趣的小伙伴们可以参考借鉴,希望对大家能有所帮助。

delta lake 的表支持删除和更新数据的语法,下面主要是从sql和scala两个语法说起吧。

1. 删除delta 表数据

可以根据查询条件,从delta表中删除数据,比如删除日期在2017年之前的数据,sql和scala的表达语法如下。

sql


DELETE FROM events WHERE date < '2017-01-01'
DELETE FROM delta.`/data/events/` WHERE date < '2017-01-01'

scala


import io.delta.tables._
val deltaTable = DeltaTable.forPath(spark, "/data/events/")
deltaTable.delete("date < '2017-01-01'") // predicate using SQL formatted string
import org.apache.spark.sql.functions._import spark.implicits._
deltaTable.delete(col("date") < "2017-01-01") // predicate using Spark SQL functions and implicits

请注意,delete操作会将数据从delta 表的最新版本中删除,但其实只有到历史版本直接被vacuum清空的时候,才会从物理存储中删除数据。

2. 更新表

可以更新满足条件的表。比如想更新eventType的字段字符串的编写失误,可以使用下面的表达,sql和scala的表达分别如下:

sql

UPDATE events SET eventType = 'click' WHERE eventType = 'clck'UPDATE delta.`/data/events/` SET eventType = 'click' WHERE eventType = 'clck'

scala


import io.delta.tables._
val deltaTable = DeltaTable.forPath(spark, "/data/events/")
deltaTable.updateExpr( // predicate and update expressions using SQL formatted string "eventType = 'clck'", Map("eventType" -> "'click'")
import org.apache.spark.sql.functions._import spark.implicits._
deltaTable.update( // predicate using Spark SQL functions and implicits col("eventType") === "clck", Map("eventType" -> lit("click")));

3.merge算子实现upsert操作

使用merge操作可以将source表,view,dataframe中的数据upsert到目标的delta lake表中。该操作很像传统数据库的merge into操作,但是额外的支持删除操作,和更新,插入和删除的额外条件。

假设你计算过程中生成了一个dataframe,元素是events,包含eventId。而且该dataframe中数据部分数据的eventId已经在events表中存在了。这个时候就可以使用merge into实现,eventId存在的话就更新其对应的值,不存在就插入其对应的值。实现表达式如下:

sql

MERGE INTO eventsUSING updatesON events.eventId = updates.eventIdWHEN MATCHED THEN  UPDATE SET events.data = updates.dataWHEN NOT MATCHED  THEN INSERT (date, eventId, data) VALUES (date, eventId, data)

scala


import io.delta.tables._import org.apache.spark.sql.functions._
val updatesDF = ... // define the updates DataFrame[date, eventId, data]
DeltaTable.forPath(spark, "/data/events/") .as("events") .merge( updatesDF.as("updates"), "events.eventId = updates.eventId") .whenMatched .updateExpr( Map("data" -> "updates.data")) .whenNotMatched .insertExpr( Map( "date" -> "updates.date", "eventId" -> "updates.eventId", "data" -> "updates.data")) .execute()

关于如何进行delta lake 的curd操作就分享到这里了,希望以上内容可以对大家有一定的帮助,可以学到更多知识。如果觉得文章不错,可以把它分享出去让更多的人看到。

数据 更新 条件 语法 内容 时候 更多 版本 帮助 支持 不错 两个 传统 元素 兴趣 历史 只有 字段 字符 字符串 数据库的安全要保护哪些东西 数据库安全各自的含义是什么 生产安全数据库录入 数据库的安全性及管理 数据库安全策略包含哪些 海淀数据库安全审计系统 建立农村房屋安全信息数据库 易用的数据库客户端支持安全管理 连接数据库失败ssl安全错误 数据库的锁怎样保障安全 ios 数据库同步工具 电脑监控进入不了服务器 公众网络安全吗 中国人生理常数数据库 学校网络安全风险排查表 数据库的重命名怎么写 伪数据包在服务器中启什么作用 领科互联网科技有限公司 db数据库如何变成word 宿迁创新软件开发欢迎咨询 临沂服务器管理系统价格 邮件服务器管理是什么 软件开发商业策划书 闲置经济持续升温 互联网科技 网络安全注意要点十条 互联网时代网络安全包括 智慧央厨软件开发多少钱 腾讯云服务器释放后还能恢复吗 网络安全反间谍漫画 传奇私服服务器错误 共享数据库中没有你的身份证照片 网络安全手抄报红色主题 电商网站服务器架构 全国计算机网络技术四级 如何提高网络安全管控效率 腾讯服务器保存几年的聊天记录 电气的数据库 沈阳联联网络技术有限公司 绿盟工业网络安全评估 使用数据库管理系统的主要功能
0