Flink中TableAPI 、 SQL 与 Kafka消息插入是怎么实现的
发表于:2024-11-20 作者:千家信息网编辑
千家信息网最后更新 2024年11月20日,这期内容当中小编将会给大家带来有关Flink中TableAPI 、 SQL 与 Kafka消息插入是怎么实现的,文章内容丰富且以专业的角度为大家分析和叙述,阅读完这篇文章希望大家可以有所收获。使用Tb
千家信息网最后更新 2024年11月20日Flink中TableAPI 、 SQL 与 Kafka消息插入是怎么实现的
这期内容当中小编将会给大家带来有关Flink中TableAPI 、 SQL 与 Kafka消息插入是怎么实现的,文章内容丰富且以专业的角度为大家分析和叙述,阅读完这篇文章希望大家可以有所收获。
使用Tbale&SQL与Flink Kafka连接器将数据写入kafka的消息队列
示例环境
java.version: 1.8.xflink.version: 1.11.1kafka:2.11
示例数据源 (项目码云下载)
Flink 系例 之 搭建开发环境与数据
示例模块 (pom.xml)
Flink 系例 之 TableAPI & SQL 与 示例模块
InsertToKafka.java
package com.flink.examples.kafka;import org.apache.flink.streaming.api.TimeCharacteristic;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import org.apache.flink.table.api.EnvironmentSettings;import org.apache.flink.table.api.StatementSet;import org.apache.flink.table.api.TableResult;import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;/** * @Description 使用Tbale&SQL与Flink Elasticsearch连接器将数据写入kafka的消息队列 */public class InsertToKafka { /** 官方参考:https://ci.apache.org/projects/flink/flink-docs-release-1.12/zh/dev/table/connectors/kafka.html format:用于反序列化和序列化Kafka消息的格式。支持的格式包括'csv','json','avro','debezium-json'和'canal-json'。 */ static String table_sql = "CREATE TABLE KafkaTable (\n" + " `user_id` BIGINT,\n" + " `item_id` BIGINT,\n" + " `behavior` STRING,\n" + " `ts` TIMESTAMP(3)\n" + ") WITH (\n" + " 'connector' = 'kafka',\n" + " 'topic' = 'user_behavior',\n" + " 'properties.bootstrap.servers' = '192.168.110.35:9092',\n" + " 'properties.group.id' = 'testGroup',\n" + " 'scan.startup.mode' = 'earliest-offset',\n" + " 'format' = 'json'\n" + ")"; public static void main(String[] args) throws Exception { //构建StreamExecutionEnvironment StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); //默认流时间方式 env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime); //构建EnvironmentSettings 并指定Blink Planner EnvironmentSettings bsSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build(); //构建StreamTableEnvironment StreamTableEnvironment tEnv = StreamTableEnvironment.create(env, bsSettings); //注册kafka数据维表 tEnv.executeSql(table_sql); //时间格式处理,参考阿里文档 //https://www.alibabacloud.com/help/zh/faq-detail/64813.htm?spm=a2c63.q38357.a3.3.697c13523NZiIN String sql = "insert into KafkaTable (user_id,item_id,behavior,ts) values(1,1,'normal', TO_TIMESTAMP(FROM_UNIXTIME( " + System.currentTimeMillis() + " / 1000, 'yyyy-MM-dd HH:mm:ss')))"; // 第一种方式:直接执行sql// TableResult tableResult = tEnv.executeSql(sql); //第二种方式:声明一个操作集合来执行sql StatementSet stmtSet = tEnv.createStatementSet(); stmtSet.addInsertSql(sql); TableResult tableResult = stmtSet.execute(); tableResult.print(); }}
打印结果
+-------------------------------------------+| default_catalog.default_database.my_users |+-------------------------------------------+| -1 |+-------------------------------------------+1 row in set
上述就是小编为大家分享的Flink中TableAPI 、 SQL 与 Kafka消息插入是怎么实现的了,如果刚好有类似的疑惑,不妨参照上述分析进行理解。如果想知道更多相关知识,欢迎关注行业资讯频道。
消息
数据
示例
方式
格式
内容
序列
时间
模块
环境
连接器
队列
分析
参考
专业
中小
内容丰富
官方
就是
数据源
数据库的安全要保护哪些东西
数据库安全各自的含义是什么
生产安全数据库录入
数据库的安全性及管理
数据库安全策略包含哪些
海淀数据库安全审计系统
建立农村房屋安全信息数据库
易用的数据库客户端支持安全管理
连接数据库失败ssl安全错误
数据库的锁怎样保障安全
oracle数据库表优化碎片
怎么销售网络安全设备
华为服务器外包装不干胶模板
更新数据库表sql语句
江苏通用软件开发定做价格
网络安全检查资质
服务器机房为什么开空调
成都软件开发人月单价标准
魔兽世界最好的服务器
软件开发工程师多大年龄
浙江通讯软件开发服务有哪些
书店数据库管理系统毕业设计
地税系统网络安全知识
计算机网络安全的目的是什么
树莓派服务器管理员密码
数据库asp怎么运行环境
武隆区工商软件开发流程参考价
商城类软件开发好处
央视网络安全比赛叫什么名
把网络安全摆在网络建设的首位
维斯塔斯网络安全事件
迅游科技跟网络安全有关系吗
863软件开发公司
服务器自动化工作
成都企业软件开发价位
网络安全保障培训
提词器大师怎么总是服务器繁忙
网络安全技术学习薪水
数据库表权限有哪些
滨州市网络安全委员