千家信息网

如何理解zk-client通信层ClientCnxnSocket

发表于:2025-01-29 作者:千家信息网编辑
千家信息网最后更新 2025年01月29日,这篇文章将为大家详细讲解有关如何理解zk-client通信层ClientCnxnSocket,文章内容质量较高,因此小编分享给大家做个参考,希望大家阅读完这篇文章后对相关知识有一定的了解。Client
千家信息网最后更新 2025年01月29日如何理解zk-client通信层ClientCnxnSocket

这篇文章将为大家详细讲解有关如何理解zk-client通信层ClientCnxnSocket,文章内容质量较高,因此小编分享给大家做个参考,希望大家阅读完这篇文章后对相关知识有一定的了解。

ClientCnxnSocket抽象类结构 定义了底层Socket通信接口,默认实现是ClientCnxnSocketNIO

readConnectResult 读取server的connnect的response

readLength 方法 读取buffer长度并给incomingBuffer分配对应大小空间

ClientCnxnSocketNIO 实现

findSendablePacket函数 从outgoingQueue中读取发送的packet

doIO函数 处理读写

doTransport函数

如果连接就绪,调用sendThread连接操作

若读写就绪,调用doIO函数

ClientCnxnSocket

属性

ClientCnxnSocketNIO子类

属性

