Flink中如何进行TableAPI 、SQL 与 Kafka 消息获取
发表于:2025-01-24 作者:千家信息网编辑
千家信息网最后更新 2025年01月24日,这篇文章将为大家详细讲解有关Flink中如何进行TableAPI 、SQL 与 Kafka 消息获取,文章内容质量较高,因此小编分享给大家做个参考,希望大家阅读完这篇文章后对相关知识有一定的了解。使用
千家信息网最后更新 2025年01月24日Flink中如何进行TableAPI 、SQL 与 Kafka 消息获取
这篇文章将为大家详细讲解有关Flink中如何进行TableAPI 、SQL 与 Kafka 消息获取,文章内容质量较高,因此小编分享给大家做个参考,希望大家阅读完这篇文章后对相关知识有一定的了解。
使用Tbale&SQL与Flink Kafka连接器从kafka的消息队列中获取数据
示例环境
java.version: 1.8.xflink.version: 1.11.1kafka:2.11
示例数据源 (项目码云下载)
Flink 系例 之 搭建开发环境与数据
示例模块 (pom.xml)
Flink 系例 之 TableAPI & SQL 与 示例模块
SelectToKafka.java
package com.flink.examples.kafka;import org.apache.flink.streaming.api.TimeCharacteristic;import org.apache.flink.streaming.api.datastream.DataStream;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import org.apache.flink.table.api.EnvironmentSettings;import org.apache.flink.table.api.Table;import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;import org.apache.flink.types.Row;/** * @Description 使用Tbale&SQL与Flink Kafka连接器从kafka的消息队列中获取数据 */public class SelectToKafka { /** 官方参考:https://ci.apache.org/projects/flink/flink-docs-release-1.12/zh/dev/table/connectors/kafka.html 开始偏移位置 config选项scan.startup.mode指定Kafka使用者的启动模式。有效的枚举是: group-offsets:从特定消费者组的ZK / Kafka经纪人中的承诺抵消开始。 earliest-offset:从最早的偏移量开始。 latest-offset:从最新的偏移量开始。 timestamp:从每个分区的用户提供的时间戳开始。 specific-offsets:从每个分区的用户提供的特定偏移量开始。 默认选项值group-offsets表示从ZK / Kafka经纪人中最后提交的偏移量消费 一致性保证 sink.semantic选项来选择三种不同的操作模式: NONE:Flink不能保证任何事情。产生的记录可能会丢失或可以重复。 AT_LEAST_ONCE (默认设置):这样可以确保不会丢失任何记录(尽管它们可以重复)。 EXACTLY_ONCE:Kafka事务将用于提供一次精确的语义。每当您使用事务写入Kafka时,请不要忘记为使用Kafka记录的任何应用程序设置所需的设置isolation.level(read_committed 或read_uncommitted-后者是默认值)。 */ static String table_sql = "CREATE TABLE KafkaTable (\n" + " `user_id` BIGINT,\n" + " `item_id` BIGINT,\n" + " `behavior` STRING,\n" + " `ts` TIMESTAMP(3)\n" + ") WITH (\n" + " 'connector' = 'kafka',\n" + " 'topic' = 'user_behavior',\n" + " 'properties.bootstrap.servers' = '192.168.110.35:9092',\n" + " 'properties.group.id' = 'testGroup',\n" + " 'scan.startup.mode' = 'earliest-offset',\n" + " 'format' = 'json'\n" + ")"; public static void main(String[] args) throws Exception { //构建StreamExecutionEnvironment StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); //默认流时间方式 env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime); //构建EnvironmentSettings 并指定Blink Planner EnvironmentSettings bsSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build(); //构建StreamTableEnvironment StreamTableEnvironment tEnv = StreamTableEnvironment.create(env, bsSettings); //注册kafka数据维表 tEnv.executeSql(table_sql); String sql = "select user_id,item_id,behavior,ts from KafkaTable"; Table table = tEnv.sqlQuery(sql); //打印字段结构 table.printSchema(); //table 转成 dataStream 流 DataStreambehaviorStream = tEnv.toAppendStream(table, Row.class); behaviorStream.print(); env.execute(); }}
打印结果
root |-- user_id: BIGINT |-- item_id: BIGINT |-- behavior: STRING |-- ts: TIMESTAMP(3)3> 1,1,normal,2021-01-26T10:25:44
关于Flink中如何进行TableAPI 、SQL 与 Kafka 消息获取就分享到这里了,希望以上内容可以对大家有一定的帮助,可以学到更多知识。如果觉得文章不错,可以把它分享出去让更多的人看到。
数据
偏移
消息
示例
事务
内容
文章
时间
更多
模块
模式
环境
用户
知识
篇文章
经纪人
连接器
队列
保证
参考
数据库的安全要保护哪些东西
数据库安全各自的含义是什么
生产安全数据库录入
数据库的安全性及管理
数据库安全策略包含哪些
海淀数据库安全审计系统
建立农村房屋安全信息数据库
易用的数据库客户端支持安全管理
连接数据库失败ssl安全错误
数据库的锁怎样保障安全
loki 日志 服务器性能
服务器怎么改成显示器
表格如何复制筛选过的数据库
深圳运维服务软件开发价位
获取canvas数据库
制作数据库分页
库存订单数据库设计
数据库两个表查找
下花园区网络安全宣传
夜饮网络技术学院论坛照片
软件开发女孩适合哪个
服务器空间划分管理方法
围棋复盘软件开发
招聘 数据库工程师
网络安全设计要考虑的需求
电脑网络安全检查方案
安全服务器网络
关于网络安全的毕业ppt
四川泸州软件开发学院
网络安全线上知识问答活动
支付宝网络技术有限公司海南
mcu独立的综合管理服务器
数据库中的页是啥
开封网络安全系统厂家
戴尔550瓦服务器电源
摄像机ntp管理服务器
jdbc是数据库驱动
校园网络安全大讲堂观后感
神州通用数据库使用手册
数据库营业收入