如何理解zk-client通信层ClientCnxnSocket
发表于:2025-01-29 作者:千家信息网编辑
千家信息网最后更新 2025年01月29日,这篇文章将为大家详细讲解有关如何理解zk-client通信层ClientCnxnSocket,文章内容质量较高,因此小编分享给大家做个参考,希望大家阅读完这篇文章后对相关知识有一定的了解。Client
千家信息网最后更新 2025年01月29日如何理解zk-client通信层ClientCnxnSocket
这篇文章将为大家详细讲解有关如何理解zk-client通信层ClientCnxnSocket,文章内容质量较高,因此小编分享给大家做个参考,希望大家阅读完这篇文章后对相关知识有一定的了解。
ClientCnxnSocket抽象类结构 定义了底层Socket通信接口,默认实现是ClientCnxnSocketNIO
readConnectResult 读取server的connnect的response
readLength 方法 读取buffer长度并给incomingBuffer分配对应大小空间
ClientCnxnSocketNIO 实现
findSendablePacket函数 从outgoingQueue中读取发送的packet
doIO函数 处理读写
doTransport函数
如果连接就绪,调用sendThread连接操作
若读写就绪,调用doIO函数
ClientCnxnSocket
属性
ClientCnxnSocketNIO子类
属性
//nio中的selectorprivate final Selector selector = Selector.open();/** * nio中的selectionKey */private SelectionKey sockKey;private SocketAddress localSocketAddress;private SocketAddress remoteSocketAddress;方法 @Overridevoid connect(InetSocketAddress addr) throws IOException { SocketChannel sock = createSock(); try { registerAndConnect(sock, addr); } catch (IOException e) { LOG.error("Unable to open socket to " + addr); sock.close(); throw e; }//已经连接,但是没有收到resposne initialized = false; /* * Reset incomingBuffer */ lenBuffer.clear(); incomingBuffer = lenBuffer;}client和server主要交互函数@Overridevoid doTransport( int waitTimeOut, QueuependingQueue, ClientCnxn cnxn) throws IOException, InterruptedException { selector.select(waitTimeOut); Set selected; synchronized (this) { selected = selector.selectedKeys(); } // Everything below and until we get back to the select is // non blocking, so time is effectively a constant. That is // Why we just have to do this once, here updateNow(); for (SelectionKey k : selected) { SocketChannel sc = ((SocketChannel) k.channel()); if ((k.readyOps() & SelectionKey.OP_CONNECT) != 0) { if (sc.finishConnect()) { updateLastSendAndHeard(); updateSocketAddresses(); sendThread.primeConnection(); } } else if ((k.readyOps() & (SelectionKey.OP_READ | SelectionKey.OP_WRITE)) != 0) { doIO(pendingQueue, cnxn); } } if (sendThread.getZkState().isConnected()) { if (findSendablePacket(outgoingQueue, sendThread.tunnelAuthInProgress()) != null) { enableWrite(); } } selected.clear();}
主要分为读写两种读:没有初始化完成初始化读取len再改incomingbuffer分配对应空间读取对应response写: 找到可以发送的packet如果packet的bytebuffer没有创建,那就进行属性添加bytebuffer写入socketChannel把Packet从outgingQueue中取出,放入pendingQueue中/** * @throws InterruptedException * @throws IOException */void doIO(QueuependingQueue, ClientCnxn cnxn) throws InterruptedException, IOException { SocketChannel sock = (SocketChannel) sockKey.channel(); if (sock == null) { throw new IOException("Socket is null!"); } if (sockKey.isReadable()) { int rc = sock.read(incomingBuffer); if (rc < 0) { throw new EndOfStreamException("Unable to read additional data from server sessionid 0x" + Long.toHexString(sessionId) + ", likely server has closed socket"); } if (!incomingBuffer.hasRemaining()) { incomingBuffer.flip(); if (incomingBuffer == lenBuffer) { recvCount.getAndIncrement(); readLength(); } else if (!initialized) { readConnectResult(); enableRead(); if (findSendablePacket(outgoingQueue, sendThread.tunnelAuthInProgress()) != null) { // Since SASL authentication has completed (if client is configured to do so), // outgoing packets waiting in the outgoingQueue can now be sent. enableWrite(); } lenBuffer.clear(); incomingBuffer = lenBuffer; updateLastHeard(); initialized = true; } else { sendThread.readResponse(incomingBuffer); lenBuffer.clear(); incomingBuffer = lenBuffer; updateLastHeard(); } } } if (sockKey.isWritable()) { Packet p = findSendablePacket(outgoingQueue, sendThread.tunnelAuthInProgress()); if (p != null) { updateLastSend(); // If we already started writing p, p.bb will already exist if (p.bb == null) { if ((p.requestHeader != null) && (p.requestHeader.getType() != OpCode.ping) && (p.requestHeader.getType() != OpCode.auth)) { p.requestHeader.setXid(cnxn.getXid()); } p.createBB(); } sock.write(p.bb); if (!p.bb.hasRemaining()) { sentCount.getAndIncrement(); outgoingQueue.removeFirstOccurrence(p); if (p.requestHeader != null && p.requestHeader.getType() != OpCode.ping && p.requestHeader.getType() != OpCode.auth) { synchronized (pendingQueue) { pendingQueue.add(p); } } } } if (outgoingQueue.isEmpty()) { // No more packets to send: turn off write interest flag. // Will be turned on later by a later call to enableWrite(), // from within ZooKeeperSaslClient (if client is configured // to attempt SASL authentication), or in either doIO() or // in doTransport() if not. disableWrite(); } else if (!initialized && p != null && !p.bb.hasRemaining()) { // On initial connection, write the complete connect request // packet, but then disable further writes until after // receiving a successful connection response. If the // session is expired, then the server sends the expiration // response and immediately closes its end of the socket. If // the client is simultaneously writing on its end, then the // TCP stack may choose to abort with RST, in which case the // client would never receive the session expired event. See // http://docs.oracle.com/javase/6/docs/technotes/guides/net/articles/connection_release.html disableWrite(); } else { // Just in case enableWrite(); } }}查询待发送队列中可以发送的packetprivate Packet findSendablePacket(LinkedBlockingDeque outgoingQueue, boolean tunneledAuthInProgres) { //没有要发送的,返回null if (outgoingQueue.isEmpty()) { return null; } // If we've already starting sending the first packet, we better finish if (outgoingQueue.getFirst().bb != null || !tunneledAuthInProgres) { //取队列第一个进行发送 return outgoingQueue.getFirst(); } // Since client's authentication with server is in progress, // send only the null-header packet queued by primeConnection(). // This packet must be sent so that the SASL authentication process // can proceed, but all other packets should wait until // SASL authentication completes. //有正在认证处理,发送空请求头包给服务端 Iterator iter = outgoingQueue.iterator(); while (iter.hasNext()) { Packet p = iter.next(); if (p.requestHeader == null) { // We've found the priming-packet. Move it to the beginning of the queue. iter.remove(); outgoingQueue.addFirst(p); return p; } else { // Non-priming packet: defer it until later, leaving it in the queue // until authentication completes. LOG.debug("deferring non-priming packet {} until SASL authentation completes.", p); } } return null;}
发送接收packet图
关于如何理解zk-client通信层ClientCnxnSocket就分享到这里了,希望以上内容可以对大家有一定的帮助,可以学到更多知识。如果觉得文章不错,可以把它分享出去让更多的人看到。
函数
通信
属性
内容
文章
方法
更多
知识
空间
篇文章
队列
分配
处理
不错
大小
子类
底层
接口
正在
结构
数据库的安全要保护哪些东西
数据库安全各自的含义是什么
生产安全数据库录入
数据库的安全性及管理
数据库安全策略包含哪些
海淀数据库安全审计系统
建立农村房屋安全信息数据库
易用的数据库客户端支持安全管理
连接数据库失败ssl安全错误
数据库的锁怎样保障安全
网络安全数据链路层
怎么搭建app服务器
xsx设置代理服务器
电力网络安全培训内容
安全数据库题库
两会临近 网络安全
网络安全维修价目表
面试题软件开发周期
网络公司软件开发多还是测试多
大学考计算机信息网络安全题
数据库原理查询平均成绩
搭建数据库要学什么
沐潮直播网络技术有限公司
spyder引入数据库
易语言管理服务器
腾讯云有没有轻量应用服务器
服务器是机柜吗
12u服务器
潮流软件开发过程创意
oracle数据库生僻字
农村网络安全普及教育报告
南充网络技术销售价格
有了域名和服务器
catia服务器启动中断
科技节关于互联网的发言稿
软件开发需要做哪些工作
原神现在有哪些服务器
网络安全形势分析部队
sql2010建立数据库
逻辑数据库和物理数据库