flink 多表join的例子
发表于:2024-12-02 作者:千家信息网编辑
千家信息网最后更新 2024年12月02日,今天写了一个稍微复杂的例子, 实现了类似mysql group_concat 功能,记录一下MapToString 参考bug 那篇博客public static void main(String[]
千家信息网最后更新 2024年12月02日flink 多表join的例子
今天写了一个稍微复杂的例子, 实现了类似mysql group_concat 功能,记录一下
MapToString 参考bug 那篇博客
public static void main(String[] arg) throws Exception { final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); BatchTableEnvironment tableEnv = new BatchTableEnvironment(env, TableConfig.DEFAULT()); tableEnv.registerFunction("mapToString", new MapToString()); getProjectInfo(env,tableEnv); getProject(env,tableEnv); joinTableProjectWithInfo(tableEnv); Table query = tableEnv.sqlQuery("select id, name, type from result_agg"); DataSet ds= tableEnv.toDataSet(query, Row.class); ds.print(); ds.writeAsText("/home/test", WriteMode.OVERWRITE); env.execute("multiple-table"); } public static void getProjectInfo(ExecutionEnvironment env,BatchTableEnvironment tableEnv) { TypeInformation[] fieldTypes = new TypeInformation[] { BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO }; String[] fieldNames = new String[] { "id", "type" }; RowTypeInfo rowTypeInfo = new RowTypeInfo(fieldTypes, fieldNames); JDBCInputFormat jdbcInputFormat = JDBCInputFormat.buildJDBCInputFormat().setDrivername("com.mysql.jdbc.Driver") .setDBUrl("jdbc:mysql://ip:3306/space?characterEncoding=utf8") .setUsername("user").setPassword("pwd") .setQuery("select project_fid, cast(project_info_type as CHAR) as type from project").setRowTypeInfo(rowTypeInfo).finish(); DataSource s = env.createInput(jdbcInputFormat); tableEnv.registerDataSet("project_info", s); aggProjectInfo(tableEnv,"project_info"); } public static void aggProjectInfo(BatchTableEnvironment tableEnv, String tableName) { Table tapiResult = tableEnv.scan(tableName); tapiResult.printSchema(); Table query = tableEnv.sqlQuery("select id, mapToString(collect(type)) as type from project_info group by id"); tableEnv.registerTable(tableName+"_agg", query); tapiResult = tableEnv.scan(tableName+"_agg"); tapiResult.printSchema(); } public static void getProject(ExecutionEnvironment env,BatchTableEnvironment tableEnv) { TypeInformation[] fieldTypes = new TypeInformation[] { BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO }; String[] fieldNames = new String[] { "pid", "name" }; RowTypeInfo rowTypeInfo = new RowTypeInfo(fieldTypes, fieldNames); JDBCInputFormat jdbcInputFormat = JDBCInputFormat.buildJDBCInputFormat().setDrivername("com.mysql.jdbc.Driver") .setDBUrl("jdbc:mysql://ip:3306/space?characterEncoding=utf8") .setUsername("user").setPassword("pwd") .setQuery("select fid, project_name from t_project").setRowTypeInfo(rowTypeInfo).finish(); DataSource s = env.createInput(jdbcInputFormat); tableEnv.registerDataSet("project", s); } public static void joinTableProjectWithInfo(BatchTableEnvironment tableEnv) { Table result =tableEnv.sqlQuery("select a.pid as id , a.name , b.type from project a inner join project_info_agg b on a.pid=b.id"); tableEnv.registerTable("result_agg", result); result.printSchema(); }
例子
复杂
功能
博客
参考
数据库的安全要保护哪些东西
数据库安全各自的含义是什么
生产安全数据库录入
数据库的安全性及管理
数据库安全策略包含哪些
海淀数据库安全审计系统
建立农村房屋安全信息数据库
易用的数据库客户端支持安全管理
连接数据库失败ssl安全错误
数据库的锁怎样保障安全
网络安全知识宣传手册发布
安徽网络时间服务器批发
北大软微的金融大数据库
拜登网络技术
catia的软件开发商
云服务做中转服务器
历史上的今天数据库资料
软件开发学习电脑上如何操作
广州停车系统软件开发
服装设计绘图软件开发
淘悦app软件开发
b价出租16和32线程服务器
网联车网络安全
用命令连接sql数据库
计算机网络技术教授
电脑服务器的远程访问怎么开启
网络安全招人
无线网服务器故障
查看localhost服务器
无线网络技术体系
wow单机服务器搭建
dota2 服务器地址
数据库营销的战略意义
美团数据库中间件
网络安全设备采购 方案
网络安全公司销售工作内容
网络安全和人才培养
瑞丽高密度存储服务器多少钱
数据库设计建表的完整步骤
java数据库图片处理