千家信息网

怎样建立连接Zookeeper中的服务端

发表于:2025-01-23 作者:千家信息网编辑
千家信息网最后更新 2025年01月23日,这篇文章给大家介绍Zookeeper之怎样建立连接服务端,内容非常详细,感兴趣的小伙伴们可以参考借鉴,希望对大家能有所帮助。服务端处理请求的代码有两种NIOServerCnxnFactory和Nett
千家信息网最后更新 2025年01月23日怎样建立连接Zookeeper中的服务端

这篇文章给大家介绍Zookeeper之怎样建立连接服务端,内容非常详细,感兴趣的小伙伴们可以参考借鉴,希望对大家能有所帮助。

服务端处理请求的代码有两种NIOServerCnxnFactory和NettyServerCnxnFactory,默认是NIOServerCnxnFactory,可以通过指定zookeeper.serverCnxnFactory参数来修改。

这两个类逻辑是一样的,只是一个用的java原生的NIO,一个用的netty,这里我们就分析下NIOServerCnxnFactory。

NIOServerCnxnFactory实现了Runnable接口,看下它的run方法,循环处理请求

//NIOServerCnxnFactory.java//第200行public void run() {    while (!ss.socket().isClosed()) {        try {            selector.select(1000);            Set selected;            synchronized (this) {                selected = selector.selectedKeys();            }            ArrayList selectedList = new ArrayList(                selected);            Collections.shuffle(selectedList);            for (SelectionKey k : selectedList) {                //如果是连接请求                if ((k.readyOps() & SelectionKey.OP_ACCEPT) != 0) {                    SocketChannel sc = ((ServerSocketChannel) k                                        .channel()).accept();                    InetAddress ia = sc.socket().getInetAddress();                    //获取IP地址对应的客户端连接数                    int cnxncount = getClientCnxnCount(ia);                    //如果超出则关闭                    if (maxClientCnxns > 0 && cnxncount >= maxClientCnxns){                        LOG.warn("Too many connections from " + ia                                 + " - max is " + maxClientCnxns );                        sc.close();                    } else {                        LOG.info("Accepted socket connection from "                                 + sc.socket().getRemoteSocketAddress());                        sc.configureBlocking(false);                        SelectionKey sk = sc.register(selector,                                                      SelectionKey.OP_READ);                        //每一个连接都是一个NIOServerCnxn                        NIOServerCnxn cnxn = createConnection(sc, sk);                        sk.attach(cnxn);                        addCnxn(cnxn);                    }                } else if ((k.readyOps() & (SelectionKey.OP_READ | SelectionKey.OP_WRITE)) != 0) {                    //在第二个循环的时候,会进入这里,处理真正的连接请求                    NIOServerCnxn c = (NIOServerCnxn) k.attachment();                    c.doIO(k);                } else {                    if (LOG.isDebugEnabled()) {                        LOG.debug("Unexpected ops in select "                                  + k.readyOps());                    }                }            }            selected.clear();        } catch (RuntimeException e) {            LOG.warn("Ignoring unexpected runtime exception", e);        } catch (Exception e) {            LOG.warn("Ignoring exception", e);        }    }    closeAll();    LOG.info("NIOServerCnxn factory exited run method");}//NIOServerCnxn.java//第237行void doIO(SelectionKey k) throws InterruptedException {    try {        if (isSocketOpen() == false) {            LOG.warn("trying to do i/o on a null socket for session:0x"                     + Long.toHexString(sessionId));            return;        }        if (k.isReadable()) {            int rc = sock.read(incomingBuffer);            if (rc < 0) {                throw new EndOfStreamException(                    "Unable to read additional data from client sessionid 0x"                    + Long.toHexString(sessionId)                    + ", likely client has closed socket");            }            if (incomingBuffer.remaining() == 0) {                boolean isPayload;                if (incomingBuffer == lenBuffer) { // start of next request                    incomingBuffer.flip();                    isPayload = readLength(k);                    incomingBuffer.clear();                } else {                    // continuation                    isPayload = true;                }                if (isPayload) { // not the case for 4letterword                    readPayload();                }                else {                    // four letter words take care                    // need not do anything else                    return;                }            }        }        //省略部分代码    } catch (CancelledKeyException e) {            } catch (CloseRequestException e) {            } catch (EndOfStreamException e) {            } catch (IOException e) {            }}//NIOServerCnxn.java//第194行private void readPayload() throws IOException, InterruptedException {    if (incomingBuffer.remaining() != 0) { // have we read length bytes?        int rc = sock.read(incomingBuffer); // sock is non-blocking, so ok        if (rc < 0) {            throw new EndOfStreamException(                "Unable to read additional data from client sessionid 0x"                + Long.toHexString(sessionId)                + ", likely client has closed socket");        }    }    if (incomingBuffer.remaining() == 0) { // have we read length bytes?        packetReceived();        incomingBuffer.flip();        if (!initialized) {            readConnectRequest();        } else {            readRequest();        }        lenBuffer.clear();        incomingBuffer = lenBuffer;    }}//NIOServerCnxn.java//第434行private void readConnectRequest() throws IOException, InterruptedException {    if (!isZKServerRunning()) {        throw new IOException("ZooKeeperServer not running");    }    zkServer.processConnectRequest(this, incomingBuffer);    initialized = true;}//ZookeeperServer.java//第886行public void processConnectRequest(ServerCnxn cnxn, ByteBuffer incomingBuffer) throws IOException {    BinaryInputArchive bia = BinaryInputArchive.getArchive(new ByteBufferInputStream(incomingBuffer));    ConnectRequest connReq = new ConnectRequest();    connReq.deserialize(bia, "connect");    if (LOG.isDebugEnabled()) {        LOG.debug("Session establishment request from client "                  + cnxn.getRemoteSocketAddress()                  + " client's lastZxid is 0x"                  + Long.toHexString(connReq.getLastZxidSeen()));    }    boolean readOnly = false;    try {        readOnly = bia.readBool("readOnly");        cnxn.isOldClient = false;    } catch (IOException e) {        // this is ok -- just a packet from an old client which        // doesn't contain readOnly field        LOG.warn("Connection request from old client "                 + cnxn.getRemoteSocketAddress()                 + "; will be dropped if server is in r-o mode");    }    //如果客户端没有设置readOnly,但是服务端是只读的,直接抛出异常关闭连接    if (readOnly == false && this instanceof ReadOnlyZooKeeperServer) {        String msg = "Refusing session request for not-read-only client "            + cnxn.getRemoteSocketAddress();        LOG.info(msg);        throw new CloseRequestException(msg);    }    if (connReq.getLastZxidSeen() > zkDb.dataTree.lastProcessedZxid) {        String msg = "Refusing session request for client "            + cnxn.getRemoteSocketAddress()            + " as it has seen zxid 0x"            + Long.toHexString(connReq.getLastZxidSeen())            + " our last zxid is 0x"            + Long.toHexString(getZKDatabase().getDataTreeLastProcessedZxid())            + " client must try another server";        LOG.info(msg);        throw new CloseRequestException(msg);    }    //协商session超时时间    int sessionTimeout = connReq.getTimeOut();    byte passwd[] = connReq.getPasswd();    int minSessionTimeout = getMinSessionTimeout();    if (sessionTimeout < minSessionTimeout) {        sessionTimeout = minSessionTimeout;    }    int maxSessionTimeout = getMaxSessionTimeout();    if (sessionTimeout > maxSessionTimeout) {        sessionTimeout = maxSessionTimeout;    }    cnxn.setSessionTimeout(sessionTimeout);    // We don't want to receive any packets until we are sure that the    // session is setup    cnxn.disableRecv();    long sessionId = connReq.getSessionId();    if (sessionId != 0) {        //如果sessionId不是0,说明是之前已经连接过的客户端因为掉线等原因重新连接的情况        long clientSessionId = connReq.getSessionId();        LOG.info("Client attempting to renew session 0x"                 + Long.toHexString(clientSessionId)                 + " at " + cnxn.getRemoteSocketAddress());        serverCnxnFactory.closeSession(sessionId);        cnxn.setSessionId(sessionId);        reopenSession(cnxn, sessionId, passwd, sessionTimeout);    } else {        LOG.info("Client attempting to establish new session at "                 + cnxn.getRemoteSocketAddress());        createSession(cnxn, passwd, sessionTimeout);    }}//ZookeeperServer.java//第617行long createSession(ServerCnxn cnxn, byte passwd[], int timeout) {    //创建一个session,zookeeper的session管理比较复杂,具体情况下一章分析    long sessionId = sessionTracker.createSession(timeout);    Random r = new Random(sessionId ^ superSecret);    r.nextBytes(passwd);    ByteBuffer to = ByteBuffer.allocate(4);    to.putInt(timeout);    cnxn.setSessionId(sessionId);    //响应客户端    submitRequest(cnxn, sessionId, OpCode.createSession, 0, to, null);    return sessionId;}//ZookeeperServer.java//第728行public void submitRequest(Request si) {    //省略部分代码        try {        //刷新session的超时时间        touch(si.cnxn);        boolean validpacket = Request.isValid(si.type);        if (validpacket) {            //提交给PrepRequestProcessor进一步处理            firstProcessor.processRequest(si);            if (si.cnxn != null) {                incInProcess();            }        } else {            LOG.warn("Received packet at server of unknown type " + si.type);            new UnimplementedRequestProcessor().processRequest(si);        }    } catch (MissingSessionException e) {        if (LOG.isDebugEnabled()) {            LOG.debug("Dropping request: " + e.getMessage());        }    } catch (RequestProcessorException e) {        LOG.error("Unable to process request:" + e.getMessage(), e);    }}//PrepRequestProcessor.java//第294行protected void pRequest2Txn(int type, long zxid, Request request, Record record, boolean deserialize)            throws KeeperException, IOException, RequestProcessorException {    request.hdr = new TxnHeader(request.sessionId, request.cxid, zxid,                                Time.currentWallTime(), type);    switch (type) {        //省略部分代码                case OpCode.createSession:            request.request.rewind();            int to = request.request.getInt();            request.txn = new CreateSessionTxn(to);            request.request.rewind();            //这里又调用了一次addSession,但是之前的代码其实已经新增过了,不太明白为什么            zks.sessionTracker.addSession(request.sessionId, to);            zks.setOwner(request.sessionId, request.getOwner());            break;                    //省略部分代码                    default:            LOG.error("Invalid OpCode: {} received by PrepRequestProcessor", type);    }}

关于Zookeeper之怎样建立连接服务端就分享到这里了,希望以上内容可以对大家有一定的帮助,可以学到更多知识。如果觉得文章不错,可以把它分享出去让更多的人看到。

0