Flink SQL如何连接Hive并写入/读取数据
这篇文章主要介绍Flink SQL如何连接Hive并写入/读取数据,文中介绍的非常详细,具有一定的参考价值,感兴趣的小伙伴们一定要看完!
1. 添加依赖
1.11.2 2.11 org.apache.flink flink-streaming-scala_${scala.version} ${flink.version} org.apache.flink flink-connector-kafka-0.11_${scala.version} ${flink.version} org.apache.flink flink-clients_${scala.version} ${flink.version} org.apache.flink flink-table-api-java-bridge_${scala.version} ${flink.version} org.apache.flink flink-table-planner-blink_${scala.version} ${flink.version} org.apache.flink flink-connector-hive_${scala.version} ${flink.version} org.apache.hive hive-exec 2.1.1 org.apache.flink flink-shaded-hadoop-2-uber 2.6.5-7.0 org.apache.flink flink-json ${flink.version} org.apache.flink flink-connector-elasticsearch7_${scala.version} ${flink.version} org.apache.flink flink-csv ${flink.version} com.fasterxml.jackson.core jackson-databind 2.4.0 com.fasterxml.jackson.core jackson-annotations 2.4.0 com.fasterxml.jackson.core jackson-core 2.4.0
2. 创建blink版本的批处理Table执行环境
EnvironmentSettings bbSettings = EnvironmentSettings.newInstance() .useBlinkPlanner() .inBatchMode() .build();TableEnvironment bbTableEnv = TableEnvironment.create(bbSettings);
经过实际测试,目前HiveTableSink 不支持流式写入(未实现 AppendStreamTableSink),必须是批处理环境才可以往hive里面写入数据,而不能将流式数据写入hive。例如将kafka创建一张临时表,然后将表中的数据流持续插入hive,这是不可以的,官网上1.11版本通过flink sql-client可以实现hive的流式写入,还有待验证。
3. 连接文件系统,创建hive catalog,对表进行操作,类似于Spark on Hive,flink可以直接获取Hive的元数据,并使用flink进行计算。
// 连接外部文件 bbTableEnv.connect(new FileSystem().path("file:///E:/d.txt")) .withFormat(new Csv().fieldDelimiter(',')) .withSchema(new Schema().field("id", DataTypes.STRING())) .createTemporaryTable("output"); // 设置 hive 方言 bbTableEnv.getConfig().setSqlDialect(SqlDialect.HIVE); // 获取hive-site.xml目录 String hiveConfDir = Thread.currentThread().getContextClassLoader().getResource("").getPath().substring(1); HiveCatalog hive = new HiveCatalog("hive", "warningplatform", hiveConfDir); bbTableEnv.registerCatalog("hive", hive); bbTableEnv.useCatalog("hive"); bbTableEnv.useDatabase("warningplatform"); bbTableEnv.executeSql("insert into test select id from default_catalog.default_database.output");
通过bbTableEnv.connect()去创建临时表的方式已经过时了,建议使用bbTableEnv.executeSql()的方式,通过DDL去创建临时表,临时表到底是属于哪一个catalog目前还不太确定,到底是什么规则目前还不清楚。 查资料得知,临时表与单个Flink会话的生命周期相关,临时表始终存储在内存中。 永久表需要一个catalog来管理表对应的元数据,比如hive metastore,该表将一直存在,直到明确删除该表为止。 因此猜测:default_catalog是存储在内存中,如果在切换成hive catalog之前创建临时表,那我们就可以使用default_catalog.default_database.tableName来获取这个临时表。 如果切换了catalog再去创建临时表,那我们就无法获取到临时表了,因为它不在default_catalog中,而且保存在内存里面,直接查询临时表会去当前的catalog里面去查找临时表,因此一定要在default_catalog 里面创建临时表。 而临时视图好像是存储在当前的catalog里面
通过bbTableEnv.createTemporaryView()创建的视图则是属于当前的database的
bbTableEnv.createTemporaryView("output",bbTableEnv.sqlQuery("select * from default_catalog.default_database.output"));
注意1.11版本的执行sql的方法发生了改变,通过执行环境的executeSql(),executeInsert()等来进行插入或者执行sql语句
以上是"Flink SQL如何连接Hive并写入/读取数据"这篇文章的所有内容,感谢各位的阅读!希望分享的内容对大家有帮助,更多相关知识,欢迎关注行业资讯频道!