千家信息网

flinksql怎么将数据写入到文件中

发表于:2025-01-23 作者:千家信息网编辑
千家信息网最后更新 2025年01月23日,本篇内容主要讲解"flinksql怎么将数据写入到文件中",感兴趣的朋友不妨来看看。本文介绍的方法操作简单快捷,实用性强。下面就让小编来带大家学习"flinksql怎么将数据写入到文件中"吧!pack
千家信息网最后更新 2025年01月23日flinksql怎么将数据写入到文件中

本篇内容主要讲解"flinksql怎么将数据写入到文件中",感兴趣的朋友不妨来看看。本文介绍的方法操作简单快捷,实用性强。下面就让小编来带大家学习"flinksql怎么将数据写入到文件中"吧!

package com.jd.dataoutput;import com.jd.data.SensorReading;import org.apache.flink.api.common.functions.MapFunction;import org.apache.flink.streaming.api.datastream.DataStreamSource;import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import org.apache.flink.table.api.DataTypes;import org.apache.flink.table.api.Table;import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;import org.apache.flink.table.descriptors.FileSystem;import org.apache.flink.table.descriptors.OldCsv;import org.apache.flink.table.descriptors.Schema;public class FlinkSqlOutputFile {    public static void main(String[] args) throws Exception {        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();        env.setParallelism(1);        DataStreamSource stream = env.readTextFile("/Users/liuhaijing/Desktop/flinktestword/aaa.txt");//        DataStreamSource stream = env.socketTextStream("localhost", 8888);        SingleOutputStreamOperator map = stream.map(new MapFunction() {            public SensorReading map(String s) throws Exception {                String[] split = s.split(",");                return new SensorReading(split[0], split[1], split[2]);            }        });        StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);//        使用 table api        Table table = tableEnv.fromDataStream(map);//        table.printSchema();        Table select = table.select("a,b");//        select.printSchema();//        使用 sql api//        tableEnv.createTemporaryView("test", map);//        Table select = tableEnv.sqlQuery(" select a, b from test");//        select.printSchema();//        DataStream sensorReading2DataStream = tableEnv.toAppendStream(select, SensorReading2.class);//        sensorReading2DataStream.map(new MapFunction() {//            @Override//            public Object map(SensorReading2 value) throws Exception {//                System.out.println(value.a+"   "+ value.b);//                return null;//            }//        });//        tableEnv.connect(new FileSystem().path("/Users/liuhaijing/IdeaProjects/haijing3/spark/flinksqldemo/output/out.txt"))//                .withFormat(new Csv())//                .withSchema(//                        new Schema()//                                .field("a", DataTypes.STRING())//                                .field("b", DataTypes.STRING()))//                .inAppendMode()//                .createTemporaryTable("outputTable");//        select.insertInto("outputTable");        tableEnv.connect(new FileSystem().path("/Users/liuhaijing/IdeaProjects/haijing3/spark/flinksqldemo/output/out.txt"))                .withFormat(new OldCsv())                .withSchema(new Schema()                                .field("a", DataTypes.STRING())                ).inAppendMode()                .createTemporaryTable("outputTable");        select.insertInto("outputTable");        env.execute();    }}

到此,相信大家对"flinksql怎么将数据写入到文件中"有了更深的了解,不妨来实际操作一番吧!这里是网站,更多相关内容可以进入相关频道进行查询,关注我们,继续学习!

0