如何使用Tbale SQL与Flink JDBC连接器读取MYSQL数据
发表于:2025-02-01 作者:千家信息网编辑
千家信息网最后更新 2025年02月01日,这篇文章主要讲解了"如何使用Tbale SQL与Flink JDBC连接器读取MYSQL数据",文中的讲解内容简单清晰,易于学习与理解,下面请大家跟着小编的思路慢慢深入,一起来研究和学习"如何使用Tb
千家信息网最后更新 2025年02月01日如何使用Tbale SQL与Flink JDBC连接器读取MYSQL数据
这篇文章主要讲解了"如何使用Tbale SQL与Flink JDBC连接器读取MYSQL数据",文中的讲解内容简单清晰,易于学习与理解,下面请大家跟着小编的思路慢慢深入,一起来研究和学习"如何使用Tbale SQL与Flink JDBC连接器读取MYSQL数据"吧!
使用Tbale&SQL与Flink JDBC连接器读取MYSQL数据,并用GROUP BY语句根据一个或多个列对结果集进行分组。
示例环境
java.version: 1.8.xflink.version: 1.11.1kafka:2.11
GroupToMysql.java
package com.flink.examples.mysql;import org.apache.flink.api.common.functions.FlatMapFunction;import org.apache.flink.api.java.tuple.Tuple2;import org.apache.flink.streaming.api.datastream.DataStream;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import org.apache.flink.table.api.EnvironmentSettings;import org.apache.flink.table.api.Table;import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;import org.apache.flink.types.Row;import org.apache.flink.util.Collector;import static org.apache.flink.table.api.Expressions.$;/** * @Description 使用Tbale&SQL与Flink JDBC连接器读取MYSQL数据,并用GROUP BY语句根据一个或多个列对结果集进行分组。 */public class GroupToMysql { /** 官方参考:https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/jdbc.html 分区扫描 为了加速并行Source任务实例中的数据读取,Flink为JDBC表提供了分区扫描功能。 scan.partition.column:用于对输入进行分区的列名。 scan.partition.num:分区数。 scan.partition.lower-bound:第一个分区的最小值。 scan.partition.upper-bound:最后一个分区的最大值。 */ //flink-jdbc-1.11.1写法,所有属性名在JdbcTableSourceSinkFactory工厂类中定义 static String table_sql = "CREATE TABLE my_users (\n" + " id BIGINT,\n" + " name STRING,\n" + " age INT,\n" + " status INT,\n" + " PRIMARY KEY (id) NOT ENFORCED\n" + ") WITH (\n" + " 'connector.type' = 'jdbc',\n" + " 'connector.url' = 'jdbc:mysql://192.168.110.35:3306/flink?useUnicode=true&characterEncoding=utf-8', \n" + " 'connector.driver' = 'com.mysql.jdbc.Driver', \n" + " 'connector.table' = 'users', \n" + " 'connector.username' = 'root',\n" + " 'connector.password' = 'password' \n" +// " 'connector.read.fetch-size' = '10' \n" + ")"; public static void main(String[] args) throws Exception { //构建StreamExecutionEnvironment StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); //设置setParallelism并行度 env.setParallelism(1); //构建EnvironmentSettings 并指定Blink Planner EnvironmentSettings bsSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build(); //构建StreamTableEnvironment StreamTableEnvironment tEnv = StreamTableEnvironment.create(env, bsSettings); //注册mysql数据维表 tEnv.executeSql(table_sql); //Table table = avg(tEnv); //Table table = count(tEnv); //Table table = min(tEnv); Table table = max(tEnv); //打印字段结构 table.printSchema(); //普通查询操作用toAppendStream //tEnv.toAppendStream(table, Row.class).print(); //group操作用toRetractStream //tEnv.toRetractStream(table, Row.class).print(); //table 转成 dataStream 流,Tuple2第一个参数flag是true表示add添加新的记录流,false表示retract表示旧的记录流 DataStream> behaviorStream = tEnv.toRetractStream(table, Row.class); behaviorStream.flatMap(new FlatMapFunction , Object>() { @Override public void flatMap(Tuple2 value, Collector
建表SQL
CREATE TABLE `users` ( `id` bigint(8) NOT NULL AUTO_INCREMENT, `name` varchar(40) DEFAULT NULL, `age` int(8) DEFAULT NULL, `status` tinyint(2) DEFAULT NULL, PRIMARY KEY (`id`)) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4
打印结果
root |-- status: INT |-- age1: INT0,160,181,211,282,31
感谢各位的阅读,以上就是"如何使用Tbale SQL与Flink JDBC连接器读取MYSQL数据"的内容了,经过本文的学习后,相信大家对如何使用Tbale SQL与Flink JDBC连接器读取MYSQL数据这一问题有了更深刻的体会,具体使用情况还需要大家实践验证。这里是,小编将为大家推送更多相关知识点的文章,欢迎关注!
数据
语句
连接器
数据流
方法
分组
结果
学习
最大
最小
之和
内容
多个
数值
最大值
普通
任务
写法
功能
参数
数据库的安全要保护哪些东西
数据库安全各自的含义是什么
生产安全数据库录入
数据库的安全性及管理
数据库安全策略包含哪些
海淀数据库安全审计系统
建立农村房屋安全信息数据库
易用的数据库客户端支持安全管理
连接数据库失败ssl安全错误
数据库的锁怎样保障安全
2020年服务器cpu二手
专业棋牌软件开发公司电话
网络安全法文库
湖北大学民族科技学院互联网
手机扫码提示服务器错误
走近互联网 感受科技生活
互联网金融科技汇
golang同时配置多数据库
图解网络安全技术
什么是软件开发平台上云
分家数据库
网络安全竞赛web安全
计算机网络技术自学考纲
网络安全招商
嵌入式软件开发平台的搭建
网络安全对外防护
派出所内部开展网络安全检查
无锡自动软件开发诚信合作
湖北电商网络安全维护怎么样
有网址怎么登录金蝶云服务器
软件开发pprom
南京学管家互联网科技
洛阳市公安局副局长网络安全
网络安全好还是程序猿好
互联网科技峰会
网络安全部门主管的岗位职责
戴尔服务器怎么看支持8个t不
服务器病毒攻击和软件攻击的区别
网络安全公益宣传
网络安全与对抗培养方向