Flink Connectors怎么连接MySql
发表于:2024-09-22 作者:千家信息网编辑
千家信息网最后更新 2024年09月22日,这篇文章主要讲解了"Flink Connectors怎么连接MySql",文中的讲解内容简单清晰,易于学习与理解,下面请大家跟着小编的思路慢慢深入,一起来研究和学习"Flink Connectors怎
千家信息网最后更新 2024年09月22日Flink Connectors怎么连接MySql
这篇文章主要讲解了"Flink Connectors怎么连接MySql",文中的讲解内容简单清晰,易于学习与理解,下面请大家跟着小编的思路慢慢深入,一起来研究和学习"Flink Connectors怎么连接MySql"吧!
通过使用Flink DataStream Connectors 数据流连接器连接到Mysql数据源,并基于JDBC提供数据流输入与输出操作
示例环境
java.version: 1.8.xflink.version: 1.11.1mysql:5.7.x
数据流输入
DataStreamSource.java
package com.flink.examples.mysql;import com.flink.examples.TUser;import com.google.gson.Gson;import org.apache.flink.api.java.io.jdbc.JDBCOptions;import org.apache.flink.api.java.io.jdbc.JDBCTableSource;import org.apache.flink.streaming.api.datastream.DataStream;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import org.apache.flink.table.api.DataTypes;import org.apache.flink.table.api.Table;import org.apache.flink.table.api.TableSchema;import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;/** * @Description 将mysql表中数据查询输出到DataStream流中 */public class DataStreamSource { /** * 官方文档:https://ci.apache.org/projects/flink/flink-docs-release-1.11/zh/dev/connectors/jdbc.html */ public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); StreamTableEnvironment tEnv = StreamTableEnvironment.create(env); //查询sql String sql = "SELECT id,name,age,sex,address,createTimeSeries FROM t_user"; //设置表视图字段与类型 TableSchema tableSchema = TableSchema.builder() .field("id", DataTypes.INT()) .field("name", DataTypes.STRING()) .field("age", DataTypes.INT()) .field("sex", DataTypes.INT()) .field("address", DataTypes.STRING()) //.field("createTime", DataTypes.TIMESTAMP()) .field("createTimeSeries", DataTypes.BIGINT()) .build(); //配置jdbc数据源选项 JDBCOptions jdbcOptions = JDBCOptions.builder() .setDriverName(MysqlConfig.DRIVER_CLASS) .setDBUrl(MysqlConfig.SOURCE_DRIVER_URL) .setUsername(MysqlConfig.SOURCE_USER) .setPassword(MysqlConfig.SOURCE_PASSWORD) .setTableName("t_user") .build(); JDBCTableSource jdbcTableSource = JDBCTableSource.builder().setOptions(jdbcOptions).setSchema(tableSchema).build(); //将数据源注册到tableEnv视图student中 tEnv.registerTableSource("t_user", jdbcTableSource); Table table = tEnv.sqlQuery(sql); DataStreamsourceStream = tEnv.toAppendStream(table, TUser.class); sourceStream.map((t)->new Gson().toJson(t)).print(); env.execute("flink mysql source"); }}
数据流输出
DataStreamSink.java
package com.flink.examples.mysql;import com.flink.examples.TUser;import org.apache.flink.api.java.io.jdbc.JDBCAppendTableSink;import org.apache.flink.streaming.api.datastream.DataStream;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import org.apache.flink.table.api.DataTypes;import org.apache.flink.table.api.Table;import org.apache.flink.table.api.TableSchema;import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;import org.apache.flink.table.sinks.TableSink;import static org.apache.flink.table.api.Expressions.$;/** * @Description 将DataStream数据流插入到mysql表中 */public class DataStreamSink { /** * 官方文档:https://ci.apache.org/projects/flink/flink-docs-release-1.11/zh/dev/connectors/jdbc.html */ public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); env.enableCheckpointing(2000); StreamTableEnvironment tEnv = StreamTableEnvironment.create(env); //查询sql String sql = "insert into t_user (id,name,age,sex,address,createTimeSeries) values (?,?,?,?,?,?)"; //封装数据 TUser user = new TUser(); user.setId(0); user.setName("zhao1"); user.setAge(22); user.setSex(1); user.setAddress("CN"); user.setCreateTimeSeries(System.currentTimeMillis()); DataStreamsourceStream = env.fromElements(user); //从DataStream获取数据// Expression id = ExpressionParser.parse_Expression("id");// Expression name = ExpressionParser.parse_Expression("name");// Expression age = ExpressionParser.parse_Expression("age");// Expression sex = ExpressionParser.parse_Expression("sex");// Expression address = ExpressionParser.parse_Expression("address");// Expression createTimeSeries = ExpressionParser.parse_Expression("createTimeSeries");// Table table = tEnv.fromDataStream(sourceStream, id, name, age, sex, address, createTimeSeries ); Table table = tEnv.fromDataStream(sourceStream,$("id"),$("name"),$("age"),$("sex"),$("address"),$("createTimeSeries")); //输出到mysql //设置表视图字段与类型 TableSchema tableSchema = TableSchema.builder() .field("id", DataTypes.INT()) .field("name", DataTypes.STRING()) .field("age", DataTypes.INT()) .field("sex", DataTypes.INT()) .field("address", DataTypes.STRING()) //.field("createTime", DataTypes.TIMESTAMP()) .field("createTimeSeries", DataTypes.BIGINT()) .build(); //设置sink输出jdbc TableSink tableSink = JDBCAppendTableSink.builder() .setDrivername(MysqlConfig.DRIVER_CLASS) .setDBUrl(MysqlConfig.SOURCE_DRIVER_URL) .setUsername(MysqlConfig.SOURCE_USER) .setPassword(MysqlConfig.SOURCE_PASSWORD) .setQuery(sql) .setParameterTypes(tableSchema.getFieldTypes()) .setBatchSize(100) .build(); //将数据源注册到tableEnv视图result中 tEnv.registerTableSink("result", tableSchema.getFieldNames(), tableSchema.getFieldTypes(), tableSink); //在指定的路径下注册,然后执行插入操作 table.executeInsert("result"); }}
数据源配置类
MysqlConfig.java
package com.flink.examples.mysql;/** * @Description Mysql数据库连接配置 */public class MysqlConfig { public final static String DRIVER_CLASS="com.mysql.jdbc.Driver"; public final static String SOURCE_DRIVER_URL="jdbc:mysql://127.0.0.1:3306/flink?useUnicode=true&characterEncoding=utf-8&useSSL=false"; public final static String SOURCE_USER="root"; public final static String SOURCE_PASSWORD="root";}
数据展示
感谢各位的阅读,以上就是"Flink Connectors怎么连接MySql"的内容了,经过本文的学习后,相信大家对Flink Connectors怎么连接MySql这一问题有了更深刻的体会,具体使用情况还需要大家实践验证。这里是,小编将为大家推送更多相关知识点的文章,欢迎关注!
数据
数据流
数据源
输出
视图
学习
查询
配置
内容
字段
官方
文档
类型
输入
就是
思路
情况
数据库
数据查询
文章
数据库的安全要保护哪些东西
数据库安全各自的含义是什么
生产安全数据库录入
数据库的安全性及管理
数据库安全策略包含哪些
海淀数据库安全审计系统
建立农村房屋安全信息数据库
易用的数据库客户端支持安全管理
连接数据库失败ssl安全错误
数据库的锁怎样保障安全
数据库的磁盘阵列的架构
数据库技术三级考题
工控机数据库开发
柳州市风行网络技术有限公司
河南一站式软件开发预算
网络安全防火春节期间网信会议
验证码服务器
街道召开专题会研究网络安全工作
iphone13 连接服务器失败
钱车网络技术有限公司招聘
网络安全日志包括
网络安全行动宣言
内网dns服务器未启用
亚信安全vmi服务器地址
江苏省教育信息数据库
药物学数据库
软件开发企业发展方向
sip的服务器软件
新建学生数据库的学生档案
校园安全网络安全应急预案
都有哪些数据库mssql
软件开发培训学费大概多少
语音识别的语音数据库
甘肃电商软件开发
幻塔华为端和官网端服务器能转吗
拿亿互联网科技有限公司投资
魔域出现服务器中断
录像机服务器端口
移动网络安全控制技术
30tb服务器