千家信息网

flink中如何使用sql将流式数据写入hive

发表于:2025-01-31 作者:千家信息网编辑
千家信息网最后更新 2025年01月31日,这篇文章将为大家详细讲解有关flink中如何使用sql将流式数据写入hive,文章内容质量较高,因此小编分享给大家做个参考,希望大家阅读完这篇文章后对相关知识有一定的了解。修改hive配置上一篇介绍了
千家信息网最后更新 2025年01月31日flink中如何使用sql将流式数据写入hive

这篇文章将为大家详细讲解有关flink中如何使用sql将流式数据写入hive,文章内容质量较高,因此小编分享给大家做个参考,希望大家阅读完这篇文章后对相关知识有一定的了解。

修改hive配置

上一篇介绍了使用sql将流式数据写入文件系统,这次我们来介绍下使用sql将文件写入hive,对于如果想写入已经存在的hive表,则至少需要添加以下两个属性. 写入hive底层还是和写入文件系统一样的,所以对于其他具体的配置参考上一篇.

alter table table_name set TBLPROPERTIES ('is_generic'='false'); 

alter table table_name set TBLPROPERTIES ('sink.partition-commit.policy.kind'='metastore');


//如果想使用eventtime分区
alter table table_name set TBLPROPERTIES ('sink.partition-commit.trigger'='partition-time');

案例讲解

下面我们讲解一下,如何使用java程序来构建一个flink程序来写入hive。

引入相关的pom

      
org.apache.flink
flink-connector-hive_${scala.binary.version}
${flink.version}


org.apache.hive
hive-exec
3.1.2

构造hive catalog

  //构造hive catalog
String name = "myhive";
String defaultDatabase = "default";
String hiveConfDir = "/Users/user/work/hive/conf"; // a local path
String version = "3.1.2";

HiveCatalog hive = new HiveCatalog(name, defaultDatabase, hiveConfDir, version);
tEnv.registerCatalog("myhive", hive);
tEnv.useCatalog("myhive");
tEnv.getConfig().setSqlDialect(SqlDialect.HIVE);
tEnv.useDatabase("db1");

创建hive表

如果目前系统中没有存在相应的hive表,可以通过在程序中执行相应的DDL建表语句来建表,如果已经存在了,就把这段代码省略,使用上面的hive命令修改现有表,添加相应的属性。

CREATE EXTERNAL TABLE `fs_table`(
`user_id` string,
`order_amount` double)
PARTITIONED BY (
`dt` string,
`h` string,
`m` string)
stored as ORC
TBLPROPERTIES (
'sink.partition-commit.policy.kind'='metastore',
'partition.time-extractor.timestamp-pattern'='$dt $h:$m:00'
)

将流数据插入hive,

 String insertSql = "insert into  fs_table SELECT userId, amount, " +
" DATE_FORMAT(ts, 'yyyy-MM-dd'), DATE_FORMAT(ts, 'HH'), DATE_FORMAT(ts, 'mm') FROM users";
tEnv.executeSql(insertSql);

完整的代码请参考:

https://github.com/zhangjun0x01/bigdata-examples/blob/master/flink/src/main/java/connectors/sql/StreamingWriteHive.java

遇到的坑

问题详解

对于如上的程序和sql,如果配置了是使用eventtime,在此程序中配置了'sink.partition-commit.trigger'='partition-time',最后发现程序没法提交分区。

分析了一下源码,问题是出在了这个方法,org.apache.flink.table.filesystem.stream.PartitionTimeCommitTigger#committablePartitions。先贴上代码:


@Override
public List committablePartitions(long checkpointId) {
if (!watermarks.containsKey(checkpointId)) {
throw new IllegalArgumentException(String.format(
"Checkpoint(%d) has not been snapshot. The watermark information is: %s.",
checkpointId, watermarks));
}

long watermark = watermarks.get(checkpointId);
watermarks.headMap(checkpointId, true).clear();

List needCommit = new ArrayList<>();
Iterator iter = pendingPartitions.iterator();
while (iter.hasNext()) {
String partition = iter.next();
//通过分区的值抽取分区的时间.
LocalDateTime partTime = extractor.extract(
partitionKeys, extractPartitionValues(new Path(partition)));
//判断水印是否大于分区创建时间+延迟时间
if (watermark > toMills(partTime) + commitDelay) {
needCommit.add(partition);
iter.remove();
}
}
return needCommit;
}

