1、如何用flink的table和sql构建pom文件
发表于:2025-02-05 作者:千家信息网编辑
千家信息网最后更新 2025年02月05日,这篇文章主要讲解了"1、如何用flink的table和sql构建pom文件",文中的讲解内容简单清晰,易于学习与理解,下面请大家跟着小编的思路慢慢深入,一起来研究和学习"1、如何用flink的tabl
千家信息网最后更新 2025年02月05日1、如何用flink的table和sql构建pom文件
这篇文章主要讲解了"1、如何用flink的table和sql构建pom文件",文中的讲解内容简单清晰,易于学习与理解,下面请大家跟着小编的思路慢慢深入,一起来研究和学习"1、如何用flink的table和sql构建pom文件"吧!
构建pom文件
4.0.0 org.example flinksqldemo 1.0-SNAPSHOT UTF-8 UTF-8 2.11 2.11.8 0.10.2.1 1.12.0 2.7.3 compile org.apache.maven.plugins maven-compiler-plugin 8 org.apache.flink flink-table-planner-blink_2.11 1.12.0 org.apache.flink flink-java ${flink.version} ${setting.scope} org.apache.flink flink-streaming-java_2.11 ${flink.version} ${setting.scope} org.apache.flink flink-clients_2.11 ${flink.version} ${setting.scope} org.apache.flink flink-connector-kafka-0.10_${scala.binary.version} ${flink.version} org.apache.flink flink-streaming-scala_${scala.binary.version} ${flink.version} ${setting.scope} org.apache.flink flink-connector-filesystem_${scala.binary.version} ${flink.version} org.apache.kafka kafka_${scala.binary.version} ${kafka.version} ${setting.scope} org.apache.hadoop hadoop-common ${hadoop.version} ${setting.scope} org.apache.hadoop hadoop-hdfs ${hadoop.version} ${setting.scope} org.apache.hadoop hadoop-client ${hadoop.version} ${setting.scope} org.slf4j slf4j-api 1.7.25 com.alibaba fastjson 1.2.72 redis.clients jedis 2.7.3 com.google.guava guava 29.0-jre
2、编写代码
package com.jd.data;import org.apache.flink.api.common.functions.MapFunction;import org.apache.flink.streaming.api.datastream.DataStream;import org.apache.flink.streaming.api.datastream.DataStreamSource;import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import org.apache.flink.table.api.Table;import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;public class test { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); DataStreamSourcestream = env.readTextFile("/Users/liuhaijing/Desktop/flinktestword/aaa.txt");// DataStreamSource stream = env.socketTextStream("localhost", 8888); SingleOutputStreamOperator map = stream.map(new MapFunction () { public SensorReading map(String s) throws Exception { String[] split = s.split(","); return new SensorReading(split[0], split[1], split[2]); } }); StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);// 使用 table api// Table table = tableEnv.fromDataStream(map);// table.printSchema();// Table select = table.select("a,b");// 使用 sql api tableEnv.createTemporaryView("test", map); Table select = tableEnv.sqlQuery(" select a, b from test"); DataStream sensorReading2DataStream = tableEnv.toAppendStream(select, SensorReading2.class); sensorReading2DataStream.map(new MapFunction () { @Override public Object map(SensorReading2 value) throws Exception { System.out.println(value.a+" "+ value.b); return null; } }); env.execute(); }}
package com.jd.data;public class SensorReading { public String a; public String b; public String c; public SensorReading(){ } public SensorReading(String a, String b, String c) { this.a = a; this.b = b; this.c = c; } public String getA() { return a; } public void setA(String a) { this.a = a; } public String getB() { return b; } public void setB(String b) { this.b = b; } public String getC() { return c; } public void setC(String c) { this.c = c; }}
package com.jd.data;public class SensorReading2 { public String a; public String b; public SensorReading2(){ } public SensorReading2(String a, String b) { this.a = a; this.b = b; } public String getA() { return a; } public void setA(String a) { this.a = a; } public String getB() { return b; } public void setB(String b) { this.b = b; }}
注意:pojo 中属性必须是public的, 包含无参构造器
感谢各位的阅读,以上就是"1、如何用flink的table和sql构建pom文件"的内容了,经过本文的学习后,相信大家对1、如何用flink的table和sql构建pom文件这一问题有了更深刻的体会,具体使用情况还需要大家实践验证。这里是,小编将为大家推送更多相关知识点的文章,欢迎关注!
文件
学习
内容
UTF-8
代码
就是
属性
思路
情况
文章
更多
知识
知识点
篇文章
跟着
问题
构造器
实践
推送
研究
数据库的安全要保护哪些东西
数据库安全各自的含义是什么
生产安全数据库录入
数据库的安全性及管理
数据库安全策略包含哪些
海淀数据库安全审计系统
建立农村房屋安全信息数据库
易用的数据库客户端支持安全管理
连接数据库失败ssl安全错误
数据库的锁怎样保障安全
秦皇岛软件开发应用范围
dns服务器被劫持
数据库名称是自己设置的吗
数据库 密码 保存
网络安全和网络测试
网络技术和手段滞后
psv无法连接到服务器
有关英文的网络安全
360集团网络安全协议书
贵州销售软件开发费用
网络安全三级认证注意事项
网络安全专业人员继续教育
抢票软件开发
33岁软件开发
中欧基金公司软件开发
python服务器运维
流萤服务器
游戏服务器中的角色
计算机网络技术的学习方向
桌面版数据库
网络安全等级保护测评工作内容
如何注册域名服务器
vb数据库连接用户登录
服务器核心跑不满
360集团网络安全协议书
网络安全的隐患
坚果2连接不到服务器
数据库查询名字带下划线
移动网络技术要求 验收标准
工业及网络技术的优点