千家信息网

Flink中如何进行TableAPI 、SQL 与 Kafka 消息获取

发表于:2024-11-20 作者:千家信息网编辑
千家信息网最后更新 2024年11月20日,这篇文章将为大家详细讲解有关Flink中如何进行TableAPI 、SQL 与 Kafka 消息获取,文章内容质量较高,因此小编分享给大家做个参考,希望大家阅读完这篇文章后对相关知识有一定的了解。使用
千家信息网最后更新 2024年11月20日Flink中如何进行TableAPI 、SQL 与 Kafka 消息获取

这篇文章将为大家详细讲解有关Flink中如何进行TableAPI 、SQL 与 Kafka 消息获取,文章内容质量较高,因此小编分享给大家做个参考,希望大家阅读完这篇文章后对相关知识有一定的了解。

使用Tbale&SQL与Flink Kafka连接器从kafka的消息队列中获取数据

示例环境

java.version: 1.8.xflink.version: 1.11.1kafka:2.11

示例数据源 (项目码云下载)

Flink 系例 之 搭建开发环境与数据

示例模块 (pom.xml)

Flink 系例 之 TableAPI & SQL 与 示例模块

SelectToKafka.java

package com.flink.examples.kafka;import org.apache.flink.streaming.api.TimeCharacteristic;import org.apache.flink.streaming.api.datastream.DataStream;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import org.apache.flink.table.api.EnvironmentSettings;import org.apache.flink.table.api.Table;import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;import org.apache.flink.types.Row;/** * @Description 使用Tbale&SQL与Flink Kafka连接器从kafka的消息队列中获取数据 */public class SelectToKafka {    /**     官方参考:https://ci.apache.org/projects/flink/flink-docs-release-1.12/zh/dev/table/connectors/kafka.html     开始偏移位置     config选项scan.startup.mode指定Kafka使用者的启动模式。有效的枚举是:         group-offsets:从特定消费者组的ZK / Kafka经纪人中的承诺抵消开始。         earliest-offset:从最早的偏移量开始。         latest-offset:从最新的偏移量开始。         timestamp:从每个分区的用户提供的时间戳开始。         specific-offsets:从每个分区的用户提供的特定偏移量开始。     默认选项值group-offsets表示从ZK / Kafka经纪人中最后提交的偏移量消费     一致性保证     sink.semantic选项来选择三种不同的操作模式:         NONE:Flink不能保证任何事情。产生的记录可能会丢失或可以重复。         AT_LEAST_ONCE (默认设置):这样可以确保不会丢失任何记录(尽管它们可以重复)。         EXACTLY_ONCE:Kafka事务将用于提供一次精确的语义。每当您使用事务写入Kafka时,请不要忘记为使用Kafka记录的任何应用程序设置所需的设置isolation.level(read_committed 或read_uncommitted-后者是默认值)。     */    static String table_sql = "CREATE TABLE KafkaTable (\n" +            "  `user_id` BIGINT,\n" +            "  `item_id` BIGINT,\n" +            "  `behavior` STRING,\n" +            "  `ts` TIMESTAMP(3)\n" +            ") WITH (\n" +            "  'connector' = 'kafka',\n" +            "  'topic' = 'user_behavior',\n" +            "  'properties.bootstrap.servers' = '192.168.110.35:9092',\n" +            "  'properties.group.id' = 'testGroup',\n" +            "  'scan.startup.mode' = 'earliest-offset',\n" +            "  'format' = 'json'\n" +            ")";    public static void main(String[] args) throws Exception {        //构建StreamExecutionEnvironment        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();        //默认流时间方式        env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);        //构建EnvironmentSettings 并指定Blink Planner        EnvironmentSettings bsSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();        //构建StreamTableEnvironment        StreamTableEnvironment tEnv = StreamTableEnvironment.create(env, bsSettings);        //注册kafka数据维表        tEnv.executeSql(table_sql);        String sql = "select user_id,item_id,behavior,ts from KafkaTable";        Table table = tEnv.sqlQuery(sql);        //打印字段结构        table.printSchema();        //table 转成 dataStream 流        DataStream behaviorStream = tEnv.toAppendStream(table, Row.class);        behaviorStream.print();        env.execute();    }}

打印结果

root |-- user_id: BIGINT |-- item_id: BIGINT |-- behavior: STRING |-- ts: TIMESTAMP(3)3> 1,1,normal,2021-01-26T10:25:44

关于Flink中如何进行TableAPI 、SQL 与 Kafka 消息获取就分享到这里了,希望以上内容可以对大家有一定的帮助,可以学到更多知识。如果觉得文章不错,可以把它分享出去让更多的人看到。

数据 偏移 消息 示例 事务 内容 文章 时间 更多 模块 模式 环境 用户 知识 篇文章 经纪人 连接器 队列 保证 参考 数据库的安全要保护哪些东西 数据库安全各自的含义是什么 生产安全数据库录入 数据库的安全性及管理 数据库安全策略包含哪些 海淀数据库安全审计系统 建立农村房屋安全信息数据库 易用的数据库客户端支持安全管理 连接数据库失败ssl安全错误 数据库的锁怎样保障安全 服务器扩容会出现什么情况 海康威视备份管理服务器 供应链要学数据库吗 坚决维护网络安全发声亮剑 无法连接数据库怎么回事 网络安全市场发展前景 战地5如何寻找服务器 计算机网络安全绘画软件 前端设备如何跟服务器时间同步 江门免费的冷库软件开发 梁溪区海航软件开发预算 c 导入数据库 奉贤区正规软件开发项目信息 android图片数据库 云计算与网络安全运维哪个好 联通和移动共用一个数据库吗 网络安全股票属于哪个版块 如何获取数据库中最大数值 两台交换机一台双网卡服务器 宝山区正规软件开发售后保障 网络安全周报在线阅读 下村勉网络安全 互联网金融的新科技 江门免费的冷库软件开发 行业前10的app软件开发 网络安全专业薪资水平 手游的数据库 查违章交管服务器繁忙什么意思 可以用什么软件开发订单系统 text的内容怎么保存到数据库
0