千家信息网

Delta Lake如何实现CDC实时入湖

发表于:2024-11-23 作者:千家信息网编辑
千家信息网最后更新 2024年11月23日,Delta Lake如何实现CDC实时入湖,相信很多没有经验的人对此束手无策,为此本文总结了问题出现的原因和解决方法,通过这篇文章希望你能解决这个问题。什么是CDCChange Data Captur
千家信息网最后更新 2024年11月23日Delta Lake如何实现CDC实时入湖

Delta Lake如何实现CDC实时入湖,相信很多没有经验的人对此束手无策,为此本文总结了问题出现的原因和解决方法,通过这篇文章希望你能解决这个问题。

什么是CDC

Change Data Capture(CDC)用来跟踪捕获数据源的数据变化,并将这些变化同步到目标存储(如数据湖或数据仓库),用于数据备份或后续分析,同步过程可以是分钟/小时/天等粒度,也可以是实时同步。CDC方案分为侵入式(intrusive manner)和非倾入性(non-intrusive manner)两种。

侵入式

侵入式方案直接请求数据源系统(如通过JDBC读取数据),会给数据源系统带来性能压力。常见的方案如下:

  • 最后更新时间(Last Modified)

源表需要有修改时间列,同步作业需要指定最后修改时间参数,表明同步某个时间点之后变更的数据。该方法不能同步删除记录的变更,同一条记录多次变更只能记录最后一次。

  • 自增id列

源表需要有一个自增id列,同步作业需要指定上次同步的最大id值,同步上次之后新增的记录行。该方法也不能同步删除记录的变更,而且老记录的变更也无法感知。

非侵入式

非侵入性一般通过日志的方式记录数据源的数据变化(如数据库的binlog),源库需要开启binlog的功能。数据源的每次操作都会被记录到binlog中(如insert/update/delete等),能够实时跟踪数据插入/删除/数据多次更新/DDL操作等。

示例:

insert into table testdb.test values("hangzhou",1);update testdb.test set b=2 where a="hangzhou";update testdb.test set b=3 where a="hangzhou";delete from testdb.test where a="hangzhou";

通过将binlog日志有序的回放到目标存储中,从而实现对数据源的数据导出同步功能。

常见的CDC方案实现

开源常见的CDC方案实现主要有两种:

Sqoop离线同步

sqoop是一个开源的数据同步工具,它可以将数据库的数据同步到HDFS/Hive中,支持全量同步和增量同步,用户可以配置小时/天的调度作业来定时同步数据。

sqoop增量同步是一种侵入式的CDC方案,支持Last Modified和Append模式。

缺点:

  • 直接jdbc请求源库拉取数据,影响源库性能

  • 小时/天调度,实时性不高

  • 无法同步源库的删除操作,Append模式还不支持数据更新操作

binlog实时同步

binlog日志可以通过一些工具实时同步到kafka等消息中间件中,然后通过Spark/Flink等流引擎实时的回放binlog到目标存储(如Kudu/HBase等)。

缺点:

  • Kudu/HBase运维成本高

  • Kudu在数据量大的有稳定性问题, HBase不支持高吞吐的分析

  • Spark Streaming实现回放binlog逻辑复杂,使用java/scala代码具有一定门槛

Streaming SQL+Delta Lake实时入湖方案

前面介绍了两种常见的CDC方案,各自都有一些缺点。阿里云E-MapReduce团队提供了一种新的CDC解决方案,利用自研的Streaming SQL搭配Delta Lake可以轻松实现CDC实时入湖。这套解决方案同时通过阿里云最新发布的数据湖构建(Data Lake Formation,DLF)服务提供一站式的入湖体验。

Streaming SQL

Spark Streaming SQL在Spark Structured Streaming之上提供了SQL能力,降低了实时业务开发的门槛,使得离线业务实时化更简单方便。


下面以实时消费SLS为例:

# 创建loghub源表spark-sql> CREATE TABLE loghub_intput_tbl(content string)         > USING loghub         > OPTIONS         > (...) # 创建delta目标表spark-sql> CREATE TABLE delta_output_tbl(content string)         > USING delta         > OPTIONS         > (...);# 创建流式SCANspark-sql> CREATE SCAN loghub_table_intput_test_stream         > ON loghub_intput_tbl         > USING STREAM;# 将loghub源表数据插入delta目标表         spark-sql> INSERT INTO delta_output_tbl SELECT content FROM loghub_table_intput_test_stream;

Delta Lake

Delta Lake是Databricks开源的一种数据湖格式,它在parquet格式之上,提供了ACID事务/元数据管理等能力,同时相比parquet具有更好的性能,能够支持更丰富的数据应用场景(如数据更新/schema演化等)。

E-MapReduce团队在开源Delta Lake基础上做了很多功能和性能的优化,如小文件合并Optimize/DataSkipping/Zorder,SparkSQL/Streaming SQL/Hive/Presto深度集成Delta等。

Streaming SQL+Delta Lake CDC实时入湖

Spark Streaming SQL提供了Merge Into 的语法,搭配Delta Lake的实时写入能力,可以很方便的实现CDC实时入湖方案。

如上图所示,只需要SQL就能完成CDC实时入湖。

看完上述内容,你们掌握Delta Lake如何实现CDC实时入湖的方法了吗?如果还想学到更多技能或想了解更多相关内容,欢迎关注行业资讯频道,感谢各位的阅读!

0