千家信息网

zk中快速选举FastLeaderElection的实现方法

发表于:2024-11-14 作者:千家信息网编辑
千家信息网最后更新 2024年11月14日,本篇内容主要讲解"zk中快速选举FastLeaderElection的实现方法",感兴趣的朋友不妨来看看。本文介绍的方法操作简单快捷,实用性强。下面就让小编来带大家学习"zk中快速选举FastLead
千家信息网最后更新 2024年11月14日zk中快速选举FastLeaderElection的实现方法

本篇内容主要讲解"zk中快速选举FastLeaderElection的实现方法",感兴趣的朋友不妨来看看。本文介绍的方法操作简单快捷,实用性强。下面就让小编来带大家学习"zk中快速选举FastLeaderElection的实现方法"吧!

选举涉及概念

服务器状态

投票

如何选择投票?

协议

选举

如何进行选举?

epoch

发送者

接收者

发送队列

接收队列

服务器状态

public enum ServerState {

LOOKING,寻找Leader状态,当服务处于该状态时当前集群中没有Leader,因此需要进入Leader选举

FOLLOWING,跟随者状态,表示当前是Follower

LEADING,领导者状态,表明当前是Leader

OBSERVING ,观察者

}

Vote投票

id

被推荐的leader的sid

zxid

被推荐leader的事务id

electionEpoch

判断多个投票是否在同一轮选举周期中,在服务器是一个字增序列,进入新一轮投票后,都对该值进行加1

peerEpoch

被推荐的leader的epoch

state

当前服务器状态

内部类

有Messenger ToSend Notification类

Notifications让其他节点知道指定节点的投票发生了变化,可能是由于节点竞选或投票中有更高zxid或相同的zxid有更高的serverid

ToSend类用于包装发送的信息

Messenger分为

WorkerReceiver和WorkerSender

主要完成这两个对象信息的设置

LinkedBlockingQueue sendqueue;

LinkedBlockingQueue recvqueue;

