Flink 1.10中SQL、HiveCatalog与事件时间整合的示例分析
发表于:2024-11-11 作者:千家信息网编辑
千家信息网最后更新 2024年11月11日,这篇文章将为大家详细讲解有关Flink 1.10中SQL、HiveCatalog与事件时间整合的示例分析,小编觉得挺实用的,因此分享给大家做个参考,希望大家阅读完这篇文章后可以有所收获。Flink 1
千家信息网最后更新 2024年11月11日Flink 1.10中SQL、HiveCatalog与事件时间整合的示例分析Flink 1.10 与 1.9 相比又是个创新版本,在我们感兴趣的很多方面都有改进,特别是 Flink SQL。本文用根据埋点日志计算 PV、UV 的简单示例来体验 Flink 1.10 的两个重要新特性:
这两点将会为我们构建实时数仓提供很大的便利。
示例采用 Hive 版本为 1.1.0,Kafka 版本为 0.11.0.2。
要使 Flink 与 Hive 集成以使用 HiveCatalog,需要先将以下 JAR 包放在 ${FLINK_HOME}/lib 目录下。
后三个 JAR 包都是 Hive 自带的,可以在 ${HIVE_HOME}/lib 目录下找到。前两个可以通过 阿里云 Maven 搜索 GAV 找到并手动下载(groupId 都是org.apache.flink)。
再在 pom.xml 内添加相关的 Maven 依赖。
最后,找到 Hive 的配置文件 hive-site.xml,准备工作就完成了。
不多废话了,直接上代码,简洁易懂。
我们的埋点日志存储在指定的 Kafka topic 里,为 JSON 格式,简化版 schema 大致如下。
其中 ts 字段就是埋点事件的时间戳(毫秒)。在 Flink 1.9 时代,用 CREATE TABLE 语句创建流表时是无法指定事件时间的,只能默认用处理时间。而在 Flink 1.10 下,可以这样写。
Flink SQL 引入了计算列(computed column)的概念,其语法为 column_name AS computed_column_expression,它的作用是在表中产生数据源 schema 不存在的列,并且可以利用原有的列、各种运算符及内置函数。比如在以上 SQL 语句中,就利用内置的 PROCTIME() 函数生成了处理时间列,并利用原有的 ts 字段与 FROM_UNIXTIME()、TO_TIMESTAMP() 两个时间转换函数生成了事件时间列。
为什么 ts 字段不能直接用作事件时间呢?因为 Flink SQL 规定时间特征必须是 TIMESTAMP(3) 类型,即形如"yyyy-MM-ddTHH:mm:ssZ"格式的字符串,Unix 时间戳自然是不行的,所以要先转换一波。
既然有了事件时间,那么自然要有水印。Flink SQL 引入了 WATERMARK FOR rowtime_column_name AS watermark_strategy_expression 的语法来产生水印,有以下两种通用的做法:
上文的 SQL 语句中就是设定了 10 秒的乱序区间。如果看官对水印、AscendingTimestampExtractor 和 BoundedOutOfOrdernessTimestampExtractor 不熟的话,可以参见之前的 这篇 ,就能理解为什么会是这样的语法了。
下面来正式建表。
执行完毕后,我们还可以去到 Hive 执行 DESCRIBE FORMATTED ods.streaming_user_active_log 语句,能够发现该表并没有事实上的列,而所有属性(包括 schema、connector、format 等等)都作为元数据记录在了 Hive Metastore 中。
Flink SQL 创建的表都会带有一个标记属性 is_generic=true,图中未示出。
用30秒的滚动窗口,按事件类型来分组,查询语句如下。
关于窗口在 SQL 里的表达方式请参见 官方文档 。1.10 版本 SQL 的官方文档写的还是比较可以的。 懒得再输出到一个结果表了,直接转换成流打到屏幕上。
这篇文章将为大家详细讲解有关Flink 1.10中SQL、HiveCatalog与事件时间整合的示例分析,小编觉得挺实用的,因此分享给大家做个参考,希望大家阅读完这篇文章后可以有所收获。
一是 SQL DDL 对事件时间的支持; 二是 Hive Metastore 作为 Flink 的元数据存储(即 HiveCatalog)。
添加依赖项
flink-connector-hive_2.11-1.10.0.jar flink-shaded-hadoop-2-uber-2.6.5-8.0.jar hive-metastore-1.1.0.jar hive-exec-1.1.0.jar libfb303-0.9.2.jar
Maven 下载:
https://maven.aliyun.com/mvn/search
2.11
1.10.0
1.1.0
org.apache.flink
flink-table-api-scala_${scala.bin.version}
${flink.version}
org.apache.flink
flink-table-api-scala-bridge_${scala.bin.version}
${flink.version}
org.apache.flink
flink-table-planner-blink_${scala.bin.version}
${flink.version}
org.apache.flink
flink-sql-connector-kafka-0.11_${scala.bin.version}
${flink.version}
org.apache.flink
flink-connector-hive_${scala.bin.version}
${flink.version}
org.apache.flink
flink-json
${flink.version}
org.apache.hive
hive-exec
${hive.version}
注册 HiveCatalog、创建数据库
val streamEnv = StreamExecutionEnvironment.getExecutionEnvironment
streamEnv.setParallelism(5)
streamEnv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
val tableEnvSettings = EnvironmentSettings.newInstance()
.useBlinkPlanner()
.inStreamingMode()
.build()
val tableEnv = StreamTableEnvironment.create(streamEnv, tableEnvSettings)
val catalog = new HiveCatalog(
"rtdw", // catalog name
"default", // default database
"/Users/lmagic/develop", // Hive config (hive-site.xml) directory
"1.1.0" // Hive version
)
tableEnv.registerCatalog("rtdw", catalog)
tableEnv.useCatalog("rtdw")
val createDbSql = "CREATE DATABASE IF NOT EXISTS rtdw.ods"
tableEnv.sqlUpdate(createDbSql)
创建 Kafka 流表并指定事件时间
"eventType": "clickBuyNow", "userId": "97470180", "shareUserId": "", "platform": "xyz", "columnType": "merchDetail", "merchandiseId": "12727495", "fromType": "wxapp", "siteId": "20392", "categoryId": "", "ts": 1585136092541
CREATE TABLE rtdw.ods.streaming_user_active_log ( eventType STRING COMMENT '...', userId STRING, shareUserId STRING, platform STRING, columnType STRING, merchandiseId STRING, fromType STRING, siteId STRING, categoryId STRING, ts BIGINT, procTime AS PROCTIME(), -- 处理时间 eventTime AS TO_TIMESTAMP(FROM_UNIXTIME(ts / 1000, 'yyyy-MM-dd HH:mm:ss')), -- 事件时间 WATERMARK FOR eventTime AS eventTime - INTERVAL '10' SECOND -- 水印) WITH ( 'connector.type' = 'kafka', 'connector.version' = '0.11', 'connector.topic' = 'ng_log_par_extracted', 'connector.startup-mode' = 'latest-offset', -- 指定起始offset位置 'connector.properties.zookeeper.connect' = 'zk109:2181,zk110:2181,zk111:2181', 'connector.properties.bootstrap.servers' = 'kafka112:9092,kafka113:9092,kafka114:9092', 'connector.properties.group.id' = 'rtdw_group_test_1', 'format.type' = 'json', 'format.derive-schema' = 'true', -- 由表schema自动推导解析JSON 'update-mode' = 'append')
单调不减水印(对应 DataStream API 的 AscendingTimestampExtractor)
WATERMARK FOR rowtime_column AS rowtime_column - INTERVAL '0.001' SECOND
有界乱序水印(对应 DataStream API 的 BoundedOutOfOrdernessTimestampExtractor)
WATERMARK FOR rowtime_column AS rowtime_column - INTERVAL 'n' TIME_UNIT
https://www.jianshu.com/p/c612e95a5028
val createTableSql = """ |上文的SQL语句 |...... """.stripMargin tableEnv.sqlUpdate(createTableSql)
开窗计算 PV、UV
SELECT eventType,TUMBLE_START(eventTime, INTERVAL '30' SECOND) AS windowStart,TUMBLE_END(eventTime, INTERVAL '30' SECOND) AS windowEnd,COUNT(userId) AS pv,COUNT(DISTINCT userId) AS uvFROM rtdw.ods.streaming_user_active_logWHERE platform = 'xyz'GROUP BY eventType, TUMBLE(eventTime, INTERVAL '30' SECOND)
SQL 文档:
https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/sql/queries.html#group-windows
val queryActiveSql =
"""
|......
|......
""".stripMargin
val result = tableEnv.sqlQuery(queryActiveSql)
result
.toAppendStream[Row]
.print()
.setParallelism(1)
关于"Flink 1.10中SQL、HiveCatalog与事件时间整合的示例分析"这篇文章就分享到这里了,希望以上内容可以对大家有一定的帮助,使各位可以学到更多知识,如果觉得文章不错,请把它分享出去让更多的人看到。
时间
事件
水印
语句
示例
数据
版本
两个
函数
字段
文档
篇文章
语法
处理
分析
整合
上文
官方
就是
属性
数据库的安全要保护哪些东西
数据库安全各自的含义是什么
生产安全数据库录入
数据库的安全性及管理
数据库安全策略包含哪些
海淀数据库安全审计系统
建立农村房屋安全信息数据库
易用的数据库客户端支持安全管理
连接数据库失败ssl安全错误
数据库的锁怎样保障安全
怎样将数据库导进pycharm
汽车应用市场显示无法连接服务器
东明租房软件开发
深圳市港湾网络技术有限公司
Hana内存数据库原理
电子商务网络安全隐患引用文献
闽盾网络安全有限公司待遇
蜜源植物数据库资料下载
数据库招商
从事网络安全的员工
自动连接频道服务器进不去
游戏软件开发是不是软件工程
万象数据库密码
著名校园网络安全事件
海南信息化软件开发批发价
普通内存和服务器内存
滨州市dna数据库
数据库连接工具开源
报送网络安全信息
软件开发用到的法律知识
h3c服务器管理网页不显示
三级迅雷数据库视频教程
微小数据库
一个单位的网络安全
网络安全在线检测装置
对日软件开发模板
全新一代网络技术
网络安全手抄报该怎么做
秀泰大数据库
挖矿中转服务器怎么样才不会发现