Spark向Elasticsearch批量导入数据,出现重复记录问题的定位
发表于:2025-01-31 作者:千家信息网编辑
千家信息网最后更新 2025年01月31日,看了下es-hadoop插件的源码:发现ES导入数据重试情况的发生,除了在es.batch.write.retry.policy参数默认开启且es-hadoop插件向ES集群发送bulk写入请求接受到
千家信息网最后更新 2025年01月31日Spark向Elasticsearch批量导入数据,出现重复记录问题的定位
看了下es-hadoop插件的源码:
发现ES导入数据重试情况的发生,除了在es.batch.write.retry.policy参数默认开启且es-hadoop插件向ES集群发送bulk写入请求接受到503响应码会重试3次之外。
本身执行http请求时,也会存在重试(hadoop/rest/NetworkClient.java):
public Response execute(Request request) { Response response = null; boolean newNode; do { SimpleRequest routedRequest = new SimpleRequest(request.method(), null, request.path(), request.params(), request.body()); newNode = false; try { response = currentTransport.execute(routedRequest); ByteSequence body = routedRequest.body(); if (body != null) { stats.bytesSent += body.length(); } } catch (Exception ex) { // configuration error - including SSL/PKI - bail out if (ex instanceof EsHadoopIllegalStateException) { throw (EsHadoopException) ex; } // issues with the SSL handshake, bail out instead of retry, for security reasons if (ex instanceof javax.net.ssl.SSLException) { throw new EsHadoopTransportException(ex); } // check for fatal, non-recoverable network exceptions if (ex instanceof BindException) { throw new EsHadoopTransportException(ex); } if (log.isTraceEnabled()) { log.trace( String.format( "Caught exception while performing request [%s][%s] - falling back to the next node in line...", currentNode, request.path()), ex); } String failed = currentNode; failedNodes.put(failed, ex); newNode = selectNextNode(); log.error(String.format("Node [%s] failed (%s); " + (newNode ? "selected next node [" + currentNode + "]" : "no other nodes left - aborting..."), failed, ex.getMessage())); if (!newNode) { throw new EsHadoopNoNodesLeftException(failedNodes); } } } while (newNode); return response; }
当请求出现超时的情况时,es-hadoop插件会再请求一个ES节点发送写入请求。即导入插件认为当前插入节点超时了(默认是一分钟)就视为该节点不可用,就换下一个节点,其实是ES在一分钟内没有处理完插入任务。
将超时时间es.http.timeout参数调大之后,给ES留下充足的入库时间,就不会再发生这个问题了。
插件
节点
参数
情况
时间
数据
问题
充足
任务
源码
集群
处理
定位
数据库的安全要保护哪些东西
数据库安全各自的含义是什么
生产安全数据库录入
数据库的安全性及管理
数据库安全策略包含哪些
海淀数据库安全审计系统
建立农村房屋安全信息数据库
易用的数据库客户端支持安全管理
连接数据库失败ssl安全错误
数据库的锁怎样保障安全
禅道服务器怎么关闭
金蝶软件开发难吗
GET获取数据存入数据库
文件存放位置不是共享服务器
保护网络安全的主要因素
软件开发这工作好吗
无锡管理软件开发服务
通信网络技术内容
数据库与主页面的链接
网站服务器管理分哪几个部分
政府软件开发哪的服务好
数据库中如何修改视图中某一数据
酷米数据库
组合办公软件开发
空间数据库课程设计
软件开发公司劳动合同
聊天服务器C
网络安全 试卷
高级数据库技术自考
管道数据库
网络安全保护制度和责任制
dnf手游服务器怎么进不去
网络安全知识的手抄报 学生
士兵网络安全对照检查
服务器下载数据
外贸社交软件开发
河源软件开发公司
军训网络安全教育心得体会
张震岳吉他谱软件开发
桥西区硕科智云软件开发工作室