如何理解Apache Flink CDC原理与使用
发表于:2025-01-23 作者:千家信息网编辑
千家信息网最后更新 2025年01月23日,如何理解Apache Flink CDC原理与使用,相信很多没有经验的人对此束手无策,为此本文总结了问题出现的原因和解决方法,通过这篇文章希望你能解决这个问题。CDC (Change Data Cap
千家信息网最后更新 2025年01月23日如何理解Apache Flink CDC原理与使用
如何理解Apache Flink CDC原理与使用,相信很多没有经验的人对此束手无策,为此本文总结了问题出现的原因和解决方法,通过这篇文章希望你能解决这个问题。
CDC (Change Data Capture)
Flink在1.11版本中新增了CDC的特性,简称 改变数据捕获。名称来看有点乱,我们先从之前的数据架构来看CDC的内容。
以上是之前的mysql binlog日志处理流程,例如canal监听binlog把日志写入到kafka中。而Apache Flink实时消费Kakfa的数据实现mysql数据的同步或其他内容等。拆分来说整体上可以分为以下几个阶段。
mysql开启binlog canal同步binlog数据写入到kafka flink读取kakfa中的binlog数据进行相关的业务处理。
整体的处理链路较长,需要用到的组件也比较多。Apache Flink CDC可以直接从数据库获取到binlog供下游进行业务计算分析。简单来说链路会变成这样也就是说数据不再通过canal与kafka进行同步,而flink直接进行处理mysql的数据。节省了canal与kafka的过程。
Flink 1.11中实现了mysql-cdc与postgre-CDC,也就是说在Flink 1.11中我们可以直接通过Flink来直接消费mysql,postgresql的数据进行业务的处理。
使用场景
数据库数据的增量同步 数据库表之上的物理化视图 维表join 其他业务处理 ...
MySQL CDC 操作实践
首先需要保证mysql数据库开启了binlog。未开启请查阅相关资料进行binlog的启用。自建默认是不开启binlog的。
源表
DROP TABLE IF EXISTS `t_test`;
CREATE TABLE `t_test` (
`id` int(11) NOT NULL AUTO_INCREMENT,
`ip` varchar(255) DEFAULT NULL,
`size` bigint(20) DEFAULT NULL
PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=183 DEFAULT CHARSET=utf8mb4;
添加mysql-cdc相关依赖
com.alibaba.ververica
flink-connector-mysql-cdc
1.1.0
compile
相关代码实现
def main(args: Array[String]): Unit = {
val envSetting = EnvironmentSettings.newInstance()
.useBlinkPlanner()
.inStreamingMode()
.build()
val env = StreamExecutionEnvironment.getExecutionEnvironment
val tableEnv = StreamTableEnvironment.create(env, envSetting)
val sourceDDL =
"CREATE TABLE test_binlog (" +
" id INT NOT NULl," +
" ip STRING," +
" size INT" +
") WITH (" +
"'connector' = 'mysql-cdc'," +
"'hostname' = 'localhost'," +
"'port' = '3306'," +
"'username' = 'root'," +
"'password' = 'cain'," +
"'database-name' = 'test'," +
"'table-name' = 't_test'" +
")"
// 输出目标表
val sinkDDL =
"CREATE TABLE test_sink (\n" +
" ip STRING,\n" +
" countSum BIGINT,\n" +
" PRIMARY KEY (ip) NOT ENFORCED\n" +
") WITH (\n" +
" 'connector' = 'print'\n" +
")"
val exeSQL =
"INSERT INTO test_sink " +
"SELECT ip, COUNT(1) " +
"FROM test_binlog " +
"GROUP BY ip"
tableEnv.executeSql(sourceDDL)
tableEnv.executeSql(sinkDDL)
val result = tableEnv.executeSql(exeSQL)
result.print()
}
启动flink job,并且插入数据
INSERT INTO `test`.`t_test`( `ip`, `size`) VALUES (UUID(), 1231231);
INSERT INTO `test`.`t_test`( `ip`, `size`) VALUES (UUID(), 1231231);
INSERT INTO `test`.`t_test`( `ip`, `size`) VALUES (UUID(), 1231231);
...
插入数据可直接在console中看到flink处理的结果
Apache Flink CDC的方式替代了之前的canal+kafka节点.直接通过sql的方式来实现对mysql数据的同步。
看完上述内容,你们掌握如何理解Apache Flink CDC原理与使用的方法了吗?如果还想学到更多技能或想了解更多相关内容,欢迎关注行业资讯频道,感谢各位的阅读!
数据
处理
同步
业务
内容
数据库
原理
也就是
也就是说
整体
方式
方法
日志
更多
链路
问题
消费
束手无策
为此
代码
数据库的安全要保护哪些东西
数据库安全各自的含义是什么
生产安全数据库录入
数据库的安全性及管理
数据库安全策略包含哪些
海淀数据库安全审计系统
建立农村房屋安全信息数据库
易用的数据库客户端支持安全管理
连接数据库失败ssl安全错误
数据库的锁怎样保障安全
数据库中关系表是什么
徐汇区互联网络技术服务介绍
数据库怎么看祖先
关于网络安全知识文字
奶块会开新的服务器吗
石首软件开发
软件开发副总裁岗位职责
服务器 管理进程
如何攻击网站服务器
给小朋友讲网络安全
酒泉多媒体软件开发公司
湖南代理商管理软件开发平台
拜登政府 网络安全
网络技术副总面试题
自己攻击自己的云服务器违法么
苏州系统软件开发费用是多少
数据库的数据内容个人基本信息表
陕西定制化服务器什么价位
河北正规软件开发项目信息
我国网络安全当前任务是什么
互联网新科技有哪些
服务器io口配置
网络安全和信息化工作汇总结
oracle数据库倒序语录
阜新直销软件开发制作
重庆服务器机柜公司虚拟主机
数据库评价
网络技术的知识储备情况
广东常见软件开发价格大全
所学专业介绍网络安全