elasticsearch节点间通信的transport启动过程是什么
这篇文章主要介绍"elasticsearch节点间通信的transport启动过程是什么"的相关知识,小编通过实际案例向大家展示操作过程,操作方法简单快捷,实用性强,希望这篇"elasticsearch节点间通信的transport启动过程是什么"文章能帮助大家解决问题。
transport
transport顾名思义是集群通信的基本通道,无论是集群状态信息,还是搜索索引请求信息,都是通过transport传送。elasticsearch定义了tansport,tansportmessage,tansportchannel,tansportrequest,tansportresponse等所需的所有的基础接口。这里将以transport为主,分析过程中会附带介绍其它接口。首先看一下transport节点的定义,如下图所示:
NettyTransport实现了该接口。分析NettyTransport前简单说一下Netty的用法,Netty的使用需要三个模块ServerBootStrap,ClientBootStrap(v3.x)及MessageHandler。ServerBootStrap启动服务器,ClientBootStrap启动客户端并连接服务器,MessageHandler是message处理逻辑所在,也就是业务逻辑。其它详细使用请参考Netty官方文档。
启动serverBootStrap
NettyTransport每个在doStart()方法中启动serverBootStrap,和ClientBootStrap,并绑定ip,代码如下所示:
protected void doStart() throws ElasticsearchException { clientBootstrap = createClientBootstrap();//根据配置启动客户端 ……//省略了无关分代码 createServerBootstrap(name, mergedSettings);//启动server端 bindServerBootstrap(name, mergedSettings);//绑定ip }
每一个节点都需要发送和接收,因此两者都需要启动,client和server的启动分别在相应的方法中,启动过程就是netty的启动过程,有兴趣可以去看相应方法。bindServerBootstrap(name, mergedSettings)将本地ip和断开绑定到netty同时设定好export host(export host的具体作业我也看明白也没有看到相关的绑定,需要进一步研究)。
启动client及server的过程中将messagehandler注入到channelpipeline中。至此启动过程完成,但是client并未连接任何server,连接过程是在节点启动后,才连接到其它节点的。
如何连接到node
方法代码如下所示:
public void connectToNode(DiscoveryNode node, boolean light) { //transport的模块必须要启动 if (!lifecycle.started()) { throw new ElasticsearchIllegalStateException("can't add nodes to a stopped transport"); } //获取读锁,每个节点可以和多个节点建立连接,因此这里用读锁 globalLock.readLock().lock(); try { //以node.id为基础获取一个锁,这保证对于每个node只能建立一次连接 connectionLock.acquire(node.id()); try { if (!lifecycle.started()) { throw new ElasticsearchIllegalStateException("can't add nodes to a stopped transport"); } NodeChannels nodeChannels = connectedNodes.get(node); if (nodeChannels != null) { return; } try { if (light) {//这里的light,就是对该节点只获取一个channel,所有类型(5种连接类型下面会说到)都使用者一个channel nodeChannels = connectToChannelsLight(node); } else { nodeChannels = new NodeChannels(new Channel[connectionsPerNodeRecovery], new Channel[connectionsPerNodeBulk], new Channel[connectionsPerNodeReg], new Channel[connectionsPerNodeState], new Channel[connectionsPerNodePing]); try { connectToChannels(nodeChannels, node); } catch (Throwable e) { logger.trace("failed to connect to [{}], cleaning dangling connections", e, node); nodeChannels.close(); throw e; } } // we acquire a connection lock, so no way there is an existing connection connectedNodes.put(node, nodeChannels); if (logger.isDebugEnabled()) { logger.debug("connected to node [{}]", node); } transportServiceAdapter.raiseNodeConnected(node); } catch (ConnectTransportException e) { throw e; } catch (Exception e) { throw new ConnectTransportException(node, "general node connection failure", e); } } finally { connectionLock.release(node.id()); } } finally { globalLock.readLock().unlock(); } }
如果不是轻连接,每个server和clien之间都有5中连接,着5中连接承担着不同的任务
连接方法的代码
protected void connectToChannels(NodeChannels nodeChannels, DiscoveryNode node) { //五种连接方式,不同的连接方式对应不同的集群操作 ChannelFuture[] connectRecovery = new ChannelFuture[nodeChannels.recovery.length]; ChannelFuture[] connectBulk = new ChannelFuture[nodeChannels.bulk.length]; ChannelFuture[] connectReg = new ChannelFuture[nodeChannels.reg.length]; ChannelFuture[] connectState = new ChannelFuture[nodeChannels.state.length]; ChannelFuture[] connectPing = new ChannelFuture[nodeChannels.ping.length]; InetSocketAddress address = ((InetSocketTransportAddress) node.address()).address(); //尝试建立连接 for (int i = 0; i < connectRecovery.length; i++) { connectRecovery[i] = clientBootstrap.connect(address); } for (int i = 0; i < connectBulk.length; i++) { connectBulk[i] = clientBootstrap.connect(address); } for (int i = 0; i < connectReg.length; i++) { connectReg[i] = clientBootstrap.connect(address); } for (int i = 0; i < connectState.length; i++) { connectState[i] = clientBootstrap.connect(address); } for (int i = 0; i < connectPing.length; i++) { connectPing[i] = clientBootstrap.connect(address); } //获取每个连接的channel存入到相应的channels中便于后面使用。 try { for (int i = 0; i < connectRecovery.length; i++) { connectRecovery[i].awaitUninterruptibly((long) (connectTimeout.millis() * 1.5)); if (!connectRecovery[i].isSuccess()) { throw new ConnectTransportException(node, "connect_timeout[" + connectTimeout + "]", connectRecovery[i].getCause()); } nodeChannels.recovery[i] = connectRecovery[i].getChannel(); nodeChannels.recovery[i].getCloseFuture().addListener(new ChannelCloseListener(node)); } for (int i = 0; i < connectBulk.length; i++) { connectBulk[i].awaitUninterruptibly((long) (connectTimeout.millis() * 1.5)); if (!connectBulk[i].isSuccess()) { throw new ConnectTransportException(node, "connect_timeout[" + connectTimeout + "]", connectBulk[i].getCause()); } nodeChannels.bulk[i] = connectBulk[i].getChannel(); nodeChannels.bulk[i].getCloseFuture().addListener(new ChannelCloseListener(node)); } for (int i = 0; i < connectReg.length; i++) { connectReg[i].awaitUninterruptibly((long) (connectTimeout.millis() * 1.5)); if (!connectReg[i].isSuccess()) { throw new ConnectTransportException(node, "connect_timeout[" + connectTimeout + "]", connectReg[i].getCause()); } nodeChannels.reg[i] = connectReg[i].getChannel(); nodeChannels.reg[i].getCloseFuture().addListener(new ChannelCloseListener(node)); } for (int i = 0; i < connectState.length; i++) { connectState[i].awaitUninterruptibly((long) (connectTimeout.millis() * 1.5)); if (!connectState[i].isSuccess()) { throw new ConnectTransportException(node, "connect_timeout[" + connectTimeout + "]", connectState[i].getCause()); } nodeChannels.state[i] = connectState[i].getChannel(); nodeChannels.state[i].getCloseFuture().addListener(new ChannelCloseListener(node)); } for (int i = 0; i < connectPing.length; i++) { connectPing[i].awaitUninterruptibly((long) (connectTimeout.millis() * 1.5)); if (!connectPing[i].isSuccess()) { throw new ConnectTransportException(node, "connect_timeout[" + connectTimeout + "]", connectPing[i].getCause()); } nodeChannels.ping[i] = connectPing[i].getChannel(); nodeChannels.ping[i].getCloseFuture().addListener(new ChannelCloseListener(node)); } if (nodeChannels.recovery.length == 0) { if (nodeChannels.bulk.length > 0) { nodeChannels.recovery = nodeChannels.bulk; } else { nodeChannels.recovery = nodeChannels.reg; } } if (nodeChannels.bulk.length == 0) { nodeChannels.bulk = nodeChannels.reg; } } catch (RuntimeException e) { // clean the futures for (ChannelFuture future : ImmutableList.builder().add(connectRecovery).add(connectBulk).add(connectReg).add(connectState).add(connectPing).build()) { future.cancel(); if (future.getChannel() != null && future.getChannel().isOpen()) { try { future.getChannel().close(); } catch (Exception e1) { // ignore } } } throw e; } }
关于"elasticsearch节点间通信的transport启动过程是什么"的内容就介绍到这里了,感谢大家的阅读。如果想了解更多行业相关的知识,可以关注行业资讯频道,小编每天都会为大家更新不同的知识点。