千家信息网

Spark Core读取ES的分区问题案例分析

发表于:2025-01-23 作者:千家信息网编辑
千家信息网最后更新 2025年01月23日,本篇内容介绍了"Spark Core读取ES的分区问题案例分析"的有关知识,在实际案例的操作过程中,不少人都会遇到这样的困境,接下来就让小编带领大家学习一下如何处理这些情况吧!希望大家仔细阅读,能够学
千家信息网最后更新 2025年01月23日Spark Core读取ES的分区问题案例分析

本篇内容介绍了"Spark Core读取ES的分区问题案例分析"的有关知识,在实际案例的操作过程中,不少人都会遇到这样的困境,接下来就让小编带领大家学习一下如何处理这些情况吧!希望大家仔细阅读,能够学有所成!

1.Spark Core读取ES

ES官网直接提供的有elasticsearch-hadoop 插件,对于ES 7.x,hadoop和Spark版本支持如下:

hadoop2Version  = 2.7.1hadoop22Version = 2.2.0spark13Version = 1.6.2spark20Version = 2.3.0

浪尖这了采用的ES版本是7.1.1,测试用的Spark版本是2.3.1,没有问题。整合es和spark,导入相关依赖有两种方式:

a,导入整个elasticsearch-hadoop包

       org.elasticsearch      elasticsearch-hadoop      7.1.1    

b,只导入spark模块的包

      org.elasticsearch      elasticsearch-spark-20_2.11      7.1.1    

浪尖这里为了测试方便,只是在本机起了一个单节点的ES实例,简单的测试代码如下:



import org.apache.spark.{SparkConf, SparkContext}import org.elasticsearch.hadoop.cfg.ConfigurationOptions
object es2sparkrdd {
def main(args: Array[String]): Unit = { val conf = new SparkConf().setMaster("local[*]").setAppName(this.getClass.getCanonicalName)
conf.set(ConfigurationOptions.ES_NODES, "127.0.0.1") conf.set(ConfigurationOptions.ES_PORT, "9200") conf.set(ConfigurationOptions.ES_NODES_WAN_ONLY, "true") conf.set(ConfigurationOptions.ES_INDEX_AUTO_CREATE, "true") conf.set(ConfigurationOptions.ES_NODES_DISCOVERY, "false")// conf.set(ConfigurationOptions.ES_NET_HTTP_AUTH_USER, esUser)// conf.set(ConfigurationOptions.ES_NET_HTTP_AUTH_PASS, esPwd) conf.set("es.write.rest.error.handlers", "ignoreConflict") conf.set("es.write.rest.error.handler.ignoreConflict", "com.jointsky.bigdata.handler.IgnoreConflictsHandler")
val sc = new SparkContext(conf) import org.elasticsearch.spark._
sc.esRDD("posts").foreach(each=>{ each._2.keys.foreach(println) }) sc.esJsonRDD("posts").foreach(each=>{ println(each._2) })
sc.stop() }}

可以看到Spark Core读取RDD主要有两种形式的API:

a,esRDD。这种返回的是一个tuple2的类型的RDD,第一个元素是id,第二个是一个map,包含ES的document元素。

RDD[(String, Map[String, AnyRef])]

b,esJsonRDD。这种返回的也是一个tuple2类型的RDD,第一个元素依然是id,第二个是json字符串。

RDD[(String, String)]

虽然是两种类型的RDD,但是RDD都是ScalaEsRDD类型。

要分析Spark Core读取ES的并行度,只需要分析ScalaEsRDD的getPartitions函数即可。

2.源码分析

首先导入源码https://github.com/elastic/elasticsearch-hadoop这个是gradle工程,可以直接导入idea,然后切换到7.x版本即可。

废话少说直接找到ScalaEsRDD,发现gePartitions是在其父类实现的,方法内容如下:


override def getPartitions: Array[Partition] = { esPartitions.zipWithIndex.map { case(esPartition, idx) => new EsPartition(id, idx, esPartition) }.toArray }