public Vote lookForLeader() throws InterruptedException {    try {        self.jmxLeaderElectionBean = new LeaderElectionBean();        MBeanRegistry.getInstance().register(self.jmxLeaderElectionBean, self.jmxLocalPeerBean);    } catch (Exception e) {        LOG.warn("Failed to register with JMX", e);        self.jmxLeaderElectionBean = null;    }    self.start_fle = Time.currentElapsedTime();    try {        Map recvset = new HashMap();        Map outofelection = new HashMap();        int notTimeout = minNotificationInterval;        synchronized (this) {            logicalclock.incrementAndGet();            updateProposal(getInitId(), getInitLastLoggedZxid(), getPeerEpoch());        }        LOG.info("New election. My id =  " + self.getId() + ", proposed zxid=0x" + Long.toHexString(proposedZxid));        sendNotifications();        SyncedLearnerTracker voteSet;        /*         * Loop in which we exchange notifications until we find a leader         */        while ((self.getPeerState() == ServerState.LOOKING) && (!stop)) {            /*             * Remove next notification from queue, times out after 2 times             * the termination time             */            Notification n = recvqueue.poll(notTimeout, TimeUnit.MILLISECONDS);            /*             * Sends more notifications if haven't received enough.             * Otherwise processes new notification.             */            if (n == null) {                if (manager.haveDelivered()) {                    sendNotifications();                } else {                    manager.connectAll();                }                /*                 * Exponential backoff                 */                int tmpTimeOut = notTimeout * 2;                notTimeout = (tmpTimeOut < maxNotificationInterval ? tmpTimeOut : maxNotificationInterval);                LOG.info("Notification time out: " + notTimeout);            } else if (validVoter(n.sid) && validVoter(n.leader)) {                /*                 * Only proceed if the vote comes from a replica in the current or next                 * voting view for a replica in the current or next voting view.                 */                switch (n.state) {                case LOOKING:                    if (getInitLastLoggedZxid() == -1) {                        LOG.debug("Ignoring notification as our zxid is -1");                        break;                    }                    if (n.zxid == -1) {                        LOG.debug("Ignoring notification from member with -1 zxid {}", n.sid);                        break;                    }                    // If notification > current, replace and send messages out                    if (n.electionEpoch > logicalclock.get()) {                        logicalclock.set(n.electionEpoch);                        recvset.clear();                        if (totalOrderPredicate(n.leader, n.zxid, n.peerEpoch, getInitId(), getInitLastLoggedZxid(), getPeerEpoch())) {                            updateProposal(n.leader, n.zxid, n.peerEpoch);                        } else {                            updateProposal(getInitId(), getInitLastLoggedZxid(), getPeerEpoch());                        }                        sendNotifications();                    } else if (n.electionEpoch < logicalclock.get()) {                        if (LOG.isDebugEnabled()) {                            LOG.debug(                                "Notification election epoch is smaller than logicalclock. n.electionEpoch = 0x" + Long.toHexString(n.electionEpoch)                                + ", logicalclock=0x" + Long.toHexString(logicalclock.get()));                        }                        break;                    } else if (totalOrderPredicate(n.leader, n.zxid, n.peerEpoch, proposedLeader, proposedZxid, proposedEpoch)) {                        updateProposal(n.leader, n.zxid, n.peerEpoch);                        sendNotifications();                    }                    if (LOG.isDebugEnabled()) {                        LOG.debug("Adding vote: from=" + n.sid                                  + ", proposed leader=" + n.leader                                  + ", proposed zxid=0x" + Long.toHexString(n.zxid)                                  + ", proposed election epoch=0x" + Long.toHexString(n.electionEpoch));                    }                    // don't care about the version if it's in LOOKING state                    recvset.put(n.sid, new Vote(n.leader, n.zxid, n.electionEpoch, n.peerEpoch));                    voteSet = getVoteTracker(recvset, new Vote(proposedLeader, proposedZxid, logicalclock.get(), proposedEpoch));                    if (voteSet.hasAllQuorums()) {                        // Verify if there is any change in the proposed leader                        while ((n = recvqueue.poll(finalizeWait, TimeUnit.MILLISECONDS)) != null) {                            if (totalOrderPredicate(n.leader, n.zxid, n.peerEpoch, proposedLeader, proposedZxid, proposedEpoch)) {                                recvqueue.put(n);                                break;                            }                        }                        /*                         * This predicate is true once we don't read any new                         * relevant message from the reception queue                         */                        if (n == null) {                            setPeerState(proposedLeader, voteSet);                            Vote endVote = new Vote(proposedLeader, proposedZxid, logicalclock.get(), proposedEpoch);                            leaveInstance(endVote);                            return endVote;                        }                    }                    break;                case OBSERVING:                    LOG.debug("Notification from observer: {}", n.sid);                    break;                case FOLLOWING:                case LEADING:                    /*                     * Consider all notifications from the same epoch                     * together.                     */                    if (n.electionEpoch == logicalclock.get()) {                        recvset.put(n.sid, new Vote(n.leader, n.zxid, n.electionEpoch, n.peerEpoch));                        voteSet = getVoteTracker(recvset, new Vote(n.version, n.leader, n.zxid, n.electionEpoch, n.peerEpoch, n.state));                        if (voteSet.hasAllQuorums() && checkLeader(outofelection, n.leader, n.electionEpoch)) {                            setPeerState(n.leader, voteSet);                            Vote endVote = new Vote(n.leader, n.zxid, n.electionEpoch, n.peerEpoch);                            leaveInstance(endVote);                            return endVote;                        }                    }                    /*                     * Before joining an established ensemble, verify that                     * a majority are following the same leader.                     */                    outofelection.put(n.sid, new Vote(n.version, n.leader, n.zxid, n.electionEpoch, n.peerEpoch, n.state));                    voteSet = getVoteTracker(outofelection, new Vote(n.version, n.leader, n.zxid, n.electionEpoch, n.peerEpoch, n.state));                    if (voteSet.hasAllQuorums() && checkLeader(outofelection, n.leader, n.electionEpoch)) {                        synchronized (this) {                            logicalclock.set(n.electionEpoch);                            setPeerState(n.leader, voteSet);                        }                        Vote endVote = new Vote(n.leader, n.zxid, n.electionEpoch, n.peerEpoch);                        leaveInstance(endVote);                        return endVote;                    }                    break;                default:                    LOG.warn("Notification state unrecoginized: " + n.state + " (n.state), " + n.sid + " (n.sid)");                    break;                }            } else {                if (!validVoter(n.leader)) {                    LOG.warn("Ignoring notification for non-cluster member sid {} from sid {}", n.leader, n.sid);                }                if (!validVoter(n.sid)) {                    LOG.warn("Ignoring notification for sid {} from non-quorum member sid {}", n.leader, n.sid);                }            }        }        return null;    } finally {        try {            if (self.jmxLeaderElectionBean != null) {                MBeanRegistry.getInstance().unregister(self.jmxLeaderElectionBean);            }        } catch (Exception e) {            LOG.warn("Failed to unregister with JMX", e);        }        self.jmxLeaderElectionBean = null;        LOG.debug("Number of connection processing threads: {}", manager.getConnectionThreadCount());    }}投票相关函数更新投票字段synchronized void updateProposal(long leader, long zxid, long epoch) {    if (LOG.isDebugEnabled()) {        LOG.debug("Updating proposal: " + leader                  + " (newleader), 0x" + Long.toHexString(zxid)                  + " (newzxid), " + proposedLeader                  + " (oldleader), 0x" + Long.toHexString(proposedZxid)                  + " (oldzxid)");    }    proposedLeader = leader;    proposedZxid = zxid;    proposedEpoch = epoch;}生成投票的函数public synchronized Vote getVote() {    return new Vote(proposedLeader, proposedZxid, proposedEpoch);}状态信息获取函数private ServerState learningState() {    if (self.getLearnerType() == LearnerType.PARTICIPANT) {        LOG.debug("I am a participant: {}", self.getId());        return ServerState.FOLLOWING;    } else {        LOG.debug("I am an observer: {}", self.getId());        return ServerState.OBSERVING;    }}获取参与投票服务器的标识idprivate long getInitId() {    if (self.getQuorumVerifier().getVotingMembers().containsKey(self.getId())) {        return self.getId();    } else {        return Long.MIN_VALUE;    }}获取最新的日志事务idprivate long getInitLastLoggedZxid() {    if (self.getLearnerType() == LearnerType.PARTICIPANT) {        return self.getLastLoggedZxid();    } else {        return Long.MIN_VALUE;    }}获取保存在文件中当前epochpublic long getCurrentEpoch() throws IOException {    if (currentEpoch == -1) {        currentEpoch = readLongFromFile(CURRENT_EPOCH_FILENAME);    }    return currentEpoch;}选举相关函数判断当前 a pair (server id, zxid)是否赢得了当前选票,总而言之 ,当前选票和新选票,哪个id大就选哪一个protected boolean totalOrderPredicate(long newId, long newZxid, long newEpoch, long curId, long curZxid, long curEpoch) {    if (LOG.isDebugEnabled()) {        LOG.debug("id: " + newId                  + ", proposed id: " + curId                  + ", zxid: 0x" + Long.toHexString(newZxid)                  + ", proposed zxid: 0x" + Long.toHexString(curZxid));    }    if (self.getQuorumVerifier().getWeight(newId) == 0) {        return false;    }    /*     * We return true if one of the following three cases hold:     * 1- New epoch is higher     * 2- New epoch is the same as current epoch, but new zxid is higher     * 3- New epoch is the same as current epoch, new zxid is the same     *  as current zxid, but server id is higher.     */    return ((newEpoch > curEpoch)            || ((newEpoch == curEpoch)                && ((newZxid > curZxid)                    || ((newZxid == curZxid)                        && (newId > curId)))));}判断是否是Leader,把不是leader的情况拆出来protected boolean checkLeader(Map votes, long leader, long electionEpoch) {    boolean predicate = true;    /*     * If everyone else thinks I'm the leader, I must be the leader.     * The other two checks are just for the case in which I'm not the     * leader. If I'm not the leader and I haven't received a message     * from leader stating that it is leading, then predicate is false.     */    if (leader != self.getId()) {        if (votes.get(leader) == null) {            predicate = false;        } else if (votes.get(leader).getState() != ServerState.LEADING) {            predicate = false;        }    } else if (logicalclock.get() != electionEpoch) {        predicate = false;    }    return predicate;}

开始新一轮竞选工作

public Vote lookForLeader() throws InterruptedException

选举中涉及的数据结构信息类

electionEpoch和peerEpoch区别

electionEpoch是选举周期,用于判断是不是他弄一个选举周期,从0开始累计

peerEpoch是当前周期

两个vote比较规则

依次比较peerEpoch,zxid,sid

peerEpoch代表所处周期,越大投票越新

peerEpoch相同时,zxid代表一个周期中事务记录,越大投票越新

peerEpoch,zxid均相同时,sid大的赢取选票

到此,相信大家对"zk中快速选举FastLeaderElection的实现方法"有了更深的了解,不妨来实际操作一番吧!这里是网站,更多相关内容可以进入相关频道进行查询,关注我们,继续学习!

0