千家信息网

elasticsearch节点间通信的transport启动过程是什么

发表于:2025-01-18 作者:千家信息网编辑
千家信息网最后更新 2025年01月18日,这篇文章主要介绍"elasticsearch节点间通信的transport启动过程是什么"的相关知识,小编通过实际案例向大家展示操作过程,操作方法简单快捷,实用性强,希望这篇"elasticsearc
千家信息网最后更新 2025年01月18日elasticsearch节点间通信的transport启动过程是什么

这篇文章主要介绍"elasticsearch节点间通信的transport启动过程是什么"的相关知识,小编通过实际案例向大家展示操作过程,操作方法简单快捷,实用性强,希望这篇"elasticsearch节点间通信的transport启动过程是什么"文章能帮助大家解决问题。

transport

transport顾名思义是集群通信的基本通道,无论是集群状态信息,还是搜索索引请求信息,都是通过transport传送。elasticsearch定义了tansport,tansportmessage,tansportchannel,tansportrequest,tansportresponse等所需的所有的基础接口。这里将以transport为主,分析过程中会附带介绍其它接口。首先看一下transport节点的定义,如下图所示:

NettyTransport实现了该接口。分析NettyTransport前简单说一下Netty的用法,Netty的使用需要三个模块ServerBootStrap,ClientBootStrap(v3.x)及MessageHandler。ServerBootStrap启动服务器,ClientBootStrap启动客户端并连接服务器,MessageHandler是message处理逻辑所在,也就是业务逻辑。其它详细使用请参考Netty官方文档。

启动serverBootStrap

NettyTransport每个在doStart()方法中启动serverBootStrap,和ClientBootStrap,并绑定ip,代码如下所示:

