千家信息网

Spark向Elasticsearch批量导入数据,出现重复记录问题的定位

发表于:2025-01-31 作者:千家信息网编辑
千家信息网最后更新 2025年01月31日,看了下es-hadoop插件的源码:发现ES导入数据重试情况的发生,除了在es.batch.write.retry.policy参数默认开启且es-hadoop插件向ES集群发送bulk写入请求接受到
千家信息网最后更新 2025年01月31日Spark向Elasticsearch批量导入数据,出现重复记录问题的定位

看了下es-hadoop插件的源码:

发现ES导入数据重试情况的发生,除了在es.batch.write.retry.policy参数默认开启且es-hadoop插件向ES集群发送bulk写入请求接受到503响应码会重试3次之外。

本身执行http请求时,也会存在重试(hadoop/rest/NetworkClient.java):

   public Response execute(Request request) {        Response response = null;        boolean newNode;        do {            SimpleRequest routedRequest = new SimpleRequest(request.method(), null, request.path(), request.params(), request.body());            newNode = false;            try {                response = currentTransport.execute(routedRequest);                ByteSequence body = routedRequest.body();                if (body != null) {                    stats.bytesSent += body.length();                }            } catch (Exception ex) {                // configuration error - including SSL/PKI - bail out                if (ex instanceof EsHadoopIllegalStateException) {                    throw (EsHadoopException) ex;                }                // issues with the SSL handshake, bail out instead of retry, for security reasons                if (ex instanceof javax.net.ssl.SSLException) {                    throw new EsHadoopTransportException(ex);                }                // check for fatal, non-recoverable network exceptions                if (ex instanceof BindException) {                    throw new EsHadoopTransportException(ex);                }                if (log.isTraceEnabled()) {                    log.trace(                            String.format(                                    "Caught exception while performing request [%s][%s] - falling back to the next node in line...",                                    currentNode, request.path()), ex);                }                String failed = currentNode;                failedNodes.put(failed, ex);                newNode = selectNextNode();                log.error(String.format("Node [%s] failed (%s); "                        + (newNode ? "selected next node [" + currentNode + "]" : "no other nodes left - aborting..."),                        failed, ex.getMessage()));                if (!newNode) {                    throw new EsHadoopNoNodesLeftException(failedNodes);                }            }        } while (newNode);        return response;    }

当请求出现超时的情况时,es-hadoop插件会再请求一个ES节点发送写入请求。即导入插件认为当前插入节点超时了(默认是一分钟)就视为该节点不可用,就换下一个节点,其实是ES在一分钟内没有处理完插入任务。

将超时时间es.http.timeout参数调大之后,给ES留下充足的入库时间,就不会再发生这个问题了。

0