//nio中的selectorprivate final Selector selector = Selector.open();/** * nio中的selectionKey */private SelectionKey sockKey;private SocketAddress localSocketAddress;private SocketAddress remoteSocketAddress;方法 @Overridevoid connect(InetSocketAddress addr) throws IOException {    SocketChannel sock = createSock();    try {        registerAndConnect(sock, addr);    } catch (IOException e) {        LOG.error("Unable to open socket to " + addr);        sock.close();        throw e;    }//已经连接,但是没有收到resposne    initialized = false;    /*     * Reset incomingBuffer     */    lenBuffer.clear();    incomingBuffer = lenBuffer;}client和server主要交互函数@Overridevoid doTransport(    int waitTimeOut,    Queue pendingQueue,    ClientCnxn cnxn) throws IOException, InterruptedException {    selector.select(waitTimeOut);    Set selected;    synchronized (this) {        selected = selector.selectedKeys();    }    // Everything below and until we get back to the select is    // non blocking, so time is effectively a constant. That is    // Why we just have to do this once, here    updateNow();    for (SelectionKey k : selected) {        SocketChannel sc = ((SocketChannel) k.channel());        if ((k.readyOps() & SelectionKey.OP_CONNECT) != 0) {            if (sc.finishConnect()) {                updateLastSendAndHeard();                updateSocketAddresses();                sendThread.primeConnection();            }        } else if ((k.readyOps() & (SelectionKey.OP_READ | SelectionKey.OP_WRITE)) != 0) {            doIO(pendingQueue, cnxn);        }    }    if (sendThread.getZkState().isConnected()) {        if (findSendablePacket(outgoingQueue, sendThread.tunnelAuthInProgress()) != null) {            enableWrite();        }    }    selected.clear();}
主要分为读写两种读:没有初始化完成初始化读取len再改incomingbuffer分配对应空间读取对应response写: 找到可以发送的packet如果packet的bytebuffer没有创建,那就进行属性添加bytebuffer写入socketChannel把Packet从outgingQueue中取出,放入pendingQueue中/** * @throws InterruptedException * @throws IOException */void doIO(Queue pendingQueue, ClientCnxn cnxn) throws InterruptedException, IOException {    SocketChannel sock = (SocketChannel) sockKey.channel();    if (sock == null) {        throw new IOException("Socket is null!");    }    if (sockKey.isReadable()) {        int rc = sock.read(incomingBuffer);        if (rc < 0) {            throw new EndOfStreamException("Unable to read additional data from server sessionid 0x"                                           + Long.toHexString(sessionId)                                           + ", likely server has closed socket");        }        if (!incomingBuffer.hasRemaining()) {            incomingBuffer.flip();            if (incomingBuffer == lenBuffer) {                recvCount.getAndIncrement();                readLength();            } else if (!initialized) {                readConnectResult();                enableRead();                if (findSendablePacket(outgoingQueue, sendThread.tunnelAuthInProgress()) != null) {                    // Since SASL authentication has completed (if client is configured to do so),                    // outgoing packets waiting in the outgoingQueue can now be sent.                    enableWrite();                }                lenBuffer.clear();                incomingBuffer = lenBuffer;                updateLastHeard();                initialized = true;            } else {                sendThread.readResponse(incomingBuffer);                lenBuffer.clear();                incomingBuffer = lenBuffer;                updateLastHeard();            }        }    }    if (sockKey.isWritable()) {        Packet p = findSendablePacket(outgoingQueue, sendThread.tunnelAuthInProgress());        if (p != null) {            updateLastSend();            // If we already started writing p, p.bb will already exist            if (p.bb == null) {                if ((p.requestHeader != null)                    && (p.requestHeader.getType() != OpCode.ping)                    && (p.requestHeader.getType() != OpCode.auth)) {                    p.requestHeader.setXid(cnxn.getXid());                }                p.createBB();            }            sock.write(p.bb);            if (!p.bb.hasRemaining()) {                sentCount.getAndIncrement();                outgoingQueue.removeFirstOccurrence(p);                if (p.requestHeader != null                    && p.requestHeader.getType() != OpCode.ping                    && p.requestHeader.getType() != OpCode.auth) {                    synchronized (pendingQueue) {                        pendingQueue.add(p);                    }                }            }        }        if (outgoingQueue.isEmpty()) {            // No more packets to send: turn off write interest flag.            // Will be turned on later by a later call to enableWrite(),            // from within ZooKeeperSaslClient (if client is configured            // to attempt SASL authentication), or in either doIO() or            // in doTransport() if not.            disableWrite();        } else if (!initialized && p != null && !p.bb.hasRemaining()) {            // On initial connection, write the complete connect request            // packet, but then disable further writes until after            // receiving a successful connection response.  If the            // session is expired, then the server sends the expiration            // response and immediately closes its end of the socket.  If            // the client is simultaneously writing on its end, then the            // TCP stack may choose to abort with RST, in which case the            // client would never receive the session expired event.  See            // http://docs.oracle.com/javase/6/docs/technotes/guides/net/articles/connection_release.html            disableWrite();        } else {            // Just in case            enableWrite();        }    }}查询待发送队列中可以发送的packetprivate Packet findSendablePacket(LinkedBlockingDeque outgoingQueue, boolean tunneledAuthInProgres) {   //没有要发送的,返回null     if (outgoingQueue.isEmpty()) {        return null;    }    // If we've already starting sending the first packet, we better finish    if (outgoingQueue.getFirst().bb != null || !tunneledAuthInProgres) {        //取队列第一个进行发送        return outgoingQueue.getFirst();    }    // Since client's authentication with server is in progress,    // send only the null-header packet queued by primeConnection().    // This packet must be sent so that the SASL authentication process    // can proceed, but all other packets should wait until    // SASL authentication completes.    //有正在认证处理,发送空请求头包给服务端    Iterator iter = outgoingQueue.iterator();    while (iter.hasNext()) {        Packet p = iter.next();        if (p.requestHeader == null) {            // We've found the priming-packet. Move it to the beginning of the queue.            iter.remove();            outgoingQueue.addFirst(p);            return p;        } else {            // Non-priming packet: defer it until later, leaving it in the queue            // until authentication completes.            LOG.debug("deferring non-priming packet {} until SASL authentation completes.", p);        }    }    return null;}

发送接收packet图

关于如何理解zk-client通信层ClientCnxnSocket就分享到这里了,希望以上内容可以对大家有一定的帮助,可以学到更多知识。如果觉得文章不错,可以把它分享出去让更多的人看到。

0