FLINK 1.12 upsertSql怎么使用
发表于:2025-01-24 作者:千家信息网编辑
千家信息网最后更新 2025年01月24日,这篇文章主要讲解了"FLINK 1.12 upsertSql怎么使用",文中的讲解内容简单清晰,易于学习与理解,下面请大家跟着小编的思路慢慢深入,一起来研究和学习"FLINK 1.12 upsertS
千家信息网最后更新 2025年01月24日FLINK 1.12 upsertSql怎么使用
这篇文章主要讲解了"FLINK 1.12 upsertSql怎么使用",文中的讲解内容简单清晰,易于学习与理解,下面请大家跟着小编的思路慢慢深入,一起来研究和学习"FLINK 1.12 upsertSql怎么使用"吧!
package com.konka.dsp;import org.apache.flink.api.common.JobExecutionResult;import org.apache.flink.api.common.restartstrategy.RestartStrategies;import org.apache.flink.api.java.DataSet;import org.apache.flink.api.java.ExecutionEnvironment;import org.apache.flink.api.java.tuple.Tuple2;import org.apache.flink.connector.jdbc.dialect.MySQLDialect;import org.apache.flink.connector.jdbc.internal.options.JdbcOptions;import org.apache.flink.connector.jdbc.table.JdbcUpsertTableSink;import org.apache.flink.formats.json.JsonFormatFactory;import org.apache.flink.formats.json.canal.CanalJsonFormatFactory;import org.apache.flink.shaded.curator4.org.apache.curator.framework.schema.Schema;import org.apache.flink.streaming.api.CheckpointingMode;import org.apache.flink.streaming.api.TimeCharacteristic;import org.apache.flink.streaming.api.datastream.DataStream;import org.apache.flink.streaming.api.environment.ExecutionCheckpointingOptions;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import org.apache.flink.table.api.*;import org.apache.flink.table.api.bridge.java.BatchTableEnvironment;import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;import org.apache.flink.table.expressions.TimeIntervalUnit;import org.apache.flink.table.types.DataType;import org.apache.flink.types.Row;import org.apache.flink.util.CloseableIterator;import org.slf4j.Logger;import org.slf4j.LoggerFactory;import java.time.Duration;import java.util.concurrent.ExecutionException;import static org.apache.flink.table.api.Expressions.*;public class SalesOrderStream { private static Logger log = LoggerFactory.getLogger(SalesOrderStream.class.getName()); public static Table report(Table transactions) { return transactions.select( $("customer_name"), $("created_date"), $("total_amount")) .groupBy($("customer_name"),$("created_date")) .select( $("customer_name"), $("total_amount").sum().as("total_amount"), $("created_date") ); } public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment .getExecutionEnvironment();// env.setParallelism(4);// env.getConfig().setRestartStrategy(RestartStrategies.noRestart()); StreamTableEnvironment tEnv = StreamTableEnvironment.create(env); // set default parallelism to 4// tEnv.executeSql("CREATE TABLE sales_order_header_stream (\n" +//// " id BIGINT not null,\n" +// " customer_name STRING,\n"+//// " dsp_org_name STRING,\n"+// " total_amount DECIMAL(38,2),\n" +//// " total_discount DECIMAL(16,2),\n" +//// " pay_amount DECIMAL(16,2),\n" +//// " total_amount DECIMAL(16,2),\n" +// " created_date TIMESTAMP(3)\n" +// ") WITH (\n" +// " 'connector' = 'mysql-cdc',\n" +// " 'hostname' = '192.168.8.73',\n" +// " 'port' = '4000',\n"+// " 'username' = 'flink',\n"+// " 'password' = 'flink',\n"+// " 'database-name' = 'dspdev',\n"+// " 'table-name' = 'sales_order_header'\n"+// ")"); //pay_type,over_sell tEnv.executeSql("CREATE TABLE sales_order_header_stream (\n" + " `id` BIGINT,\n"+ " `total_amount` DECIMAL(16,2) ,\n"+ " `customer_name` STRING,\n"+ " `order_no` STRING,\n"+ " `doc_type` STRING,\n"+ " `sales_org` STRING,\n"+ " `distr_chan` STRING,\n"+ " `division` STRING,\n"+ " `sales_grp` STRING,\n"+ " `sales_off` STRING,\n"+ " `purch_no_c` STRING,\n"+ " `purch_date` STRING,\n"+ " `sold_to` STRING,\n"+ " `ship_to` STRING,\n"+ " `r3_sales_order` STRING,\n"+ " `created_by_employee_name` STRING,\n"+ " `created_by_dept_name` STRING,\n"+ " `created_by_dept_name` STRING,\n"+ " `is_enable` BIGINT,\n"+ " `is_delete` BIGINT,\n"+ " `sale_order_status` STRING,\n"+ " `created_by_parent_dept_name` STRING,\n"+ " `total_discount` DECIMAL(16,2),\n"+ " `customer_sapcode` STRING,\n"+ " `sold_to_name` STRING,\n"+ " `ship_to_name` STRING,\n"+ " `total_discount_amount` DECIMAL(16,2),\n"+ " `other_discount` DECIMAL(16,2),\n"+ " `other_amount` DECIMAL(16,2),\n"+ " `pay_amount` DECIMAL(16,2),\n"+ " `dsp_org_name` STRING,\n"+ " `delivery_address` STRING,\n"+ " `delivery_person` STRING,\n"+ " `delivery_phone` STRING,\n"+ " `pay_type` STRING,\n"+ " `over_sell` STRING,\n"+ " `created_date` TIMESTAMP(3),\n"+ " PRIMARY KEY (`id`) NOT ENFORCED "+ ") WITH (\n" + "'connector' = 'kafka',\n"+ "'topic' = 'canal-data',\n"+ "'properties.bootstrap.servers' = '192.168.8.71:9092',\n"+ "'properties.group.id' = 'test',\n"+ "'format' = 'canal-json'\n"+ ")");// tEnv.executeSql("CREATE TABLE total_day_report (\n" +// " customer_name STRING,\n" +//// " total_amount DECIMAL(16,2),\n" +//// " total_discount DECIMAL(16,2),\n" +//// " pay_amount DECIMAL(16,2),\n" +// " total_amount DECIMAL(16,2),\n" +// " created_date STRING,\n" +// " PRIMARY KEY (created_date) NOT ENFORCED" +// ") WITH (\n" +// " 'connector' = 'upsert-kafka',\n" +// " 'topic' = 'customer_amount',\n" +// " 'properties.bootstrap.servers' = '192.168.8.71:9092',\n"+// " 'key.format' = 'json',\n"+// " 'value.format' = 'json',\n"+// " 'value.fields-include' = 'ALL'\n"+// ")"); tEnv.executeSql("CREATE TABLE upsertSink (\n" + " customer_name STRING,\n" +// " total_amount DECIMAL(16,2),\n" +// " total_discount DECIMAL(16,2),\n" +// " pay_amount DECIMAL(16,2),\n" + " total_amount DECIMAL(16,2),\n" + " created_date STRING,\n" + " PRIMARY KEY (customer_name,created_date) NOT ENFORCED" + ") WITH (\n" + " 'connector' = 'tidb',\n" + " 'tidb.database.url' = 'jdbc:mysql://192.168.8.73:4000/dspdev',\n" + " 'tidb.username' = 'flink',\n"+ " 'tidb.password' = 'flink',\n"+ " 'tidb.database.name' = 'dspdev',\n"+ " 'tidb.table.name' = 'spend_report'\n"+// " 'connector.type'='jdbc'," +// " 'connector.url'='jdbc:mysql://192.168.8.73:4000/dspdev',\n" +// " 'connector.username' = 'flink',\n"+// " 'connector.password' = 'flink',\n"+// " 'connector.table' = 'spend_report'" + ")");// TableSchema tableSche = TableSchema.builder()// .field("customer_name",DataTypes.STRING().notNull())// .field("total_amount",DataTypes.DECIMAL(16,2))// .field("created_date",DataTypes.STRING().notNull()).build();//// JdbcOptions jdbcOptions = JdbcOptions.builder()// .setDBUrl("jdbc:mysql://192.168.8.73:4000/dspdev")// .setTableName("spend_report")// .setUsername("flink")// .setPassword("flink")// .setDialect(new MySQLDialect())// .setDriverName("com.mysql.jdbc.Driver")// .build();// JdbcUpsertTableSink jdbcUpsertTableSink = JdbcUpsertTableSink.builder()// .setTableSchema(tableSche)// .setOptions(jdbcOptions)// .build();// jdbcUpsertTableSink.setKeyFields(new String[]{"id"}); /** * SINK End */// tEnv.re("spend_report",jdbcUpsertTableSink); Table transactions = tEnv.from("sales_order_header_stream");// tEnv.executeSql("delete from total_day_report"); tEnv.executeSql("insert into upsertSink select dsp_org_name as customer_name,cast(sum(t.pay_amount) as decimal(16,2)) as amount,DATE_FORMAT(t.created_date,'yyyy-MM-dd') as created_date from sales_order_header_stream t group by DATE_FORMAT(t.created_date,'yyyy-MM-dd'),dsp_org_name").print();// tEnv.executeSql("insert into spend_report select * from total_day_report");// Table transactions = tEnv.from("total_day_report");// report(transactions).executeInsert("spend_report"); tEnv.execute("-----------"); }}
最后数据库结果如下:
每次都是更新替换,这样的话省去很多麻烦,不用转datastream在处理了,而且1.12支持upsert-kafka,最后数据叠加如下:
upsert-kafka上面已经体现
感谢各位的阅读,以上就是"FLINK 1.12 upsertSql怎么使用"的内容了,经过本文的学习后,相信大家对FLINK 1.12 upsertSql怎么使用这一问题有了更深刻的体会,具体使用情况还需要大家实践验证。这里是,小编将为大家推送更多相关知识点的文章,欢迎关注!
学习
内容
数据
这样的话
不用
就是
思路
情况
数据库
文章
更多
知识
知识点
篇文章
结果
跟着
问题
麻烦
叠加
处理
数据库的安全要保护哪些东西
数据库安全各自的含义是什么
生产安全数据库录入
数据库的安全性及管理
数据库安全策略包含哪些
海淀数据库安全审计系统
建立农村房屋安全信息数据库
易用的数据库客户端支持安全管理
连接数据库失败ssl安全错误
数据库的锁怎样保障安全
广州废品回收软件开发公司
我国企业网络安全现状
网站服务器如何修改内容
泰拉瑞亚哪里看服务器ip
公安抓好网络安全责任制落实
网络安全几个特征
无法ping服务器
鹰潭网络安全
数据库修改单模板
r720服务器怎么看raid卡
买菜软件开发成本
罗布乐斯如何自己创造一个服务器
小米软件开发考试
h3c服务器扩展卡
路由器网络技术论文
ed2k安全服务器
冠霖网络技术咨询
数据库语句查询并降序
家庭网络安全教育心得
软件开发的学生用什么笔记本
服务器辐射安全距离是多少米
软件开发 iso标准
网络安全与网络管理关系
服务器禁ping设置
清除缓存并重建数据库
我的世界跑酷服务器地址
网络技术盗刷案
h3c服务器扩展卡
南昌共享充电软件开发多少钱
服务器管理口控制台无法打开