千家信息网

1、如何用flink的table和sql​构建pom文件

发表于:2025-02-05 作者:千家信息网编辑
千家信息网最后更新 2025年02月05日,这篇文章主要讲解了"1、如何用flink的table和sql构建pom文件",文中的讲解内容简单清晰,易于学习与理解,下面请大家跟着小编的思路慢慢深入,一起来研究和学习"1、如何用flink的tabl
千家信息网最后更新 2025年02月05日1、如何用flink的table和sql​构建pom文件

这篇文章主要讲解了"1、如何用flink的table和sql构建pom文件",文中的讲解内容简单清晰,易于学习与理解,下面请大家跟着小编的思路慢慢深入,一起来研究和学习"1、如何用flink的table和sql构建pom文件"吧!

构建pom文件

    4.0.0    org.example    flinksqldemo    1.0-SNAPSHOT                    UTF-8        UTF-8        2.11        2.11.8        0.10.2.1        1.12.0        2.7.3                compile                                            org.apache.maven.plugins                maven-compiler-plugin                                    8                    8                                                                        org.apache.flink            flink-table-planner-blink_2.11            1.12.0                            org.apache.flink            flink-java            ${flink.version}            ${setting.scope}                            org.apache.flink            flink-streaming-java_2.11            ${flink.version}            ${setting.scope}                            org.apache.flink            flink-clients_2.11            ${flink.version}            ${setting.scope}                            org.apache.flink            flink-connector-kafka-0.10_${scala.binary.version}            ${flink.version}                            org.apache.flink            flink-streaming-scala_${scala.binary.version}            ${flink.version}            ${setting.scope}                            org.apache.flink            flink-connector-filesystem_${scala.binary.version}            ${flink.version}                                                    org.apache.kafka            kafka_${scala.binary.version}            ${kafka.version}            ${setting.scope}                                            org.apache.hadoop            hadoop-common            ${hadoop.version}            ${setting.scope}                            org.apache.hadoop            hadoop-hdfs            ${hadoop.version}            ${setting.scope}                            org.apache.hadoop            hadoop-client            ${hadoop.version}            ${setting.scope}                                    org.slf4j            slf4j-api            1.7.25                            com.alibaba            fastjson            1.2.72                            redis.clients            jedis            2.7.3                            com.google.guava            guava            29.0-jre            

2、编写代码

package com.jd.data;import org.apache.flink.api.common.functions.MapFunction;import org.apache.flink.streaming.api.datastream.DataStream;import org.apache.flink.streaming.api.datastream.DataStreamSource;import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import org.apache.flink.table.api.Table;import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;public class test {    public static void main(String[] args) throws Exception {        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();        env.setParallelism(1);        DataStreamSource stream = env.readTextFile("/Users/liuhaijing/Desktop/flinktestword/aaa.txt");//        DataStreamSource stream = env.socketTextStream("localhost", 8888);        SingleOutputStreamOperator map = stream.map(new MapFunction() {            public SensorReading map(String s) throws Exception {                String[] split = s.split(",");                return new SensorReading(split[0], split[1], split[2]);            }        });        StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);//        使用 table api//        Table table = tableEnv.fromDataStream(map);//        table.printSchema();//        Table select = table.select("a,b");//        使用 sql api        tableEnv.createTemporaryView("test", map);        Table select = tableEnv.sqlQuery(" select a, b from test");        DataStream sensorReading2DataStream = tableEnv.toAppendStream(select, SensorReading2.class);        sensorReading2DataStream.map(new MapFunction() {            @Override            public Object map(SensorReading2 value) throws Exception {                System.out.println(value.a+"   "+ value.b);                return null;            }        });        env.execute();    }}
package com.jd.data;public class SensorReading {    public String a;    public String b;    public String c;    public SensorReading(){    }    public SensorReading(String a, String b, String c) {        this.a = a;        this.b = b;        this.c = c;    }    public String getA() {        return a;    }    public void setA(String a) {        this.a = a;    }    public String getB() {        return b;    }    public void setB(String b) {        this.b = b;    }    public String getC() {        return c;    }    public void setC(String c) {        this.c = c;    }}
package com.jd.data;public class SensorReading2 {    public String a;    public String b;    public SensorReading2(){    }    public SensorReading2(String a, String b) {        this.a = a;        this.b = b;    }    public String getA() {        return a;    }    public void setA(String a) {        this.a = a;    }    public String getB() {        return b;    }    public void setB(String b) {        this.b = b;    }}

注意:pojo 中属性必须是public的, 包含无参构造器

感谢各位的阅读,以上就是"1、如何用flink的table和sql构建pom文件"的内容了,经过本文的学习后,相信大家对1、如何用flink的table和sql构建pom文件这一问题有了更深刻的体会,具体使用情况还需要大家实践验证。这里是,小编将为大家推送更多相关知识点的文章,欢迎关注!

0