怎样建立连接Zookeeper中的服务端
发表于:2024-12-13 作者:千家信息网编辑
千家信息网最后更新 2024年12月13日,这篇文章给大家介绍Zookeeper之怎样建立连接服务端,内容非常详细,感兴趣的小伙伴们可以参考借鉴,希望对大家能有所帮助。服务端处理请求的代码有两种NIOServerCnxnFactory和Nett
千家信息网最后更新 2024年12月13日怎样建立连接Zookeeper中的服务端
这篇文章给大家介绍Zookeeper之怎样建立连接服务端,内容非常详细,感兴趣的小伙伴们可以参考借鉴,希望对大家能有所帮助。
服务端处理请求的代码有两种NIOServerCnxnFactory和NettyServerCnxnFactory,默认是NIOServerCnxnFactory,可以通过指定zookeeper.serverCnxnFactory参数来修改。
这两个类逻辑是一样的,只是一个用的java原生的NIO,一个用的netty,这里我们就分析下NIOServerCnxnFactory。
NIOServerCnxnFactory实现了Runnable接口,看下它的run方法,循环处理请求
//NIOServerCnxnFactory.java//第200行public void run() { while (!ss.socket().isClosed()) { try { selector.select(1000); Setselected; synchronized (this) { selected = selector.selectedKeys(); } ArrayList selectedList = new ArrayList ( selected); Collections.shuffle(selectedList); for (SelectionKey k : selectedList) { //如果是连接请求 if ((k.readyOps() & SelectionKey.OP_ACCEPT) != 0) { SocketChannel sc = ((ServerSocketChannel) k .channel()).accept(); InetAddress ia = sc.socket().getInetAddress(); //获取IP地址对应的客户端连接数 int cnxncount = getClientCnxnCount(ia); //如果超出则关闭 if (maxClientCnxns > 0 && cnxncount >= maxClientCnxns){ LOG.warn("Too many connections from " + ia + " - max is " + maxClientCnxns ); sc.close(); } else { LOG.info("Accepted socket connection from " + sc.socket().getRemoteSocketAddress()); sc.configureBlocking(false); SelectionKey sk = sc.register(selector, SelectionKey.OP_READ); //每一个连接都是一个NIOServerCnxn NIOServerCnxn cnxn = createConnection(sc, sk); sk.attach(cnxn); addCnxn(cnxn); } } else if ((k.readyOps() & (SelectionKey.OP_READ | SelectionKey.OP_WRITE)) != 0) { //在第二个循环的时候,会进入这里,处理真正的连接请求 NIOServerCnxn c = (NIOServerCnxn) k.attachment(); c.doIO(k); } else { if (LOG.isDebugEnabled()) { LOG.debug("Unexpected ops in select " + k.readyOps()); } } } selected.clear(); } catch (RuntimeException e) { LOG.warn("Ignoring unexpected runtime exception", e); } catch (Exception e) { LOG.warn("Ignoring exception", e); } } closeAll(); LOG.info("NIOServerCnxn factory exited run method");}//NIOServerCnxn.java//第237行void doIO(SelectionKey k) throws InterruptedException { try { if (isSocketOpen() == false) { LOG.warn("trying to do i/o on a null socket for session:0x" + Long.toHexString(sessionId)); return; } if (k.isReadable()) { int rc = sock.read(incomingBuffer); if (rc < 0) { throw new EndOfStreamException( "Unable to read additional data from client sessionid 0x" + Long.toHexString(sessionId) + ", likely client has closed socket"); } if (incomingBuffer.remaining() == 0) { boolean isPayload; if (incomingBuffer == lenBuffer) { // start of next request incomingBuffer.flip(); isPayload = readLength(k); incomingBuffer.clear(); } else { // continuation isPayload = true; } if (isPayload) { // not the case for 4letterword readPayload(); } else { // four letter words take care // need not do anything else return; } } } //省略部分代码 } catch (CancelledKeyException e) { } catch (CloseRequestException e) { } catch (EndOfStreamException e) { } catch (IOException e) { }}//NIOServerCnxn.java//第194行private void readPayload() throws IOException, InterruptedException { if (incomingBuffer.remaining() != 0) { // have we read length bytes? int rc = sock.read(incomingBuffer); // sock is non-blocking, so ok if (rc < 0) { throw new EndOfStreamException( "Unable to read additional data from client sessionid 0x" + Long.toHexString(sessionId) + ", likely client has closed socket"); } } if (incomingBuffer.remaining() == 0) { // have we read length bytes? packetReceived(); incomingBuffer.flip(); if (!initialized) { readConnectRequest(); } else { readRequest(); } lenBuffer.clear(); incomingBuffer = lenBuffer; }}//NIOServerCnxn.java//第434行private void readConnectRequest() throws IOException, InterruptedException { if (!isZKServerRunning()) { throw new IOException("ZooKeeperServer not running"); } zkServer.processConnectRequest(this, incomingBuffer); initialized = true;}//ZookeeperServer.java//第886行public void processConnectRequest(ServerCnxn cnxn, ByteBuffer incomingBuffer) throws IOException { BinaryInputArchive bia = BinaryInputArchive.getArchive(new ByteBufferInputStream(incomingBuffer)); ConnectRequest connReq = new ConnectRequest(); connReq.deserialize(bia, "connect"); if (LOG.isDebugEnabled()) { LOG.debug("Session establishment request from client " + cnxn.getRemoteSocketAddress() + " client's lastZxid is 0x" + Long.toHexString(connReq.getLastZxidSeen())); } boolean readOnly = false; try { readOnly = bia.readBool("readOnly"); cnxn.isOldClient = false; } catch (IOException e) { // this is ok -- just a packet from an old client which // doesn't contain readOnly field LOG.warn("Connection request from old client " + cnxn.getRemoteSocketAddress() + "; will be dropped if server is in r-o mode"); } //如果客户端没有设置readOnly,但是服务端是只读的,直接抛出异常关闭连接 if (readOnly == false && this instanceof ReadOnlyZooKeeperServer) { String msg = "Refusing session request for not-read-only client " + cnxn.getRemoteSocketAddress(); LOG.info(msg); throw new CloseRequestException(msg); } if (connReq.getLastZxidSeen() > zkDb.dataTree.lastProcessedZxid) { String msg = "Refusing session request for client " + cnxn.getRemoteSocketAddress() + " as it has seen zxid 0x" + Long.toHexString(connReq.getLastZxidSeen()) + " our last zxid is 0x" + Long.toHexString(getZKDatabase().getDataTreeLastProcessedZxid()) + " client must try another server"; LOG.info(msg); throw new CloseRequestException(msg); } //协商session超时时间 int sessionTimeout = connReq.getTimeOut(); byte passwd[] = connReq.getPasswd(); int minSessionTimeout = getMinSessionTimeout(); if (sessionTimeout < minSessionTimeout) { sessionTimeout = minSessionTimeout; } int maxSessionTimeout = getMaxSessionTimeout(); if (sessionTimeout > maxSessionTimeout) { sessionTimeout = maxSessionTimeout; } cnxn.setSessionTimeout(sessionTimeout); // We don't want to receive any packets until we are sure that the // session is setup cnxn.disableRecv(); long sessionId = connReq.getSessionId(); if (sessionId != 0) { //如果sessionId不是0,说明是之前已经连接过的客户端因为掉线等原因重新连接的情况 long clientSessionId = connReq.getSessionId(); LOG.info("Client attempting to renew session 0x" + Long.toHexString(clientSessionId) + " at " + cnxn.getRemoteSocketAddress()); serverCnxnFactory.closeSession(sessionId); cnxn.setSessionId(sessionId); reopenSession(cnxn, sessionId, passwd, sessionTimeout); } else { LOG.info("Client attempting to establish new session at " + cnxn.getRemoteSocketAddress()); createSession(cnxn, passwd, sessionTimeout); }}//ZookeeperServer.java//第617行long createSession(ServerCnxn cnxn, byte passwd[], int timeout) { //创建一个session,zookeeper的session管理比较复杂,具体情况下一章分析 long sessionId = sessionTracker.createSession(timeout); Random r = new Random(sessionId ^ superSecret); r.nextBytes(passwd); ByteBuffer to = ByteBuffer.allocate(4); to.putInt(timeout); cnxn.setSessionId(sessionId); //响应客户端 submitRequest(cnxn, sessionId, OpCode.createSession, 0, to, null); return sessionId;}//ZookeeperServer.java//第728行public void submitRequest(Request si) { //省略部分代码 try { //刷新session的超时时间 touch(si.cnxn); boolean validpacket = Request.isValid(si.type); if (validpacket) { //提交给PrepRequestProcessor进一步处理 firstProcessor.processRequest(si); if (si.cnxn != null) { incInProcess(); } } else { LOG.warn("Received packet at server of unknown type " + si.type); new UnimplementedRequestProcessor().processRequest(si); } } catch (MissingSessionException e) { if (LOG.isDebugEnabled()) { LOG.debug("Dropping request: " + e.getMessage()); } } catch (RequestProcessorException e) { LOG.error("Unable to process request:" + e.getMessage(), e); }}//PrepRequestProcessor.java//第294行protected void pRequest2Txn(int type, long zxid, Request request, Record record, boolean deserialize) throws KeeperException, IOException, RequestProcessorException { request.hdr = new TxnHeader(request.sessionId, request.cxid, zxid, Time.currentWallTime(), type); switch (type) { //省略部分代码 case OpCode.createSession: request.request.rewind(); int to = request.request.getInt(); request.txn = new CreateSessionTxn(to); request.request.rewind(); //这里又调用了一次addSession,但是之前的代码其实已经新增过了,不太明白为什么 zks.sessionTracker.addSession(request.sessionId, to); zks.setOwner(request.sessionId, request.getOwner()); break; //省略部分代码 default: LOG.error("Invalid OpCode: {} received by PrepRequestProcessor", type); }}
关于Zookeeper之怎样建立连接服务端就分享到这里了,希望以上内容可以对大家有一定的帮助,可以学到更多知识。如果觉得文章不错,可以把它分享出去让更多的人看到。
代码
部分
处理
服务
客户
客户端
内容
情况
时间
更多
分析
帮助
循环
不错
复杂
两个
兴趣
原因
参数
只是
数据库的安全要保护哪些东西
数据库安全各自的含义是什么
生产安全数据库录入
数据库的安全性及管理
数据库安全策略包含哪些
海淀数据库安全审计系统
建立农村房屋安全信息数据库
易用的数据库客户端支持安全管理
连接数据库失败ssl安全错误
数据库的锁怎样保障安全
电脑网络安全设置pdf
怎么预防网络安全隐患
圀新网络技术须抓人
朋友专享网络安全吗
北邮网络安全是否是a类学科
模拟mc服务器
电脑显示服务器状态变更
绝地吃鸡服务器
六一儿童网络安全
普法网络安全法实施时间
衡阳回收戴尔服务器
和祐美和医院网络安全改造
高地分期网络安全
无线网络技术考核代码
郑州鼎和软件开发公司
浙江梵森互联网科技有限公司
天耀手游软件开发公司
一台服务器辐射距离
百信服务器单价
手机游戏软件开发岗位
上海交友软件开发哪里好
no数据库
网络安全的作用与特征
中国生物医学数据库注册
南京尚运网络技术有限公司
番禺软件开发哪里不错
剑网3怎么连不上服务器
河北服务器机柜哪种好
网络安全与信息化 评论
大唐荣耀服务器有哪些