系统通过分区值来抽取相应的分区创建时间,然后进行比对,比如我们设置的pattern是 h:$m:00 , 某一时刻我们正在往 /2020-07-06/18/20/ 这个分区下写数据,那么程序根据分区值,得到的pattern将会是2020-07-06 18:20:00,这个值在sql中是根据DATA_FORMAT函数获取的。

这个值是带有时区的, 也是我想要的, 比如我们的时区设置为东八区,2020-07-06 18:20:00这个时间是东八区的时间,换成标准UTC时间是减去八个小时,也就是2020-07-06 10:20:00,而源码中的toMills函数在处理这个东八区的时间时,并没有任何加入任何时区的处理,把这个其实应该是东八区的时间当做了UTC时间来处理,这样计算出来的值就比实际值大8小时,导致一直没有触发分区的提交。

如果我们在数据源构造的分区是UTC时间,也就是不带分区的时间,那么这个逻辑就是没有问题的,但是这样又不符合我们的实际情况,比如对于分区2020-07-06 18:20:00,我希望我的分区肯定是东八区的时间,而不是比东八区小8个小时的UTC时间2020-07-06 10:20:00。

所以针对上述情况,有两种解决方案,一种是自定义一个分区抽取类,第二,就是修改源码,改一下现在的缺省的时间分区抽取类。我个人认为修改一下缺省类更好理解,因为目前写入文件和hive这块配置和概念有点多,我不想太增加过多的配置来增加用户的难度,应该尽可能的用缺省值就能使程序很好的运行。

我们看下flink中的StreamingFileSink类,构造分区桶的时候默认是使用的DateTimeBucketAssigner,其构造分区路径就是带有时区概念的,默认就用的是本地时区。

public DateTimeBucketAssigner(String formatString) {
this(formatString, ZoneId.systemDefault());
}

修改方案

这个问题,也不知道算不算一个bug,我给官方提交了一个ISSUE,但是官方没有采纳,不过我觉得不符合我的习惯,所以我对这个功能进行了修改,让partition.time-extractor.timestamp-pattern提取的partiiton是带有时区的,默认情况下是本地时区。如果是非本地时区,可以指定时区,通过参数partition.time-extractor.time-zone来指定,我们可以通下面的代码获取有效的时区。

 Set zoneIds = ZoneId.getAvailableZoneIds();
zoneIds.stream().forEach(System.out::println);

比如我们东八区默认使用 Asia/Shanghai。

我基于社区的flink的tag release-1.11.0-rc4,我改了一下代码 将代码放到了github上。

关于flink中如何使用sql将流式数据写入hive就分享到这里了,希望以上内容可以对大家有一定的帮助,可以学到更多知识。如果觉得文章不错,可以把它分享出去让更多的人看到。

时间 时区 程序 八区 数据 配置 代码 文件 系统 问题 抽取 小时 就是 情况 源码 参考 处理 也就是 内容 函数 数据库的安全要保护哪些东西 数据库安全各自的含义是什么 生产安全数据库录入 数据库的安全性及管理 数据库安全策略包含哪些 海淀数据库安全审计系统 建立农村房屋安全信息数据库 易用的数据库客户端支持安全管理 连接数据库失败ssl安全错误 数据库的锁怎样保障安全 网络安全主题班会内容策划 许昌关山月网络技术有限公司 网络安全的类型 汕头微信软件开发订制 网游服务器管理员英文 广州楚越互联网络科技有限公司 mysql 数据库初始化 东营游戏软件开发公司有哪些 哪些东西不使用数据库 数据库应用系统的设计代码 宜章软件开发工程师学费 饥荒联机的服务器也是保存在本地 苏州迪信通网络技术有限公司 g3管理服务器不能启动 上海工业软件开发大概要多少钱 数据库连接怎么用java连起来 中国农行软件开发公司招聘 水务行业网络安全解决方案价格 北京同心互联网科技有限公司 清楚sql表中所有数据库 数据库原理下载 网络安全课工作总结 江西三套网络安全知识回放 怎么搭建一个收费系统服务器 甘肃科研网络技术有限公司 东营游戏软件开发公司有哪些 linux操作系统操作数据库 海州区聚思迅网络技术 软件开发的周期控制 阿里巴巴的网络技术标准
0