如何使用Tbale SQL与Flink JDBC连接器读取MYSQL数据
发表于:2024-11-15 作者:千家信息网编辑
千家信息网最后更新 2024年11月15日,这篇文章主要讲解了"如何使用Tbale SQL与Flink JDBC连接器读取MYSQL数据",文中的讲解内容简单清晰,易于学习与理解,下面请大家跟着小编的思路慢慢深入,一起来研究和学习"如何使用Tb
千家信息网最后更新 2024年11月15日如何使用Tbale SQL与Flink JDBC连接器读取MYSQL数据
这篇文章主要讲解了"如何使用Tbale SQL与Flink JDBC连接器读取MYSQL数据",文中的讲解内容简单清晰,易于学习与理解,下面请大家跟着小编的思路慢慢深入,一起来研究和学习"如何使用Tbale SQL与Flink JDBC连接器读取MYSQL数据"吧!
使用Tbale&SQL与Flink JDBC连接器读取MYSQL数据,并用GROUP BY语句根据一个或多个列对结果集进行分组。
示例环境
java.version: 1.8.xflink.version: 1.11.1kafka:2.11
GroupToMysql.java
package com.flink.examples.mysql;import org.apache.flink.api.common.functions.FlatMapFunction;import org.apache.flink.api.java.tuple.Tuple2;import org.apache.flink.streaming.api.datastream.DataStream;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import org.apache.flink.table.api.EnvironmentSettings;import org.apache.flink.table.api.Table;import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;import org.apache.flink.types.Row;import org.apache.flink.util.Collector;import static org.apache.flink.table.api.Expressions.$;/** * @Description 使用Tbale&SQL与Flink JDBC连接器读取MYSQL数据,并用GROUP BY语句根据一个或多个列对结果集进行分组。 */public class GroupToMysql { /** 官方参考:https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/jdbc.html 分区扫描 为了加速并行Source任务实例中的数据读取,Flink为JDBC表提供了分区扫描功能。 scan.partition.column:用于对输入进行分区的列名。 scan.partition.num:分区数。 scan.partition.lower-bound:第一个分区的最小值。 scan.partition.upper-bound:最后一个分区的最大值。 */ //flink-jdbc-1.11.1写法,所有属性名在JdbcTableSourceSinkFactory工厂类中定义 static String table_sql = "CREATE TABLE my_users (\n" + " id BIGINT,\n" + " name STRING,\n" + " age INT,\n" + " status INT,\n" + " PRIMARY KEY (id) NOT ENFORCED\n" + ") WITH (\n" + " 'connector.type' = 'jdbc',\n" + " 'connector.url' = 'jdbc:mysql://192.168.110.35:3306/flink?useUnicode=true&characterEncoding=utf-8', \n" + " 'connector.driver' = 'com.mysql.jdbc.Driver', \n" + " 'connector.table' = 'users', \n" + " 'connector.username' = 'root',\n" + " 'connector.password' = 'password' \n" +// " 'connector.read.fetch-size' = '10' \n" + ")"; public static void main(String[] args) throws Exception { //构建StreamExecutionEnvironment StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); //设置setParallelism并行度 env.setParallelism(1); //构建EnvironmentSettings 并指定Blink Planner EnvironmentSettings bsSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build(); //构建StreamTableEnvironment StreamTableEnvironment tEnv = StreamTableEnvironment.create(env, bsSettings); //注册mysql数据维表 tEnv.executeSql(table_sql); //Table table = avg(tEnv); //Table table = count(tEnv); //Table table = min(tEnv); Table table = max(tEnv); //打印字段结构 table.printSchema(); //普通查询操作用toAppendStream //tEnv.toAppendStream(table, Row.class).print(); //group操作用toRetractStream //tEnv.toRetractStream(table, Row.class).print(); //table 转成 dataStream 流,Tuple2第一个参数flag是true表示add添加新的记录流,false表示retract表示旧的记录流 DataStream> behaviorStream = tEnv.toRetractStream(table, Row.class); behaviorStream.flatMap(new FlatMapFunction , Object>() { @Override public void flatMap(Tuple2 value, Collector
建表SQL
CREATE TABLE `users` ( `id` bigint(8) NOT NULL AUTO_INCREMENT, `name` varchar(40) DEFAULT NULL, `age` int(8) DEFAULT NULL, `status` tinyint(2) DEFAULT NULL, PRIMARY KEY (`id`)) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4
打印结果
root |-- status: INT |-- age1: INT0,160,181,211,282,31
感谢各位的阅读,以上就是"如何使用Tbale SQL与Flink JDBC连接器读取MYSQL数据"的内容了,经过本文的学习后,相信大家对如何使用Tbale SQL与Flink JDBC连接器读取MYSQL数据这一问题有了更深刻的体会,具体使用情况还需要大家实践验证。这里是,小编将为大家推送更多相关知识点的文章,欢迎关注!
数据
语句
连接器
数据流
方法
分组
结果
学习
最大
最小
之和
内容
多个
数值
最大值
普通
任务
写法
功能
参数
数据库的安全要保护哪些东西
数据库安全各自的含义是什么
生产安全数据库录入
数据库的安全性及管理
数据库安全策略包含哪些
海淀数据库安全审计系统
建立农村房屋安全信息数据库
易用的数据库客户端支持安全管理
连接数据库失败ssl安全错误
数据库的锁怎样保障安全
软件开发需求申请单
网络安全与网络信息安全工作计划
数据库的大小怎么设置
同一个数据库怎么联合查询
工作面临哪些网络安全隐患
北京口碑好的软件开发平台
网络技术挑战赛奖金
几种软件开发模式
平板电脑服务器更新错误
服务器 压力测试
叁壹互联网科技有限公司
苹果电脑搭建本地服务器
图书馆网络安全小课堂
广告网络技术公司经营范围
国标林业数据库有哪几个字段
戈壁滩上的数据库
十八大以来我国网络安全成就
哪个服务器一键部署好用
三级的网络安全手抄报
郑州信合网络技术服务
花旗银行软件开发工资
pg数据库和oracle优势
简述数据库安全控制措施
数据库应用 average
思科网院网络技术一到十一章
公民保障网络安全的措施百度
智慧农场软件开发
ajkx无刷新 数据库
建立网络安全应急
数据库组织级别包括实体吗