Flink Connectors怎么连接MySql
发表于:2025-02-09 作者:千家信息网编辑
千家信息网最后更新 2025年02月09日,这篇文章主要讲解了"Flink Connectors怎么连接MySql",文中的讲解内容简单清晰,易于学习与理解,下面请大家跟着小编的思路慢慢深入,一起来研究和学习"Flink Connectors怎
千家信息网最后更新 2025年02月09日Flink Connectors怎么连接MySql
这篇文章主要讲解了"Flink Connectors怎么连接MySql",文中的讲解内容简单清晰,易于学习与理解,下面请大家跟着小编的思路慢慢深入,一起来研究和学习"Flink Connectors怎么连接MySql"吧!
通过使用Flink DataStream Connectors 数据流连接器连接到Mysql数据源,并基于JDBC提供数据流输入与输出操作
示例环境
java.version: 1.8.xflink.version: 1.11.1mysql:5.7.x
数据流输入
DataStreamSource.java
package com.flink.examples.mysql;import com.flink.examples.TUser;import com.google.gson.Gson;import org.apache.flink.api.java.io.jdbc.JDBCOptions;import org.apache.flink.api.java.io.jdbc.JDBCTableSource;import org.apache.flink.streaming.api.datastream.DataStream;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import org.apache.flink.table.api.DataTypes;import org.apache.flink.table.api.Table;import org.apache.flink.table.api.TableSchema;import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;/** * @Description 将mysql表中数据查询输出到DataStream流中 */public class DataStreamSource { /** * 官方文档:https://ci.apache.org/projects/flink/flink-docs-release-1.11/zh/dev/connectors/jdbc.html */ public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); StreamTableEnvironment tEnv = StreamTableEnvironment.create(env); //查询sql String sql = "SELECT id,name,age,sex,address,createTimeSeries FROM t_user"; //设置表视图字段与类型 TableSchema tableSchema = TableSchema.builder() .field("id", DataTypes.INT()) .field("name", DataTypes.STRING()) .field("age", DataTypes.INT()) .field("sex", DataTypes.INT()) .field("address", DataTypes.STRING()) //.field("createTime", DataTypes.TIMESTAMP()) .field("createTimeSeries", DataTypes.BIGINT()) .build(); //配置jdbc数据源选项 JDBCOptions jdbcOptions = JDBCOptions.builder() .setDriverName(MysqlConfig.DRIVER_CLASS) .setDBUrl(MysqlConfig.SOURCE_DRIVER_URL) .setUsername(MysqlConfig.SOURCE_USER) .setPassword(MysqlConfig.SOURCE_PASSWORD) .setTableName("t_user") .build(); JDBCTableSource jdbcTableSource = JDBCTableSource.builder().setOptions(jdbcOptions).setSchema(tableSchema).build(); //将数据源注册到tableEnv视图student中 tEnv.registerTableSource("t_user", jdbcTableSource); Table table = tEnv.sqlQuery(sql); DataStreamsourceStream = tEnv.toAppendStream(table, TUser.class); sourceStream.map((t)->new Gson().toJson(t)).print(); env.execute("flink mysql source"); }}
数据流输出
DataStreamSink.java
package com.flink.examples.mysql;import com.flink.examples.TUser;import org.apache.flink.api.java.io.jdbc.JDBCAppendTableSink;import org.apache.flink.streaming.api.datastream.DataStream;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import org.apache.flink.table.api.DataTypes;import org.apache.flink.table.api.Table;import org.apache.flink.table.api.TableSchema;import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;import org.apache.flink.table.sinks.TableSink;import static org.apache.flink.table.api.Expressions.$;/** * @Description 将DataStream数据流插入到mysql表中 */public class DataStreamSink { /** * 官方文档:https://ci.apache.org/projects/flink/flink-docs-release-1.11/zh/dev/connectors/jdbc.html */ public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); env.enableCheckpointing(2000); StreamTableEnvironment tEnv = StreamTableEnvironment.create(env); //查询sql String sql = "insert into t_user (id,name,age,sex,address,createTimeSeries) values (?,?,?,?,?,?)"; //封装数据 TUser user = new TUser(); user.setId(0); user.setName("zhao1"); user.setAge(22); user.setSex(1); user.setAddress("CN"); user.setCreateTimeSeries(System.currentTimeMillis()); DataStreamsourceStream = env.fromElements(user); //从DataStream获取数据// Expression id = ExpressionParser.parse_Expression("id");// Expression name = ExpressionParser.parse_Expression("name");// Expression age = ExpressionParser.parse_Expression("age");// Expression sex = ExpressionParser.parse_Expression("sex");// Expression address = ExpressionParser.parse_Expression("address");// Expression createTimeSeries = ExpressionParser.parse_Expression("createTimeSeries");// Table table = tEnv.fromDataStream(sourceStream, id, name, age, sex, address, createTimeSeries ); Table table = tEnv.fromDataStream(sourceStream,$("id"),$("name"),$("age"),$("sex"),$("address"),$("createTimeSeries")); //输出到mysql //设置表视图字段与类型 TableSchema tableSchema = TableSchema.builder() .field("id", DataTypes.INT()) .field("name", DataTypes.STRING()) .field("age", DataTypes.INT()) .field("sex", DataTypes.INT()) .field("address", DataTypes.STRING()) //.field("createTime", DataTypes.TIMESTAMP()) .field("createTimeSeries", DataTypes.BIGINT()) .build(); //设置sink输出jdbc TableSink tableSink = JDBCAppendTableSink.builder() .setDrivername(MysqlConfig.DRIVER_CLASS) .setDBUrl(MysqlConfig.SOURCE_DRIVER_URL) .setUsername(MysqlConfig.SOURCE_USER) .setPassword(MysqlConfig.SOURCE_PASSWORD) .setQuery(sql) .setParameterTypes(tableSchema.getFieldTypes()) .setBatchSize(100) .build(); //将数据源注册到tableEnv视图result中 tEnv.registerTableSink("result", tableSchema.getFieldNames(), tableSchema.getFieldTypes(), tableSink); //在指定的路径下注册,然后执行插入操作 table.executeInsert("result"); }}
数据源配置类
MysqlConfig.java
package com.flink.examples.mysql;/** * @Description Mysql数据库连接配置 */public class MysqlConfig { public final static String DRIVER_CLASS="com.mysql.jdbc.Driver"; public final static String SOURCE_DRIVER_URL="jdbc:mysql://127.0.0.1:3306/flink?useUnicode=true&characterEncoding=utf-8&useSSL=false"; public final static String SOURCE_USER="root"; public final static String SOURCE_PASSWORD="root";}
数据展示
感谢各位的阅读,以上就是"Flink Connectors怎么连接MySql"的内容了,经过本文的学习后,相信大家对Flink Connectors怎么连接MySql这一问题有了更深刻的体会,具体使用情况还需要大家实践验证。这里是,小编将为大家推送更多相关知识点的文章,欢迎关注!
数据
数据流
数据源
输出
视图
学习
查询
配置
内容
字段
官方
文档
类型
输入
就是
思路
情况
数据库
数据查询
文章
数据库的安全要保护哪些东西
数据库安全各自的含义是什么
生产安全数据库录入
数据库的安全性及管理
数据库安全策略包含哪些
海淀数据库安全审计系统
建立农村房屋安全信息数据库
易用的数据库客户端支持安全管理
连接数据库失败ssl安全错误
数据库的锁怎样保障安全
厦门网络安全培训免费试学
余干天气预报软件开发
软件开发工程师多大
丽水云软件开发工具
滨州陶瓷软件开发咨询
视频综合服务器
终端机升级数据库
数据库连接和三次握手有关吗
nas服务器有多大
网络安全工程师培训学习资料
什么是1901网络安全
网络安全微课优秀作品图片
两会期间加强网络安全
姿态球软件开发
福建智能软件开发成交价
数据库优化点
艾比利网络技术有限公司工作
身高体重数据库分析
绘制功能结构图和数据库
java编写服务器端
数据库三范式转化题目
汕头软件开发订制
夸克怎么取消代理服务器
手机app软件开发教
开发智能公厕系统软件开发
软件开发成本因素
联通手厅免流地址服务器
深圳智软软件开发
网络安全应急处置工作制度
魔兽赛季服联盟选哪个服务器