Flink中如何进行TableAPI 、SQL 与 Kafka 消息获取
发表于:2024-10-21 作者:千家信息网编辑
千家信息网最后更新 2024年10月21日,这篇文章将为大家详细讲解有关Flink中如何进行TableAPI 、SQL 与 Kafka 消息获取,文章内容质量较高,因此小编分享给大家做个参考,希望大家阅读完这篇文章后对相关知识有一定的了解。使用
千家信息网最后更新 2024年10月21日Flink中如何进行TableAPI 、SQL 与 Kafka 消息获取
这篇文章将为大家详细讲解有关Flink中如何进行TableAPI 、SQL 与 Kafka 消息获取,文章内容质量较高,因此小编分享给大家做个参考,希望大家阅读完这篇文章后对相关知识有一定的了解。
使用Tbale&SQL与Flink Kafka连接器从kafka的消息队列中获取数据
示例环境
java.version: 1.8.xflink.version: 1.11.1kafka:2.11
示例数据源 (项目码云下载)
Flink 系例 之 搭建开发环境与数据
示例模块 (pom.xml)
Flink 系例 之 TableAPI & SQL 与 示例模块
SelectToKafka.java
package com.flink.examples.kafka;import org.apache.flink.streaming.api.TimeCharacteristic;import org.apache.flink.streaming.api.datastream.DataStream;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import org.apache.flink.table.api.EnvironmentSettings;import org.apache.flink.table.api.Table;import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;import org.apache.flink.types.Row;/** * @Description 使用Tbale&SQL与Flink Kafka连接器从kafka的消息队列中获取数据 */public class SelectToKafka { /** 官方参考:https://ci.apache.org/projects/flink/flink-docs-release-1.12/zh/dev/table/connectors/kafka.html 开始偏移位置 config选项scan.startup.mode指定Kafka使用者的启动模式。有效的枚举是: group-offsets:从特定消费者组的ZK / Kafka经纪人中的承诺抵消开始。 earliest-offset:从最早的偏移量开始。 latest-offset:从最新的偏移量开始。 timestamp:从每个分区的用户提供的时间戳开始。 specific-offsets:从每个分区的用户提供的特定偏移量开始。 默认选项值group-offsets表示从ZK / Kafka经纪人中最后提交的偏移量消费 一致性保证 sink.semantic选项来选择三种不同的操作模式: NONE:Flink不能保证任何事情。产生的记录可能会丢失或可以重复。 AT_LEAST_ONCE (默认设置):这样可以确保不会丢失任何记录(尽管它们可以重复)。 EXACTLY_ONCE:Kafka事务将用于提供一次精确的语义。每当您使用事务写入Kafka时,请不要忘记为使用Kafka记录的任何应用程序设置所需的设置isolation.level(read_committed 或read_uncommitted-后者是默认值)。 */ static String table_sql = "CREATE TABLE KafkaTable (\n" + " `user_id` BIGINT,\n" + " `item_id` BIGINT,\n" + " `behavior` STRING,\n" + " `ts` TIMESTAMP(3)\n" + ") WITH (\n" + " 'connector' = 'kafka',\n" + " 'topic' = 'user_behavior',\n" + " 'properties.bootstrap.servers' = '192.168.110.35:9092',\n" + " 'properties.group.id' = 'testGroup',\n" + " 'scan.startup.mode' = 'earliest-offset',\n" + " 'format' = 'json'\n" + ")"; public static void main(String[] args) throws Exception { //构建StreamExecutionEnvironment StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); //默认流时间方式 env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime); //构建EnvironmentSettings 并指定Blink Planner EnvironmentSettings bsSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build(); //构建StreamTableEnvironment StreamTableEnvironment tEnv = StreamTableEnvironment.create(env, bsSettings); //注册kafka数据维表 tEnv.executeSql(table_sql); String sql = "select user_id,item_id,behavior,ts from KafkaTable"; Table table = tEnv.sqlQuery(sql); //打印字段结构 table.printSchema(); //table 转成 dataStream 流 DataStreambehaviorStream = tEnv.toAppendStream(table, Row.class); behaviorStream.print(); env.execute(); }}
打印结果
root |-- user_id: BIGINT |-- item_id: BIGINT |-- behavior: STRING |-- ts: TIMESTAMP(3)3> 1,1,normal,2021-01-26T10:25:44
关于Flink中如何进行TableAPI 、SQL 与 Kafka 消息获取就分享到这里了,希望以上内容可以对大家有一定的帮助,可以学到更多知识。如果觉得文章不错,可以把它分享出去让更多的人看到。
数据
偏移
消息
示例
事务
内容
文章
时间
更多
模块
模式
环境
用户
知识
篇文章
经纪人
连接器
队列
保证
参考
数据库的安全要保护哪些东西
数据库安全各自的含义是什么
生产安全数据库录入
数据库的安全性及管理
数据库安全策略包含哪些
海淀数据库安全审计系统
建立农村房屋安全信息数据库
易用的数据库客户端支持安全管理
连接数据库失败ssl安全错误
数据库的锁怎样保障安全
海陵区个性化网络技术哪家好
传记文学 数据库 个人版
网络安全漏洞的危害献文
关于rsasdns服务器配置
期货交易软件开发代码
福州雅好网络技术服务部
网络安全销售顾问赚钱
软件开发培训照片
镇江做开软件开发
新西兰网络安全专硕课程
图文数据库免费
游戏网站建设软件开发
网络安全问题的新闻
火妖服务器人口
数据库工程师知乎
河南大学网络安全比赛
网络安全挑战赛赛程
日本最大的网络安全公司
关于医院的数据库
2018中国数据库大会
网络安全考试大题
国家对网络安全法律法规
linux 视频 服务器
原图网络安全手抄报
湖北网络技术标准
数学课程思政数据库
琼馨网络技术服务部
计算机软件开发好找工作吗
查询数据库版本语句
数据库 问号