Flink 1.10中SQL、HiveCatalog与事件时间整合的示例分析
发表于:2025-01-27 作者:千家信息网编辑
千家信息网最后更新 2025年01月27日,这篇文章将为大家详细讲解有关Flink 1.10中SQL、HiveCatalog与事件时间整合的示例分析,小编觉得挺实用的,因此分享给大家做个参考,希望大家阅读完这篇文章后可以有所收获。Flink 1
千家信息网最后更新 2025年01月27日Flink 1.10中SQL、HiveCatalog与事件时间整合的示例分析Flink 1.10 与 1.9 相比又是个创新版本,在我们感兴趣的很多方面都有改进,特别是 Flink SQL。本文用根据埋点日志计算 PV、UV 的简单示例来体验 Flink 1.10 的两个重要新特性:
这两点将会为我们构建实时数仓提供很大的便利。
示例采用 Hive 版本为 1.1.0,Kafka 版本为 0.11.0.2。
要使 Flink 与 Hive 集成以使用 HiveCatalog,需要先将以下 JAR 包放在 ${FLINK_HOME}/lib 目录下。
后三个 JAR 包都是 Hive 自带的,可以在 ${HIVE_HOME}/lib 目录下找到。前两个可以通过 阿里云 Maven 搜索 GAV 找到并手动下载(groupId 都是org.apache.flink)。
再在 pom.xml 内添加相关的 Maven 依赖。
最后,找到 Hive 的配置文件 hive-site.xml,准备工作就完成了。
不多废话了,直接上代码,简洁易懂。
我们的埋点日志存储在指定的 Kafka topic 里,为 JSON 格式,简化版 schema 大致如下。
其中 ts 字段就是埋点事件的时间戳(毫秒)。在 Flink 1.9 时代,用 CREATE TABLE 语句创建流表时是无法指定事件时间的,只能默认用处理时间。而在 Flink 1.10 下,可以这样写。
Flink SQL 引入了计算列(computed column)的概念,其语法为 column_name AS computed_column_expression,它的作用是在表中产生数据源 schema 不存在的列,并且可以利用原有的列、各种运算符及内置函数。比如在以上 SQL 语句中,就利用内置的 PROCTIME() 函数生成了处理时间列,并利用原有的 ts 字段与 FROM_UNIXTIME()、TO_TIMESTAMP() 两个时间转换函数生成了事件时间列。
为什么 ts 字段不能直接用作事件时间呢?因为 Flink SQL 规定时间特征必须是 TIMESTAMP(3) 类型,即形如"yyyy-MM-ddTHH:mm:ssZ"格式的字符串,Unix 时间戳自然是不行的,所以要先转换一波。
既然有了事件时间,那么自然要有水印。Flink SQL 引入了 WATERMARK FOR rowtime_column_name AS watermark_strategy_expression 的语法来产生水印,有以下两种通用的做法:
上文的 SQL 语句中就是设定了 10 秒的乱序区间。如果看官对水印、AscendingTimestampExtractor 和 BoundedOutOfOrdernessTimestampExtractor 不熟的话,可以参见之前的 这篇 ,就能理解为什么会是这样的语法了。
下面来正式建表。
执行完毕后,我们还可以去到 Hive 执行 DESCRIBE FORMATTED ods.streaming_user_active_log 语句,能够发现该表并没有事实上的列,而所有属性(包括 schema、connector、format 等等)都作为元数据记录在了 Hive Metastore 中。
Flink SQL 创建的表都会带有一个标记属性 is_generic=true,图中未示出。
用30秒的滚动窗口,按事件类型来分组,查询语句如下。
关于窗口在 SQL 里的表达方式请参见 官方文档 。1.10 版本 SQL 的官方文档写的还是比较可以的。 懒得再输出到一个结果表了,直接转换成流打到屏幕上。
这篇文章将为大家详细讲解有关Flink 1.10中SQL、HiveCatalog与事件时间整合的示例分析,小编觉得挺实用的,因此分享给大家做个参考,希望大家阅读完这篇文章后可以有所收获。
一是 SQL DDL 对事件时间的支持; 二是 Hive Metastore 作为 Flink 的元数据存储(即 HiveCatalog)。
添加依赖项
flink-connector-hive_2.11-1.10.0.jar flink-shaded-hadoop-2-uber-2.6.5-8.0.jar hive-metastore-1.1.0.jar hive-exec-1.1.0.jar libfb303-0.9.2.jar
Maven 下载:
https://maven.aliyun.com/mvn/search
2.11
1.10.0
1.1.0
org.apache.flink
flink-table-api-scala_${scala.bin.version}
${flink.version}
org.apache.flink
flink-table-api-scala-bridge_${scala.bin.version}
${flink.version}
org.apache.flink
flink-table-planner-blink_${scala.bin.version}
${flink.version}
org.apache.flink
flink-sql-connector-kafka-0.11_${scala.bin.version}
${flink.version}
org.apache.flink
flink-connector-hive_${scala.bin.version}
${flink.version}
org.apache.flink
flink-json
${flink.version}
org.apache.hive
hive-exec
${hive.version}
注册 HiveCatalog、创建数据库
val streamEnv = StreamExecutionEnvironment.getExecutionEnvironment
streamEnv.setParallelism(5)
streamEnv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
val tableEnvSettings = EnvironmentSettings.newInstance()
.useBlinkPlanner()
.inStreamingMode()
.build()
val tableEnv = StreamTableEnvironment.create(streamEnv, tableEnvSettings)
val catalog = new HiveCatalog(
"rtdw", // catalog name
"default", // default database
"/Users/lmagic/develop", // Hive config (hive-site.xml) directory
"1.1.0" // Hive version
)
tableEnv.registerCatalog("rtdw", catalog)
tableEnv.useCatalog("rtdw")
val createDbSql = "CREATE DATABASE IF NOT EXISTS rtdw.ods"
tableEnv.sqlUpdate(createDbSql)
创建 Kafka 流表并指定事件时间
"eventType": "clickBuyNow", "userId": "97470180", "shareUserId": "", "platform": "xyz", "columnType": "merchDetail", "merchandiseId": "12727495", "fromType": "wxapp", "siteId": "20392", "categoryId": "", "ts": 1585136092541
CREATE TABLE rtdw.ods.streaming_user_active_log ( eventType STRING COMMENT '...', userId STRING, shareUserId STRING, platform STRING, columnType STRING, merchandiseId STRING, fromType STRING, siteId STRING, categoryId STRING, ts BIGINT, procTime AS PROCTIME(), -- 处理时间 eventTime AS TO_TIMESTAMP(FROM_UNIXTIME(ts / 1000, 'yyyy-MM-dd HH:mm:ss')), -- 事件时间 WATERMARK FOR eventTime AS eventTime - INTERVAL '10' SECOND -- 水印) WITH ( 'connector.type' = 'kafka', 'connector.version' = '0.11', 'connector.topic' = 'ng_log_par_extracted', 'connector.startup-mode' = 'latest-offset', -- 指定起始offset位置 'connector.properties.zookeeper.connect' = 'zk109:2181,zk110:2181,zk111:2181', 'connector.properties.bootstrap.servers' = 'kafka112:9092,kafka113:9092,kafka114:9092', 'connector.properties.group.id' = 'rtdw_group_test_1', 'format.type' = 'json', 'format.derive-schema' = 'true', -- 由表schema自动推导解析JSON 'update-mode' = 'append')
单调不减水印(对应 DataStream API 的 AscendingTimestampExtractor)
WATERMARK FOR rowtime_column AS rowtime_column - INTERVAL '0.001' SECOND
有界乱序水印(对应 DataStream API 的 BoundedOutOfOrdernessTimestampExtractor)
WATERMARK FOR rowtime_column AS rowtime_column - INTERVAL 'n' TIME_UNIT
https://www.jianshu.com/p/c612e95a5028
val createTableSql = """ |上文的SQL语句 |...... """.stripMargin tableEnv.sqlUpdate(createTableSql)
开窗计算 PV、UV
SELECT eventType,TUMBLE_START(eventTime, INTERVAL '30' SECOND) AS windowStart,TUMBLE_END(eventTime, INTERVAL '30' SECOND) AS windowEnd,COUNT(userId) AS pv,COUNT(DISTINCT userId) AS uvFROM rtdw.ods.streaming_user_active_logWHERE platform = 'xyz'GROUP BY eventType, TUMBLE(eventTime, INTERVAL '30' SECOND)
SQL 文档:
https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/sql/queries.html#group-windows
val queryActiveSql =
"""
|......
|......
""".stripMargin
val result = tableEnv.sqlQuery(queryActiveSql)
result
.toAppendStream[Row]
.print()
.setParallelism(1)
关于"Flink 1.10中SQL、HiveCatalog与事件时间整合的示例分析"这篇文章就分享到这里了,希望以上内容可以对大家有一定的帮助,使各位可以学到更多知识,如果觉得文章不错,请把它分享出去让更多的人看到。
时间
事件
水印
语句
示例
数据
版本
两个
函数
字段
文档
篇文章
语法
处理
分析
整合
上文
官方
就是
属性
数据库的安全要保护哪些东西
数据库安全各自的含义是什么
生产安全数据库录入
数据库的安全性及管理
数据库安全策略包含哪些
海淀数据库安全审计系统
建立农村房屋安全信息数据库
易用的数据库客户端支持安全管理
连接数据库失败ssl安全错误
数据库的锁怎样保障安全
会计人员网络安全
上海云鑫网络技术有限公司
网宿科技 互联网百强
数据库错误18204
数据库表之间一定要有外键吗
武汉达梦数据库怎么样
软件开发市场的危险
分布式数据库系统于戈版二手
东莞网络安全培训机构
即时通讯软件开发定制
ecnu公共数据库登录
了解软件开发模型
职高高考网络技术
郑州 手机软件开发
传奇db数据库导出
丰台区进口软件开发推荐
朱巍家庭网络安全教育美篇
拒绝攻击服务器是如何实现的
gp数据库truncate
电脑用什么快捷键查服务器
免费邮件服务器下载
mysql中数据库表设计
江苏数据库空投箱销售价格
数据库 as in
数据库名称 数据库用户
腾讯轻型服务器部署vue
sql数据库安装要多久
数据库汇总啥意思
陪护床共享软件开发
汉川网络安全宣传周