esPartitions是一个lazy型的变量:

@transient private[spark] lazy val esPartitions = {    RestService.findPartitions(esCfg, logger)  }

这种声明原因是什么呢?

lazy+transient的原因大家可以考虑一下。

RestService.findPartitions方法也是仅是创建客户端获取分片等信息,然后调用,分两种情况调用两个方法。

final List partitions;//            5.x及以后版本 同时没有配置es.input.max.docs.per.partitionif (clusterInfo.getMajorVersion().onOrAfter(EsMajorVersion.V_5_X) && settings.getMaxDocsPerPartition() != null) {     partitions = findSlicePartitions(client.getRestClient(), settings, mapping, nodesMap, shards, log);} else {     partitions = findShardPartitions(settings, mapping, nodesMap, shards, log);}

a).findSlicePartitions

这个方法其实就是在5.x及以后的ES版本,同时配置了

es.input.max.docs.per.partition

以后,才会执行,实际上就是将ES的分片按照指定大小进行拆分,必然要先进行分片大小统计,然后计算出拆分的分区数,最后生成分区信息。具体代码如下:

long numDocs;if (readResource.isTyped()) {    numDocs = client.count(index, readResource.type(), Integer.toString(shardId), query);} else {    numDocs = client.countIndexShard(index, Integer.toString(shardId), query);}int numPartitions = (int) Math.max(1, numDocs / maxDocsPerPartition);for (int i = 0; i < numPartitions; i++) {    PartitionDefinition.Slice slice = new PartitionDefinition.Slice(i, numPartitions);    partitions.add(new PartitionDefinition(settings, resolvedMapping, index, shardId, slice, locations));}

实际上分片就是用游标的方式,对_doc进行排序,然后按照分片计算得到的分区偏移进行数据的读取,组装过程是SearchRequestBuilder.assemble方法来实现的。

这个其实个人觉得会浪费一定的性能,假如真的要ES结合Spark的话,建议合理设置分片数。

b).findShardPartitions方法

这个方法没啥疑问了就是一个RDD分区对应于ES index的一个分片。

PartitionDefinition partition = new PartitionDefinition(settings, resolvedMapping, index, shardId,locationList.toArray(new String[0]));partitions.add(partition);

"Spark Core读取ES的分区问题案例分析"的内容就介绍到这里了,感谢大家的阅读。如果想了解更多行业相关的知识可以关注网站,小编将为大家输出更多高质量的实用文章!

方法 版本 分析 就是 类型 案例 问题 元素 内容 实际 测试 案例分析 代码 信息 原因 同时 大小 实际上 情况 方式 数据库的安全要保护哪些东西 数据库安全各自的含义是什么 生产安全数据库录入 数据库的安全性及管理 数据库安全策略包含哪些 海淀数据库安全审计系统 建立农村房屋安全信息数据库 易用的数据库客户端支持安全管理 连接数据库失败ssl安全错误 数据库的锁怎样保障安全 中流房地一体数据库 大话西游拼图软件开发 游戏软件开发商地址 游戏软件开发公司哪些靠谱 尚道网络技术 数据库学的那一门语言 怎么选择云服务器 泰州网络营销软件开发价格咨询 山东pdu服务器电源公司 介绍美国互联网科技前沿的书 两个人同时操作一个数据库表 4G软件开发工作岗位 我的世界服务器固定管理员 向网页提交数据库 棋类软件开发游戏 浙江宁波hp服务器云服务器 阿里云服务器到期后发票 内蒙古电子软件开发计划 网络安全级别类别 戴尔服务器选择u盘启动按什么 网络安全考研那个学校好 建邺区第三方软件开发售后服务 网络安全手抄报图片字 电脑服务器磁盘跑满 介绍美国互联网科技前沿的书 饥荒可以把本地服务器给好友吗 粉红猫软件开发 中小学网络安全第一课作业答案 网络技术计算机试题类型 龙岩公安网络安全保卫支队
0