千家信息网

Flink中TableAPI 、 SQL 与 Kafka消息插入是怎么实现的

发表于:2024-11-20 作者:千家信息网编辑
千家信息网最后更新 2024年11月20日,这期内容当中小编将会给大家带来有关Flink中TableAPI 、 SQL 与 Kafka消息插入是怎么实现的,文章内容丰富且以专业的角度为大家分析和叙述,阅读完这篇文章希望大家可以有所收获。使用Tb
千家信息网最后更新 2024年11月20日Flink中TableAPI 、 SQL 与 Kafka消息插入是怎么实现的

这期内容当中小编将会给大家带来有关Flink中TableAPI 、 SQL 与 Kafka消息插入是怎么实现的,文章内容丰富且以专业的角度为大家分析和叙述,阅读完这篇文章希望大家可以有所收获。

使用Tbale&SQL与Flink Kafka连接器将数据写入kafka的消息队列

示例环境

java.version: 1.8.xflink.version: 1.11.1kafka:2.11

示例数据源 (项目码云下载)

Flink 系例 之 搭建开发环境与数据

示例模块 (pom.xml)

Flink 系例 之 TableAPI & SQL 与 示例模块

InsertToKafka.java

package com.flink.examples.kafka;import org.apache.flink.streaming.api.TimeCharacteristic;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import org.apache.flink.table.api.EnvironmentSettings;import org.apache.flink.table.api.StatementSet;import org.apache.flink.table.api.TableResult;import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;/** * @Description 使用Tbale&SQL与Flink Elasticsearch连接器将数据写入kafka的消息队列 */public class InsertToKafka {    /**        官方参考:https://ci.apache.org/projects/flink/flink-docs-release-1.12/zh/dev/table/connectors/kafka.html        format:用于反序列化和序列化Kafka消息的格式。支持的格式包括'csv','json','avro','debezium-json'和'canal-json'。     */    static String table_sql = "CREATE TABLE KafkaTable (\n" +            "  `user_id` BIGINT,\n" +            "  `item_id` BIGINT,\n" +            "  `behavior` STRING,\n" +            "  `ts` TIMESTAMP(3)\n" +            ") WITH (\n" +            "  'connector' = 'kafka',\n" +            "  'topic' = 'user_behavior',\n" +            "  'properties.bootstrap.servers' = '192.168.110.35:9092',\n" +            "  'properties.group.id' = 'testGroup',\n" +            "  'scan.startup.mode' = 'earliest-offset',\n" +            "  'format' = 'json'\n" +            ")";    public static void main(String[] args) throws Exception {        //构建StreamExecutionEnvironment        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();        //默认流时间方式        env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);        //构建EnvironmentSettings 并指定Blink Planner        EnvironmentSettings bsSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();        //构建StreamTableEnvironment        StreamTableEnvironment tEnv = StreamTableEnvironment.create(env, bsSettings);        //注册kafka数据维表        tEnv.executeSql(table_sql);        //时间格式处理,参考阿里文档        //https://www.alibabacloud.com/help/zh/faq-detail/64813.htm?spm=a2c63.q38357.a3.3.697c13523NZiIN        String sql = "insert into KafkaTable (user_id,item_id,behavior,ts) values(1,1,'normal', TO_TIMESTAMP(FROM_UNIXTIME( " + System.currentTimeMillis() + " / 1000, 'yyyy-MM-dd HH:mm:ss')))";        // 第一种方式:直接执行sql//        TableResult tableResult = tEnv.executeSql(sql);        //第二种方式:声明一个操作集合来执行sql        StatementSet stmtSet = tEnv.createStatementSet();        stmtSet.addInsertSql(sql);        TableResult tableResult = stmtSet.execute();        tableResult.print();    }}

打印结果

+-------------------------------------------+| default_catalog.default_database.my_users |+-------------------------------------------+|                                        -1 |+-------------------------------------------+1 row in set

上述就是小编为大家分享的Flink中TableAPI 、 SQL 与 Kafka消息插入是怎么实现的了,如果刚好有类似的疑惑,不妨参照上述分析进行理解。如果想知道更多相关知识,欢迎关注行业资讯频道。

消息 数据 示例 方式 格式 内容 序列 时间 模块 环境 连接器 队列 分析 参考 专业 中小 内容丰富 官方 就是 数据源 数据库的安全要保护哪些东西 数据库安全各自的含义是什么 生产安全数据库录入 数据库的安全性及管理 数据库安全策略包含哪些 海淀数据库安全审计系统 建立农村房屋安全信息数据库 易用的数据库客户端支持安全管理 连接数据库失败ssl安全错误 数据库的锁怎样保障安全 oracle数据库表优化碎片 怎么销售网络安全设备 华为服务器外包装不干胶模板 更新数据库表sql语句 江苏通用软件开发定做价格 网络安全检查资质 服务器机房为什么开空调 成都软件开发人月单价标准 魔兽世界最好的服务器 软件开发工程师多大年龄 浙江通讯软件开发服务有哪些 书店数据库管理系统毕业设计 地税系统网络安全知识 计算机网络安全的目的是什么 树莓派服务器管理员密码 数据库asp怎么运行环境 武隆区工商软件开发流程参考价 商城类软件开发好处 央视网络安全比赛叫什么名 把网络安全摆在网络建设的首位 维斯塔斯网络安全事件 迅游科技跟网络安全有关系吗 863软件开发公司 服务器自动化工作 成都企业软件开发价位 网络安全保障培训 提词器大师怎么总是服务器繁忙 网络安全技术学习薪水 数据库表权限有哪些 滨州市网络安全委员
0