Flink 1.11中流批一体Hive数仓的示例分析
这篇文章主要介绍Flink 1.11中流批一体Hive数仓的示例分析,文中介绍的非常详细,具有一定的参考价值,感兴趣的小伙伴们一定要看完!
数仓架构
离线数仓
传统的离线数仓是由 Hive 加上 HDFS 的方案,Hive 数仓有着成熟和稳定的大数据分析能力,结合调度和上下游工具,构建一个完整的数据处理分析平台,流程如下:
Flume 把数据导入 Hive 数仓
调度工具,调度 ETL 作业进行数据处理
在 Hive 数仓的表上,可以进行灵活的 Ad-hoc 查询
调度工具,调度聚合作业输出到BI层的数据库中
这个流程下的问题是:
导入过程不够灵活,这应该是一个灵活 SQL 流计算的过程
基于调度作业的级联计算,实时性太差
ETL 不能有流式的增量计算
实时数仓
针对离线数仓的特点,随着实时计算的流行,越来越多的公司引入实时数仓,实时数仓基于 Kafka + Flink streaming,定义全流程的流计算作业,有着秒级甚至毫秒的实时性。
但是,实时数仓的一个问题是历史数据只有 3-15 天,无法在其上做 Ad-hoc 的查询。如果搭建 Lambda 的离线+实时的架构,维护成本、计算存储成本、一致性保证、重复的开发会带来很大的负担。
Hive 实时化
Hive streaming sink
带来 Flink streaming 的实时/准实时的能力 支持 Filesystem connector 的全部 formats(csv,json,avro,parquet,orc) 支持 Hive table 的所有 formats 继承 Datastream StreamingFileSink 的所有特性:Exactly-once、支持HDFS, S3。
Trigger:控制Partition提交的时机,可以根据Watermark加上从Partition中提取的时间来判断,也可以通过Processing time来判断。你可以控制:是想先尽快看到没写完的Partition;还是保证写完Partition之后,再让下游看到它。 Policy:提交策略,内置支持SUCCESS文件和Metastore的提交,你也可以扩展提交的实现,比如在提交阶段触发Hive的analysis来生成统计信息,或者进行小文件的合并等等。
-- 结合Hive dialect使用Hive DDL语法SET table.sql-dialect=hive;CREATE TABLE hive_table ( user_id STRING, order_amount DOUBLE) PARTITIONED BY ( dt STRING, hour STRING) STORED AS PARQUET TBLPROPERTIES ( -- 使用partition中抽取时间,加上watermark决定partiton commit的时机 'sink.partition-commit.trigger'='partition-time', -- 配置hour级别的partition时间抽取策略,这个例子中dt字段是yyyy-MM-dd格式的天,hour是0-23的小时,timestamp-pattern定义了如何从这两个partition字段推出完整的timestamp 'partition.time-extractor.timestamp-pattern'='$dt $hour:00:00', -- 配置dalay为小时级,当 watermark > partition时间 + 1小时,会commit这个partition 'sink.partition-commit.delay'='1 h', -- partitiion commit的策略是:先更新metastore(addPartition),再写SUCCESS文件 'sink.partition-commit.policy.kind'='metastore,success-file') SET table.sql-dialect=default;CREATE TABLE kafka_table ( user_id STRING, order_amount DOUBLE, log_ts TIMESTAMP(3), WATERMARK FOR log_ts AS log_ts - INTERVAL '5' SECOND) -- 可以结合Table Hints动态指定table properties [3]INSERT INTO TABLE hive_table SELECT user_id, order_amount, DATE_FORMAT(log_ts, 'yyyy-MM-dd'), DATE_FORMAT(log_ts, 'HH') FROM kafka_table;
Hive streaming source
实时性不强,往往调度最小是小时级。 流程复杂,组件多,容易出现问题。
Partition 表,监控 Partition 的生成,增量读取新的 Partition。 非 Partition 表,监控文件夹内新文件的生成,增量读取新的文件。
SELECT * FROM hive_table/*+ OPTIONS('streaming-source.enable'='true','streaming-source.consume-start-offset'='2020-05-20') */;
实时数据关联 Hive 表
SELECT o.amout, o.currency, r.rate, o.amount * r.rateFROM Orders AS o JOIN LatestRates FOR SYSTEM_TIME AS OF o.proctime AS r ON r.currency = o.currency
Hive 增强
Hive Dialect 语法兼容
Flink 1.10 中进一步完善了 DDL,但由于 Flink 与 Hive 在元数据语义上的差异,通过 Flink DDL 来操作 Hive 元数据的可用性比较差,仅能覆盖很少的应用场景。 使用 Flink 对接 Hive 的用户经常需要切换到 Hive CLI 来执行 DDL。
Hive Dialect 只能用于操作 Hive 表,而不是 Flink 原生的表(如 Kafka、ES 的表),这也意味着 Hive Dialect 需要配合 HiveCatalog 使用。 使用 Hive Dialect 时,原有的 Flink 的一些语法可能会无法使用(例如 Flink 定义的类型别名),在需要使用 Flink 语法时可以动态切换到默认的 Dialect。 Hive Dialect 的 DDL 语法定义基于 Hive 的官方文档,而不同 Hive 版本之间语法可能会有轻微的差异,需要用户进行一定的调整。 Hive Dialect 的语法实现基于 Calcite,而 Calcite 与 Hive 有不同的保留关键字。因此,某些在 Hive 中可以直接作为标识符的关键字(如 "default" ),在Hive Dialect 中可能需要用"`"进行转义。
向量化读取
ORC for Hive 1.x [8] Parquet for Hive 1,2,3 [9]
简化 Hive 依赖
flink-sql-connector-hive-1.2.2_2.11-1.11.jar:Hive 1 的依赖版本。 flink-sql-connector-hive-2.2.0_2.11-1.11.jar:Hive 2.0 - 2.2 的依赖版本。 flink-sql-connector-hive-2.3.6_2.11-1.11.jar:Hive 2.3 的依赖版本。 flink-sql-connector-hive-3.1.2_2.11-1.11.jar:Hive 3 的依赖版本。
Flink 增强
Flink Filesystem connector
结合 Partition,现在,Filesystem connector 支持 SQL 中 Partition 的所有语义,支持 Partition 的 DDL,支持 Partition Pruning,支持静态/动态 Partition 的插入,支持 overwrite 的插入。 支持各种 Formats: CSV JSON Aparch AVRO Apache Parquet Apache ORC. 支持 Batch 的读写。 支持 Streaming sink,也支持上述 Hive 支持的 Partition commit,支持写Success 文件。
CREATE TABLE fs_table (
user_id STRING,
order_amount DOUBLE,
dt STRING,
hour STRING
) PARTITIONED BY (dt, hour) WITH (
'connector'='filesystem',
'path'='...',
'format'='parquet',
'partition.time-extractor.timestamp-pattern'='$dt $hour:00:00',
'sink.partition-commit.delay'='1 h',
'sink.partition-commit.policy.kind'='success-file')
)
-- stream environment or batch environment
INSERT INTO TABLE fs_table SELECT user_id, order_amount, DATE_FORMAT(log_ts, 'yyyy-MM-dd'), DATE_FORMAT(log_ts, 'HH') FROM kafka_table;
-- 通过 Partition 查询
SELECT * FROM fs_table WHERE dt='2020-05-20' and hour='12';
引入 Max Slot
slotmanager.number-of-slots.max
以上是"Flink 1.11中流批一体Hive数仓的示例分析"这篇文章的所有内容,感谢各位的阅读!希望分享的内容对大家有帮助,更多相关知识,欢迎关注行业资讯频道!