千家信息网

zk工厂方法如何实现NIOServerCnxnFactory

发表于:2025-01-31 作者:千家信息网编辑
千家信息网最后更新 2025年01月31日,这篇文章主要介绍了zk工厂方法如何实现NIOServerCnxnFactory,具有一定借鉴价值,感兴趣的朋友可以参考下,希望大家阅读完这篇文章之后大有收获,下面让小编带着大家一起了解一下。NIOSe
千家信息网最后更新 2025年01月31日zk工厂方法如何实现NIOServerCnxnFactory

这篇文章主要介绍了zk工厂方法如何实现NIOServerCnxnFactory,具有一定借鉴价值,感兴趣的朋友可以参考下,希望大家阅读完这篇文章之后大有收获,下面让小编带着大家一起了解一下。

NIOServerCnxnFactory类

内部类

AbstractSelectThread

AcceptThread

SelectorThread

属性

ZOOKEEPER_NIO_SESSIONLESS_CNXN_TIMEOUT

10s session过期时间

ZOOKEEPER_NIO_NUM_SELECTOR_THREADS

selector 线程数

ZOOKEEPER_NIO_NUM_WORKER_THREADS

worker 线程数

directBuffer

buffer用来线程间数据交互

ipMap

限制ip上连接数

cnxnExpiryQueue

连接失效时间分桶队列

workerPool

WorkerService worker执行服务

acceptThread

接收新连接,simple round-robin 分配到选择线程

selectorThreads

方法

停止接收

private void pauseAccept(long millisecs) {    acceptKey.interestOps(0);    try {        selector.select(millisecs);    } catch (IOException e) {        // ignore    } finally {        acceptKey.interestOps(SelectionKey.OP_ACCEPT);    }}private boolean doAccept() {    boolean accepted = false;    SocketChannel sc = null;    try {        sc = acceptSocket.accept();        accepted = true;        InetAddress ia = sc.socket().getInetAddress();        int cnxncount = getClientCnxnCount(ia);        if (maxClientCnxns > 0 && cnxncount >= maxClientCnxns) {            throw new IOException("Too many connections from " + ia + " - max is " + maxClientCnxns);        }        LOG.debug("Accepted socket connection from {}", sc.socket().getRemoteSocketAddress());        sc.configureBlocking(false);        // Round-robin assign this connection to a selector thread        if (!selectorIterator.hasNext()) {            selectorIterator = selectorThreads.iterator();        }        SelectorThread selectorThread = selectorIterator.next();        if (!selectorThread.addAcceptedConnection(sc)) {            throw new IOException("Unable to add connection to selector queue"                                  + (stopped ? " (shutdown in progress)" : ""));        }        acceptErrorLogger.flush();    } catch (IOException e) {        // accept, maxClientCnxns, configureBlocking        ServerMetrics.getMetrics().CONNECTION_REJECTED.add(1);        acceptErrorLogger.rateLimitLog("Error accepting new connection: " + e.getMessage());        fastCloseSock(sc);    }    return accepted;}private void processAcceptedConnections() {    SocketChannel accepted;    while (!stopped && (accepted = acceptedQueue.poll()) != null) {        SelectionKey key = null;        try {            key = accepted.register(selector, SelectionKey.OP_READ);            NIOServerCnxn cnxn = createConnection(accepted, key, this);            key.attach(cnxn);            addCnxn(cnxn);        } catch (IOException e) {            // register, createConnection            cleanupSelectionKey(key);            fastCloseSock(accepted);        }    }}configure获取客户端连接数private int getClientCnxnCount(InetAddress cl) {    Set s = ipMap.get(cl);    if (s == null) {        return 0;    }    return s.size();}创建连接protected NIOServerCnxn createConnection(SocketChannel sock, SelectionKey sk, SelectorThread selectorThread) throws IOException {    return new NIOServerCnxn(zkServer, sock, sk, this, selectorThread);}创建连接private void addCnxn(NIOServerCnxn cnxn) throws IOException {    InetAddress addr = cnxn.getSocketAddress();    if (addr == null) {        throw new IOException("Socket of " + cnxn + " has been closed");    }    Set set = ipMap.get(addr);    if (set == null) {        // in general we will see 1 connection from each        // host, setting the initial cap to 2 allows us        // to minimize mem usage in the common case        // of 1 entry --  we need to set the initial cap        // to 2 to avoid rehash when the first entry is added        // Construct a ConcurrentHashSet using a ConcurrentHashMap        set = Collections.newSetFromMap(new ConcurrentHashMap(2));        // Put the new set in the map, but only if another thread        // hasn't beaten us to it        Set existingSet = ipMap.putIfAbsent(addr, set);        if (existingSet != null) {            set = existingSet;        }    }    set.add(cnxn);    cnxns.add(cnxn);    touchCnxn(cnxn);}思考:为什么单机和集群模式启动不一样单机可以直接从日志,快照恢复数据集群根据角色划分,涉及到数据同步

感谢你能够认真阅读完这篇文章,希望小编分享的"zk工厂方法如何实现NIOServerCnxnFactory"这篇文章对大家有帮助,同时也希望大家多多支持,关注行业资讯频道,更多相关知识等着你来学习!

0