怎么使用Flink TableAPI和SQL /Elasticsearch
发表于:2025-02-01 作者:千家信息网编辑
千家信息网最后更新 2025年02月01日,这篇文章主要讲解了"怎么使用Flink TableAPI和SQL /Elasticsearch",文中的讲解内容简单清晰,易于学习与理解,下面请大家跟着小编的思路慢慢深入,一起来研究和学习"怎么使用F
千家信息网最后更新 2025年02月01日怎么使用Flink TableAPI和SQL /Elasticsearch
这篇文章主要讲解了"怎么使用Flink TableAPI和SQL /Elasticsearch",文中的讲解内容简单清晰,易于学习与理解,下面请大家跟着小编的思路慢慢深入,一起来研究和学习"怎么使用Flink TableAPI和SQL /Elasticsearch"吧!
使用Tbale&SQL与Flink Elasticsearch Connector 连接器将数据写入Elasticsearch引擎的索引
示例环境
java.version: 1.8.xflink.version: 1.11.1elasticsearch:6.x
示例数据源 (项目码云下载)
Flink 系例 之 搭建开发环境与数据
示例模块 (pom.xml)
Flink 系例 之 TableAPI & SQL 与 示例模块
InsertToEs.java
package com.flink.examples.elasticsearch;import org.apache.flink.streaming.api.TimeCharacteristic;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import org.apache.flink.table.api.EnvironmentSettings;import org.apache.flink.table.api.StatementSet;import org.apache.flink.table.api.TableResult;import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;/** * @Description 使用Tbale&SQL与Flink Elasticsearch连接器将数据写入Elasticsearch引擎的索引 */public class InsertToEs { /** * Apache Flink 有两种关系型 API 来做流批统一处理:Table API 和 SQL。 * 参考官方:https://ci.apache.org/projects/flink/flink-docs-release-1.11/zh/dev/table/connectors/elasticsearch.html */ //参见属性配置类:ElasticsearchValidator static String table_sql = "CREATE TABLE my_users (\n" + " user_id STRING,\n" + " user_name STRING,\n" + " uv BIGINT,\n" + " pv BIGINT,\n" + " PRIMARY KEY (user_id) NOT ENFORCED\n" + ") WITH (\n" + " 'connector.type' = 'elasticsearch',\n" + " 'connector.version' = '6',\n" + " 'connector.property-version' = '1', \n" + " 'connector.hosts' = 'http://192.168.110.35:9200',\n" + " 'connector.index' = 'users',\n" + " 'connector.document-type' = 'doc',\n" + " 'format.type' = 'json',\n" + " 'update-mode'='append' -- append|upsert\n" + ")"; public static void main(String[] args) { //构建StreamExecutionEnvironment StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); //默认流时间方式 env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime); //构建EnvironmentSettings 并指定Blink Planner EnvironmentSettings bsSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build(); //构建StreamTableEnvironment StreamTableEnvironment tEnv = StreamTableEnvironment.create(env, bsSettings); //注册kafka数据维表 tEnv.executeSql(table_sql); //Elasticsearch connector 目前只支持了 sink,不支持 source 。不能SELECT elasticsearch table,因此只能通过insert的方式提交数据; String sql = "insert into my_users (user_id,user_name,uv,pv) values('10003','tom',31,10)";// TableResult tableResult = tEnv.executeSql(sql); //第二种方式:声明一个操作集合来执行sql StatementSet stmtSet = tEnv.createStatementSet(); stmtSet.addInsertSql(sql); TableResult tableResult = stmtSet.execute(); tableResult.print(); }}
打印结果
+-------------------------------------------+| default_catalog.default_database.my_users |+-------------------------------------------+| -1 |+-------------------------------------------+1 row in set
感谢各位的阅读,以上就是"怎么使用Flink TableAPI和SQL /Elasticsearch"的内容了,经过本文的学习后,相信大家对怎么使用Flink TableAPI和SQL /Elasticsearch这一问题有了更深刻的体会,具体使用情况还需要大家实践验证。这里是,小编将为大家推送更多相关知识点的文章,欢迎关注!
数据
示例
方式
学习
内容
引擎
模块
环境
索引
连接器
支持
官方
就是
属性
思路
情况
数据源
文章
时间
更多
数据库的安全要保护哪些东西
数据库安全各自的含义是什么
生产安全数据库录入
数据库的安全性及管理
数据库安全策略包含哪些
海淀数据库安全审计系统
建立农村房屋安全信息数据库
易用的数据库客户端支持安全管理
连接数据库失败ssl安全错误
数据库的锁怎样保障安全
CAE软件开发公司
软件开发相关文档模板
招聘app软件开发工程师
c语言聊天软件开发
魅族下拉 数据库
软件开发行业成本率
sql数据库安装需要多久
暗黑2服务器制作
互联网科技文创公司行业周期
网络安全主题教学设计
贵公司南京软件开发公司
什么样软件开发最难
浙江个人云服务器虚拟主机
怎么判断开放网络安全吗
诺基亚云呼服务器
pg数据库服务是否正常
lexis数据库怎么注册
网络安全等级保护新政策
数据库 相册表
人为制造数据库故障
厦门市政软件开发
数据库查询范围的执行过程
服务器怎么导入数据库
网络技术考试题答案
邵武市诗雨网络技术服务中心
信息系统网络安全保护管理办法
山东服务器零售商云主机
北京前端软件开发需要多少钱
软通网络安全与隐私保护考试
属于护理学科的外文数据库