千家信息网

Zookeeper源码中session管理的示例分析

发表于:2025-02-11 作者:千家信息网编辑
千家信息网最后更新 2025年02月11日,Zookeeper源码中session管理的示例分析,相信很多没有经验的人对此束手无策,为此本文总结了问题出现的原因和解决方法,通过这篇文章希望你能解决这个问题。一、session的创建//Zooke
千家信息网最后更新 2025年02月11日Zookeeper源码中session管理的示例分析

Zookeeper源码中session管理的示例分析,相信很多没有经验的人对此束手无策,为此本文总结了问题出现的原因和解决方法,通过这篇文章希望你能解决这个问题。

一、session的创建

//ZookeeperServer.java//第617行long createSession(ServerCnxn cnxn, byte passwd[], int timeout) {    long sessionId = sessionTracker.createSession(timeout);        //省略部分代码}//SessionTrackerImpl.java//第236行synchronized public long createSession(int sessionTimeout) {    addSession(nextSessionId, sessionTimeout);    return nextSessionId++;}//SessionTrackerImpl.java//第241行synchronized public void addSession(long id, int sessionTimeout) {    //保存sessionId和过期时间的关系    sessionsWithTimeout.put(id, sessionTimeout);        //如果session不存在,就新建一个    if (sessionsById.get(id) == null) {        SessionImpl s = new SessionImpl(id, sessionTimeout, 0);        sessionsById.put(id, s);        //省略日志打印    } else {        //省略日志打印    }    //将session按照一定规则聚合    touchSession(id, sessionTimeout);}//SessionTrackerImpl.java//第166行synchronized public boolean touchSession(long sessionId, int timeout) {    if (LOG.isTraceEnabled()) {        //省略日志打印    }    SessionImpl s = sessionsById.get(sessionId);    // Return false, if the session doesn't exists or marked as closing    if (s == null || s.isClosing()) {        return false;    }    long expireTime = roundToInterval(Time.currentElapsedTime() + timeout);    //如果当前session的过期时间大于这个值,不需要操作    if (s.tickTime >= expireTime) {        // Nothing needs to be done        return true;    }    //将session从旧的桶中移出,并放入(剩余超时时间更长的)新的桶    SessionSet set = sessionSets.get(s.tickTime);    if (set != null) {        set.sessions.remove(s);    }    s.tickTime = expireTime;    set = sessionSets.get(s.tickTime);    if (set == null) {        set = new SessionSet();        sessionSets.put(expireTime, set);    }    set.sessions.add(s);    return true;}//SessionTrackerImpl.java//第89行private long roundToInterval(long time) {    //expirationInterval就是zookeeper的心跳周期(tickTime),默认值是3000    //这段计算的意思是将过期时间每3000ms分一个段    //比如200ms、500ms、3000ms返回0,3001ms、5000ms返回3000    //由于这里的time是加了Time.currentElapsedTime()的,所以不会出现0的情况    return (time / expirationInterval + 1) * expirationInterval;}

二、session激活

//ZookeeperServer.java//第728行public void submitRequest(Request si) {    //省略部分代码        try {        touch(si.cnxn);                //省略部分代码    } catch (MissingSessionException e) {        if (LOG.isDebugEnabled()) {            LOG.debug("Dropping request: " + e.getMessage());        }    } catch (RequestProcessorException e) {        LOG.error("Unable to process request:" + e.getMessage(), e);    }}//ZookeeperServer.java//第368行void touch(ServerCnxn cnxn) throws MissingSessionException {    //省略部分代码    if (!sessionTracker.touchSession(id, to)) {        throw new MissingSessionException(            "No session with sessionid 0x" + Long.toHexString(id)            + " exists, probably expired and removed");    }}

zookeeper服务端响应客户端的请求时,都会调用submitRequest方法,最终会调用到touchSession方法,这里会将session移动到新的桶中

三、session的过期

//SessionTrackerImpl.java//第142行synchronized public void run() {    try {        while (running) {            currentTime = Time.currentElapsedTime();            //在SessionTrackerImpl初始化的时候,会给nextExpirationTime赋一个初值            //nextExpirationTime = roundToInterval(Time.currentElapsedTime());            if (nextExpirationTime > currentTime) {                this.wait(nextExpirationTime - currentTime);                continue;            }            SessionSet set;            //如果到达了过期时间,则移除对应桶中的所有session            set = sessionSets.remove(nextExpirationTime);            if (set != null) {                for (SessionImpl s : set.sessions) {                    setSessionClosing(s.sessionId);                    expirer.expire(s);                }            }            nextExpirationTime += expirationInterval;        }    } catch (InterruptedException e) {        handleException(this.getName(), e);    }    LOG.info("SessionTrackerImpl exited loop!");}//ZookeeperServer.java//第353行public void expire(Session session) {    long sessionId = session.getSessionId();    LOG.info("Expiring session 0x" + Long.toHexString(sessionId)             + ", timeout of " + session.getTimeout() + "ms exceeded");    close(sessionId);}//ZookeeperServer.java//第329行private void close(long sessionId) {    submitRequest(null, sessionId, OpCode.closeSession, 0, null, null);}//PrepRequestProcessor.java//第294行protected void pRequest2Txn(int type, long zxid, Request request, Record record, boolean deserialize)            throws KeeperException, IOException, RequestProcessorException {    request.hdr = new TxnHeader(request.sessionId, request.cxid, zxid,                                Time.currentWallTime(), type);    switch (type) {        //省略代码                    case OpCode.closeSession:            // We don't want to do this check since the session expiration thread            // queues up this operation without being the session owner.            // this request is the last of the session so it should be ok            //zks.sessionTracker.checkSession(request.sessionId, request.getOwner());            HashSet es = zks.getZKDatabase().getEphemerals(request.sessionId);            //删除session关联的所有临时节点            synchronized (zks.outstandingChanges) {                //zookeeper的大部分操作都会记录并放入列表                for (ChangeRecord c : zks.outstandingChanges) {                    //c.stat == null表示这是删除操作                    if (c.stat == null) {                        es.remove(c.path);                    } else if (c.stat.getEphemeralOwner() == request.sessionId) {                        es.add(c.path);                    }                }                for (String path3Delete : es) {                    addChangeRecord(new ChangeRecord(request.hdr.getZxid(),                                                     path3Delete, null, 0, null));                }                zks.sessionTracker.setSessionClosing(request.sessionId);            }            LOG.info("Processed session termination for sessionid: 0x"                     + Long.toHexString(request.sessionId));            break;                 //省略代码    }}

看完上述内容,你们掌握Zookeeper源码中session管理的示例分析的方法了吗?如果还想学到更多技能或想了解更多相关内容,欢迎关注行业资讯频道,感谢各位的阅读!

0