flinksql怎么将数据写入到文件中
发表于:2025-01-23 作者:千家信息网编辑
千家信息网最后更新 2025年01月23日,本篇内容主要讲解"flinksql怎么将数据写入到文件中",感兴趣的朋友不妨来看看。本文介绍的方法操作简单快捷,实用性强。下面就让小编来带大家学习"flinksql怎么将数据写入到文件中"吧!pack
千家信息网最后更新 2025年01月23日flinksql怎么将数据写入到文件中
本篇内容主要讲解"flinksql怎么将数据写入到文件中",感兴趣的朋友不妨来看看。本文介绍的方法操作简单快捷,实用性强。下面就让小编来带大家学习"flinksql怎么将数据写入到文件中"吧!
package com.jd.dataoutput;import com.jd.data.SensorReading;import org.apache.flink.api.common.functions.MapFunction;import org.apache.flink.streaming.api.datastream.DataStreamSource;import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import org.apache.flink.table.api.DataTypes;import org.apache.flink.table.api.Table;import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;import org.apache.flink.table.descriptors.FileSystem;import org.apache.flink.table.descriptors.OldCsv;import org.apache.flink.table.descriptors.Schema;public class FlinkSqlOutputFile { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); DataStreamSourcestream = env.readTextFile("/Users/liuhaijing/Desktop/flinktestword/aaa.txt");// DataStreamSource stream = env.socketTextStream("localhost", 8888); SingleOutputStreamOperator map = stream.map(new MapFunction () { public SensorReading map(String s) throws Exception { String[] split = s.split(","); return new SensorReading(split[0], split[1], split[2]); } }); StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);// 使用 table api Table table = tableEnv.fromDataStream(map);// table.printSchema(); Table select = table.select("a,b");// select.printSchema();// 使用 sql api// tableEnv.createTemporaryView("test", map);// Table select = tableEnv.sqlQuery(" select a, b from test");// select.printSchema();// DataStream sensorReading2DataStream = tableEnv.toAppendStream(select, SensorReading2.class);// sensorReading2DataStream.map(new MapFunction () {// @Override// public Object map(SensorReading2 value) throws Exception {// System.out.println(value.a+" "+ value.b);// return null;// }// });// tableEnv.connect(new FileSystem().path("/Users/liuhaijing/IdeaProjects/haijing3/spark/flinksqldemo/output/out.txt"))// .withFormat(new Csv())// .withSchema(// new Schema()// .field("a", DataTypes.STRING())// .field("b", DataTypes.STRING()))// .inAppendMode()// .createTemporaryTable("outputTable");// select.insertInto("outputTable"); tableEnv.connect(new FileSystem().path("/Users/liuhaijing/IdeaProjects/haijing3/spark/flinksqldemo/output/out.txt")) .withFormat(new OldCsv()) .withSchema(new Schema() .field("a", DataTypes.STRING()) ).inAppendMode() .createTemporaryTable("outputTable"); select.insertInto("outputTable"); env.execute(); }}
到此,相信大家对"flinksql怎么将数据写入到文件中"有了更深的了解,不妨来实际操作一番吧!这里是网站,更多相关内容可以进入相关频道进行查询,关注我们,继续学习!
数据
文件
内容
学习
实用
更深
兴趣
实用性
实际
操作简单
方法
更多
朋友
网站
频道
查询
数据库的安全要保护哪些东西
数据库安全各自的含义是什么
生产安全数据库录入
数据库的安全性及管理
数据库安全策略包含哪些
海淀数据库安全审计系统
建立农村房屋安全信息数据库
易用的数据库客户端支持安全管理
连接数据库失败ssl安全错误
数据库的锁怎样保障安全
网络安全黑
方舟服务器更新时间
qq邮件服务器有备份吗
站级服务器数据库创建数据源
怎样用数据库做学生信息表
超微服务器厂家
深圳服务器机柜厂家有哪些
GDC服务器阵列架怎么修复
畅行网络技术服务有限公司
网络安全研究生专业学校排名
杭州岚森网络技术有限公司
根域名服务器管理机构
应用访问数据库的实现过程
我的租房网数据库表数据
体积小的服务器
精诚互赢软件开发公司
创造中心服务器
违法网络安全法第十二条
河南铆钉枪软件开发
gp数据库 查询所有表
邯郸工业软件开发哪家可靠
网络安全教育手抄
用什么软件开发需求
矢量数据库收费依据
真正抗投诉外贸服务器
陈述什么是网络安全
广播台网络技术部的面试内容
中国电信网络安全题库
无锡软件开发活动
软件开发公司怎么接到包