Flink中TableAPI 、 SQL 与 Kafka消息插入是怎么实现的
发表于:2024-11-20 作者:千家信息网编辑
千家信息网最后更新 2024年11月20日,这期内容当中小编将会给大家带来有关Flink中TableAPI 、 SQL 与 Kafka消息插入是怎么实现的,文章内容丰富且以专业的角度为大家分析和叙述,阅读完这篇文章希望大家可以有所收获。使用Tb
千家信息网最后更新 2024年11月20日Flink中TableAPI 、 SQL 与 Kafka消息插入是怎么实现的
这期内容当中小编将会给大家带来有关Flink中TableAPI 、 SQL 与 Kafka消息插入是怎么实现的,文章内容丰富且以专业的角度为大家分析和叙述,阅读完这篇文章希望大家可以有所收获。
使用Tbale&SQL与Flink Kafka连接器将数据写入kafka的消息队列
示例环境
java.version: 1.8.xflink.version: 1.11.1kafka:2.11
示例数据源 (项目码云下载)
Flink 系例 之 搭建开发环境与数据
示例模块 (pom.xml)
Flink 系例 之 TableAPI & SQL 与 示例模块
InsertToKafka.java
package com.flink.examples.kafka;import org.apache.flink.streaming.api.TimeCharacteristic;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import org.apache.flink.table.api.EnvironmentSettings;import org.apache.flink.table.api.StatementSet;import org.apache.flink.table.api.TableResult;import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;/** * @Description 使用Tbale&SQL与Flink Elasticsearch连接器将数据写入kafka的消息队列 */public class InsertToKafka { /** 官方参考:https://ci.apache.org/projects/flink/flink-docs-release-1.12/zh/dev/table/connectors/kafka.html format:用于反序列化和序列化Kafka消息的格式。支持的格式包括'csv','json','avro','debezium-json'和'canal-json'。 */ static String table_sql = "CREATE TABLE KafkaTable (\n" + " `user_id` BIGINT,\n" + " `item_id` BIGINT,\n" + " `behavior` STRING,\n" + " `ts` TIMESTAMP(3)\n" + ") WITH (\n" + " 'connector' = 'kafka',\n" + " 'topic' = 'user_behavior',\n" + " 'properties.bootstrap.servers' = '192.168.110.35:9092',\n" + " 'properties.group.id' = 'testGroup',\n" + " 'scan.startup.mode' = 'earliest-offset',\n" + " 'format' = 'json'\n" + ")"; public static void main(String[] args) throws Exception { //构建StreamExecutionEnvironment StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); //默认流时间方式 env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime); //构建EnvironmentSettings 并指定Blink Planner EnvironmentSettings bsSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build(); //构建StreamTableEnvironment StreamTableEnvironment tEnv = StreamTableEnvironment.create(env, bsSettings); //注册kafka数据维表 tEnv.executeSql(table_sql); //时间格式处理,参考阿里文档 //https://www.alibabacloud.com/help/zh/faq-detail/64813.htm?spm=a2c63.q38357.a3.3.697c13523NZiIN String sql = "insert into KafkaTable (user_id,item_id,behavior,ts) values(1,1,'normal', TO_TIMESTAMP(FROM_UNIXTIME( " + System.currentTimeMillis() + " / 1000, 'yyyy-MM-dd HH:mm:ss')))"; // 第一种方式:直接执行sql// TableResult tableResult = tEnv.executeSql(sql); //第二种方式:声明一个操作集合来执行sql StatementSet stmtSet = tEnv.createStatementSet(); stmtSet.addInsertSql(sql); TableResult tableResult = stmtSet.execute(); tableResult.print(); }}
打印结果
+-------------------------------------------+| default_catalog.default_database.my_users |+-------------------------------------------+| -1 |+-------------------------------------------+1 row in set
上述就是小编为大家分享的Flink中TableAPI 、 SQL 与 Kafka消息插入是怎么实现的了,如果刚好有类似的疑惑,不妨参照上述分析进行理解。如果想知道更多相关知识,欢迎关注行业资讯频道。
消息
数据
示例
方式
格式
内容
序列
时间
模块
环境
连接器
队列
分析
参考
专业
中小
内容丰富
官方
就是
数据源
数据库的安全要保护哪些东西
数据库安全各自的含义是什么
生产安全数据库录入
数据库的安全性及管理
数据库安全策略包含哪些
海淀数据库安全审计系统
建立农村房屋安全信息数据库
易用的数据库客户端支持安全管理
连接数据库失败ssl安全错误
数据库的锁怎样保障安全
深圳科技园有哪些互联网公司
电子白板软件开发技术
互联网人工智能医疗科技
数据库满了
jsp数据库查询获取数据
湘乡市人口数据库
岭钰网络技术洛阳
数据库窗口怎么添加
甲骨文数据库异地双活
有关网络安全的事故警示视频
上海网络安全招聘
手机app软件开发教材
万稷网络技术有限公司
软件开发项目进展汇报范文
数据库技术是现代信息科学
软件开发公司erp
我的世界 服务器711
成都芯吴芯网络技术
数据库安装完成但安装失败
联想 ts530 服务器
数据库设计总体规范
网络安全进机关活动
网络安全线上老师
和龙市天气预报软件开发
plc读取excel数据库记录
资深软件开发工程师工资一般多少
手机网络安全防范技术
金蝶服务器能看管理员密码吗
可以查找外文资料的数据库
网络技术专业是编程软件吗