如何用Flink Connectors读写txt文件
发表于:2025-01-23 作者:千家信息网编辑
千家信息网最后更新 2025年01月23日,本篇内容主要讲解"如何用Flink Connectors读写txt文件",感兴趣的朋友不妨来看看。本文介绍的方法操作简单快捷,实用性强。下面就让小编来带大家学习"如何用Flink Connectors
千家信息网最后更新 2025年01月23日如何用Flink Connectors读写txt文件
本篇内容主要讲解"如何用Flink Connectors读写txt文件",感兴趣的朋友不妨来看看。本文介绍的方法操作简单快捷,实用性强。下面就让小编来带大家学习"如何用Flink Connectors读写txt文件"吧!
通过使用Flink DataSet Connectors 数据流连接器打开txt文件,并提供数据流输入与输出操作;
示例环境
java.version: 1.8.xflink.version: 1.11.1
示例数据源 (项目码云下载)
Flink 系例 之 搭建开发环境与数据
示例模块 (pom.xml)
Flink 系例 之 DataStream Connectors 与 示例模块
数据流输入
TextSource.java
package com.flink.examples.file;import org.apache.commons.lang3.StringUtils;import org.apache.commons.lang3.time.DateUtils;import org.apache.flink.api.common.functions.FlatMapFunction;import org.apache.flink.api.java.DataSet;import org.apache.flink.api.java.ExecutionEnvironment;import org.apache.flink.util.Collector;import scala.Tuple7;import java.util.Date;/** * @Description 从txt文件中读取内容输出到DataSet中 */public class TextSource { public static void main(String[] args) throws Exception { ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); String filePath = "D:\\Workspaces\\idea_2\\flink-examples\\connectors\\src\\main\\resources\\user.txt"; DataSet> dataSet = env.readTextFile(filePath) .flatMap(new FlatMapFunction >() { @Override public void flatMap(String value, Collector > out) throws Exception { if (StringUtils.isNotBlank(value)) { String[] values = value.split(","); out.collect(Tuple7.apply( Integer.parseInt(values[0]), values[1], Integer.parseInt(values[2]), Integer.parseInt(values[3]), values[4], DateUtils.parseDate(values[5], "yyyy-MM-dd HH:mm:ss"), Long.parseLong(values[6]))); } } }); dataSet.print(); }}
数据流输出
TextSink.java
package com.flink.examples.file;import org.apache.flink.api.java.DataSet;import org.apache.flink.api.java.ExecutionEnvironment;import org.apache.flink.core.fs.FileSystem;import org.apache.flink.types.Row;/** * @Description 将DataSet数据写入到txt文件中 */public class TextSink { public static void main(String[] args) throws Exception { ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); //需先建立文件 String filePath = "D:\\Workspaces\\idea_2\\flink-examples\\connectors\\src\\main\\resources\\user.txt"; //将实体转换为Row对象,new Row(字段个数) Row row = Row.of(15, "chen1", 40, 1, "CN", "2020-09-08 00:00:00", 1599494400000L); //转换为dataSet DataSetdataSet = env.fromElements(row); //将内容写入到File中,如果文件已存在,将会被复盖 dataSet.writeAsText(filePath, FileSystem.WriteMode.OVERWRITE).setParallelism(1); env.execute("fline file sink"); }}
数据展示
到此,相信大家对"如何用Flink Connectors读写txt文件"有了更深的了解,不妨来实际操作一番吧!这里是网站,更多相关内容可以进入相关频道进行查询,关注我们,继续学习!
文件
数据
内容
数据流
示例
输出
模块
环境
学习
输入
实用
更深
个数
兴趣
复盖
字段
实体
实用性
实际
对象
数据库的安全要保护哪些东西
数据库安全各自的含义是什么
生产安全数据库录入
数据库的安全性及管理
数据库安全策略包含哪些
海淀数据库安全审计系统
建立农村房屋安全信息数据库
易用的数据库客户端支持安全管理
连接数据库失败ssl安全错误
数据库的锁怎样保障安全
企业网络安全防御的意义
fm2021有没有老数据库
数据库00955
服务器安全设置+十六
网络安全常识答题
服务器区安全设备部署
搭建一个ftp服务器
网站后端服务器购买
网络安全工程师思路图
佛山智能软件开发报价
数据库 dblink
丽江网络安全综合服务平台
节假日网络安全防护方案
北京回收服务器硬盘虚拟主机
哪个软件开发工资高
网络安全技术与应用贾铁军
南京数据网络技术服务费
DG连接数据库地址
excel的数据库怎么做的
西藏自治区第五届网络安全大赛
加拿大uu服务器
具发展潜力的数据库培训
网络技术工程师是什么证书
网络安全的保密性方法是
qq命名的数据库
网络技术培训班广州
服务器电源插头品牌排行榜
移动互联网 科技技术
昌吉软件开发五星服务
中国网络安全办法