protected void doStart() throws ElasticsearchException {       clientBootstrap = createClientBootstrap();//根据配置启动客户端       ……//省略了无关分代码    createServerBootstrap(name, mergedSettings);//启动server端       bindServerBootstrap(name, mergedSettings);//绑定ip        }

每一个节点都需要发送和接收,因此两者都需要启动,client和server的启动分别在相应的方法中,启动过程就是netty的启动过程,有兴趣可以去看相应方法。bindServerBootstrap(name, mergedSettings)将本地ip和断开绑定到netty同时设定好export host(export host的具体作业我也看明白也没有看到相关的绑定,需要进一步研究)。

启动client及server的过程中将messagehandler注入到channelpipeline中。至此启动过程完成,但是client并未连接任何server,连接过程是在节点启动后,才连接到其它节点的。

如何连接到node

方法代码如下所示:

public void connectToNode(DiscoveryNode node, boolean light) {     //transport的模块必须要启动        if (!lifecycle.started()) {            throw new ElasticsearchIllegalStateException("can't add nodes to a stopped transport");        }     //获取读锁,每个节点可以和多个节点建立连接,因此这里用读锁        globalLock.readLock().lock();        try {        //以node.id为基础获取一个锁,这保证对于每个node只能建立一次连接            connectionLock.acquire(node.id());            try {                if (!lifecycle.started()) {                    throw new ElasticsearchIllegalStateException("can't add nodes to a stopped transport");                }                NodeChannels nodeChannels = connectedNodes.get(node);                if (nodeChannels != null) {                    return;                }                try {                    if (light) {//这里的light,就是对该节点只获取一个channel,所有类型(5种连接类型下面会说到)都使用者一个channel                        nodeChannels = connectToChannelsLight(node);                    } else {                        nodeChannels = new NodeChannels(new Channel[connectionsPerNodeRecovery], new Channel[connectionsPerNodeBulk], new Channel[connectionsPerNodeReg], new Channel[connectionsPerNodeState], new Channel[connectionsPerNodePing]);                        try {                            connectToChannels(nodeChannels, node);                        } catch (Throwable e) {                            logger.trace("failed to connect to [{}], cleaning dangling connections", e, node);                            nodeChannels.close();                            throw e;                        }                    }                    // we acquire a connection lock, so no way there is an existing connection                    connectedNodes.put(node, nodeChannels);                    if (logger.isDebugEnabled()) {                        logger.debug("connected to node [{}]", node);                    }                    transportServiceAdapter.raiseNodeConnected(node);                } catch (ConnectTransportException e) {                    throw e;                } catch (Exception e) {                    throw new ConnectTransportException(node, "general node connection failure", e);                }            } finally {                connectionLock.release(node.id());            }        } finally {            globalLock.readLock().unlock();        }    }

如果不是轻连接,每个server和clien之间都有5中连接,着5中连接承担着不同的任务

连接方法的代码

protected void connectToChannels(NodeChannels nodeChannels, DiscoveryNode node) {    //五种连接方式,不同的连接方式对应不同的集群操作        ChannelFuture[] connectRecovery = new ChannelFuture[nodeChannels.recovery.length];        ChannelFuture[] connectBulk = new ChannelFuture[nodeChannels.bulk.length];        ChannelFuture[] connectReg = new ChannelFuture[nodeChannels.reg.length];        ChannelFuture[] connectState = new ChannelFuture[nodeChannels.state.length];        ChannelFuture[] connectPing = new ChannelFuture[nodeChannels.ping.length];        InetSocketAddress address = ((InetSocketTransportAddress) node.address()).address();    //尝试建立连接        for (int i = 0; i < connectRecovery.length; i++) {            connectRecovery[i] = clientBootstrap.connect(address);        }        for (int i = 0; i < connectBulk.length; i++) {            connectBulk[i] = clientBootstrap.connect(address);        }        for (int i = 0; i < connectReg.length; i++) {            connectReg[i] = clientBootstrap.connect(address);        }        for (int i = 0; i < connectState.length; i++) {            connectState[i] = clientBootstrap.connect(address);        }        for (int i = 0; i < connectPing.length; i++) {            connectPing[i] = clientBootstrap.connect(address);        }    //获取每个连接的channel存入到相应的channels中便于后面使用。        try {            for (int i = 0; i < connectRecovery.length; i++) {                connectRecovery[i].awaitUninterruptibly((long) (connectTimeout.millis() * 1.5));                if (!connectRecovery[i].isSuccess()) {                    throw new ConnectTransportException(node, "connect_timeout[" + connectTimeout + "]", connectRecovery[i].getCause());                }                nodeChannels.recovery[i] = connectRecovery[i].getChannel();                nodeChannels.recovery[i].getCloseFuture().addListener(new ChannelCloseListener(node));            }            for (int i = 0; i < connectBulk.length; i++) {                connectBulk[i].awaitUninterruptibly((long) (connectTimeout.millis() * 1.5));                if (!connectBulk[i].isSuccess()) {                    throw new ConnectTransportException(node, "connect_timeout[" + connectTimeout + "]", connectBulk[i].getCause());                }                nodeChannels.bulk[i] = connectBulk[i].getChannel();                nodeChannels.bulk[i].getCloseFuture().addListener(new ChannelCloseListener(node));            }            for (int i = 0; i < connectReg.length; i++) {                connectReg[i].awaitUninterruptibly((long) (connectTimeout.millis() * 1.5));                if (!connectReg[i].isSuccess()) {                    throw new ConnectTransportException(node, "connect_timeout[" + connectTimeout + "]", connectReg[i].getCause());                }                nodeChannels.reg[i] = connectReg[i].getChannel();                nodeChannels.reg[i].getCloseFuture().addListener(new ChannelCloseListener(node));            }            for (int i = 0; i < connectState.length; i++) {                connectState[i].awaitUninterruptibly((long) (connectTimeout.millis() * 1.5));                if (!connectState[i].isSuccess()) {                    throw new ConnectTransportException(node, "connect_timeout[" + connectTimeout + "]", connectState[i].getCause());                }                nodeChannels.state[i] = connectState[i].getChannel();                nodeChannels.state[i].getCloseFuture().addListener(new ChannelCloseListener(node));            }            for (int i = 0; i < connectPing.length; i++) {                connectPing[i].awaitUninterruptibly((long) (connectTimeout.millis() * 1.5));                if (!connectPing[i].isSuccess()) {                    throw new ConnectTransportException(node, "connect_timeout[" + connectTimeout + "]", connectPing[i].getCause());                }                nodeChannels.ping[i] = connectPing[i].getChannel();                nodeChannels.ping[i].getCloseFuture().addListener(new ChannelCloseListener(node));            }            if (nodeChannels.recovery.length == 0) {                if (nodeChannels.bulk.length > 0) {                    nodeChannels.recovery = nodeChannels.bulk;                } else {                    nodeChannels.recovery = nodeChannels.reg;                }            }            if (nodeChannels.bulk.length == 0) {                nodeChannels.bulk = nodeChannels.reg;            }        } catch (RuntimeException e) {            // clean the futures            for (ChannelFuture future : ImmutableList.builder().add(connectRecovery).add(connectBulk).add(connectReg).add(connectState).add(connectPing).build()) {                future.cancel();                if (future.getChannel() != null && future.getChannel().isOpen()) {                    try {                        future.getChannel().close();                    } catch (Exception e1) {                        // ignore                    }                }            }            throw e;        }    }

关于"elasticsearch节点间通信的transport启动过程是什么"的内容就介绍到这里了,感谢大家的阅读。如果想了解更多行业相关的知识,可以关注行业资讯频道,小编每天都会为大家更新不同的知识点。

0