nacos中RaftPeerSet的原理和作用是什么
这篇文章主要讲解了"nacos中RaftPeerSet的原理和作用是什么",文中的讲解内容简单清晰,易于学习与理解,下面请大家跟着小编的思路慢慢深入,一起来研究和学习"nacos中RaftPeerSet的原理和作用是什么"吧!
序
本文主要研究一下nacos的RaftPeerSet
RaftPeerSet
nacos-1.1.3/naming/src/main/java/com/alibaba/nacos/naming/consistency/persistent/raft/RaftPeerSet.java
@Component@DependsOn("serverListManager")public class RaftPeerSet implements ServerChangeListener, ApplicationContextAware { @Autowired private ServerListManager serverListManager; private ApplicationContext applicationContext; private AtomicLong localTerm = new AtomicLong(0L); private RaftPeer leader = null; private Mappeers = new HashMap<>(); private Set sites = new HashSet<>(); private boolean ready = false; public RaftPeerSet() { } @Override public void setApplicationContext(ApplicationContext applicationContext) throws BeansException { this.applicationContext = applicationContext; } @PostConstruct public void init() { serverListManager.listen(this); } public RaftPeer getLeader() { if (STANDALONE_MODE) { return local(); } return leader; } public Set allSites() { return sites; } public boolean isReady() { return ready; } public void remove(List servers) { for (String server : servers) { peers.remove(server); } } public RaftPeer update(RaftPeer peer) { peers.put(peer.ip, peer); return peer; } public boolean isLeader(String ip) { if (STANDALONE_MODE) { return true; } if (leader == null) { Loggers.RAFT.warn("[IS LEADER] no leader is available now!"); return false; } return StringUtils.equals(leader.ip, ip); } public Set allServersIncludeMyself() { return peers.keySet(); } public Set allServersWithoutMySelf() { Set servers = new HashSet (peers.keySet()); // exclude myself servers.remove(local().ip); return servers; } public Collection allPeers() { return peers.values(); } public int size() { return peers.size(); } public RaftPeer decideLeader(RaftPeer candidate) { peers.put(candidate.ip, candidate); SortedBag ips = new TreeBag(); int maxApproveCount = 0; String maxApprovePeer = null; for (RaftPeer peer : peers.values()) { if (StringUtils.isEmpty(peer.voteFor)) { continue; } ips.add(peer.voteFor); if (ips.getCount(peer.voteFor) > maxApproveCount) { maxApproveCount = ips.getCount(peer.voteFor); maxApprovePeer = peer.voteFor; } } if (maxApproveCount >= majorityCount()) { RaftPeer peer = peers.get(maxApprovePeer); peer.state = RaftPeer.State.LEADER; if (!Objects.equals(leader, peer)) { leader = peer; applicationContext.publishEvent(new LeaderElectFinishedEvent(this, leader)); Loggers.RAFT.info("{} has become the LEADER", leader.ip); } } return leader; } public RaftPeer makeLeader(RaftPeer candidate) { if (!Objects.equals(leader, candidate)) { leader = candidate; applicationContext.publishEvent(new MakeLeaderEvent(this, leader)); Loggers.RAFT.info("{} has become the LEADER, local: {}, leader: {}", leader.ip, JSON.toJSONString(local()), JSON.toJSONString(leader)); } for (final RaftPeer peer : peers.values()) { Map params = new HashMap<>(1); if (!Objects.equals(peer, candidate) && peer.state == RaftPeer.State.LEADER) { try { String url = RaftCore.buildURL(peer.ip, RaftCore.API_GET_PEER); HttpClient.asyncHttpGet(url, null, params, new AsyncCompletionHandler () { @Override public Integer onCompleted(Response response) throws Exception { if (response.getStatusCode() != HttpURLConnection.HTTP_OK) { Loggers.RAFT.error("[NACOS-RAFT] get peer failed: {}, peer: {}", response.getResponseBody(), peer.ip); peer.state = RaftPeer.State.FOLLOWER; return 1; } update(JSON.parseObject(response.getResponseBody(), RaftPeer.class)); return 0; } }); } catch (Exception e) { peer.state = RaftPeer.State.FOLLOWER; Loggers.RAFT.error("[NACOS-RAFT] error while getting peer from peer: {}", peer.ip); } } } return update(candidate); } public RaftPeer local() { RaftPeer peer = peers.get(NetUtils.localServer()); if (peer == null && SystemUtils.STANDALONE_MODE) { RaftPeer localPeer = new RaftPeer(); localPeer.ip = NetUtils.localServer(); localPeer.term.set(localTerm.get()); peers.put(localPeer.ip, localPeer); return localPeer; } if (peer == null) { throw new IllegalStateException("unable to find local peer: " + NetUtils.localServer() + ", all peers: " + Arrays.toString(peers.keySet().toArray())); } return peer; } public RaftPeer get(String server) { return peers.get(server); } public int majorityCount() { return peers.size() / 2 + 1; } public void reset() { leader = null; for (RaftPeer peer : peers.values()) { peer.voteFor = null; } } public void setTerm(long term) { localTerm.set(term); } public long getTerm() { return localTerm.get(); } public boolean contains(RaftPeer remote) { return peers.containsKey(remote.ip); } //......}
RaftPeerSet提供了remove、update、isLeader、allServersIncludeMyself、allServersWithoutMySelf、allPeers、decideLeader、makeLeader、majorityCount、reset等方法
decideLeader方法会遍历peers,然后使用TreeBag来统计peer.voteFor,当maxApproveCount大于等于majorityCount(),则将对应的peer的state标记为RaftPeer.State.LEADER,然后判断leader是否变更,变更则发布LeaderElectFinishedEvent事件
makeLeader方法判断candidate与当前leader是否一致,不一致则更新leader为candidate,发布MakeLeaderEvent事件,然后遍历peers给非candidate且state是LEADER状态的的节点发送API_GET_PEER请求,然后更新该peer在本地的信息,如果请求失败则更新其state为RaftPeer.State.FOLLOWER
RaftCore.MasterElection
nacos-1.1.3/naming/src/main/java/com/alibaba/nacos/naming/consistency/persistent/raft/RaftCore.java
public class MasterElection implements Runnable { @Override public void run() { try { if (!peers.isReady()) { return; } RaftPeer local = peers.local(); local.leaderDueMs -= GlobalExecutor.TICK_PERIOD_MS; if (local.leaderDueMs > 0) { return; } // reset timeout local.resetLeaderDue(); local.resetHeartbeatDue(); sendVote(); } catch (Exception e) { Loggers.RAFT.warn("[RAFT] error while master election {}", e); } } public void sendVote() { RaftPeer local = peers.get(NetUtils.localServer()); Loggers.RAFT.info("leader timeout, start voting,leader: {}, term: {}", JSON.toJSONString(getLeader()), local.term); peers.reset(); local.term.incrementAndGet(); local.voteFor = local.ip; local.state = RaftPeer.State.CANDIDATE; Mapparams = new HashMap<>(1); params.put("vote", JSON.toJSONString(local)); for (final String server : peers.allServersWithoutMySelf()) { final String url = buildURL(server, API_VOTE); try { HttpClient.asyncHttpPost(url, null, params, new AsyncCompletionHandler () { @Override public Integer onCompleted(Response response) throws Exception { if (response.getStatusCode() != HttpURLConnection.HTTP_OK) { Loggers.RAFT.error("NACOS-RAFT vote failed: {}, url: {}", response.getResponseBody(), url); return 1; } RaftPeer peer = JSON.parseObject(response.getResponseBody(), RaftPeer.class); Loggers.RAFT.info("received approve from peer: {}", JSON.toJSONString(peer)); peers.decideLeader(peer); return 0; } }); } catch (Exception e) { Loggers.RAFT.warn("error while sending vote to server: {}", server); } } } }
RaftCore.MasterElection的sendVote方法在请求成功时会执行peers.decideLeader(peer)方法来选举leader
RaftCore.receivedBeat
nacos-1.1.3/naming/src/main/java/com/alibaba/nacos/naming/consistency/persistent/raft/RaftCore.java
@Componentpublic class RaftCore { //...... public RaftPeer receivedBeat(JSONObject beat) throws Exception { final RaftPeer local = peers.local(); final RaftPeer remote = new RaftPeer(); remote.ip = beat.getJSONObject("peer").getString("ip"); remote.state = RaftPeer.State.valueOf(beat.getJSONObject("peer").getString("state")); remote.term.set(beat.getJSONObject("peer").getLongValue("term")); remote.heartbeatDueMs = beat.getJSONObject("peer").getLongValue("heartbeatDueMs"); remote.leaderDueMs = beat.getJSONObject("peer").getLongValue("leaderDueMs"); remote.voteFor = beat.getJSONObject("peer").getString("voteFor"); if (remote.state != RaftPeer.State.LEADER) { Loggers.RAFT.info("[RAFT] invalid state from master, state: {}, remote peer: {}", remote.state, JSON.toJSONString(remote)); throw new IllegalArgumentException("invalid state from master, state: " + remote.state); } if (local.term.get() > remote.term.get()) { Loggers.RAFT.info("[RAFT] out of date beat, beat-from-term: {}, beat-to-term: {}, remote peer: {}, and leaderDueMs: {}" , remote.term.get(), local.term.get(), JSON.toJSONString(remote), local.leaderDueMs); throw new IllegalArgumentException("out of date beat, beat-from-term: " + remote.term.get() + ", beat-to-term: " + local.term.get()); } if (local.state != RaftPeer.State.FOLLOWER) { Loggers.RAFT.info("[RAFT] make remote as leader, remote peer: {}", JSON.toJSONString(remote)); // mk follower local.state = RaftPeer.State.FOLLOWER; local.voteFor = remote.ip; } final JSONArray beatDatums = beat.getJSONArray("datums"); local.resetLeaderDue(); local.resetHeartbeatDue(); peers.makeLeader(remote); MapreceivedKeysMap = new HashMap<>(datums.size()); for (Map.Entry entry : datums.entrySet()) { receivedKeysMap.put(entry.getKey(), 0); } // now check datums List batch = new ArrayList<>(); if (!switchDomain.isSendBeatOnly()) { int processedCount = 0; if (Loggers.RAFT.isDebugEnabled()) { Loggers.RAFT.debug("[RAFT] received beat with {} keys, RaftCore.datums' size is {}, remote server: {}, term: {}, local term: {}", beatDatums.size(), datums.size(), remote.ip, remote.term, local.term); } for (Object object : beatDatums) { processedCount = processedCount + 1; JSONObject entry = (JSONObject) object; String key = entry.getString("key"); final String datumKey; if (KeyBuilder.matchServiceMetaKey(key)) { datumKey = KeyBuilder.detailServiceMetaKey(key); } else if (KeyBuilder.matchInstanceListKey(key)) { datumKey = KeyBuilder.detailInstanceListkey(key); } else { // ignore corrupted key: continue; } long timestamp = entry.getLong("timestamp"); receivedKeysMap.put(datumKey, 1); try { if (datums.containsKey(datumKey) && datums.get(datumKey).timestamp.get() >= timestamp && processedCount < beatDatums.size()) { continue; } if (!(datums.containsKey(datumKey) && datums.get(datumKey).timestamp.get() >= timestamp)) { batch.add(datumKey); } if (batch.size() < 50 && processedCount < beatDatums.size()) { continue; } String keys = StringUtils.join(batch, ","); if (batch.size() <= 0) { continue; } Loggers.RAFT.info("get datums from leader: {}, batch size is {}, processedCount is {}, datums' size is {}, RaftCore.datums' size is {}" , getLeader().ip, batch.size(), processedCount, beatDatums.size(), datums.size()); // update datum entry String url = buildURL(remote.ip, API_GET) + "?keys=" + URLEncoder.encode(keys, "UTF-8"); HttpClient.asyncHttpGet(url, null, null, new AsyncCompletionHandler () { @Override public Integer onCompleted(Response response) throws Exception { if (response.getStatusCode() != HttpURLConnection.HTTP_OK) { return 1; } List datumList = JSON.parseObject(response.getResponseBody(), new TypeReference >() { }); for (JSONObject datumJson : datumList) { OPERATE_LOCK.lock(); Datum newDatum = null; try { Datum oldDatum = getDatum(datumJson.getString("key")); if (oldDatum != null && datumJson.getLongValue("timestamp") <= oldDatum.timestamp.get()) { Loggers.RAFT.info("[NACOS-RAFT] timestamp is smaller than that of mine, key: {}, remote: {}, local: {}", datumJson.getString("key"), datumJson.getLongValue("timestamp"), oldDatum.timestamp); continue; } if (KeyBuilder.matchServiceMetaKey(datumJson.getString("key"))) { Datum
serviceDatum = new Datum<>(); serviceDatum.key = datumJson.getString("key"); serviceDatum.timestamp.set(datumJson.getLongValue("timestamp")); serviceDatum.value = JSON.parseObject(JSON.toJSONString(datumJson.getJSONObject("value")), Service.class); newDatum = serviceDatum; } if (KeyBuilder.matchInstanceListKey(datumJson.getString("key"))) { Datum instancesDatum = new Datum<>(); instancesDatum.key = datumJson.getString("key"); instancesDatum.timestamp.set(datumJson.getLongValue("timestamp")); instancesDatum.value = JSON.parseObject(JSON.toJSONString(datumJson.getJSONObject("value")), Instances.class); newDatum = instancesDatum; } if (newDatum == null || newDatum.value == null) { Loggers.RAFT.error("receive null datum: {}", datumJson); continue; } raftStore.write(newDatum); datums.put(newDatum.key, newDatum); notifier.addTask(newDatum.key, ApplyAction.CHANGE); local.resetLeaderDue(); if (local.term.get() + 100 > remote.term.get()) { getLeader().term.set(remote.term.get()); local.term.set(getLeader().term.get()); } else { local.term.addAndGet(100); } raftStore.updateTerm(local.term.get()); Loggers.RAFT.info("data updated, key: {}, timestamp: {}, from {}, local term: {}", newDatum.key, newDatum.timestamp, JSON.toJSONString(remote), local.term); } catch (Throwable e) { Loggers.RAFT.error("[RAFT-BEAT] failed to sync datum from leader, datum: {}", newDatum, e); } finally { OPERATE_LOCK.unlock(); } } TimeUnit.MILLISECONDS.sleep(200); return 0; } }); batch.clear(); } catch (Exception e) { Loggers.RAFT.error("[NACOS-RAFT] failed to handle beat entry, key: {}", datumKey); } } List deadKeys = new ArrayList<>(); for (Map.Entry entry : receivedKeysMap.entrySet()) { if (entry.getValue() == 0) { deadKeys.add(entry.getKey()); } } for (String deadKey : deadKeys) { try { deleteDatum(deadKey); } catch (Exception e) { Loggers.RAFT.error("[NACOS-RAFT] failed to remove entry, key={} {}", deadKey, e); } } } return local; } //......}
receivedBeat方法会调用peers.makeLeader(remote)来更新leader
小结
RaftPeerSet提供了remove、update、isLeader、allServersIncludeMyself、allServersWithoutMySelf、allPeers、decideLeader、makeLeader、majorityCount、reset等方法
decideLeader方法会遍历peers,然后使用TreeBag来统计peer.voteFor,当maxApproveCount大于等于majorityCount(),则将对应的peer的state标记为RaftPeer.State.LEADER,然后判断leader是否变更,变更则发布LeaderElectFinishedEvent事件
makeLeader方法判断candidate与当前leader是否一致,不一致则更新leader为candidate,发布MakeLeaderEvent事件,然后遍历peers给非candidate且state是LEADER状态的的节点发送API_GET_PEER请求,然后更新该peer在本地的信息,如果请求失败则更新其state为RaftPeer.State.FOLLOWER
感谢各位的阅读,以上就是"nacos中RaftPeerSet的原理和作用是什么"的内容了,经过本文的学习后,相信大家对nacos中RaftPeerSet的原理和作用是什么这一问题有了更深刻的体会,具体使用情况还需要大家实践验证。这里是,小编将为大家推送更多相关知识点的文章,欢迎关注!