千家信息网

nacos中RaftPeerSet的原理和作用是什么

发表于:2024-12-13 作者:千家信息网编辑
千家信息网最后更新 2024年12月13日,这篇文章主要讲解了"nacos中RaftPeerSet的原理和作用是什么",文中的讲解内容简单清晰,易于学习与理解,下面请大家跟着小编的思路慢慢深入,一起来研究和学习"nacos中RaftPeerSe
千家信息网最后更新 2024年12月13日nacos中RaftPeerSet的原理和作用是什么

这篇文章主要讲解了"nacos中RaftPeerSet的原理和作用是什么",文中的讲解内容简单清晰,易于学习与理解,下面请大家跟着小编的思路慢慢深入,一起来研究和学习"nacos中RaftPeerSet的原理和作用是什么"吧!

本文主要研究一下nacos的RaftPeerSet

RaftPeerSet

nacos-1.1.3/naming/src/main/java/com/alibaba/nacos/naming/consistency/persistent/raft/RaftPeerSet.java

@Component@DependsOn("serverListManager")public class RaftPeerSet implements ServerChangeListener, ApplicationContextAware {    @Autowired    private ServerListManager serverListManager;    private ApplicationContext applicationContext;    private AtomicLong localTerm = new AtomicLong(0L);    private RaftPeer leader = null;    private Map peers = new HashMap<>();    private Set sites = new HashSet<>();    private boolean ready = false;    public RaftPeerSet() {    }    @Override    public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {        this.applicationContext = applicationContext;    }    @PostConstruct    public void init() {        serverListManager.listen(this);    }    public RaftPeer getLeader() {        if (STANDALONE_MODE) {            return local();        }        return leader;    }    public Set allSites() {        return sites;    }    public boolean isReady() {        return ready;    }    public void remove(List servers) {        for (String server : servers) {            peers.remove(server);        }    }    public RaftPeer update(RaftPeer peer) {        peers.put(peer.ip, peer);        return peer;    }    public boolean isLeader(String ip) {        if (STANDALONE_MODE) {            return true;        }        if (leader == null) {            Loggers.RAFT.warn("[IS LEADER] no leader is available now!");            return false;        }        return StringUtils.equals(leader.ip, ip);    }    public Set allServersIncludeMyself() {        return peers.keySet();    }    public Set allServersWithoutMySelf() {        Set servers = new HashSet(peers.keySet());        // exclude myself        servers.remove(local().ip);        return servers;    }    public Collection allPeers() {        return peers.values();    }    public int size() {        return peers.size();    }    public RaftPeer decideLeader(RaftPeer candidate) {        peers.put(candidate.ip, candidate);        SortedBag ips = new TreeBag();        int maxApproveCount = 0;        String maxApprovePeer = null;        for (RaftPeer peer : peers.values()) {            if (StringUtils.isEmpty(peer.voteFor)) {                continue;            }            ips.add(peer.voteFor);            if (ips.getCount(peer.voteFor) > maxApproveCount) {                maxApproveCount = ips.getCount(peer.voteFor);                maxApprovePeer = peer.voteFor;            }        }        if (maxApproveCount >= majorityCount()) {            RaftPeer peer = peers.get(maxApprovePeer);            peer.state = RaftPeer.State.LEADER;            if (!Objects.equals(leader, peer)) {                leader = peer;                applicationContext.publishEvent(new LeaderElectFinishedEvent(this, leader));                Loggers.RAFT.info("{} has become the LEADER", leader.ip);            }        }        return leader;    }    public RaftPeer makeLeader(RaftPeer candidate) {        if (!Objects.equals(leader, candidate)) {            leader = candidate;            applicationContext.publishEvent(new MakeLeaderEvent(this, leader));            Loggers.RAFT.info("{} has become the LEADER, local: {}, leader: {}",                leader.ip, JSON.toJSONString(local()), JSON.toJSONString(leader));        }        for (final RaftPeer peer : peers.values()) {            Map params = new HashMap<>(1);            if (!Objects.equals(peer, candidate) && peer.state == RaftPeer.State.LEADER) {                try {                    String url = RaftCore.buildURL(peer.ip, RaftCore.API_GET_PEER);                    HttpClient.asyncHttpGet(url, null, params, new AsyncCompletionHandler() {                        @Override                        public Integer onCompleted(Response response) throws Exception {                            if (response.getStatusCode() != HttpURLConnection.HTTP_OK) {                                Loggers.RAFT.error("[NACOS-RAFT] get peer failed: {}, peer: {}",                                    response.getResponseBody(), peer.ip);                                peer.state = RaftPeer.State.FOLLOWER;                                return 1;                            }                            update(JSON.parseObject(response.getResponseBody(), RaftPeer.class));                            return 0;                        }                    });                } catch (Exception e) {                    peer.state = RaftPeer.State.FOLLOWER;                    Loggers.RAFT.error("[NACOS-RAFT] error while getting peer from peer: {}", peer.ip);                }            }        }        return update(candidate);    }    public RaftPeer local() {        RaftPeer peer = peers.get(NetUtils.localServer());        if (peer == null && SystemUtils.STANDALONE_MODE) {            RaftPeer localPeer = new RaftPeer();            localPeer.ip = NetUtils.localServer();            localPeer.term.set(localTerm.get());            peers.put(localPeer.ip, localPeer);            return localPeer;        }        if (peer == null) {            throw new IllegalStateException("unable to find local peer: " + NetUtils.localServer() + ", all peers: "                + Arrays.toString(peers.keySet().toArray()));        }        return peer;    }    public RaftPeer get(String server) {        return peers.get(server);    }    public int majorityCount() {        return peers.size() / 2 + 1;    }    public void reset() {        leader = null;        for (RaftPeer peer : peers.values()) {            peer.voteFor = null;        }    }    public void setTerm(long term) {        localTerm.set(term);    }    public long getTerm() {        return localTerm.get();    }    public boolean contains(RaftPeer remote) {        return peers.containsKey(remote.ip);    }    //......}
  • RaftPeerSet提供了remove、update、isLeader、allServersIncludeMyself、allServersWithoutMySelf、allPeers、decideLeader、makeLeader、majorityCount、reset等方法

  • decideLeader方法会遍历peers,然后使用TreeBag来统计peer.voteFor,当maxApproveCount大于等于majorityCount(),则将对应的peer的state标记为RaftPeer.State.LEADER,然后判断leader是否变更,变更则发布LeaderElectFinishedEvent事件

  • makeLeader方法判断candidate与当前leader是否一致,不一致则更新leader为candidate,发布MakeLeaderEvent事件,然后遍历peers给非candidate且state是LEADER状态的的节点发送API_GET_PEER请求,然后更新该peer在本地的信息,如果请求失败则更新其state为RaftPeer.State.FOLLOWER

RaftCore.MasterElection

nacos-1.1.3/naming/src/main/java/com/alibaba/nacos/naming/consistency/persistent/raft/RaftCore.java

    public class MasterElection implements Runnable {        @Override        public void run() {            try {                if (!peers.isReady()) {                    return;                }                RaftPeer local = peers.local();                local.leaderDueMs -= GlobalExecutor.TICK_PERIOD_MS;                if (local.leaderDueMs > 0) {                    return;                }                // reset timeout                local.resetLeaderDue();                local.resetHeartbeatDue();                sendVote();            } catch (Exception e) {                Loggers.RAFT.warn("[RAFT] error while master election {}", e);            }        }        public void sendVote() {            RaftPeer local = peers.get(NetUtils.localServer());            Loggers.RAFT.info("leader timeout, start voting,leader: {}, term: {}",                JSON.toJSONString(getLeader()), local.term);            peers.reset();            local.term.incrementAndGet();            local.voteFor = local.ip;            local.state = RaftPeer.State.CANDIDATE;            Map params = new HashMap<>(1);            params.put("vote", JSON.toJSONString(local));            for (final String server : peers.allServersWithoutMySelf()) {                final String url = buildURL(server, API_VOTE);                try {                    HttpClient.asyncHttpPost(url, null, params, new AsyncCompletionHandler() {                        @Override                        public Integer onCompleted(Response response) throws Exception {                            if (response.getStatusCode() != HttpURLConnection.HTTP_OK) {                                Loggers.RAFT.error("NACOS-RAFT vote failed: {}, url: {}", response.getResponseBody(), url);                                return 1;                            }                            RaftPeer peer = JSON.parseObject(response.getResponseBody(), RaftPeer.class);                            Loggers.RAFT.info("received approve from peer: {}", JSON.toJSONString(peer));                            peers.decideLeader(peer);                            return 0;                        }                    });                } catch (Exception e) {                    Loggers.RAFT.warn("error while sending vote to server: {}", server);                }            }        }    }
  • RaftCore.MasterElection的sendVote方法在请求成功时会执行peers.decideLeader(peer)方法来选举leader

RaftCore.receivedBeat

nacos-1.1.3/naming/src/main/java/com/alibaba/nacos/naming/consistency/persistent/raft/RaftCore.java

@Componentpublic class RaftCore {        //......    public RaftPeer receivedBeat(JSONObject beat) throws Exception {        final RaftPeer local = peers.local();        final RaftPeer remote = new RaftPeer();        remote.ip = beat.getJSONObject("peer").getString("ip");        remote.state = RaftPeer.State.valueOf(beat.getJSONObject("peer").getString("state"));        remote.term.set(beat.getJSONObject("peer").getLongValue("term"));        remote.heartbeatDueMs = beat.getJSONObject("peer").getLongValue("heartbeatDueMs");        remote.leaderDueMs = beat.getJSONObject("peer").getLongValue("leaderDueMs");        remote.voteFor = beat.getJSONObject("peer").getString("voteFor");        if (remote.state != RaftPeer.State.LEADER) {            Loggers.RAFT.info("[RAFT] invalid state from master, state: {}, remote peer: {}",                remote.state, JSON.toJSONString(remote));            throw new IllegalArgumentException("invalid state from master, state: " + remote.state);        }        if (local.term.get() > remote.term.get()) {            Loggers.RAFT.info("[RAFT] out of date beat, beat-from-term: {}, beat-to-term: {}, remote peer: {}, and leaderDueMs: {}"                , remote.term.get(), local.term.get(), JSON.toJSONString(remote), local.leaderDueMs);            throw new IllegalArgumentException("out of date beat, beat-from-term: " + remote.term.get()                + ", beat-to-term: " + local.term.get());        }        if (local.state != RaftPeer.State.FOLLOWER) {            Loggers.RAFT.info("[RAFT] make remote as leader, remote peer: {}", JSON.toJSONString(remote));            // mk follower            local.state = RaftPeer.State.FOLLOWER;            local.voteFor = remote.ip;        }        final JSONArray beatDatums = beat.getJSONArray("datums");        local.resetLeaderDue();        local.resetHeartbeatDue();        peers.makeLeader(remote);        Map receivedKeysMap = new HashMap<>(datums.size());        for (Map.Entry entry : datums.entrySet()) {            receivedKeysMap.put(entry.getKey(), 0);        }        // now check datums        List batch = new ArrayList<>();        if (!switchDomain.isSendBeatOnly()) {            int processedCount = 0;            if (Loggers.RAFT.isDebugEnabled()) {                Loggers.RAFT.debug("[RAFT] received beat with {} keys, RaftCore.datums' size is {}, remote server: {}, term: {}, local term: {}",                    beatDatums.size(), datums.size(), remote.ip, remote.term, local.term);            }            for (Object object : beatDatums) {                processedCount = processedCount + 1;                JSONObject entry = (JSONObject) object;                String key = entry.getString("key");                final String datumKey;                if (KeyBuilder.matchServiceMetaKey(key)) {                    datumKey = KeyBuilder.detailServiceMetaKey(key);                } else if (KeyBuilder.matchInstanceListKey(key)) {                    datumKey = KeyBuilder.detailInstanceListkey(key);                } else {                    // ignore corrupted key:                    continue;                }                long timestamp = entry.getLong("timestamp");                receivedKeysMap.put(datumKey, 1);                try {                    if (datums.containsKey(datumKey) && datums.get(datumKey).timestamp.get() >= timestamp && processedCount < beatDatums.size()) {                        continue;                    }                    if (!(datums.containsKey(datumKey) && datums.get(datumKey).timestamp.get() >= timestamp)) {                        batch.add(datumKey);                    }                    if (batch.size() < 50 && processedCount < beatDatums.size()) {                        continue;                    }                    String keys = StringUtils.join(batch, ",");                    if (batch.size() <= 0) {                        continue;                    }                    Loggers.RAFT.info("get datums from leader: {}, batch size is {}, processedCount is {}, datums' size is {}, RaftCore.datums' size is {}"                        , getLeader().ip, batch.size(), processedCount, beatDatums.size(), datums.size());                    // update datum entry                    String url = buildURL(remote.ip, API_GET) + "?keys=" + URLEncoder.encode(keys, "UTF-8");                    HttpClient.asyncHttpGet(url, null, null, new AsyncCompletionHandler() {                        @Override                        public Integer onCompleted(Response response) throws Exception {                            if (response.getStatusCode() != HttpURLConnection.HTTP_OK) {                                return 1;                            }                            List datumList = JSON.parseObject(response.getResponseBody(), new TypeReference>() {                            });                            for (JSONObject datumJson : datumList) {                                OPERATE_LOCK.lock();                                Datum newDatum = null;                                try {                                    Datum oldDatum = getDatum(datumJson.getString("key"));                                    if (oldDatum != null && datumJson.getLongValue("timestamp") <= oldDatum.timestamp.get()) {                                        Loggers.RAFT.info("[NACOS-RAFT] timestamp is smaller than that of mine, key: {}, remote: {}, local: {}",                                            datumJson.getString("key"), datumJson.getLongValue("timestamp"), oldDatum.timestamp);                                        continue;                                    }                                    if (KeyBuilder.matchServiceMetaKey(datumJson.getString("key"))) {                                        Datum serviceDatum = new Datum<>();                                        serviceDatum.key = datumJson.getString("key");                                        serviceDatum.timestamp.set(datumJson.getLongValue("timestamp"));                                        serviceDatum.value =                                            JSON.parseObject(JSON.toJSONString(datumJson.getJSONObject("value")), Service.class);                                        newDatum = serviceDatum;                                    }                                    if (KeyBuilder.matchInstanceListKey(datumJson.getString("key"))) {                                        Datum instancesDatum = new Datum<>();                                        instancesDatum.key = datumJson.getString("key");                                        instancesDatum.timestamp.set(datumJson.getLongValue("timestamp"));                                        instancesDatum.value =                                            JSON.parseObject(JSON.toJSONString(datumJson.getJSONObject("value")), Instances.class);                                        newDatum = instancesDatum;                                    }                                    if (newDatum == null || newDatum.value == null) {                                        Loggers.RAFT.error("receive null datum: {}", datumJson);                                        continue;                                    }                                    raftStore.write(newDatum);                                    datums.put(newDatum.key, newDatum);                                    notifier.addTask(newDatum.key, ApplyAction.CHANGE);                                    local.resetLeaderDue();                                    if (local.term.get() + 100 > remote.term.get()) {                                        getLeader().term.set(remote.term.get());                                        local.term.set(getLeader().term.get());                                    } else {                                        local.term.addAndGet(100);                                    }                                    raftStore.updateTerm(local.term.get());                                    Loggers.RAFT.info("data updated, key: {}, timestamp: {}, from {}, local term: {}",                                        newDatum.key, newDatum.timestamp, JSON.toJSONString(remote), local.term);                                } catch (Throwable e) {                                    Loggers.RAFT.error("[RAFT-BEAT] failed to sync datum from leader, datum: {}", newDatum, e);                                } finally {                                    OPERATE_LOCK.unlock();                                }                            }                            TimeUnit.MILLISECONDS.sleep(200);                            return 0;                        }                    });                    batch.clear();                } catch (Exception e) {                    Loggers.RAFT.error("[NACOS-RAFT] failed to handle beat entry, key: {}", datumKey);                }            }            List deadKeys = new ArrayList<>();            for (Map.Entry entry : receivedKeysMap.entrySet()) {                if (entry.getValue() == 0) {                    deadKeys.add(entry.getKey());                }            }            for (String deadKey : deadKeys) {                try {                    deleteDatum(deadKey);                } catch (Exception e) {                    Loggers.RAFT.error("[NACOS-RAFT] failed to remove entry, key={} {}", deadKey, e);                }            }        }        return local;    }    //......}
  • receivedBeat方法会调用peers.makeLeader(remote)来更新leader

小结

  • RaftPeerSet提供了remove、update、isLeader、allServersIncludeMyself、allServersWithoutMySelf、allPeers、decideLeader、makeLeader、majorityCount、reset等方法

  • decideLeader方法会遍历peers,然后使用TreeBag来统计peer.voteFor,当maxApproveCount大于等于majorityCount(),则将对应的peer的state标记为RaftPeer.State.LEADER,然后判断leader是否变更,变更则发布LeaderElectFinishedEvent事件

  • makeLeader方法判断candidate与当前leader是否一致,不一致则更新leader为candidate,发布MakeLeaderEvent事件,然后遍历peers给非candidate且state是LEADER状态的的节点发送API_GET_PEER请求,然后更新该peer在本地的信息,如果请求失败则更新其state为RaftPeer.State.FOLLOWER

感谢各位的阅读,以上就是"nacos中RaftPeerSet的原理和作用是什么"的内容了,经过本文的学习后,相信大家对nacos中RaftPeerSet的原理和作用是什么这一问题有了更深刻的体会,具体使用情况还需要大家实践验证。这里是,小编将为大家推送更多相关知识点的文章,欢迎关注!

0