flink 多表join的例子
发表于:2025-01-31 作者:千家信息网编辑
千家信息网最后更新 2025年01月31日,今天写了一个稍微复杂的例子, 实现了类似mysql group_concat 功能,记录一下MapToString 参考bug 那篇博客public static void main(String[]
千家信息网最后更新 2025年01月31日flink 多表join的例子
今天写了一个稍微复杂的例子, 实现了类似mysql group_concat 功能,记录一下
MapToString 参考bug 那篇博客
public static void main(String[] arg) throws Exception { final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); BatchTableEnvironment tableEnv = new BatchTableEnvironment(env, TableConfig.DEFAULT()); tableEnv.registerFunction("mapToString", new MapToString()); getProjectInfo(env,tableEnv); getProject(env,tableEnv); joinTableProjectWithInfo(tableEnv); Table query = tableEnv.sqlQuery("select id, name, type from result_agg"); DataSet ds= tableEnv.toDataSet(query, Row.class); ds.print(); ds.writeAsText("/home/test", WriteMode.OVERWRITE); env.execute("multiple-table"); } public static void getProjectInfo(ExecutionEnvironment env,BatchTableEnvironment tableEnv) { TypeInformation[] fieldTypes = new TypeInformation[] { BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO }; String[] fieldNames = new String[] { "id", "type" }; RowTypeInfo rowTypeInfo = new RowTypeInfo(fieldTypes, fieldNames); JDBCInputFormat jdbcInputFormat = JDBCInputFormat.buildJDBCInputFormat().setDrivername("com.mysql.jdbc.Driver") .setDBUrl("jdbc:mysql://ip:3306/space?characterEncoding=utf8") .setUsername("user").setPassword("pwd") .setQuery("select project_fid, cast(project_info_type as CHAR) as type from project").setRowTypeInfo(rowTypeInfo).finish(); DataSource s = env.createInput(jdbcInputFormat); tableEnv.registerDataSet("project_info", s); aggProjectInfo(tableEnv,"project_info"); } public static void aggProjectInfo(BatchTableEnvironment tableEnv, String tableName) { Table tapiResult = tableEnv.scan(tableName); tapiResult.printSchema(); Table query = tableEnv.sqlQuery("select id, mapToString(collect(type)) as type from project_info group by id"); tableEnv.registerTable(tableName+"_agg", query); tapiResult = tableEnv.scan(tableName+"_agg"); tapiResult.printSchema(); } public static void getProject(ExecutionEnvironment env,BatchTableEnvironment tableEnv) { TypeInformation[] fieldTypes = new TypeInformation[] { BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO }; String[] fieldNames = new String[] { "pid", "name" }; RowTypeInfo rowTypeInfo = new RowTypeInfo(fieldTypes, fieldNames); JDBCInputFormat jdbcInputFormat = JDBCInputFormat.buildJDBCInputFormat().setDrivername("com.mysql.jdbc.Driver") .setDBUrl("jdbc:mysql://ip:3306/space?characterEncoding=utf8") .setUsername("user").setPassword("pwd") .setQuery("select fid, project_name from t_project").setRowTypeInfo(rowTypeInfo).finish(); DataSource s = env.createInput(jdbcInputFormat); tableEnv.registerDataSet("project", s); } public static void joinTableProjectWithInfo(BatchTableEnvironment tableEnv) { Table result =tableEnv.sqlQuery("select a.pid as id , a.name , b.type from project a inner join project_info_agg b on a.pid=b.id"); tableEnv.registerTable("result_agg", result); result.printSchema(); }
例子
复杂
功能
博客
参考
数据库的安全要保护哪些东西
数据库安全各自的含义是什么
生产安全数据库录入
数据库的安全性及管理
数据库安全策略包含哪些
海淀数据库安全审计系统
建立农村房屋安全信息数据库
易用的数据库客户端支持安全管理
连接数据库失败ssl安全错误
数据库的锁怎样保障安全
湖南郴州安卓软件开发招生
数据库领域核心技术
剑灵重逢无线服务器在哪里
小区团购软件开发
软件开发支援系统程序
手机方舟管理服务器
电脑登录服务器超时怎么办
h5贷款超市软件开发
事务处理技术数据库
即墨区电商软件开发哪家好
以下不是数据库攻击技术的是
网页添加导出表格数据库
服务器传送文件
华为手机怎样设置网络安全
安卓查看数据库app
广州工业软件开发报价
网络安全的四个强化
木瓜互联网科技布沙发品牌
局开展网络安全培训
什么是网络安全中的社会工程学
佳都科技互联网发展建议
关于网络技术手段培训的心得
美工软件开发
软件开发需要学什么app
hp服务器管理口默认密码
郴州学计算机软件开发学费
sql数据库名称修改工具
网络安全行业谈加薪
若本地域名服务器不缓存
股市服务器有问题了吗