千家信息网

Flink Connectors怎么连接MySql

发表于:2024-09-22 作者:千家信息网编辑
千家信息网最后更新 2024年09月22日,这篇文章主要讲解了"Flink Connectors怎么连接MySql",文中的讲解内容简单清晰,易于学习与理解,下面请大家跟着小编的思路慢慢深入,一起来研究和学习"Flink Connectors怎
千家信息网最后更新 2024年09月22日Flink Connectors怎么连接MySql

这篇文章主要讲解了"Flink Connectors怎么连接MySql",文中的讲解内容简单清晰,易于学习与理解,下面请大家跟着小编的思路慢慢深入,一起来研究和学习"Flink Connectors怎么连接MySql"吧!

通过使用Flink DataStream Connectors 数据流连接器连接到Mysql数据源,并基于JDBC提供数据流输入与输出操作

示例环境

java.version: 1.8.xflink.version: 1.11.1mysql:5.7.x

数据流输入

DataStreamSource.java

package com.flink.examples.mysql;import com.flink.examples.TUser;import com.google.gson.Gson;import org.apache.flink.api.java.io.jdbc.JDBCOptions;import org.apache.flink.api.java.io.jdbc.JDBCTableSource;import org.apache.flink.streaming.api.datastream.DataStream;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import org.apache.flink.table.api.DataTypes;import org.apache.flink.table.api.Table;import org.apache.flink.table.api.TableSchema;import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;/** * @Description 将mysql表中数据查询输出到DataStream流中 */public class DataStreamSource {    /**     * 官方文档:https://ci.apache.org/projects/flink/flink-docs-release-1.11/zh/dev/connectors/jdbc.html     */    public static void main(String[] args) throws Exception {        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();        StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);        //查询sql        String sql = "SELECT id,name,age,sex,address,createTimeSeries FROM t_user";        //设置表视图字段与类型        TableSchema tableSchema = TableSchema.builder()                .field("id", DataTypes.INT())                .field("name", DataTypes.STRING())                .field("age", DataTypes.INT())                .field("sex", DataTypes.INT())                .field("address", DataTypes.STRING())                //.field("createTime", DataTypes.TIMESTAMP())                .field("createTimeSeries", DataTypes.BIGINT())                .build();        //配置jdbc数据源选项        JDBCOptions jdbcOptions = JDBCOptions.builder()                .setDriverName(MysqlConfig.DRIVER_CLASS)                .setDBUrl(MysqlConfig.SOURCE_DRIVER_URL)                .setUsername(MysqlConfig.SOURCE_USER)                .setPassword(MysqlConfig.SOURCE_PASSWORD)                .setTableName("t_user")                .build();        JDBCTableSource jdbcTableSource = JDBCTableSource.builder().setOptions(jdbcOptions).setSchema(tableSchema).build();        //将数据源注册到tableEnv视图student中        tEnv.registerTableSource("t_user", jdbcTableSource);        Table table = tEnv.sqlQuery(sql);        DataStream sourceStream = tEnv.toAppendStream(table, TUser.class);        sourceStream.map((t)->new Gson().toJson(t)).print();        env.execute("flink mysql source");    }}

数据流输出

DataStreamSink.java

package com.flink.examples.mysql;import com.flink.examples.TUser;import org.apache.flink.api.java.io.jdbc.JDBCAppendTableSink;import org.apache.flink.streaming.api.datastream.DataStream;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import org.apache.flink.table.api.DataTypes;import org.apache.flink.table.api.Table;import org.apache.flink.table.api.TableSchema;import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;import org.apache.flink.table.sinks.TableSink;import static org.apache.flink.table.api.Expressions.$;/** * @Description 将DataStream数据流插入到mysql表中 */public class DataStreamSink {    /**     * 官方文档:https://ci.apache.org/projects/flink/flink-docs-release-1.11/zh/dev/connectors/jdbc.html     */    public static void main(String[] args) throws Exception {        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();        env.setParallelism(1);        env.enableCheckpointing(2000);        StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);        //查询sql        String sql = "insert into t_user (id,name,age,sex,address,createTimeSeries) values (?,?,?,?,?,?)";        //封装数据        TUser user = new TUser();        user.setId(0);        user.setName("zhao1");        user.setAge(22);        user.setSex(1);        user.setAddress("CN");        user.setCreateTimeSeries(System.currentTimeMillis());        DataStream sourceStream = env.fromElements(user);        //从DataStream获取数据//        Expression id = ExpressionParser.parse_Expression("id");//        Expression name = ExpressionParser.parse_Expression("name");//        Expression age = ExpressionParser.parse_Expression("age");//        Expression sex = ExpressionParser.parse_Expression("sex");//        Expression address = ExpressionParser.parse_Expression("address");//        Expression createTimeSeries = ExpressionParser.parse_Expression("createTimeSeries");//        Table table = tEnv.fromDataStream(sourceStream, id, name, age, sex, address, createTimeSeries );        Table table = tEnv.fromDataStream(sourceStream,$("id"),$("name"),$("age"),$("sex"),$("address"),$("createTimeSeries"));        //输出到mysql        //设置表视图字段与类型        TableSchema tableSchema = TableSchema.builder()                .field("id", DataTypes.INT())                .field("name", DataTypes.STRING())                .field("age", DataTypes.INT())                .field("sex", DataTypes.INT())                .field("address", DataTypes.STRING())                //.field("createTime", DataTypes.TIMESTAMP())                .field("createTimeSeries", DataTypes.BIGINT())                .build();        //设置sink输出jdbc        TableSink tableSink = JDBCAppendTableSink.builder()                .setDrivername(MysqlConfig.DRIVER_CLASS)                .setDBUrl(MysqlConfig.SOURCE_DRIVER_URL)                .setUsername(MysqlConfig.SOURCE_USER)                .setPassword(MysqlConfig.SOURCE_PASSWORD)                .setQuery(sql)                .setParameterTypes(tableSchema.getFieldTypes())                .setBatchSize(100)                .build();        //将数据源注册到tableEnv视图result中        tEnv.registerTableSink("result",                tableSchema.getFieldNames(),                tableSchema.getFieldTypes(),                tableSink);        //在指定的路径下注册,然后执行插入操作        table.executeInsert("result");    }}

数据源配置类

MysqlConfig.java

package com.flink.examples.mysql;/** * @Description Mysql数据库连接配置 */public class MysqlConfig {    public final static String DRIVER_CLASS="com.mysql.jdbc.Driver";    public final static String SOURCE_DRIVER_URL="jdbc:mysql://127.0.0.1:3306/flink?useUnicode=true&characterEncoding=utf-8&useSSL=false";    public final static String SOURCE_USER="root";    public final static String SOURCE_PASSWORD="root";}

数据展示

感谢各位的阅读,以上就是"Flink Connectors怎么连接MySql"的内容了,经过本文的学习后,相信大家对Flink Connectors怎么连接MySql这一问题有了更深刻的体会,具体使用情况还需要大家实践验证。这里是,小编将为大家推送更多相关知识点的文章,欢迎关注!

0