flink 多表join的例子
发表于:2024-11-14 作者:千家信息网编辑
千家信息网最后更新 2024年11月14日,今天写了一个稍微复杂的例子, 实现了类似mysql group_concat 功能,记录一下MapToString 参考bug 那篇博客public static void main(String[]
千家信息网最后更新 2024年11月14日flink 多表join的例子
今天写了一个稍微复杂的例子, 实现了类似mysql group_concat 功能,记录一下
MapToString 参考bug 那篇博客
public static void main(String[] arg) throws Exception { final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); BatchTableEnvironment tableEnv = new BatchTableEnvironment(env, TableConfig.DEFAULT()); tableEnv.registerFunction("mapToString", new MapToString()); getProjectInfo(env,tableEnv); getProject(env,tableEnv); joinTableProjectWithInfo(tableEnv); Table query = tableEnv.sqlQuery("select id, name, type from result_agg"); DataSet ds= tableEnv.toDataSet(query, Row.class); ds.print(); ds.writeAsText("/home/test", WriteMode.OVERWRITE); env.execute("multiple-table"); } public static void getProjectInfo(ExecutionEnvironment env,BatchTableEnvironment tableEnv) { TypeInformation[] fieldTypes = new TypeInformation[] { BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO }; String[] fieldNames = new String[] { "id", "type" }; RowTypeInfo rowTypeInfo = new RowTypeInfo(fieldTypes, fieldNames); JDBCInputFormat jdbcInputFormat = JDBCInputFormat.buildJDBCInputFormat().setDrivername("com.mysql.jdbc.Driver") .setDBUrl("jdbc:mysql://ip:3306/space?characterEncoding=utf8") .setUsername("user").setPassword("pwd") .setQuery("select project_fid, cast(project_info_type as CHAR) as type from project").setRowTypeInfo(rowTypeInfo).finish(); DataSource s = env.createInput(jdbcInputFormat); tableEnv.registerDataSet("project_info", s); aggProjectInfo(tableEnv,"project_info"); } public static void aggProjectInfo(BatchTableEnvironment tableEnv, String tableName) { Table tapiResult = tableEnv.scan(tableName); tapiResult.printSchema(); Table query = tableEnv.sqlQuery("select id, mapToString(collect(type)) as type from project_info group by id"); tableEnv.registerTable(tableName+"_agg", query); tapiResult = tableEnv.scan(tableName+"_agg"); tapiResult.printSchema(); } public static void getProject(ExecutionEnvironment env,BatchTableEnvironment tableEnv) { TypeInformation[] fieldTypes = new TypeInformation[] { BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO }; String[] fieldNames = new String[] { "pid", "name" }; RowTypeInfo rowTypeInfo = new RowTypeInfo(fieldTypes, fieldNames); JDBCInputFormat jdbcInputFormat = JDBCInputFormat.buildJDBCInputFormat().setDrivername("com.mysql.jdbc.Driver") .setDBUrl("jdbc:mysql://ip:3306/space?characterEncoding=utf8") .setUsername("user").setPassword("pwd") .setQuery("select fid, project_name from t_project").setRowTypeInfo(rowTypeInfo).finish(); DataSource s = env.createInput(jdbcInputFormat); tableEnv.registerDataSet("project", s); } public static void joinTableProjectWithInfo(BatchTableEnvironment tableEnv) { Table result =tableEnv.sqlQuery("select a.pid as id , a.name , b.type from project a inner join project_info_agg b on a.pid=b.id"); tableEnv.registerTable("result_agg", result); result.printSchema(); }
例子
复杂
功能
博客
参考
数据库的安全要保护哪些东西
数据库安全各自的含义是什么
生产安全数据库录入
数据库的安全性及管理
数据库安全策略包含哪些
海淀数据库安全审计系统
建立农村房屋安全信息数据库
易用的数据库客户端支持安全管理
连接数据库失败ssl安全错误
数据库的锁怎样保障安全
新华三软件开发怎么样
工业控制网络技术应用
软件开发是否需要每天开晨会
服务器安全是指什么
管家婆生产管理软件开发商
乡镇网络安全知识讲座报道
维护铁路网络安全
数据库的内部存储权限
网络安全行业面试题
威联通有服务器吗
电脑如何架设为服务器
网络安全法安全扩展
网络安全自查工作总结报告范文
服务器禁用rd授权管理器
中药专利数据库
验证码识别数据库
软件开发工时安排
三级数据库技术 未来
腾讯文档暂无法连接服务器
网络安全手抄报五年级视频
物理设备保障企业网络安全
网上自己查重会进入数据库吗
九级的服务器
我的世界服务器初始内存越来越大
潮州通信软件开发报价行情
简单网站服务器多少钱
河南高云网络技术公司
网络安全领域面对的挑战
中小学生网络安全问卷
公务员网络技术