如何理解Apache Flink CDC原理与使用
发表于:2025-02-24 作者:千家信息网编辑
千家信息网最后更新 2025年02月24日,如何理解Apache Flink CDC原理与使用,相信很多没有经验的人对此束手无策,为此本文总结了问题出现的原因和解决方法,通过这篇文章希望你能解决这个问题。CDC (Change Data Cap
千家信息网最后更新 2025年02月24日如何理解Apache Flink CDC原理与使用
如何理解Apache Flink CDC原理与使用,相信很多没有经验的人对此束手无策,为此本文总结了问题出现的原因和解决方法,通过这篇文章希望你能解决这个问题。
CDC (Change Data Capture)
Flink在1.11版本中新增了CDC的特性,简称 改变数据捕获。名称来看有点乱,我们先从之前的数据架构来看CDC的内容。
以上是之前的mysql binlog日志处理流程,例如canal监听binlog把日志写入到kafka中。而Apache Flink实时消费Kakfa的数据实现mysql数据的同步或其他内容等。拆分来说整体上可以分为以下几个阶段。
mysql开启binlog canal同步binlog数据写入到kafka flink读取kakfa中的binlog数据进行相关的业务处理。
整体的处理链路较长,需要用到的组件也比较多。Apache Flink CDC可以直接从数据库获取到binlog供下游进行业务计算分析。简单来说链路会变成这样也就是说数据不再通过canal与kafka进行同步,而flink直接进行处理mysql的数据。节省了canal与kafka的过程。
Flink 1.11中实现了mysql-cdc与postgre-CDC,也就是说在Flink 1.11中我们可以直接通过Flink来直接消费mysql,postgresql的数据进行业务的处理。
使用场景
数据库数据的增量同步 数据库表之上的物理化视图 维表join 其他业务处理 ...
MySQL CDC 操作实践
首先需要保证mysql数据库开启了binlog。未开启请查阅相关资料进行binlog的启用。自建默认是不开启binlog的。
源表
DROP TABLE IF EXISTS `t_test`;
CREATE TABLE `t_test` (
`id` int(11) NOT NULL AUTO_INCREMENT,
`ip` varchar(255) DEFAULT NULL,
`size` bigint(20) DEFAULT NULL
PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=183 DEFAULT CHARSET=utf8mb4;
添加mysql-cdc相关依赖
com.alibaba.ververica
flink-connector-mysql-cdc
1.1.0
compile
相关代码实现
def main(args: Array[String]): Unit = {
val envSetting = EnvironmentSettings.newInstance()
.useBlinkPlanner()
.inStreamingMode()
.build()
val env = StreamExecutionEnvironment.getExecutionEnvironment
val tableEnv = StreamTableEnvironment.create(env, envSetting)
val sourceDDL =
"CREATE TABLE test_binlog (" +
" id INT NOT NULl," +
" ip STRING," +
" size INT" +
") WITH (" +
"'connector' = 'mysql-cdc'," +
"'hostname' = 'localhost'," +
"'port' = '3306'," +
"'username' = 'root'," +
"'password' = 'cain'," +
"'database-name' = 'test'," +
"'table-name' = 't_test'" +
")"
// 输出目标表
val sinkDDL =
"CREATE TABLE test_sink (\n" +
" ip STRING,\n" +
" countSum BIGINT,\n" +
" PRIMARY KEY (ip) NOT ENFORCED\n" +
") WITH (\n" +
" 'connector' = 'print'\n" +
")"
val exeSQL =
"INSERT INTO test_sink " +
"SELECT ip, COUNT(1) " +
"FROM test_binlog " +
"GROUP BY ip"
tableEnv.executeSql(sourceDDL)
tableEnv.executeSql(sinkDDL)
val result = tableEnv.executeSql(exeSQL)
result.print()
}
启动flink job,并且插入数据
INSERT INTO `test`.`t_test`( `ip`, `size`) VALUES (UUID(), 1231231);
INSERT INTO `test`.`t_test`( `ip`, `size`) VALUES (UUID(), 1231231);
INSERT INTO `test`.`t_test`( `ip`, `size`) VALUES (UUID(), 1231231);
...
插入数据可直接在console中看到flink处理的结果

Apache Flink CDC的方式替代了之前的canal+kafka节点.直接通过sql的方式来实现对mysql数据的同步。
看完上述内容,你们掌握如何理解Apache Flink CDC原理与使用的方法了吗?如果还想学到更多技能或想了解更多相关内容,欢迎关注行业资讯频道,感谢各位的阅读!
数据
处理
同步
业务
内容
数据库
原理
也就是
也就是说
整体
方式
方法
日志
更多
链路
问题
消费
束手无策
为此
代码
数据库的安全要保护哪些东西
数据库安全各自的含义是什么
生产安全数据库录入
数据库的安全性及管理
数据库安全策略包含哪些
海淀数据库安全审计系统
建立农村房屋安全信息数据库
易用的数据库客户端支持安全管理
连接数据库失败ssl安全错误
数据库的锁怎样保障安全
数据库考试怎么准备什么
思科网络技术课程
表单图文混合存到数据库
电子政务网络安全的重要性
网络安全进企业活动
开展网络安全专项检查报告
绩溪现代软件开发服务销售厂
有数据库的课程吗
小米个人云服务器
图数据库neo4j
超算服务器维修网点
dubbo 服务器
宁波银河服务器
做软件开发公司有电脑的吧
东营app软件开发招聘
手抄报 网络安全文明行
百信服务器价钱
蓝牙设备需要服务器
网络安全三根线
dnf韩服不能连接服务器
黄岛遨游网络技术有限公司
阳泉商场大屏导航软件开发公司
手抄报 网络安全文明行
数据库原理及应用重庆大学出版社
战雷网络技术有限公司
虚拟机数据库怎么看网站
福建省机关网络安全
dw配置服务器
夏林聪网络安全
资讯软件开发设计说明书