千家信息网

Flink批处理之读写Mysql

发表于:2025-01-31 作者:千家信息网编辑
千家信息网最后更新 2025年01月31日,1、添加Maven坐标 mysql mysql-connector-java 5.1.48 org.apache.flink flin
千家信息网最后更新 2025年01月31日Flink批处理之读写Mysql

1、添加Maven坐标

       mysql       mysql-connector-java       5.1.48         org.apache.flink        flink-jdbc_2.12         1.8.0 

2、建表

CREATE TABLE `temp` (  `id` bigint(20) NOT NULL AUTO_INCREMENT,  `name` varchar(255) DEFAULT NULL,  `time` varchar(255) DEFAULT NULL,  `type` bigint(20) DEFAULT NULL,  PRIMARY KEY (`id`)) ENGINE=InnoDB DEFAULT CHARSET=utf8

3、 Show Code

package com.fwmagic.flink.batch;import org.apache.flink.api.common.functions.MapFunction;import org.apache.flink.api.common.typeinfo.BasicTypeInfo;import org.apache.flink.api.java.DataSet;import org.apache.flink.api.java.ExecutionEnvironment;import org.apache.flink.api.java.io.jdbc.JDBCInputFormat;import org.apache.flink.api.java.io.jdbc.JDBCOutputFormat;import org.apache.flink.api.java.operators.DataSource;import org.apache.flink.api.java.tuple.Tuple3;import org.apache.flink.api.java.typeutils.RowTypeInfo;import org.apache.flink.types.Row;import java.util.concurrent.TimeUnit;public class BatchDemoOperatorMysql {    public static void main(String[] args) throws Exception {        String driverClass = "com.mysql.jdbc.Driver";        String dbUrl = "jdbc:mysql://localhost:3306/test";        String userNmae = "root";        String passWord = "123456";        String sql = "insert into test.temp (name,time,type) values (?,?,?)";        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();        /**         * 文件内容:         * 关羽,2019-10-14 00:00:01,1         * 张飞,2019-10-14 00:00:02,2         * 赵云,2019-10-14 00:00:03,3         */        String filePath = "/Users/temp/data.csv";        //读csv文件内容,转成Row对象        DataSet outputData = env.readCsvFile(filePath).fieldDelimiter(",").types(String.class, String.class, Long.class).map(new MapFunction, Row>() {            @Override            public Row map(Tuple3 t) throws Exception {                Row row = new Row(3);                row.setField(0, t.f0.getBytes("UTF-8"));                row.setField(1, t.f1.getBytes("UTF-8"));                row.setField(2, t.f2.longValue());                return row;            }        });        //将Row对象写到mysql        outputData.output(JDBCOutputFormat.buildJDBCOutputFormat()                .setDrivername(driverClass)                .setDBUrl(dbUrl)                .setUsername(userNmae)                .setPassword(passWord)                .setQuery(sql)                .finish());        //触发执行        env.execute("insert data to mysql");        System.out.println("mysql写入成功!");        TimeUnit.SECONDS.sleep(6);        //读mysql        DataSource dataSource = env.createInput(JDBCInputFormat.buildJDBCInputFormat()                .setDrivername(driverClass)                .setDBUrl(dbUrl)                .setUsername(userNmae)                .setPassword(passWord)                .setQuery("select * from temp")                .setRowTypeInfo(new RowTypeInfo(BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.LONG_TYPE_INFO))                .finish());        //获取数据并打印        dataSource.map(new MapFunction() {            @Override            public String map(Row value) throws Exception {                System.out.println(value);                return value.toString();            }        }).print();    }}

4、注意事项

  • 数据写入mysql的DataSet泛型要求是row,需要转换;
  • 数据读取的结果也是row类型,不能直接print,需要转换;
  • 数据写入后一定要加上env.execute(),触发任务执行;
  • 涉及到中文的,需要转换成UTF-8,不然数据库中会出现乱码。
0