千家信息网

zk中QuorumPeer的原理和使用

发表于:2025-01-28 作者:千家信息网编辑
千家信息网最后更新 2025年01月28日,本篇内容主要讲解"zk中QuorumPeer的原理和使用",感兴趣的朋友不妨来看看。本文介绍的方法操作简单快捷,实用性强。下面就让小编来带大家学习"zk中QuorumPeer的原理和使用"吧!内部类A
千家信息网最后更新 2025年01月28日zk中QuorumPeer的原理和使用

本篇内容主要讲解"zk中QuorumPeer的原理和使用",感兴趣的朋友不妨来看看。本文介绍的方法操作简单快捷,实用性强。下面就让小编来带大家学习"zk中QuorumPeer的原理和使用"吧!

内部类

AddressTuple 地址组

QuorumServer

ServerState状态looking f o l

ZabState ZabState当前状态

SyncMode 同步机制

LearnerType 学习类型

属性

Vote currentVote

节点认为当前服务是谁

方法

构造函数

public QuorumPeer() throws SaslException {    super("QuorumPeer");    quorumStats = new QuorumStats(this);    jmxRemotePeerBean = new HashMap();    adminServer = AdminServerFactory.createAdminServer();    x509Util = createX509Util();    initialize();}
启动方法选举状态epoch持久化QuorumServer记录服务相关的属性信息等public QuorumServer(long id, InetSocketAddress addr, InetSocketAddress electionAddr, InetSocketAddress clientAddr, LearnerType type) {    this.id = id;    this.addr = addr;    this.electionAddr = electionAddr;    this.type = type;    this.clientAddr = clientAddr;    setMyAddrs();}构造函数public QuorumPeer(Map quorumPeers, File dataDir, File dataLogDir, int electionType, long myid, int tickTime, int initLimit, int syncLimit, int connectToLearnerMasterLimit, boolean quorumListenOnAllIPs, ServerCnxnFactory cnxnFactory, QuorumVerifier quorumConfig) throws IOException {    this();    this.cnxnFactory = cnxnFactory;    this.electionType = electionType;    this.myid = myid;    this.tickTime = tickTime;    this.initLimit = initLimit;    this.syncLimit = syncLimit;    this.connectToLearnerMasterLimit = connectToLearnerMasterLimit;    this.quorumListenOnAllIPs = quorumListenOnAllIPs;    this.logFactory = new FileTxnSnapLog(dataLogDir, dataDir);    this.zkDb = new ZKDatabase(this.logFactory);    if (quorumConfig == null) {        quorumConfig = new QuorumMaj(quorumPeers);    }    setQuorumVerifier(quorumConfig, false);    adminServer = AdminServerFactory.createAdminServer();}获取最大的事务idpublic long getLastLoggedZxid() {    if (!zkDb.isInitialized()) {        loadDataBase();    }    return zkDb.getDataTreeLastProcessedZxid();}线程启动@Overridepublic synchronized void start() {    if (!getView().containsKey(myid)) {        throw new RuntimeException("My id " + myid + " not in the peer list");    }    //加载数据库    loadDataBase();    //启动服务连接工厂    startServerCnxnFactory();    try {        adminServer.start();    } catch (AdminServerException e) {        LOG.warn("Problem starting AdminServer", e);        System.out.println(e);    }    //开始选举    startLeaderElection();    startJvmPauseMonitor();    super.start();}private void loadDataBase() {    try {        zkDb.loadDataBase();        // load the epochs        long lastProcessedZxid = zkDb.getDataTree().lastProcessedZxid;        long epochOfZxid = ZxidUtils.getEpochFromZxid(lastProcessedZxid);        try {            currentEpoch = readLongFromFile(CURRENT_EPOCH_FILENAME);        } catch (FileNotFoundException e) {            // pick a reasonable epoch number            // this should only happen once when moving to a            // new code version            currentEpoch = epochOfZxid;            LOG.info(CURRENT_EPOCH_FILENAME + " not found! Creating with a reasonable default of {}. This should only happen when you are upgrading your installation", currentEpoch);            writeLongToFile(CURRENT_EPOCH_FILENAME, currentEpoch);        }        if (epochOfZxid > currentEpoch) {            throw new IOException("The current epoch, "                                  + ZxidUtils.zxidToString(currentEpoch)                                  + ", is older than the last zxid, "                                  + lastProcessedZxid);        }        try {            acceptedEpoch = readLongFromFile(ACCEPTED_EPOCH_FILENAME);        } catch (FileNotFoundException e) {            // pick a reasonable epoch number            // this should only happen once when moving to a            // new code version            acceptedEpoch = epochOfZxid;            LOG.info(ACCEPTED_EPOCH_FILENAME + " not found! Creating with a reasonable default of {}. This should only happen when you are upgrading your installation", acceptedEpoch);            writeLongToFile(ACCEPTED_EPOCH_FILENAME, acceptedEpoch);        }        if (acceptedEpoch < currentEpoch) {            throw new IOException("The accepted epoch, "                                  + ZxidUtils.zxidToString(acceptedEpoch)                                  + " is less than the current epoch, "                                  + ZxidUtils.zxidToString(currentEpoch));        }    } catch (IOException ie) {        LOG.error("Unable to load database on disk", ie);        throw new RuntimeException("Unable to run quorum server ", ie);    }}public synchronized void startLeaderElection() {    try {        if (getPeerState() == ServerState.LOOKING) {            //正在寻找leader,创建选票            currentVote = new Vote(myid, getLastLoggedZxid(), getCurrentEpoch());        }    } catch (IOException e) {        RuntimeException re = new RuntimeException(e.getMessage());        re.setStackTrace(e.getStackTrace());        throw re;    }    //创建选举算法    this.electionAlg = createElectionAlgorithm(electionType);}protected Follower makeFollower(FileTxnSnapLog logFactory) throws IOException {    return new Follower(this, new FollowerZooKeeperServer(logFactory, this, this.zkDb));}protected Leader makeLeader(FileTxnSnapLog logFactory) throws IOException, X509Exception {    return new Leader(this, new LeaderZooKeeperServer(logFactory, this, this.zkDb));}protected Observer makeObserver(FileTxnSnapLog logFactory) throws IOException {    return new Observer(this, new ObserverZooKeeperServer(logFactory, this, this.zkDb));}从文件读取long值private long readLongFromFile(String name) throws IOException {    File file = new File(logFactory.getSnapDir(), name);    BufferedReader br = new BufferedReader(new FileReader(file));    String line = "";    try {        line = br.readLine();        return Long.parseLong(line);    } catch (NumberFormatException e) {        throw new IOException("Found " + line + " in " + file);    } finally {        br.close();    }}写入文件long值private void writeLongToFile(String name, final long value) throws IOException {    File file = new File(logFactory.getSnapDir(), name);    new AtomicFileWritingIdiom(file, new WriterStatement() {        @Override        public void write(Writer bw) throws IOException {            bw.write(Long.toString(value));        }    });}


到此,相信大家对"zk中QuorumPeer的原理和使用"有了更深的了解,不妨来实际操作一番吧!这里是网站,更多相关内容可以进入相关频道进行查询,关注我们,继续学习!

0