Flink批处理之读写Mysql
发表于:2025-01-31 作者:千家信息网编辑
千家信息网最后更新 2025年01月31日,1、添加Maven坐标 mysql mysql-connector-java 5.1.48 org.apache.flink flin
千家信息网最后更新 2025年01月31日Flink批处理之读写Mysql
1、添加Maven坐标
mysql mysql-connector-java 5.1.48 org.apache.flink flink-jdbc_2.12 1.8.0
2、建表
CREATE TABLE `temp` ( `id` bigint(20) NOT NULL AUTO_INCREMENT, `name` varchar(255) DEFAULT NULL, `time` varchar(255) DEFAULT NULL, `type` bigint(20) DEFAULT NULL, PRIMARY KEY (`id`)) ENGINE=InnoDB DEFAULT CHARSET=utf8
3、 Show Code
package com.fwmagic.flink.batch;import org.apache.flink.api.common.functions.MapFunction;import org.apache.flink.api.common.typeinfo.BasicTypeInfo;import org.apache.flink.api.java.DataSet;import org.apache.flink.api.java.ExecutionEnvironment;import org.apache.flink.api.java.io.jdbc.JDBCInputFormat;import org.apache.flink.api.java.io.jdbc.JDBCOutputFormat;import org.apache.flink.api.java.operators.DataSource;import org.apache.flink.api.java.tuple.Tuple3;import org.apache.flink.api.java.typeutils.RowTypeInfo;import org.apache.flink.types.Row;import java.util.concurrent.TimeUnit;public class BatchDemoOperatorMysql { public static void main(String[] args) throws Exception { String driverClass = "com.mysql.jdbc.Driver"; String dbUrl = "jdbc:mysql://localhost:3306/test"; String userNmae = "root"; String passWord = "123456"; String sql = "insert into test.temp (name,time,type) values (?,?,?)"; ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); /** * 文件内容: * 关羽,2019-10-14 00:00:01,1 * 张飞,2019-10-14 00:00:02,2 * 赵云,2019-10-14 00:00:03,3 */ String filePath = "/Users/temp/data.csv"; //读csv文件内容,转成Row对象 DataSet outputData = env.readCsvFile(filePath).fieldDelimiter(",").types(String.class, String.class, Long.class).map(new MapFunction, Row>() { @Override public Row map(Tuple3 t) throws Exception { Row row = new Row(3); row.setField(0, t.f0.getBytes("UTF-8")); row.setField(1, t.f1.getBytes("UTF-8")); row.setField(2, t.f2.longValue()); return row; } }); //将Row对象写到mysql outputData.output(JDBCOutputFormat.buildJDBCOutputFormat() .setDrivername(driverClass) .setDBUrl(dbUrl) .setUsername(userNmae) .setPassword(passWord) .setQuery(sql) .finish()); //触发执行 env.execute("insert data to mysql"); System.out.println("mysql写入成功!"); TimeUnit.SECONDS.sleep(6); //读mysql DataSource dataSource = env.createInput(JDBCInputFormat.buildJDBCInputFormat() .setDrivername(driverClass) .setDBUrl(dbUrl) .setUsername(userNmae) .setPassword(passWord) .setQuery("select * from temp") .setRowTypeInfo(new RowTypeInfo(BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.LONG_TYPE_INFO)) .finish()); //获取数据并打印 dataSource.map(new MapFunction() { @Override public String map(Row value) throws Exception { System.out.println(value); return value.toString(); } }).print(); }}
4、注意事项
- 数据写入mysql的DataSet泛型要求是row,需要转换;
- 数据读取的结果也是row类型,不能直接print,需要转换;
- 数据写入后一定要加上env.execute(),触发任务执行;
- 涉及到中文的,需要转换成UTF-8,不然数据库中会出现乱码。
数据
UTF-8
内容
对象
文件
成功
乱码
事项
任务
坐标
数据库
注意事项
类型
结果
关羽
赵云
中文
求是
数据库的安全要保护哪些东西
数据库安全各自的含义是什么
生产安全数据库录入
数据库的安全性及管理
数据库安全策略包含哪些
海淀数据库安全审计系统
建立农村房屋安全信息数据库
易用的数据库客户端支持安全管理
连接数据库失败ssl安全错误
数据库的锁怎样保障安全
社保管理客户端显示服务器异常
备选dns服务器地址河南联通
南昌大鹏软件开发有限公司
历城租房软件开发
酵妈妈软件开发
网络网络安全教育手抄报
谷歌投资的网络安全
艾薇云互联 - 高运维云服务器
浙江web前端软件开发价位
R740服务器
山东正规软件开发品质保障
网络安全法的构成
通讯网络技术的前景
数据库有哪些应用技术
网络技术科普知识竞赛
塔城互联网科技专业怎么样
广州智能消防软件开发机构
企业账套数据库叫什么
数据库中asci值
青岛服务器绝缘片
关于网络安全的画儿童
哈密网络技术费用
服务器被踹
计算机和服务器内存条区分
sol数据库备份目录修改
本科毕业论文进入数据库吗
互娱网络技术服务有限公司
东西湖网络安全基地什么时候开学
静安区标准网络技术协议
天翼网关无线网络安全设置