  • CAP定理

  1. Consistency:一致性

  2. Availability:可用性

  3. Partition-tolerance:分区容错性



  • Raft核心算法



  1. 使用日志写入,而不是直接修改,保证到Follower服务器的同步请求有序而且能够重新计算当前状态,也就是日志状态机模型。

  2. 写入时,过半服务器写入成功才算整体成功,也就是Quorum机制。

  • 日志状态机模型

1X = 1{X:1}
2Y = 2{X:1,Y:2}
3X = 3{X:3,Y:2}
4Z = 4{X:3,Y:2,Z:4}


  • 基于Quorum机制的写入



  1. 节点A和B:2与1

  2. 节点A和C:2与1

  3. 节点B和C:1与1




  1. 节点A和B:2与2

  2. 节点A和C:2与1

  3. 节点B和C:2与1


  • 基于日志比较的选举








M * N过半

当节点数为奇数时,N过半为(N + 1) / 2

当节点数为偶数时,N过半为N / 2 + 1

而 M * N过半 <= N

要满足该式成立,M(Leader节点数)为1,N过半<= N成立,而M为2的时候

当节点数为奇数时,2 * (N + 1) / 2 = N + 1,而N + 1 <= N是不满足的

当节点数为偶数时,2 * (N / 2 + 1) = N + 2,而N + 2 <= N也是不满足的

以此类推,M >= 2的时候,M * N过半 <= N都是不满足的。


  • Raft算法中的选举


  1. Leader

  2. Candidate(Leader候选人)

  3. Follower



  1. RequestVote,即请求其他节点给自己投票,一般由Candidate节点发出。

  2. AppendEntries,用于日志复制,增加条目,在增加日志条目数量为0时作为心跳信息,一般只由Leader节点发出。

  • 逻辑时钟term

为了避免服务器时间不一致,系统也可以安全地推进逻辑时间,Raft算法中的选举有一个整形的term参数。这是一个逻辑时钟值,全局递增。它是Lamport Timestamp算法的一个变体。

当多个进程要维护一个全局时间,首先要让每个进程本地有一个全局时间的副本。Lamport Timestamp算法的流程如下

  1. 每个进程在事件发生时递增自己本地的时间副本(加1)。

  2. 当进程发送消息时,带上自己本地的时间副本。

  3. 当进程收到消息时,比较消息中的时间值和自己本地的时间副本,选择比较大的时间值加1,并更新自己的时间副本。

  • 选举中的term和角色迁移



  1. 系统启动时,所有节点都是Follower节点。

  2. 当没有收到来自Leader节点心跳消息时,即心跳超时,Follower节点变成Candidate节点,即自荐成为选举的候选人。

  3. Candidate节点收到过半的支持后,变成Leader节点。



票数对半的现象在Raft算法中被称为split vote(分割选举),在偶数个节点的集群中有可能发生。Raft算法使用随机选举超时来降低split vote出现的概率。

  • 选举超时





  1. 节点以Follower角色启动,随机选择选举超时时间为3.3秒,即3.3秒后系统会发起选举。

  2. 节点启动1秒后,收到来自Leader节点的心跳消息,节点重新随机选择一个选举超时时间(假设是3.4秒),并修改下一次选举时间为现在时间的3.4秒后。

  3. 节点启动2秒后,再次收到来自Leader节点的心跳消息,节点再次随机选择一个选举超时时间(假设是4秒),并修改下一次选举时间为现在时间的4秒后。


  • Raft算法中的日志复制


  1. 已追加但是尚未持久化

  2. 已持久化


  • 复制进度






  1. 一开始Leader节点为A,其他节点都是Follower.

  2. 在某个时间点,A、B两个节点与C、D、E 3个节点产生网络分区。网络分区时,节点A无法与节点B以外的节点通信。

  3. 节点B依旧接收得到A的心跳消息,所以不会变成Candidate。

  4. 节点C、D、E 收不到来自节点A的心跳消息,进行了选举,假设C节点成为了新的Leader。

  5. 客户端连接节点A和C分别写入,因为Leader节点并不确认过半写入,所以会导致节点A和C各自增加不同的日志。

  6. 当网络分区恢复时,由于分区内节点A、B和分区内节点C、D、E 各自的日志冲突,因此无法合并。

但如果上述过程中,Leader节点确认过半追加后再推进commitIndex,节点A不会持久化日志,并且在网络分区恢复后,分区内节点C、D、E 的日志可以正确复制到分区节点A、B 上,保证数据一致性。



/** * 节点ID */@AllArgsConstructor@Getterpublic class NodeId implements Serializable {//节点的ID值,一经确定不可改变    //可以简单为A、B、C....    @NonNull    private final String value;    public static NodeId of(String value) {return new NodeId(value);    }@Override    public boolean equals(Object o) {if (this == o) return true;        if (o == null || !(o instanceof NodeId)) return false;        NodeId nodeId = (NodeId) o;        return value.equals(nodeId.value);    }@Override    public int hashCode() {return value.hashCode();    }@Override    public String toString() {return value;    }}


/** * 集群成员表 */public class NodeGroup {//当前节点ID    private NodeId selfId;    //成员映射表    private Map memberMap;    /**     * 单节点构造函数     * @param endpoint     */    public NodeGroup(NodeEndpoint endpoint) {this(Collections.singleton(endpoint),endpoint.getId());    }/**     * 多节点构造函数     * @param endpoints     * @param selfId     */    public NodeGroup(Collection endpoints,NodeId selfId) {this.memberMap = buildMemberMap(endpoints);        this.selfId = selfId;    }/**     * 从节点列表中构造成员映射表     * @param endpoints     * @return     */    private Map buildMemberMap(Collection endpoints) {        Map map = new HashMap<>();        endpoints.stream().forEach(endpoint -> map.put(endpoint.getId(),new GroupMember(endpoint)));        if (map.isEmpty()) {throw new IllegalArgumentException("endpoints is empty");        }return map;    }/**     * 重置其他节点的复制进度     * @param nextLogIndex     */    public void resetReplicatingStates(int nextLogIndex) {memberMap.values().stream()                .filter(member -> !member.idEquals(selfId))                .forEach(member -> member.setReplicatingState(new ReplicatingState(nextLogIndex)));    }/**     * 按照节点ID查找成员,找不到时返回空     * @param id     * @return     */    public GroupMember getMember(NodeId id) {return memberMap.get(id);    }/**     * 按照节点呢ID查找成员,找不到时抛出异常     * @param id     * @return     */    public GroupMember findMember(NodeId id) {        GroupMember member = getMember(id);        if (member == null) {throw new IllegalArgumentException("no such node " + id);        }return member;    }/**     * 获取主要节点的数量     * @return     */    public int getCountOfMajor() {return (int) memberMap.values().stream().filter(GroupMember::isMajor).count();    }    /**     * 列出日志复制的对象节点,获取除自己以外的所有节点     * @return     */    public Collection listReplicationTarget() {return memberMap.values().stream()                .filter(m -> !m.idEquals(selfId))                .collect(Collectors.toList());    }/**     * 获取当前节点之外的其他节点     * @return     */    public Set listEndpointExceptSelf() {        Set endpoints = new HashSet<>();        memberMap.values().stream()                .filter(member -> !member.idEquals(selfId))                .forEach(member -> endpoints.add(member.getEndpoint()));        return endpoints;    }}



/** * 服务器地址 */@AllArgsConstructor@Getter@ToStringpublic class Address {@NonNull    private final String host; //IP    private final int port; //端口}
/** * 连接节点 */@AllArgsConstructor@Getterpublic class NodeEndpoint {@NonNull    private final NodeId id;    @NonNull    private final Address address;    public NodeEndpoint(NodeId id,String host,int port) {this(id,new Address(host,port));    }}
/** * 集群成员 */@AllArgsConstructorpublic class GroupMember {//连接节点    @Getter    private final NodeEndpoint endpoint;    //复制进度    @Setter    private ReplicatingState replicatingState;    //是否主要成员    @Getter    @Setter    private boolean major;    public GroupMember(NodeEndpoint endpoint) {this(endpoint,null,true);    }//获取下一条日志索引    public int getNextIndex() {return ensureReplicatingState().getNextIndex();    }//获取匹配日志索引    public int getMatchIndex() {return ensureReplicatingState().getMatchIndex();    }/**     * 获取复制进度     * @return     */    private ReplicatingState ensureReplicatingState() {if (replicatingState == null) {throw new IllegalArgumentException("replicating state not set");        }return replicatingState;    }/**     * 判断是否同一个连接节点     * @param id     * @return     */    public boolean idEquals(NodeId id) {return endpoint.getId().equals(id);    }}
/** * 日志复制进度 */@ToStringpublic class ReplicatingState {//下一个需要复制日志条目的索引    @Getter    private int nextIndex;    //匹配的日志条目索引    @Getter    private int matchIndex;    //是否开始复制    @Getter    @Setter    private boolean replicating = false;    //最后复制的位置    @Getter    @Setter    private long lastReplicatedAt = 0;    public ReplicatingState(int nextIndex,int matchIndex) {this.nextIndex = nextIndex;        this.matchIndex = matchIndex;    }public ReplicatingState(int nextIndex) {this(nextIndex,0);    }/**     * 回退     * @return     */    public boolean backOffNextIndex() {if (nextIndex > 1) {nextIndex--;            return true;        }return false;    }/**     * 建议是否推进索引     * @param lastEntryIndex     * @return     */    public boolean advice(int lastEntryIndex) {boolean result = matchIndex != lastEntryIndex || nextIndex != lastEntryIndex + 1;        matchIndex = lastEntryIndex;        nextIndex = lastEntryIndex + 1;        return result;    }}
  • 选举实现


public enum RoleName {FOLLOWER,   //从节点    CANDIDATE,  //选举节点    LEADER      //主节点}


@AllArgsConstructor@Getterpublic abstract class AbstractNodeRole {//节点角色    private final RoleName name;    //选举周期    protected final int term;    /**     * 取消每个角色对应的选举超时或者日志复制定时任务     */    public abstract void cancelTimeoutOrTask();    public abstract NodeId getLeaderId(NodeId selfId);}


/** * 选举超时 */@RequiredArgsConstructorpublic class ElectionTimeout {//定时任务结果    private final ScheduledFuture scheduledFuture;    public static final ElectionTimeout NONE = new ElectionTimeout(new NullScheduledFuture());    /**     * 取消选举     */    public void cancel() {scheduledFuture.cancel(false);    }@Override    public String toString() {if (scheduledFuture.isCancelled()) {return "ElectionTimeout(state=cancelled)";        }if (scheduledFuture.isDone()) {return "ElectionTimeout(state=done)";        }return "ElectionTimeout(delay=" + scheduledFuture.getDelay(TimeUnit.MILLISECONDS) + "ms)";    }}


/** * 从节点角色 */@ToStringpublic class FollowerNodeRole extends AbstractNodeRole {//投过票的节点    @Getter    private final NodeId votedFor;    //当前主节点    @Getter    private final NodeId leaderId;    //选举超时    private final ElectionTimeout electionTimeout;    public FollowerNodeRole(int term,NodeId votedFor,NodeId leaderId,ElectionTimeout electionTimeout) {super(RoleName.FOLLOWER, term);        this.votedFor = votedFor;        this.leaderId = leaderId;        this.electionTimeout = electionTimeout;    }@Override    public void cancelTimeoutOrTask() {electionTimeout.cancel();    }@Override    public NodeId getLeaderId(NodeId selfId) {return leaderId;    }}


/** * 选举节点角色 */@ToStringpublic class CandidateNodeRole extends AbstractNodeRole{//票数    @Getter    private final int votesCount;    //选举超时    private final ElectionTimeout electionTimeout;    /**     * 增加任意票构造     * @param term     * @param votesCount     * @param electionTimeout     */    public CandidateNodeRole(int term,int votesCount,ElectionTimeout electionTimeout) {super(RoleName.CANDIDATE, term);        this.votesCount = votesCount;        this.electionTimeout = electionTimeout;    }/**     * 增加1票构造     * @param term     * @param electionTimeout     */    public CandidateNodeRole(int term,ElectionTimeout electionTimeout) {this(term,1,electionTimeout);    }/**     * 票数+1     * @param electionTimeout     * @return     */    public CandidateNodeRole increaseVotesCount(ElectionTimeout electionTimeout) {this.electionTimeout.cancel();        return new CandidateNodeRole(term,votesCount + 1,electionTimeout);    }@Override    public void cancelTimeoutOrTask() {electionTimeout.cancel();    }@Override    public NodeId getLeaderId(NodeId selfId) {return null;    }}


/** * 日志复制任务 */@Slf4j@RequiredArgsConstructorpublic class LogReplicationTask {private final ScheduledFuture scheduledFuture;    public static final LogReplicationTask NONE = new LogReplicationTask(new NullScheduledFuture());    public void cancel() {log.debug("cancel log replication task");        scheduledFuture.cancel(false);    }@Override    public String toString() {return "LogReplicationTask{delay=" + scheduledFuture.getDelay(TimeUnit.MILLISECONDS) + "}";    }}
/** * 主节点角色 */@ToStringpublic class LeaderNodeRole extends AbstractNodeRole {//日志复制任务    private final LogReplicationTask logReplicationTask;    public LeaderNodeRole(int term,LogReplicationTask logReplicationTask) {super(RoleName.LEADER, term);        this.logReplicationTask = logReplicationTask;    }@Override    public void cancelTimeoutOrTask() {logReplicationTask.cancel();    }@Override    public NodeId getLeaderId(NodeId selfId) {return selfId;    }}


/** * 定时器 */public interface Scheduler {/**     * 创建日志复制定时任务     * @param task     * @return     */    LogReplicationTask scheduleLogReplicationTask(Runnable task);    /**     * 创建选举超时器     * @param task     * @return     */    ElectionTimeout scheduleElectionTimeout(Runnable task);    /**     * 关闭定时器     * @throws InterruptedException     */    void stop() throws InterruptedException;}


/** * 节点配置类 */@Datapublic class NodeConfig {//最小选举超时时间    private int minElectionTimeout = 3000;    //最大选举超时时间内    private int maxElectionTimeout = 4000;    //初次日志复制延迟时间    private int logReplicationDelay = 0;    //日志复制间隔    private int logReplicationInterval = 1000;}
@Slf4jpublic class DefaultScheduler implements Scheduler {//最小选举超时时间    private final int minElectionTimeout;    //最大选举超时时间内    private final int maxElectionTimeout;    //初次日志复制延迟时间    private final int logReplicationDelay;    //日志复制间隔    private final int logReplicationInterval;    //随机数生成器    private final Random electionTimeoutRandom;    //定时任务线程池    private final ScheduledExecutorService scheduledExecutorService;    public DefaultScheduler(int minElectionTimeout,int maxElectionTimeout,int logReplicationDelay,                            int logReplicationInterval) {if (minElectionTimeout <= 0 || maxElectionTimeout <= 0                || minElectionTimeout > maxElectionTimeout) {throw new IllegalArgumentException("election timeout should not be 0 or min > max");        }if (logReplicationDelay < 0 || logReplicationInterval <= 0) {throw new IllegalArgumentException("log replication delay < 0 or log replication interval <= 0");        }this.minElectionTimeout = minElectionTimeout;        this.maxElectionTimeout = maxElectionTimeout;        this.logReplicationDelay = logReplicationDelay;        this.logReplicationInterval = logReplicationInterval;        electionTimeoutRandom = new Random();        scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(r -> new Thread(r,"scheduler"));    }public DefaultScheduler(NodeConfig config) {this(config.getMinElectionTimeout(), config.getMaxElectionTimeout(), config.getLogReplicationDelay(),                config.getLogReplicationInterval());    }@Override    public LogReplicationTask scheduleLogReplicationTask(Runnable task) {        ScheduledFuture scheduledFuture = scheduledExecutorService.scheduleWithFixedDelay(task,                logReplicationDelay,logReplicationInterval,TimeUnit.MILLISECONDS);        return new LogReplicationTask(scheduledFuture);    }@Override    public ElectionTimeout scheduleElectionTimeout(Runnable task) {int timeout = electionTimeoutRandom.nextInt(maxElectionTimeout - minElectionTimeout)                + minElectionTimeout;        ScheduledFuture scheduledFuture = scheduledExecutorService.schedule(task,timeout, TimeUnit.MILLISECONDS);        return new ElectionTimeout(scheduledFuture);    }@Override    public void stop() throws InterruptedException {log.debug("stop scheduler");        scheduledExecutorService.shutdown();        scheduledExecutorService.awaitTermination(1,TimeUnit.SECONDS);    }}
/** * 空调度 */public class NullScheduledFuture implements ScheduledFuture {@Override    public long getDelay(TimeUnit unit) {return 0;    }@Override    public int compareTo(Delayed o) {return 0;    }@Override    public boolean cancel(boolean mayInterruptIfRunning) {return false;    }@Override    public boolean isCancelled() {return false;    }@Override    public boolean isDone() {return false;    }@Override    public Object get() throws InterruptedException, ExecutionException {return null;    }@Override    public Object get(long timeout,TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {return null;    }}
/** * 测试用定时器组件 */@Slf4jpublic class NullScheduler implements Scheduler {@Override    public LogReplicationTask scheduleLogReplicationTask(Runnable task) {log.debug("schedule log replication task");        return LogReplicationTask.NONE;    }@Override    public ElectionTimeout scheduleElectionTimeout(Runnable task) {log.debug("schedule election timeout");        return ElectionTimeout.NONE;    }@Override    public void stop() throws InterruptedException {    }}


  • 节点之间发送的消息


/** * 选举请求消息 */@Data@ToStringpublic class RequestVoteRpc {//选举term    private int term;    //候选者节点Id,一般都是发送者自己    private NodeId candidateId;    //候选者最后一条日志的索引    private int lastLogIndex = 0;    //候选者最后一条日志的term    private int lastLogTerm = 0;}
/** * 选举响应消息 */@AllArgsConstructor@Getter@ToStringpublic class RequestVoteResult {//选举term    private final int term;    //是否投票    private final boolean voteGranted;}


/** * 复制日志请求消息 */@Datapublic class AppendEntriesRpc {//消息Id    private String messageId;    //选举term    private int term;    //leader节点Id    private NodeId leaderId;    //前一条日志的索引    private int prevLogIndex = 0;    //前一条日志的term    private int prevLogTerm;    //复制的日志条目    private List entries = Collections.emptyList();    //leader节点的commitIndex    private int leaderCommit;    @Override    public String toString() {return "AppendEntriesRpc{" +"term=" + term +", leaderId=" + leaderId +", prevLogIndex=" + prevLogIndex +", prevLogTerm=" + prevLogTerm +", entries.size=" + entries.size() +", leaderCommit=" + leaderCommit +'}';    }}
/** * 复制日志响应消息 */@AllArgsConstructor@Getter@ToStringpublic class AppendEntriesResult {//消息Id    private final String rpcMessageId;    //选举term    private final int term;    //是否追加成功    private final boolean success;}
/** * 抽象rpc消息 * @param  */@AllArgsConstructor@Getterpublic abstract class AbstractRpcMessage {//消息体    private final T rpc;    //发送节点Id    private final NodeId sourceNodeId;    //Netty通道    private final Channel channel;}
/** * 投票选举消息 */public class RequestVoteRpcMessage extends AbstractRpcMessage {public RequestVoteRpcMessage(RequestVoteRpc rpc, NodeId sourceNodeId, Channel channel) {super(rpc, sourceNodeId, channel);    }}
/** * 日志复制消息 */public class AppendEntriesRpcMessage extends AbstractRpcMessage {public AppendEntriesRpcMessage(AppendEntriesRpc rpc, NodeId sourceNodeId, Channel channel) {super(rpc, sourceNodeId, channel);    }}
/** * 日志复制响应消息 */@AllArgsConstructor@Getterpublic class AppendEntriesResultMessage {private final AppendEntriesResult result;    private final NodeId sourceNodeId;    @NonNull    private final AppendEntriesRpc rpc;}


/** * 消息连接处理器 */public interface Connector {/**     * 初始化     */    void initialize();    /**     * 发送选举请求消息     * @param rpc 选举请求消息     * @param destinationEndpoints 目标连接节点集群     */    void sendRequestVote(RequestVoteRpc rpc, Collection destinationEndpoints);    /**     * 回复选举请求消息     * @param result 选举回复消息     * @param destinationEndpoint 目标节点     */    void replyRequestVote(RequestVoteResult result,NodeEndpoint destinationEndpoint);    /**     * 发送复制日志请求消息     * @param rpc 复制日志请求消息     * @param destinationEndpoint 目标节点     */    void sendAppendEntries(AppendEntriesRpc rpc,NodeEndpoint destinationEndpoint);    /**     * 回复复制日志消息     * @param result 复制日志回复消息     * @param destinationEndpoint 目标节点     */    void replyAppendEntries(AppendEntriesResult result,NodeEndpoint destinationEndpoint);    /**     * 重置通道     */    void resetChannels();    /**     * 关闭消息处理器     */    void close();}


/** * 消息连接处理器适配器 */public abstract class ConnectorAdapter implements Connector {@Override    public void initialize() {    }@Override    public void sendRequestVote(RequestVoteRpc rpc, Collection destinationEndpoints) {    }@Override    public void replyRequestVote(RequestVoteResult result, NodeEndpoint destinationEndpoint) {    }@Override    public void sendAppendEntries(AppendEntriesRpc rpc, NodeEndpoint destinationEndpoint) {    }@Override    public void replyAppendEntries(AppendEntriesResult result, NodeEndpoint destinationEndpoint) {    }@Override    public void resetChannels() {            }@Override    public void close() {    }}


/** * 模拟消息连接处理器 */public class MockConnector extends ConnectorAdapter {@ToString    @Getter    public class Message {private Object rpc;        private NodeId destinationNodeId;        private Object result;    }@Getter    private List messages = new LinkedList<>();    @Override    public void sendRequestVote(RequestVoteRpc rpc, Collection destinationEndpoints) {        Message m = new Message();        m.rpc = rpc;        messages.add(m);    }@Override    public void replyRequestVote(RequestVoteResult result, NodeEndpoint destinationEndpoint) {        Message m = new Message();        m.result = result;        m.destinationNodeId = destinationEndpoint.getId();        messages.add(m);    }@Override    public void sendAppendEntries(AppendEntriesRpc rpc, NodeEndpoint destinationEndpoint) {        Message m = new Message();        m.rpc = rpc;        m.destinationNodeId = destinationEndpoint.getId();        messages.add(m);    }@Override    public void replyAppendEntries(AppendEntriesResult result, NodeEndpoint destinationEndpoint) {        Message m = new Message();        m.result = result;        m.destinationNodeId = destinationEndpoint.getId();        messages.add(m);    }public Message getLastMessage() {return messages.isEmpty() ? null : (Message) ((LinkedList)messages).getLast();    }private Message getLastMessageOrDefault() {return messages.isEmpty() ? new Message() : (Message) ((LinkedList)messages).getLast();    }public Object getRpc() {return getLastMessageOrDefault().rpc;    }public Object getResult() {return getLastMessageOrDefault().result;    }public NodeId getDestinationNodeId() {return getLastMessageOrDefault().destinationNodeId;    }public int getMessageCount() {return messages.size();    }public List getMessages() {return new ArrayList<>(messages);    }public void clearMessage() {messages.clear();    }}


/** * 消息通道 */public interface Channel {/**     * 写入选举请求消息     * @param rpc rpc     */    void writeRequestVoteRpc(RequestVoteRpc rpc);    /**     * 写入选举响应消息     * @param result result     */    void writeRequestVoteResult(RequestVoteResult result);    /**     * 写入复制日志请求消息     * @param rpc rpc     */    void writeAppendEntriesRpc(AppendEntriesRpc rpc);    /**     * 写入复制日志响应消息     * @param result result     */    void writeAppendEntriesResult(AppendEntriesResult result);    /**     * 关闭通道     */    void close();}
/** * 通道异常 */public class ChannelException extends RuntimeException {public ChannelException(Throwable cause) {super(cause);    }public ChannelException(String message, Throwable cause) {super(message, cause);    }}
/** * 通道连接异常 */public class ChannelConnectException extends ChannelException {public ChannelConnectException(Throwable cause) {super(cause);    }public ChannelConnectException(String message, Throwable cause) {super(message, cause);    }}
/** * Netty通道 */@AllArgsConstructor@Getterpublic class NioChannel implements Channel {private final io.netty.channel.Channel nettyChannel;    @Override    public void writeRequestVoteRpc(RequestVoteRpc rpc) {nettyChannel.writeAndFlush(rpc);    }@Override    public void writeRequestVoteResult(RequestVoteResult result) {nettyChannel.writeAndFlush(result);    }@Override    public void writeAppendEntriesRpc(AppendEntriesRpc rpc) {nettyChannel.writeAndFlush(rpc);    }@Override    public void writeAppendEntriesResult(AppendEntriesResult result) {nettyChannel.writeAndFlush(result);    }@Override    public void close() {try {nettyChannel.close().sync();        } catch (InterruptedException e) {throw new ChannelException("failed to close",e);        }    }}
/** * Netty消息连接处理器 */@Slf4jpublic class NioConnector implements Connector {//欢迎线程组    private final NioEventLoopGroup bossNioEventLoopGroup = new NioEventLoopGroup(1);    //工作线程组    private final NioEventLoopGroup workerNioEventLoopGroup;    //Netty服务端和连接端是否共享工作线程组    private final boolean workerGroupShared;    //一种观察者模式的消息发布/订阅的工具    private final EventBus eventBus;    //端口    private final int port;    //入站消息通道组    private final InboundChannelGroup inboundChannelGroup = new InboundChannelGroup();    //出站消息通道组    private final OutboundChannelGroup outboundChannelGroup;    public NioConnector(NodeId selfNodeId, EventBus eventBus, int port) {this(new NioEventLoopGroup(), false, selfNodeId, eventBus, port);    }public NioConnector(NioEventLoopGroup workerNioEventLoopGroup, NodeId selfNodeId, EventBus eventBus, int port) {this(workerNioEventLoopGroup, true, selfNodeId, eventBus, port);    }public NioConnector(NioEventLoopGroup workerNioEventLoopGroup, boolean workerGroupShared, NodeId selfNodeId, EventBus eventBus, int port) {this.workerNioEventLoopGroup = workerNioEventLoopGroup;        this.workerGroupShared = workerGroupShared;        this.eventBus = eventBus;        this.port = port;        outboundChannelGroup = new OutboundChannelGroup(workerNioEventLoopGroup, eventBus, selfNodeId);    }/**     * 建立Netty服务端     */    @Override    public void initialize() {        ServerBootstrap serverBootstrap = new ServerBootstrap()                .group(bossNioEventLoopGroup, workerNioEventLoopGroup)                .channel(NioServerSocketChannel.class)                .option(ChannelOption.SO_BACKLOG,1024)                .childOption(ChannelOption.TCP_NODELAY,true)                .childHandler(new ChannelInitializer() {@Override                    protected void initChannel(SocketChannel ch) throws Exception {                        ChannelPipeline pipeline = ch.pipeline();                        pipeline.addLast(new Decoder());                        pipeline.addLast(new Encoder());                        pipeline.addLast(new FromRemoteHandler(eventBus, inboundChannelGroup));                    }                });        log.debug("node listen on port {}", port);        try {            serverBootstrap.bind(port).sync();        } catch (InterruptedException e) {throw new ConnectorException("failed to bind port", e);        }    }@Override    public void sendRequestVote(RequestVoteRpc rpc, Collection destinationEndpoints) {        destinationEndpoints.forEach(endpoint -> {log.debug("send {} to node {}", rpc, endpoint.getId());            try {                getChannel(endpoint).writeRequestVoteRpc(rpc);            } catch (Exception e) {                logException(e);            }        });    }private void logException(Exception e) {if (e instanceof ChannelConnectException) {log.warn(e.getMessage());        } else {log.warn("failed to process channel", e);        }    }@Override    public void replyRequestVote(RequestVoteResult result,NodeEndpoint destinationEndpoint) {log.debug("reply {} to node {}", result, destinationEndpoint.getId());        try {            getChannel(destinationEndpoint).writeRequestVoteResult(result);        } catch (Exception e) {            logException(e);        }    }@Override    public void sendAppendEntries(AppendEntriesRpc rpc, NodeEndpoint destinationEndpoint) {log.debug("send {} to node {}", rpc, destinationEndpoint.getId());        try {            getChannel(destinationEndpoint).writeAppendEntriesRpc(rpc);        } catch (Exception e) {            logException(e);        }    }@Override    public void replyAppendEntries(AppendEntriesResult result, NodeEndpoint destinationEndpoint) {log.debug("reply {} to node {}", result, destinationEndpoint.getId());        try {            getChannel(destinationEndpoint).writeAppendEntriesResult(result);        } catch (Exception e) {            logException(e);        }    }/**     * 建立Netty连接端     * @param endpoint     * @return     */    private Channel getChannel(NodeEndpoint endpoint) {return outboundChannelGroup.getOrConnect(endpoint.getId(), endpoint.getAddress());    }@Override    public void resetChannels() {inboundChannelGroup.closeAll();    }@Override    public void close() {log.debug("close connector");        inboundChannelGroup.closeAll();        outboundChannelGroup.closeAll();        bossNioEventLoopGroup.shutdownGracefully();        if (!workerGroupShared) {workerNioEventLoopGroup.shutdownGracefully();        }    }}

/** * 自定义解码器 */public class Decoder extends ByteToMessageDecoder {//日志复制创建工厂    private final EntryFactory entryFactory = new EntryFactory();    /**     * 解码,消息体占不小于8个字节,前8个字节即2个整数,第一个整数为消息类型     * 第二个整数为后续字节的长度,后续字节为ProtoBuffer序列化后的二进制码     * @param ctx     * @param in 读取缓冲区     * @param out ProtoBuffer反序列化后的对象列表     * @throws Exception     */    @Override    protected void decode(ChannelHandlerContext ctx, ByteBuf in, List out) throws Exception {int availableBytes = in.readableBytes();        if (availableBytes < 8) return;        //标记ByteBuf的读取位置        in.markReaderIndex();        int messageType = in.readInt();        int payloadLength = in.readInt();        if (in.readableBytes() < payloadLength) {            in.resetReaderIndex();            return;        }byte[] payload = new byte[payloadLength];        in.readBytes(payload);        switch (messageType) {case MessageConstants.MSG_TYPE_NODE_ID:                out.add(new NodeId(new String(payload)));                break;            case MessageConstants.MSG_TYPE_REQUEST_VOTE_RPC:                Protos.RequestVoteRpc protoRVRpc = Protos.RequestVoteRpc.parseFrom(payload);                RequestVoteRpc rpc = new RequestVoteRpc();                rpc.setTerm(protoRVRpc.getTerm());                rpc.setCandidateId(new NodeId(protoRVRpc.getCandidateId()));                rpc.setLastLogIndex(protoRVRpc.getLastLogIndex());                rpc.setLastLogTerm(protoRVRpc.getLastLogTerm());                out.add(rpc);                break;            case MessageConstants.MSG_TYPE_REQUEST_VOTE_RESULT:                Protos.RequestVoteResult protoRVResult = Protos.RequestVoteResult.parseFrom(payload);                out.add(new RequestVoteResult(protoRVResult.getTerm(), protoRVResult.getVoteGranted()));                break;            case MessageConstants.MSG_TYPE_APPEND_ENTRIES_RPC:                Protos.AppendEntriesRpc protoAERpc = Protos.AppendEntriesRpc.parseFrom(payload);                AppendEntriesRpc aeRpc = new AppendEntriesRpc();                aeRpc.setMessageId(protoAERpc.getMessageId());                aeRpc.setTerm(protoAERpc.getTerm());                aeRpc.setLeaderId(new NodeId(protoAERpc.getLeaderId()));                aeRpc.setLeaderCommit(protoAERpc.getLeaderCommit());                aeRpc.setPrevLogIndex(protoAERpc.getPrevLogIndex());                aeRpc.setPrevLogTerm(protoAERpc.getPrevLogTerm());                aeRpc.setEntries(protoAERpc.getEntriesList().stream().map(e ->entryFactory.create(e.getKind(), e.getIndex(), e.getTerm(), e.getCommand().toByteArray())                ).collect(Collectors.toList()));                out.add(aeRpc);                break;            case MessageConstants.MSG_TYPE_APPEND_ENTRIES_RESULT:                Protos.AppendEntriesResult protoAEResult = Protos.AppendEntriesResult.parseFrom(payload);                out.add(new AppendEntriesResult(protoAEResult.getRpcMessageId(), protoAEResult.getTerm(), protoAEResult.getSuccess()));                break;        }    }}
/** * 自定义编码器 */public class Encoder extends MessageToByteEncoder {    /**     * 将消息进行ProtoBuffer序列化后写入ByteBuf中     * @param ctx     * @param msg     * @param out     * @throws Exception     */    @Override    protected void encode(ChannelHandlerContext ctx, Object msg, ByteBuf out) throws Exception {if (msg instanceof NodeId) {this.writeMessage(out, MessageConstants.MSG_TYPE_NODE_ID, ((NodeId) msg).getValue().getBytes());        } else if (msg instanceof RequestVoteRpc) {            RequestVoteRpc rpc = (RequestVoteRpc) msg;            Protos.RequestVoteRpc protoRpc = Protos.RequestVoteRpc.newBuilder()                    .setTerm(rpc.getTerm())                    .setCandidateId(rpc.getCandidateId().getValue())                    .setLastLogIndex(rpc.getLastLogIndex())                    .setLastLogTerm(rpc.getLastLogTerm())                    .build();            this.writeMessage(out, MessageConstants.MSG_TYPE_REQUEST_VOTE_RPC, protoRpc);        } else if (msg instanceof RequestVoteResult) {            RequestVoteResult result = (RequestVoteResult) msg;            Protos.RequestVoteResult protoResult = Protos.RequestVoteResult.newBuilder()                    .setTerm(result.getTerm())                    .setVoteGranted(result.isVoteGranted())                    .build();            this.writeMessage(out, MessageConstants.MSG_TYPE_REQUEST_VOTE_RESULT, protoResult);        } else if (msg instanceof AppendEntriesRpc) {            AppendEntriesRpc rpc = (AppendEntriesRpc) msg;            Protos.AppendEntriesRpc protoRpc = Protos.AppendEntriesRpc.newBuilder()                    .setMessageId(rpc.getMessageId())                    .setTerm(rpc.getTerm())                    .setLeaderId(rpc.getLeaderId().getValue())                    .setLeaderCommit(rpc.getLeaderCommit())                    .setPrevLogIndex(rpc.getPrevLogIndex())                    .setPrevLogTerm(rpc.getPrevLogTerm())                    .addAllEntries(                            rpc.getEntries().stream().map(e ->                                    Protos.AppendEntriesRpc.Entry.newBuilder()                                            .setKind(e.getKind())                                            .setIndex(e.getIndex())                                            .setTerm(e.getTerm())                                            .setCommand(ByteString.copyFrom(e.getCommandBytes()))                                            .build()                            ).collect(Collectors.toList())                    ).build();            this.writeMessage(out, MessageConstants.MSG_TYPE_APPEND_ENTRIES_RPC, protoRpc);        } else if (msg instanceof AppendEntriesResult) {            AppendEntriesResult result = (AppendEntriesResult) msg;            Protos.AppendEntriesResult protoResult = Protos.AppendEntriesResult.newBuilder()                    .setRpcMessageId(result.getRpcMessageId())                    .setTerm(result.getTerm())                    .setSuccess(result.isSuccess())                    .build();            this.writeMessage(out, MessageConstants.MSG_TYPE_APPEND_ENTRIES_RESULT, protoResult);        }    }private void writeMessage(ByteBuf out, int messageType, MessageLite message) throws IOException {        ByteArrayOutputStream byteOutput = new ByteArrayOutputStream();        message.writeTo(byteOutput);        out.writeInt(messageType);        this.writeBytes(out, byteOutput.toByteArray());    }private void writeMessage(ByteBuf out, int messageType, byte[] bytes) {// 4 + 4 + VAR        out.writeInt(messageType);        this.writeBytes(out, bytes);    }private void writeBytes(ByteBuf out, byte[] bytes) {        out.writeInt(bytes.length);        out.writeBytes(bytes);    }}
/** * 消息抽象处理器 * ChannelDuplexHandler实现了ChannelInboundHandler * 和ChannelOutboundHandler两个接口 */@Slf4j@RequiredArgsConstructorpublic abstract class AbstractHandler extends ChannelDuplexHandler {//一种观察者模式的消息发布/订阅的工具    protected final EventBus eventBus;    //远程节点Id    protected NodeId remoteId;    //消息通道    protected Channel channel;    //最后一个日志复制请求消息    private AppendEntriesRpc lastAppendEntriesRpc;    /**     * 从通道中获取的消息进行eventBus的消息投递     * @param ctx     * @param msg     * @throws Exception     */    @Override    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {//断言,需要在JVM参数中加入-ea,否则不起作用        assert remoteId != null;        assert channel != null;        if (msg instanceof RequestVoteRpc) {            RequestVoteRpc rpc = (RequestVoteRpc) msg;            eventBus.post(new RequestVoteRpcMessage(rpc, remoteId, channel));        } else if (msg instanceof RequestVoteResult) {eventBus.post(msg);        } else if (msg instanceof AppendEntriesRpc) {            AppendEntriesRpc rpc = (AppendEntriesRpc) msg;            eventBus.post(new AppendEntriesRpcMessage(rpc, remoteId, channel));        } else if (msg instanceof AppendEntriesResult) {            AppendEntriesResult result = (AppendEntriesResult) msg;            if (lastAppendEntriesRpc == null) {log.warn("no last append entries rpc");            } else {if (!Objects.equals(result.getRpcMessageId(), lastAppendEntriesRpc.getMessageId())) {log.warn("incorrect append entries rpc message id {}, expected {}", result.getRpcMessageId(), lastAppendEntriesRpc.getMessageId());                } else {eventBus.post(new AppendEntriesResultMessage(result, remoteId, lastAppendEntriesRpc));                    lastAppendEntriesRpc = null;                }            }        }    }@Override    public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {if (msg instanceof AppendEntriesRpc) {lastAppendEntriesRpc = (AppendEntriesRpc) msg;        }super.write(ctx, msg, promise);    }@Override    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {log.warn(cause.getMessage(), cause);        ctx.close();    }}
/** * 服务端消息事件处理器 */@Slf4jpublic class FromRemoteHandler extends AbstractHandler {//入站消息通道组    private final InboundChannelGroup channelGroup;    public FromRemoteHandler(EventBus eventBus, InboundChannelGroup channelGroup) {super(eventBus);        this.channelGroup = channelGroup;    }/**     * 针对节点Id重写消息读取事件     * @param ctx     * @param msg     * @throws Exception     */    @Override    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {if (msg instanceof NodeId) {remoteId = (NodeId) msg;            NioChannel nioChannel = new NioChannel(ctx.channel());            channel = nioChannel;            channelGroup.add(remoteId, nioChannel);            return;        }log.debug("receive {} from {}", msg, remoteId);        super.channelRead(ctx, msg);    }}
/** * 入站消息通道组 */@Slf4jpublic class InboundChannelGroup {//所有的入站消息通道    private List channels = new CopyOnWriteArrayList<>();    /**     * 消息进入后的处理     * @param remoteId     * @param channel     */    public void add(NodeId remoteId, NioChannel channel) {log.debug("channel INBOUND-{} connected", remoteId);        //一个ChannelFuture对象代表尚未发生的IO操作,因为在Netty中所有的操作都是异步的        //同步阻塞监听端口关闭后移除该通道(这是一个尚未发生的未来事件)        channel.getNettyChannel().closeFuture().addListener((ChannelFutureListener) future -> {log.debug("channel INBOUND-{} disconnected", remoteId);            remove(channel);        });    }private void remove(NioChannel channel) {channels.remove(channel);    }/**     * 关闭所有入站连接通道     */    public void closeAll() {log.debug("close all inbound channels");        channels.forEach(NioChannel::close);    }}
/** * 出站消息通道组 */@Slf4j@RequiredArgsConstructorpublic class OutboundChannelGroup {//工作线程组    private final EventLoopGroup workerGroup;    //一种观察者模式的消息发布/订阅的工具    private final EventBus eventBus;    //自身的节点Id    private final NodeId selfNodeId;    //节点Id和Netty通道异步运行结果的映射    private Map> channelMap = new ConcurrentHashMap<>();    /**     * 获取或连接服务端的Netty通道     * @param nodeId     * @param address     * @return     */    public NioChannel getOrConnect(NodeId nodeId, Address address) {        Future future = channelMap.get(nodeId);        if (future == null) {            FutureTask newFuture = new FutureTask<>(() -> connect(nodeId, address));            future = channelMap.putIfAbsent(nodeId, newFuture);            if (future == null) {                future = newFuture;                newFuture.run();            }        }try {return future.get();        } catch (Exception e) {channelMap.remove(nodeId);            if (e instanceof ExecutionException) {                Throwable cause = e.getCause();                if (cause instanceof ConnectException) {throw new ChannelConnectException("failed to get channel to node " + nodeId +", cause " + cause.getMessage(), cause);                }            }throw new ChannelException("failed to get channel to node " + nodeId, e);        }    }/**     * 建立连接端并连接到服务端     * @param nodeId     * @param address     * @return     * @throws InterruptedException     */    private NioChannel connect(NodeId nodeId, Address address) throws InterruptedException {        Bootstrap bootstrap = new Bootstrap()                .group(workerGroup)                .channel(NioSocketChannel.class)                .option(ChannelOption.SO_BACKLOG,1024)                .option(ChannelOption.TCP_NODELAY, true)                .handler(new ChannelInitializer() {@Override                    protected void initChannel(SocketChannel ch) throws Exception {                        ChannelPipeline pipeline = ch.pipeline();                        pipeline.addLast(new Decoder());                        pipeline.addLast(new Encoder());                        pipeline.addLast(new ToRemoteHandler(eventBus, nodeId, selfNodeId));                    }                });        ChannelFuture future = bootstrap.connect(address.getHost(), address.getPort()).sync();        if (!future.isSuccess()) {throw new ChannelException("failed to connect", future.cause());        }log.debug("channel OUTBOUND-{} connected", nodeId);        Channel nettyChannel = future.channel();        //当通道连接关闭时移除节点Id的Netty通道映射        nettyChannel.closeFuture().addListener((ChannelFutureListener) cf -> {log.debug("channel OUTBOUND-{} disconnected", nodeId);            channelMap.remove(nodeId);        });        return new NioChannel(nettyChannel);    }/**     * 关闭所有连接端通道     */    public void closeAll() {log.debug("close all outbound channels");        channelMap.forEach((nodeId, nioChannelFuture) -> {try {                nioChannelFuture.get().close();            } catch (Exception e) {log.warn("failed to close", e);            }        });    }}
/** * 连接端消息事件处理器 */@Slf4jclass ToRemoteHandler extends AbstractHandler {//自身节点Id    private final NodeId selfNodeId;    ToRemoteHandler(EventBus eventBus, NodeId remoteId, NodeId selfNodeId) {super(eventBus);        this.remoteId = remoteId;        this.selfNodeId = selfNodeId;    }@Override    public void channelActive(ChannelHandlerContext ctx) {        ctx.write(selfNodeId);        channel = new NioChannel(ctx.channel());    }@Override    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {log.debug("receive {} from {}", msg, remoteId);        super.channelRead(ctx, msg);    }}
  • 任务执行

/** * 任务执行器 */public interface TaskExecutor {/**     * 提交任务     * @param task     * @return     */    Future submit(Runnable task);    /**     * 提交任务,任务有返回值     * @param task     * @param      * @return     */     Future submit(Callable task);    /**     * 关闭任务执行器     * @throws InterruptedException     */    void shutdown() throws InterruptedException;}


/** * 异步单线程任务执行器 */public class SingleThreadTaskExecutor implements TaskExecutor {private final ExecutorService executorService;    public SingleThreadTaskExecutor() {this(Executors.defaultThreadFactory());    }public SingleThreadTaskExecutor(String name) {this(r -> new Thread(r,name));    }public SingleThreadTaskExecutor(ThreadFactory threadFactory) {executorService = Executors.newSingleThreadExecutor(threadFactory);    }@Override    public Future submit(Runnable task) {return executorService.submit(task);    }@Override    public  Future submit(Callable task) {return executorService.submit(task);    }@Override    public void shutdown() throws InterruptedException {executorService.shutdown();        executorService.awaitTermination(1,TimeUnit.SECONDS);    }}


/** * 直接任务执行器 */public class DirectTaskExecutor implements TaskExecutor {@Override    public Future submit(Runnable task) {        FutureTask futureTask = new FutureTask<>(task,null);        futureTask.run();        return futureTask;    }@Override    public  Future submit(Callable task) {        FutureTask futureTask = new FutureTask<>(task);        futureTask.run();        return futureTask;    }@Override    public void shutdown() throws InterruptedException {    }}


/** * 节点存储 */public interface NodeStore {/**     * 获取currentTerm     * @return     */    int getTerm();    /**     * 设置currentTerm     * @param term     */    void setTerm(int term);    /**     * 获取voterFor     * @return     */    NodeId getVotedFor();    /**     * 设置votedFor     * @param votedFor     */    void setVotedFor(NodeId votedFor);    /**     * 关闭文件     */    void close();}


/** * 基于内存实现节点存储,便于测试 */@AllArgsConstructorpublic class MemoryNodeStore implements NodeStore {private int term;    private NodeId votedFor;    public MemoryNodeStore() {this(0,null);    }@Override    public int getTerm() {return term;    }@Override    public void setTerm(int term) {this.term = term;    }@Override    public NodeId getVotedFor() {return votedFor;    }@Override    public void setVotedFor(NodeId votedFor) {this.votedFor = votedFor;    }@Override    public void close() {    }}



/** * 节点存储异常 */public class NodeStoreException extends RuntimeException {public NodeStoreException(Throwable cause) {super(cause);    }public NodeStoreException(String message, Throwable cause) {super(message, cause);    }}
/** * 可定位的文件接口 */public interface SeekableFile {/**     * 获取文件当前的位置     * @return     * @throws IOException     */    long position() throws IOException;    /**     * 定位文件指针     * @param position     * @throws IOException     */    void seek(long position) throws IOException;    /**     * 文件中写入一个整数     * @param i     * @throws IOException     */    void writeInt(int i) throws IOException;    /**     * 文件呢中写入一个长整数     * @param l     * @throws IOException     */    void writeLong(long l) throws IOException;    /**     * 文件呢中写入字节数组     * @param b     * @throws IOException     */    void write(byte[] b) throws IOException;    /**     * 文件中读取一个整数     * @return     * @throws IOException     */    int readInt() throws IOException;    /**     * 文件中读取一个长整数     * @return     * @throws IOException     */    long readLong() throws IOException;    /**     * 文件中读取字节数组     * @param b     * @return     * @throws IOException     */    int read(byte[] b) throws IOException;    /**     * 获取文件的长度     * @return     * @throws IOException     */    long size() throws IOException;    /**     * 设置文件的长度     * @param size     * @throws IOException     */    void truncate(long size) throws IOException;    /**     * 获取一段字节流     * @param start     * @return     * @throws IOException     */    InputStream inputStream(long start) throws IOException;    /**     * 刷新     * @throws IOException     */    void flush() throws IOException;    /**     * 关闭文件     * @throws IOException     */    void close() throws IOException;}


public class RandomAccessFileAdapter implements SeekableFile {private final File file;    private final RandomAccessFile randomAccessFile;    public RandomAccessFileAdapter(File file) throws FileNotFoundException {this(file, "rw");    }public RandomAccessFileAdapter(File file, String mode) throws FileNotFoundException {this.file = file;        randomAccessFile = new RandomAccessFile(file, mode);    }@Override    public void seek(long position) throws IOException {randomAccessFile.seek(position);    }@Override    public void writeInt(int i) throws IOException {randomAccessFile.writeInt(i);    }@Override    public void writeLong(long l) throws IOException {randomAccessFile.writeLong(l);    }@Override    public void write(byte[] b) throws IOException {randomAccessFile.write(b);    }@Override    public int readInt() throws IOException {return randomAccessFile.readInt();    }@Override    public long readLong() throws IOException {return randomAccessFile.readLong();    }@Override    public int read(byte[] b) throws IOException {return randomAccessFile.read(b);    }@Override    public long size() throws IOException {return randomAccessFile.length();    }@Override    public void truncate(long size) throws IOException {randomAccessFile.setLength(size);    }@Override    public InputStream inputStream(long start) throws IOException {        FileInputStream input = new FileInputStream(file);        if (start > 0) {            input.skip(start);        }return input;    }@Override    public long position() throws IOException {return randomAccessFile.getFilePointer();    }@Override    public void flush() throws IOException {    }@Override    public void close() throws IOException {randomAccessFile.close();    }}
/** * 基于文件节点存储 */public class FileNodeStore implements NodeStore {//文件名    private static final String FILE_NAME = "node.bin";    //currentTerm在文件中的位置    private static final long OFFSET_TERM = 0;    //已投票的节点Id在文件中的位置    private static final long OFFSET_VOTED_FOR = 4;    //文件操作接口    private final SeekableFile seekableFile;    //currentTerm    private int term = 0;    //已投票节点Id    private NodeId votedFor = null;    public FileNodeStore(File file) {try {if (!file.exists()) {                Files.touch(file);            }seekableFile = new RandomAccessFileAdapter(file);            initializeOrLoad();        } catch (IOException e) {throw new NodeStoreException(e);        }    }public FileNodeStore(SeekableFile seekableFile) {this.seekableFile = seekableFile;        try {            initializeOrLoad();        } catch (IOException e) {throw new NodeStoreException(e);        }    }/**     * 初始化或从文件导入term和votedFor属性     * @throws IOException     */    private void initializeOrLoad() throws IOException {if (seekableFile.size() == 0) {seekableFile.truncate(8);            seekableFile.seek(0);            seekableFile.writeInt(0);            seekableFile.writeInt(0);        }else {term = seekableFile.readInt();            int length = seekableFile.readInt();            if (length > 0) {byte[] bytes = new byte[length];                seekableFile.read(bytes);                votedFor = new NodeId(new String(bytes));            }        }    }@Override    public int getTerm() {return term;    }@Override    public void setTerm(int term) {try {seekableFile.seek(OFFSET_TERM);            seekableFile.writeInt(term);        } catch (IOException e) {throw new NodeStoreException(e);        }this.term = term;    }@Override    public NodeId getVotedFor() {return votedFor;    }@Override    public void setVotedFor(NodeId votedFor) {try {seekableFile.seek(OFFSET_VOTED_FOR);            if (votedFor == null) {seekableFile.writeInt(0);                seekableFile.truncate(8);            }else {byte[] bytes = votedFor.getValue().getBytes();                seekableFile.writeInt(bytes.length);                seekableFile.write(bytes);            }        } catch (IOException e) {throw new NodeStoreException(e);        }this.votedFor = votedFor;    }@Override    public void close() {try {seekableFile.close();        } catch (IOException e) {throw new NodeStoreException(e);        }    }}
/** * 文件工具 */public class Files {/**     * 文件创建的检测     * @param file     * @throws IOException     */    public static void touch(File file) throws IOException {if (!file.createNewFile() && !file.setLastModified(System.currentTimeMillis())) {throw new IOException("failed to touch file " + file);        }    }}
  • 选举核心算法

/** * 节点上下文 */@Data@AllArgsConstructor@Builderpublic class NodeContext {//当前节点Id    private NodeId selfId;    //集群成员组    private NodeGroup group;    //消息连接处理器    private Connector connector;    //任务定时器    private Scheduler scheduler;    //是一种观察者模式的消息发布/订阅的工具    //import com.google.common.eventbus.EventBus;    private EventBus eventBus;    //任务执行器    private TaskExecutor taskExecutor;    //记录日志    private Log log;    //节点存储器    private NodeStore store;}
/** * 一致性(核心)组件 */public interface Node {/**     * 启动     */    void start();    /**     * 关闭     */    void stop();}


/** * 一致性(核心)组件实现类 */@Slf4j@RequiredArgsConstructorpublic class NodeImpl implements Node {//核心组件上下文    @Getter    private final NodeContext context;    //是否已启动    private boolean started;    //当前节点角色    @Getter    private AbstractNodeRole role;    @Override    public synchronized void start() {if (started) {return;        }//将自己注册到EventBus        context.getEventBus().register(this);        context.getConnector().initialize();        NodeStore store = context.getStore();        changeToRole(new FollowerNodeRole(store.getTerm(),store.getVotedFor(),                null,scheduleElectionTimeout()));        started = true;    }@Override    public synchronized void stop() throws InterruptedException {if (!started) {throw new IllegalArgumentException("node not started");        }context.getScheduler().stop();        context.getConnector().close();        context.getTaskExecutor().shutdown();        started = false;    }/**     * 转变当前节点角色     * @param newRole     */    private void changeToRole(AbstractNodeRole newRole) {log.debug("node {},role state changed -> {}",context.getSelfId(),newRole);        //获取持久化节点存储        NodeStore store = context.getStore();        //存储新角色的term        store.setTerm(newRole.getTerm());        //如果新节点角色为从节点,存储已投票的节点Id        if (newRole.getName() == RoleName.FOLLOWER) {            store.setVotedFor(((FollowerNodeRole) newRole).getVotedFor());        }//将当前节点角色转化为新的节点角色        role = newRole;    }/**     * 调度选举超时     * @return     */    private ElectionTimeout scheduleElectionTimeout() {return context.getScheduler().scheduleElectionTimeout(this::electionTimeout);    }/**     * 运行超时任务     */    public void electionTimeout() {context.getTaskExecutor().submit(this::doProcessElectionTimeout);    }/**     * 日志复制定时任务     * @return     */    public LogReplicationTask scheduleLogReplicationTask() {return context.getScheduler().scheduleLogReplicationTask(this::replicateLog);    }/**     * 日志复制任务     */    public void replicateLog() {context.getTaskExecutor().submit(this::doReplicateLog);    }/**     * 执行日志复制,发送日志复制消息给除自己之外的其他节点     */    private void doReplicateLog() {log.debug("replicate log");        context.getGroup().listReplicationTarget()                .forEach(this::doMemberReplicateLog);    }/**     * 发送日志复制消息给集群成员     * @param member     */    private void doMemberReplicateLog(GroupMember member) {        AppendEntriesRpc rpc = new AppendEntriesRpc();        rpc.setTerm(role.getTerm());        rpc.setLeaderId(context.getSelfId());        rpc.setPrevLogIndex(0);        rpc.setPrevLogTerm(0);        rpc.setLeaderCommit(0);        context.getConnector().sendAppendEntries(rpc,member.getEndpoint());    }/**     * 超时任务进程     */    private void doProcessElectionTimeout() {//如果节点类型为Leader节点,不进行选举        if (role.getName() == RoleName.LEADER) {log.warn("node {},current role is leader,ignore election timeout",context.getSelfId());            return;        }//获取新的选举的term        int newTerm = role.getTerm() + 1;        role.cancelTimeoutOrTask();        log.info("start election");        //将当前节点转变为选举节点        changeToRole(new CandidateNodeRole(newTerm,scheduleElectionTimeout()));        //创建选举投票消息,并给消息赋属性        RequestVoteRpc rpc = new RequestVoteRpc();        rpc.setTerm(newTerm);        rpc.setCandidateId(context.getSelfId());        rpc.setLastLogIndex(0);        rpc.setLastLogTerm(0);        //并把消息通过网络发送给除自己之外的其他所有节点        context.getConnector().sendRequestVote(rpc,context.getGroup().listEndpointExceptSelf());    }/**     * 处理投票消息的请求     * @param rpcMessage     */    @Subscribe    public void onReceiveRequestVoteRpc(RequestVoteRpcMessage rpcMessage) {//通过任务执行器提交        //通过网络连接向选举节点回复选举投票消息        context.getTaskExecutor().submit(() -> context.getConnector().replyRequestVote(                doProcessRequestVoteRpc(rpcMessage),context.getGroup().getMember(rpcMessage.getSourceNodeId()).getEndpoint()        ));    }/**     * 投票消息处理进程     * @param rpcMessage     * @return     */    private RequestVoteResult doProcessRequestVoteRpc(RequestVoteRpcMessage rpcMessage) {//获取投票消息中的消息体        RequestVoteRpc rpc = rpcMessage.getRpc();        //如果消息的term小于当前节点的term,不投票        if (rpc.getTerm() < role.getTerm()) {log.debug("term from rpc < current term,don't vote ({} < {})",                    rpc.getTerm(),role.getTerm());            return new RequestVoteResult(role.getTerm(),false);        }//决定投票        boolean voteForCandidate = true;        //如果消息的term大于当前节点的term        if (rpc.getTerm() > role.getTerm()) {//将当前节点转化成从节点并投票            becomeFollower(rpc.getTerm(),voteForCandidate ? rpc.getCandidateId() : null,null,true);            return new RequestVoteResult(rpc.getTerm(),voteForCandidate);        }//如果消息的term等于当前节点的term,根据当前节点的角色进行处理        switch (role.getName()) {case FOLLOWER: //如果是从节点                FollowerNodeRole follower = (FollowerNodeRole) role;                //获取投过票的节点Id(即选举节点Id中的一个)                NodeId votedFor = follower.getVotedFor();                //看自己如果没有投过票或者消息的选举节点发送者Id就是本身已投过票的节点Id                if ((votedFor == null && voteForCandidate) || Objects.equals(votedFor,rpc.getCandidateId())) {//将当前节点变为从节点,并记录投票的选举节点,并对选举发送节点投票                    becomeFollower(role.getTerm(),rpc.getCandidateId(),null,true);                    return new RequestVoteResult(rpc.getTerm(),true);                }//否则不投票                return new RequestVoteResult(rpc.getTerm(),false);            case CANDIDATE: //如果是选举节点或主节点,不投票            case LEADER:return new RequestVoteResult(role.getTerm(),false);            default:throw new IllegalArgumentException("unexpected node role [" + role.getName() + "]");        }    }/**     * 变成从节点     * @param term 当前term     * @param votedFor 投过票的节点Id     * @param leaderId 主节点Id     * @param scheduleElectionTimeout 是否运行超时任务     */    private void becomeFollower(int term, NodeId votedFor,NodeId leaderId,boolean scheduleElectionTimeout) {//取消当前节点的超时任务        role.cancelTimeoutOrTask();        //如果有主节点,打印出主节点和term        if (leaderId != null && !leaderId.equals(role.getLeaderId(context.getSelfId()))) {log.info("current leader is {},term {}",leaderId,term);        }//是否运行超时任务,并获取该超时任务        ElectionTimeout electionTimeout = scheduleElectionTimeout ? scheduleElectionTimeout() : ElectionTimeout.NONE;        //将当前节点角色转化为从节点        changeToRole(new FollowerNodeRole(term,votedFor,leaderId,electionTimeout));    }/**     * 收到投票结果消息的处理     * @param result     */    @Subscribe    public void onReceiveRequestVoteResult(RequestVoteResult result) {context.getTaskExecutor().submit(() -> processRequestVoteResult(result));    }/**     * 投票结果消息的异步处理     * @param result     */    private void processRequestVoteResult(RequestVoteResult result) {context.getTaskExecutor().submit(() -> doProcessRequestVoteResult(result));    }/**     * 投票结果消息处理进程     * @param result     */    private void doProcessRequestVoteResult(RequestVoteResult result) {//如果消息的term大于当前节点的term        //当前节点退化成从节点        if (result.getTerm() > role.getTerm()) {            becomeFollower(result.getTerm(), null, null, true);            return;        }//如果当前节点不是选举节点,结束处理        if (role.getName() != RoleName.CANDIDATE) {log.debug("receive request vote result and current role is not candidate, ignore");            return;        }//如果消息返回不投票,结束处理        if (!result.isVoteGranted()) {return;        }//将当前选举节点的已投票数加1        int currentVotesCount = ((CandidateNodeRole) role).getVotesCount() + 1;        //获取集群主要成员的数量        int countOfMajor = context.getGroup().getCountOfMajor();        log.debug("votes count {}, major node count {}", currentVotesCount, countOfMajor);        //取消选举超时任务        role.cancelTimeoutOrTask();        //如果当前选举节点的选举票数过半        if (currentVotesCount > countOfMajor / 2) {log.info("become leader, term {}", role.getTerm());            //重置其他节点的复制进度            resetReplicatingStates();            //将当前节点转变成主节点,并开始发送日志复制消息或心跳消息            changeToRole(new LeaderNodeRole(role.getTerm(), scheduleLogReplicationTask()));            context.getLog().appendEntry(role.getTerm());            //重置所有进站通道            context.getConnector().resetChannels();        } else {//如果选举票数未过半,依然转化为选举节点,并开启选举超时任务            changeToRole(new CandidateNodeRole(role.getTerm(), currentVotesCount, scheduleElectionTimeout()));        }    }/**     * 重置其他节点的复制进度     */    private void resetReplicatingStates() {context.getGroup().resetReplicatingStates(context.getLog().getNextIndex());    }/**     * 处理心跳消息的请求     * @param rpcMessage     */    @Subscribe    public void onReceiveAppendEntriesRpc(AppendEntriesRpcMessage rpcMessage) {context.getTaskExecutor().submit(() -> context.getConnector().replyAppendEntries(doProcessAppendEntriesRpc(rpcMessage),                context.getGroup().getMember(rpcMessage.getSourceNodeId()).getEndpoint()));    }/**     * 心跳消息处理进程     * @param rpcMessage     * @return     */    private AppendEntriesResult doProcessAppendEntriesRpc(AppendEntriesRpcMessage rpcMessage) {        AppendEntriesRpc rpc = rpcMessage.getRpc();        if (rpc.getTerm() < role.getTerm()) {return new AppendEntriesResult(rpc.getMessageId(),role.getTerm(),false);        }if (rpc.getTerm() > role.getTerm()) {            becomeFollower(rpc.getTerm(),null,rpc.getLeaderId(),true);            return new AppendEntriesResult(rpc.getMessageId(),rpc.getTerm(),appendEntries(rpc));        }assert rpc.getTerm() == role.getTerm();        switch (role.getName()) {case FOLLOWER:                becomeFollower(rpc.getTerm(),((FollowerNodeRole) role).getVotedFor(),rpc.getLeaderId(),true);                return new AppendEntriesResult(rpc.getMessageId(),rpc.getTerm(),appendEntries(rpc));            case CANDIDATE:                becomeFollower(rpc.getTerm(),null,rpc.getLeaderId(),true);                return new AppendEntriesResult(rpc.getMessageId(),rpc.getTerm(),appendEntries(rpc));            case LEADER:log.warn("receive append entries rpc from another leader {},ignore",rpc.getLeaderId());                return new AppendEntriesResult(rpc.getMessageId(),rpc.getTerm(),false);            default:throw new IllegalArgumentException("unexpected node role [" + role.getName() + "]");        }    }/**     * 追加日志     * @param rpc     * @return     */    private boolean appendEntries(AppendEntriesRpc rpc) {return true;    }/**     * 处理日志复制消息的响应     * @param resultMessage     */    @Subscribe    public void onReceiveAppendEntriesResult(AppendEntriesResultMessage resultMessage) {context.getTaskExecutor().submit(() -> doProcessAppendEntriesResult(resultMessage));    }/**     * 日志复制消息响应的进程     * @param resultMessage     */    private void doProcessAppendEntriesResult(AppendEntriesResultMessage resultMessage) {        AppendEntriesResult result = resultMessage.getResult();        if (result.getTerm() > role.getTerm()) {            becomeFollower(result.getTerm(),null,null,true);            return;        }if (role.getName() != RoleName.LEADER) {log.warn("receive append entries result from node {} but current" +" node is not leader,ignore",resultMessage.getSourceNodeId());        }    }}


/** * 一致性(核心)组件建造类 */public class NodeBuilder {private final NodeGroup group;    private final NodeId selfId;    private final EventBus eventBus;    private Scheduler scheduler = null;    private Connector connector = null;    private TaskExecutor taskExecutor = null;    private NodeStore nodeStore = null;    private Log log = null;    private NodeConfig config = new NodeConfig();    public NodeBuilder(Collection endpoints,NodeId selfId) {group = new NodeGroup(endpoints,selfId);        this.selfId = selfId;        eventBus = new EventBus(selfId.getValue());    }public NodeBuilder(NodeEndpoint endpoint) {this(Collections.singletonList(endpoint),endpoint.getId());    }/**     * 设置通信组件     * @param connector     * @return     */    public NodeBuilder setConnector(Connector connector) {this.connector = connector;        return this;    }/**     * 设置定时器     * @param scheduler     * @return     */    public NodeBuilder setScheduler(Scheduler scheduler) {this.scheduler = scheduler;        return this;    }/**     * 设置任务执行器     * @param taskExecutor     * @return     */    public NodeBuilder setTaskExecutor(TaskExecutor taskExecutor) {this.taskExecutor = taskExecutor;        return this;    }/**     * 设置存储器     * @param nodeStore     * @return     */    public NodeBuilder setNodeStore(NodeStore nodeStore) {this.nodeStore = nodeStore;        return this;    }/**     * 设置日志     * @param log     * @return     */    public NodeBuilder setLog(Log log) {this.log = log;        return this;    }/**     * 构建Node实例     * @return     */    public Node build() {return new NodeImpl(buildContext());    }/**     * 构建上下文     * @return     */    private NodeContext buildContext() {return NodeContext.builder()                .group(group)                .selfId(selfId)                .eventBus(eventBus)                .scheduler(scheduler != null ? scheduler : new DefaultScheduler(config))                .connector(connector)                .taskExecutor(taskExecutor != null ? taskExecutor : new SingleThreadTaskExecutor("node"))                .store(nodeStore != null ? nodeStore : new FileNodeStore(new File("./example/node.bin")))                .log(log != null ? log : new MemoryLog())                .build();    }}


  • 单元测试

public class NodeImplTest {private NodeBuilder newNodeBuilder(NodeId selfId, NodeEndpoint... endpoints) {return new NodeBuilder(Arrays.asList(endpoints),selfId)                .setScheduler(new NullScheduler())                .setConnector(new MockConnector())                .setTaskExecutor(new DirectTaskExecutor())                .setNodeStore(new MemoryNodeStore());    }/**     * 启动测试     */    @Test    public void testStart() {        NodeImpl node = (NodeImpl) newNodeBuilder(NodeId.of("A"),                new NodeEndpoint(new NodeId("A"),"localhost",2333))                .build();        node.start();        FollowerNodeRole role = (FollowerNodeRole) node.getRole();        assertEquals(role.getTerm(),0);        assertEquals(role.getVotedFor(),null);    }/**     * 测试收到选举结果消息     */    @Test    public void testOnReceiveRequestVoteResult() {        NodeImpl node = (NodeImpl) newNodeBuilder(NodeId.of("A"),                new NodeEndpoint(new NodeId("A"),"localhost",2333),                new NodeEndpoint(new NodeId("B"),"localhost",2334),                new NodeEndpoint(new NodeId("C"),"localhost",2335))                .build();        node.start();        node.electionTimeout();        node.onReceiveRequestVoteResult(new RequestVoteResult(1,true));        LeaderNodeRole role = (LeaderNodeRole) node.getRole();        assertEquals(role.getTerm(),1);    }/**     * 测试日志复制     */    @Test    public void testReplicateLog() {        NodeImpl node = (NodeImpl) newNodeBuilder(NodeId.of("A"),                new NodeEndpoint(new NodeId("A"),"localhost",2333),                new NodeEndpoint(new NodeId("B"),"localhost",2334),                new NodeEndpoint(new NodeId("C"),"localhost",2335))                .build();        node.start();        node.electionTimeout();        node.onReceiveRequestVoteResult(new RequestVoteResult(1,true));        node.replicateLog();        MockConnector mockConnector = (MockConnector) node.getContext().getConnector();        assertEquals(mockConnector.getMessageCount(),3);        List messages = mockConnector.getMessages();        Set destinationNodeIds = messages.subList(1,3).stream()                .map(MockConnector.Message::getDestinationNodeId)                .collect(Collectors.toSet());        assertEquals(destinationNodeIds.size(),2);        assertTrue(destinationNodeIds.contains(NodeId.of("B")));        assertTrue(destinationNodeIds.contains(NodeId.of("C")));        AppendEntriesRpc rpc = (AppendEntriesRpc) messages.get(2).getRpc();        assertEquals(rpc.getTerm(),1);    }/**     * 测试复制日志请求消息的处理     */    @Test    public void testOnReceiveAppendEntriesRpcFollower() {        NodeImpl node = (NodeImpl) newNodeBuilder(NodeId.of("A"),                new NodeEndpoint(new NodeId("A"),"localhost",2333),                new NodeEndpoint(new NodeId("B"),"localhost",2334),                new NodeEndpoint(new NodeId("C"),"localhost",2335))                .build();        node.start();        AppendEntriesRpc rpc = new AppendEntriesRpc();        rpc.setTerm(1);        rpc.setLeaderId(NodeId.of("B"));        node.onReceiveAppendEntriesRpc(new AppendEntriesRpcMessage(rpc,NodeId.of("B"),null));        MockConnector connector = (MockConnector) node.getContext().getConnector();        AppendEntriesResult result = (AppendEntriesResult) connector.getResult();        assertEquals(result.getTerm(),1);        assertTrue(result.isSuccess());        FollowerNodeRole role = (FollowerNodeRole) node.getRole();        assertEquals(role.getTerm(),1);        assertEquals(NodeId.of("B"),role.getLeaderId());    }/**     * 测试日志复制回复消息的处理     */    @Test    public void testOnReceiveAppendEntriesNormal() {        NodeImpl node = (NodeImpl) newNodeBuilder(NodeId.of("A"),                new NodeEndpoint(new NodeId("A"),"localhost",2333),                new NodeEndpoint(new NodeId("B"),"localhost",2334),                new NodeEndpoint(new NodeId("C"),"localhost",2335))                .build();        node.start();        node.electionTimeout();        node.onReceiveRequestVoteResult(new RequestVoteResult(1,true));        node.replicateLog();        node.onReceiveAppendEntriesResult(new AppendEntriesResultMessage(new AppendEntriesResult("",1,true),                NodeId.of("B"),                new AppendEntriesRpc()));    }}






  • 日志实现

日志在分布式一致性算法中一直都是一个很重要的基础组件,不管是在与Raft算法作为对比对象的Paxos算法中,还是在Paxos变体算法中。这些算法所要求的日志系统和一般的数据库WAL(Write-Ahead Log),即只会追加日志的日志系统不同,在运行中写入的日志可能会因为冲突而被丢弃或者说被覆盖。日志并不关心上层服务是什么,日志存储的内容是与服务无关的。可以把服务的某个请求转换成一种通用的存储方式,比如转换成二进制存放起来。


/** * 日志条目 */public interface Entry {//日志条目类型    int KIND_NO_OP = 0; //选举产生的新Leader节点增加的第一条空白日志    int KIND_GENERAL = 1; //普通日志条目,上层服务产生的日志    /**     * 获取类型     * @return     */    int getKind();    /**     * 获取索引     * @return     */    int getIndex();    /**     * 获取term     * @return     */    int getTerm();    /**     * 获取元信息(kind,term和index)     * @return     */    EntryMeta getMeta();    /**     * 获取日志负载     * @return     */    byte[] getCommandBytes();}
/** * 日志条目元信息 */@AllArgsConstructor@Getterpublic class EntryMeta {private final int kind;    private final int index;    private final int term;}


/** * 日志条目抽象类 */@AllArgsConstructorpublic abstract class AbstractEntry implements Entry {//日志类型    private final int kind;    //日志索引    protected final int index;    protected final int term;    @Override    public int getKind() {return this.kind;    }@Override    public int getIndex() {return index;    }@Override    public int getTerm() {return term;    }@Override    public EntryMeta getMeta() {return new EntryMeta(kind, index, term);    }}


/** * 普通日志条目 */public class GeneralEntry extends AbstractEntry {//日志负载    private final byte[] commandBytes;    public GeneralEntry(int index, int term, byte[] commandBytes) {super(KIND_GENERAL, index, term);        this.commandBytes = commandBytes;    }@Override    public byte[] getCommandBytes() {return this.commandBytes;    }@Override    public String toString() {return "GeneralEntry{" +"index=" + index +", term=" + term +'}';    }}


/** * 空日志条目 */public class NoOpEntry extends AbstractEntry {public NoOpEntry(int index, int term) {super(KIND_NO_OP, index, term);    }@Override    public byte[] getCommandBytes() {return new byte[0];    }@Override    public String toString() {return "NoOpEntry{" +"index=" + index +", term=" + term +'}';    }}
  • 日志


/** * 日志 */public interface Log {//总条目数    int ALL_ENTRIES = -1;    /**     * 获取最后一条日志的元信息     * 一般用于选取开始时、发送消息时     */    EntryMeta getLastEntryMeta();    /**     * 创建AppendEntries消息     * Leader向Follower发送日志复制消息时     *     * @param term       当前的term     * @param selfId     自节点Id     * @param nextIndex  下一条索引     * @param maxEntries 最大条目数     */    AppendEntriesRpc createAppendEntriesRpc(int term, NodeId selfId, int nextIndex, int maxEntries);    /**     * 获取下一条日志索引     */    int getNextIndex();    /**     * 获取当前提交的索引     */    int getCommitIndex();    /**     * 判断对象的lastLogIndex和LastLogTerm是否比自己新     *     * @param lastLogIndex 最后一条日志索引     * @param lastLogTerm  最后一条日志term     */    boolean isNewerThan(int lastLogIndex, int lastLogTerm);    /**     * 增加一个空日志条目     * 上层服务操作或者当前节点成为Leader后的第一条空日志     *     * @param term     */    NoOpEntry appendEntry(int term);    /**     * 增加一个普通日志条目     *     * @param term     * @param command     */    GeneralEntry appendEntry(int term, byte[] command);    /**     * 追加来自Leader的日志条目     * 收到来自Leader服务器的日志复制请求时     *     * @param prevLogIndex 日志条目的前一个索引     * @param prevLogTerm  日志复制的前一个term     * @param entries      日志条目集合     * @return true if success, false if previous log check failed     */    boolean appendEntriesFromLeader(int prevLogIndex, int prevLogTerm, List entries);    /**     * 推进commitIndex     * 收到来自Leader服务器的日志复制请求时     *     * @param newCommitIndex 新的commitIndex     * @param currentTerm    当前term     */    void advanceCommitIndex(int newCommitIndex, int currentTerm);    /**     * 关闭     */    void close();}


/** * 日志条目序列 */public interface EntrySequence {/**     * 判断是否为空     * @return     */    boolean isEmpty();    /**     * 获取第一条日志的索引     * @return     */    int getFirstLogIndex();    /**     * 获取最后一条日志的索引     * @return     */    int getLastLogIndex();    /**     * 获取下一条日志的索引     * @return     */    int getNextLogIndex();    /**     * 获取序列的子视图,到最后一条日志     * @param fromIndex     * @return     */    List subView(int fromIndex);    /**     * 获取序列的子视图,指定范围[fromIndex, toIndex)     * @param fromIndex     * @param toIndex     * @return     */    List subList(int fromIndex, int toIndex);    /**     * 检查某个日志条目是否存在     * @param index     * @return     */    boolean isEntryPresent(int index);    /**     * 获取某个日志条目的元信息     * @param index     * @return     */    EntryMeta getEntryMeta(int index);    /**     * 获取某个日志条目     * @param index     * @return     */    Entry getEntry(int index);    /**     * 获取最后一个日志条目     * @return     */    Entry getLastEntry();    /**     * 追加日志条目     * @param entry     */    void append(Entry entry);    /**     * 追加多条日志     * @param entries     */    void append(List entries);    /**     * 推进commitIndex     * @param index     */    void commit(int index);    /**     * 获取当前commitIndex     * @return     */    int getCommitIndex();    /**     * 移除某个索引之后的日志条目     * @param index     */    void removeAfter(int index);    /**     * 关闭日志序列     */    void close();}


/** * 日志条目序列抽象类 */public abstract class AbstractEntrySequence implements EntrySequence {//日志索引偏移    protected int logIndexOffset;    //下一条日志的索引    protected int nextLogIndex;    public AbstractEntrySequence(int logIndexOffset) {this.logIndexOffset = logIndexOffset;        this.nextLogIndex = logIndexOffset;    }/**     * 日志索引偏移量在当前的日志条目序列不是从1开始时,不管第一条日志是否存在     * 初始情况下:日志索引偏移 = 下一条日志的索引 = 1     * @return     */    @Override    public boolean isEmpty() {return logIndexOffset == nextLogIndex;    }@Override    public int getFirstLogIndex() {if (isEmpty()) {throw new EmptySequenceException();        }return doGetFirstLogIndex();    }/**     * 获取日志索引偏移     * @return     */    protected int doGetFirstLogIndex() {return logIndexOffset;    }@Override    public int getLastLogIndex() {if (isEmpty()) {throw new EmptySequenceException();        }return doGetLastLogIndex();    }/**     * 获取最后一条日志的索引     * @return     */    protected int doGetLastLogIndex() {return nextLogIndex - 1;    }@Override    public boolean isEntryPresent(int index) {return !isEmpty() && index >= doGetFirstLogIndex() && index <= doGetLastLogIndex();    }@Override    public Entry getEntry(int index) {if (!isEntryPresent(index)) {return null;        }return doGetEntry(index);    }@Override    public EntryMeta getEntryMeta(int index) {        Entry entry = getEntry(index);        return entry != null ? entry.getMeta() : null;    }/**     * 获取指定索引的日志条目     * @param index     * @return     */    protected abstract Entry doGetEntry(int index);    @Override    public Entry getLastEntry() {return isEmpty() ? null : doGetEntry(doGetLastLogIndex());    }@Override    public List subView(int fromIndex) {if (isEmpty() || fromIndex > doGetLastLogIndex()) {return Collections.emptyList();        }return subList(Math.max(fromIndex, doGetFirstLogIndex()), nextLogIndex);    }// [fromIndex, toIndex)    @Override    public List subList(int fromIndex, int toIndex) {if (isEmpty()) {throw new EmptySequenceException();        }if (fromIndex < doGetFirstLogIndex() || toIndex > doGetLastLogIndex() + 1 || fromIndex > toIndex) {throw new IllegalArgumentException("illegal from index " + fromIndex + " or to index " + toIndex);        }return doSubList(fromIndex, toIndex);    }protected abstract List doSubList(int fromIndex, int toIndex);    @Override    public int getNextLogIndex() {return nextLogIndex;    }@Override    public void append(List entries) {for (Entry entry : entries) {            append(entry);        }    }@Override    public void append(Entry entry) {if (entry.getIndex() != nextLogIndex) {throw new IllegalArgumentException("entry index must be " + nextLogIndex);        }        doAppend(entry);        nextLogIndex++;    }protected abstract void doAppend(Entry entry);    @Override    public void removeAfter(int index) {if (isEmpty() || index >= doGetLastLogIndex()) {return;        }        doRemoveAfter(index);    }protected abstract void doRemoveAfter(int index);}


/** * 基于内存实现的日志条目序列 */public class MemoryEntrySequence extends AbstractEntrySequence {private final List entries = new ArrayList<>();    private int commitIndex = 0;    public MemoryEntrySequence() {this(1);    }public MemoryEntrySequence(int logIndexOffset) {super(logIndexOffset);    }@Override    protected List doSubList(int fromIndex, int toIndex) {return entries.subList(fromIndex - logIndexOffset, toIndex - logIndexOffset);    }@Override    protected Entry doGetEntry(int index) {return entries.get(index - logIndexOffset);    }@Override    protected void doAppend(Entry entry) {entries.add(entry);    }@Override    public void commit(int index) {commitIndex = index;    }@Override    public int getCommitIndex() {return commitIndex;    }@Override    protected void doRemoveAfter(int index) {if (index < doGetFirstLogIndex()) {entries.clear();            nextLogIndex = logIndexOffset;        } else {entries.subList(index - logIndexOffset + 1, entries.size()).clear();            nextLogIndex = index + 1;        }    }@Override    public void close() {    }@Override    public String toString() {return "MemoryEntrySequence{" +"logIndexOffset=" + logIndexOffset +", nextLogIndex=" + nextLogIndex +", entries.size=" + entries.size() +'}';    }}




/** * 日志条目文件 */@AllArgsConstructorpublic class EntriesFile {//可定位文件    private final SeekableFile seekableFile;    public EntriesFile(File file) throws FileNotFoundException {this(new RandomAccessFileAdapter(file));    }/**     * 追加日志条目     * @param entry     * @return     * @throws IOException     */    public long appendEntry(Entry entry) throws IOException {long offset = seekableFile.size();        seekableFile.seek(offset);        seekableFile.writeInt(entry.getKind());        seekableFile.writeInt(entry.getIndex());        seekableFile.writeInt(entry.getTerm());        byte[] commandBytes = entry.getCommandBytes();        seekableFile.writeInt(commandBytes.length);        seekableFile.write(commandBytes);        return offset;    }/**     * 从指定偏移加载日志条目     * @param offset     * @param factory     * @return     * @throws IOException     */    public Entry loadEntry(long offset, EntryFactory factory) throws IOException {if (offset > seekableFile.size()) {throw new IllegalArgumentException("offset > size");        }seekableFile.seek(offset);        int kind = seekableFile.readInt();        int index = seekableFile.readInt();        int term = seekableFile.readInt();        int length = seekableFile.readInt();        byte[] bytes = new byte[length];        seekableFile.read(bytes);        return factory.create(kind, index, term, bytes);    }public long size() throws IOException {return seekableFile.size();    }public void clear() throws IOException {        truncate(0L);    }public void truncate(long offset) throws IOException {seekableFile.truncate(offset);    }public void close() throws IOException {seekableFile.close();    }}
/** * 日志条目工厂 */public class EntryFactory {/**     * 创建日志条目对象     * @param kind     * @param index     * @param term     * @param commandBytes     * @return     */    public Entry create(int kind, int index, int term, byte[] commandBytes) {switch (kind) {case Entry.KIND_NO_OP:return new NoOpEntry(index, term);            case Entry.KIND_GENERAL:return new GeneralEntry(index, term, commandBytes);            default:throw new IllegalArgumentException("unexpected entry kind " + kind);        }    }}



/** * 日志条目索引文件 */public class EntryIndexFile implements Iterable {//最大条目索引的偏移    private static final long OFFSET_MAX_ENTRY_INDEX = Integer.BYTES;    //单条日志条目元信息的长度    private static final int LENGTH_ENTRY_INDEX_ITEM = 16;    //可定位的文件    private final SeekableFile seekableFile;    //日志条目数    @Getter    private int entryIndexCount;    //最小日志索引    private int minEntryIndex;    //最大日志索引    private int maxEntryIndex;    //日志条目容器    private Map entryIndexMap = new HashMap<>();    public EntryIndexFile(File file) throws IOException {this(new RandomAccessFileAdapter(file));    }public EntryIndexFile(SeekableFile seekableFile) throws IOException {this.seekableFile = seekableFile;        load();    }/**     * 加载所有日志元信息     * @throws IOException     */    private void load() throws IOException {if (seekableFile.size() == 0L) {entryIndexCount = 0;            return;        }minEntryIndex = seekableFile.readInt();        maxEntryIndex = seekableFile.readInt();        updateEntryIndexCount();        //逐条加载日志元信息到容器        long offset;        int kind;        int term;        for (int i = minEntryIndex; i <= maxEntryIndex; i++) {            offset = seekableFile.readLong();            kind = seekableFile.readInt();            term = seekableFile.readInt();            entryIndexMap.put(i, new EntryIndexItem(i, offset, kind, term));        }    }/**     * 更新日志条目数量     */    private void updateEntryIndexCount() {entryIndexCount = maxEntryIndex - minEntryIndex + 1;    }/**     * 文件是否为空     * @return     */    public boolean isEmpty() {return entryIndexCount == 0;    }public int getMinEntryIndex() {        checkEmpty();        return minEntryIndex;    }private void checkEmpty() {if (isEmpty()) {throw new IllegalStateException("no entry index");        }    }public int getMaxEntryIndex() {        checkEmpty();        return maxEntryIndex;    }/**     * 追加日志条目信息     * @param index     * @param offset     * @param kind     * @param term     * @throws IOException     */    public void appendEntryIndex(int index, long offset, int kind, int term) throws IOException {if (seekableFile.size() == 0L) {//如果文件为空,则写入最小日志条目索引            seekableFile.writeInt(index);            minEntryIndex = index;        } else {//索引检查            if (index != maxEntryIndex + 1) {throw new IllegalArgumentException("index must be " + (maxEntryIndex + 1) + ", but was " + index);            }//跳过最小日志条目索引            seekableFile.seek(OFFSET_MAX_ENTRY_INDEX);        }//写入最大日志条目索引        seekableFile.writeInt(index);        maxEntryIndex = index;        updateEntryIndexCount();        //移动到文件最后        seekableFile.seek(getOffsetOfEntryIndexItem(index));        seekableFile.writeLong(offset);        seekableFile.writeInt(kind);        seekableFile.writeInt(term);        entryIndexMap.put(index, new EntryIndexItem(index, offset, kind, term));    }/**     * 获取指定索引的日志的偏移     * @param index     * @return     */    private long getOffsetOfEntryIndexItem(int index) {return (index - minEntryIndex) * LENGTH_ENTRY_INDEX_ITEM + Integer.BYTES * 2;    }/**     * 清除全部     * @throws IOException     */    public void clear() throws IOException {seekableFile.truncate(0L);        entryIndexCount = 0;        entryIndexMap.clear();    }/**     * 移除某个索引之后的数据     * @param newMaxEntryIndex     * @throws IOException     */    public void removeAfter(int newMaxEntryIndex) throws IOException {//判断是否为空        if (isEmpty() || newMaxEntryIndex >= maxEntryIndex) {return;        }//判断新的maxEntryIndex是否比minEntryIndex小        //如果是则全部移除        if (newMaxEntryIndex < minEntryIndex) {            clear();            return;        }//修改maxEntryIndex        seekableFile.seek(OFFSET_MAX_ENTRY_INDEX);        seekableFile.writeInt(newMaxEntryIndex);        //裁剪文件        seekableFile.truncate(getOffsetOfEntryIndexItem(newMaxEntryIndex + 1));        //移除容器中的元信息        for (int i = newMaxEntryIndex + 1; i <= maxEntryIndex; i++) {entryIndexMap.remove(i);        }maxEntryIndex = newMaxEntryIndex;        entryIndexCount = newMaxEntryIndex - minEntryIndex + 1;    }public long getOffset(int entryIndex) {return get(entryIndex).getOffset();    }public EntryIndexItem get(int entryIndex) {        checkEmpty();        if (entryIndex < minEntryIndex || entryIndex > maxEntryIndex) {throw new IllegalArgumentException("index < min or index > max");        }return entryIndexMap.get(entryIndex);    }/**     * 遍历文件中所有的日志条目元信息     * @return     */    @Override    public Iterator iterator() {if (isEmpty()) {return Collections.emptyIterator();        }return new EntryIndexIterator(entryIndexCount, minEntryIndex);    }public void close() throws IOException {seekableFile.close();    }/**     * 日志条目索引迭代器     */    @AllArgsConstructor    private class EntryIndexIterator implements Iterator {//条目总数        private final int entryIndexCount;        //当前索引        private int currentEntryIndex;        /**         * 是否存在下一条         * @return         */        @Override        public boolean hasNext() {            checkModification();            return currentEntryIndex <= maxEntryIndex;        }/**         * 检查是否修改         */        private void checkModification() {if (this.entryIndexCount != EntryIndexFile.this.entryIndexCount) {throw new IllegalStateException("entry index count changed");            }        }/**         * 获取下一条         * @return         */        @Override        public EntryIndexItem next() {            checkModification();            return entryIndexMap.get(currentEntryIndex++);        }    }}


/** * 文件地址 */public interface LogDir {/**     * 初始化目录     */    void initialize();    /**     * 目录是否存在     * @return     */    boolean exists();    /**     * 获取EntriesFile对应的文件     * @return     */    File getEntriesFile();    /**     * 获取EntryIndexFile对应的文件     * @return     */    File getEntryOffsetIndexFile();    /**     * 获取目录     * @return     */    File get();    /**     * 重命名目录     * @param logDir     * @return     */    boolean renameTo(LogDir logDir);}
/** * 基于文件实现的日志条目序列 */public class FileEntrySequence extends AbstractEntrySequence {//索引条目工厂    private final EntryFactory entryFactory = new EntryFactory();    //日志条目文件    private final EntriesFile entriesFile;    //日志条目索引文件    private final EntryIndexFile entryIndexFile;    //日志条目缓冲    private final LinkedList pendingEntries = new LinkedList<>();    //Raft算法中定义初始commitIndex为0,和日志是否支持持久化无关    private int commitIndex;    public FileEntrySequence(LogDir logDir, int logIndexOffset) {super(logIndexOffset);        try {this.entriesFile = new EntriesFile(logDir.getEntriesFile());            this.entryIndexFile = new EntryIndexFile(logDir.getEntryOffsetIndexFile());            initialize();        } catch (IOException e) {throw new LogException("failed to open entries file or entry index file", e);        }    }public FileEntrySequence(EntriesFile entriesFile, EntryIndexFile entryIndexFile, int logIndexOffset) {super(logIndexOffset);        this.entriesFile = entriesFile;        this.entryIndexFile = entryIndexFile;        initialize();    }/**     * 初始化     */    private void initialize() {if (entryIndexFile.isEmpty()) {commitIndex = logIndexOffset - 1;            return;        }//使用日志索引文件的minEntryIndex作为logIndexOffset        logIndexOffset = entryIndexFile.getMinEntryIndex();        //使用日志索引文件的maxEntryIndex加1作为nextLogOffset        nextLogIndex = entryIndexFile.getMaxEntryIndex() + 1;        commitIndex = entryIndexFile.getMaxEntryIndex();    }@Override    public int getCommitIndex() {return commitIndex;    }/**     * 获取日志条目视图     * @param fromIndex     * @param toIndex     * @return     */    @Override    protected List doSubList(int fromIndex, int toIndex) {//结果分为来自文件的与来自缓冲的两部分        List result = new ArrayList<>();        //从文件中获取日志条目        if (!entryIndexFile.isEmpty() && fromIndex <= entryIndexFile.getMaxEntryIndex()) {int maxIndex = Math.min(entryIndexFile.getMaxEntryIndex() + 1, toIndex);            for (int i = fromIndex; i < maxIndex; i++) {                result.add(getEntryInFile(i));            }        }//从日志缓冲中获取日志条目        if (!pendingEntries.isEmpty() && toIndex > pendingEntries.getFirst().getIndex()) {            Iterator iterator = pendingEntries.iterator();            Entry entry;            int index;            while (iterator.hasNext()) {                entry = iterator.next();                index = entry.getIndex();                if (index >= toIndex) {break;                }if (index >= fromIndex) {                    result.add(entry);                }            }        }return result;    }/**     * 获取指定位置的日志条目     * @param index     * @return     */    @Override    protected Entry doGetEntry(int index) {if (!pendingEntries.isEmpty()) {int firstPendingEntryIndex = pendingEntries.getFirst().getIndex();            if (index >= firstPendingEntryIndex) {return pendingEntries.get(index - firstPendingEntryIndex);            }        }assert !entryIndexFile.isEmpty();        return getEntryInFile(index);    }/**     * 获取日志元信息     * @param index     * @return     */    @Override    public EntryMeta getEntryMeta(int index) {if (!isEntryPresent(index)) {return null;        }if (entryIndexFile.isEmpty()) {return pendingEntries.get(index - doGetFirstLogIndex()).getMeta();        }return entryIndexFile.get(index).toEntryMeta();    }/**     * 按照索引获取文件中的日志条目     * @param index     * @return     */    private Entry getEntryInFile(int index) {long offset = entryIndexFile.getOffset(index);        try {return entriesFile.loadEntry(offset, entryFactory);        } catch (IOException e) {throw new LogException("failed to load entry " + index, e);        }    }/**     * 获取最后一条日志     * @return     */    @Override    public Entry getLastEntry() {if (isEmpty()) {return null;        }if (!pendingEntries.isEmpty()) {return pendingEntries.getLast();        }assert !entryIndexFile.isEmpty();        return getEntryInFile(entryIndexFile.getMaxEntryIndex());    }/**     * 追加日志条目     * @param entry     */    @Override    protected void doAppend(Entry entry) {pendingEntries.add(entry);    }/**     * 提交commitIndex     * @param index     */    @Override    public void commit(int index) {//检查commitIndex        if (index < commitIndex) {throw new IllegalArgumentException("commit index < " + commitIndex);        }if (index == commitIndex) {return;        }if (pendingEntries.isEmpty() || pendingEntries.getLast().getIndex() < index) {throw new IllegalArgumentException("no entry to commit or commit index exceed");        }long offset;        Entry entry = null;        try {for (int i = commitIndex + 1; i <= index; i++) {                entry = pendingEntries.removeFirst();                offset = entriesFile.appendEntry(entry);                entryIndexFile.appendEntryIndex(i, offset, entry.getKind(), entry.getTerm());                commitIndex = i;            }        } catch (IOException e) {throw new LogException("failed to commit entry " + entry, e);        }    }/**     * 移除指定索引之后的日志条目     * @param index     */    @Override    protected void doRemoveAfter(int index) {//只需要移除缓冲中的日志        if (!pendingEntries.isEmpty() && index >= pendingEntries.getFirst().getIndex() - 1) {//移除指定数目的日志条目            //循环方向是从小到大,但是移除是从后往前            //最终移除指定数量的日志条目            for (int i = index + 1; i <= doGetLastLogIndex(); i++) {pendingEntries.removeLast();            }nextLogIndex = index + 1;            return;        }try {if (index >= doGetFirstLogIndex()) {//索引比日志缓冲中的第一条日志小                pendingEntries.clear();                entriesFile.truncate(entryIndexFile.getOffset(index + 1));                entryIndexFile.removeAfter(index);                nextLogIndex = index + 1;                commitIndex = index;            } else {//如果索引比第一条日志的索引都小,则清除所有数据                pendingEntries.clear();                entriesFile.clear();                entryIndexFile.clear();                nextLogIndex = logIndexOffset;                commitIndex = logIndexOffset - 1;            }        } catch (IOException e) {throw new LogException(e);        }    }/**     * 关闭文件序列     */    @Override    public void close() {try {entriesFile.close();            entryIndexFile.close();        } catch (IOException e) {throw new LogException("failed to close", e);        }    }}
  • 日志实现


/** * 日志抽象类 */@Slf4jpublic abstract class AbstractLog implements Log {//日志条目序列    protected EntrySequence entrySequence;    @Override    public EntryMeta getLastEntryMeta() {if (entrySequence.isEmpty()) {return new EntryMeta(Entry.KIND_NO_OP, 0,0);        }return entrySequence.getLastEntry().getMeta();    }@Override    public AppendEntriesRpc createAppendEntriesRpc(int term, NodeId selfId, int nextIndex, int maxEntries) {int nextLogIndex = entrySequence.getNextLogIndex();        if (nextIndex > nextLogIndex) {throw new IllegalArgumentException("illegal next index " + nextIndex);        }        AppendEntriesRpc rpc = new AppendEntriesRpc();        rpc.setMessageId(UUID.randomUUID().toString());        rpc.setTerm(term);        rpc.setLeaderId(selfId);        rpc.setLeaderCommit(entrySequence.getCommitIndex());        Entry entry = entrySequence.getEntry(nextIndex - 1);        if (entry != null) {            rpc.setPrevLogIndex(entry.getIndex());            rpc.setPrevLogTerm(entry.getTerm());        }if (!entrySequence.isEmpty()) {int maxIndex = (maxEntries == ALL_ENTRIES ? nextLogIndex : Math.min(nextLogIndex, nextIndex + maxEntries));            rpc.setEntries(entrySequence.subList(nextIndex, maxIndex));        }return rpc;    }@Override    public int getNextIndex() {return entrySequence.getNextLogIndex();    }@Override    public int getCommitIndex() {return entrySequence.getCommitIndex();    }@Override    public boolean isNewerThan(int lastLogIndex, int lastLogTerm) {        EntryMeta lastEntryMeta = getLastEntryMeta();        log.debug("last entry ({}, {}), candidate ({}, {})", lastEntryMeta.getIndex(), lastEntryMeta.getTerm(), lastLogIndex, lastLogTerm);        return lastEntryMeta.getTerm() > lastLogTerm || lastEntryMeta.getIndex() > lastLogIndex;    }@Override    public NoOpEntry appendEntry(int term) {        NoOpEntry entry = new NoOpEntry(entrySequence.getNextLogIndex(), term);        entrySequence.append(entry);        return entry;    }@Override    public GeneralEntry appendEntry(int term, byte[] command) {        GeneralEntry entry = new GeneralEntry(entrySequence.getNextLogIndex(), term, command);        entrySequence.append(entry);        return entry;    }/**     * 追加从主节点来的日志条目     * 在追加之前需要移除不一致的日志条目。移除时从最后一条匹配的日志条目开始,     * 之后所有冲突的日志条目都会被移除     * @param prevLogIndex 日志条目的前一个索引     * @param prevLogTerm  日志复制的前一个term     * @param leaderEntries     * @return     */    @Override    public boolean appendEntriesFromLeader(int prevLogIndex, int prevLogTerm, List leaderEntries) {//检查前一条日志是否匹配        if (!checkIfPreviousLogMatches(prevLogIndex, prevLogTerm)) {return false;        }//Leader节点传递过来的日志条目为空        if (leaderEntries.isEmpty()) {return true;        }assert prevLogIndex + 1 == leaderEntries.get(0).getIndex();        //移除冲突的日志条目并返回接下来要追加的日志条目(如果还有的话)        EntrySequenceView newEntries = removeUnmatchedLog(new EntrySequenceView(leaderEntries));        //仅追加日志        appendEntriesFromLeader(newEntries);        return true;    }/**     * 追加全部日志     * @param leaderEntries     */    private void appendEntriesFromLeader(EntrySequenceView leaderEntries) {if (leaderEntries.isEmpty()) {return;        }log.debug("append entries from leader from {} to {}", leaderEntries.getFirstLogIndex(), leaderEntries.getLastLogIndex());        Iterator leaderEntriesIterator = leaderEntries.iterator();        while (leaderEntriesIterator.hasNext()) {entrySequence.append(leaderEntriesIterator.next());        }    }/**     * 移除冲突的日志条目     * @param leaderEntries     * @return     */    private EntrySequenceView removeUnmatchedLog(EntrySequenceView leaderEntries) {//Leader节点过来的entries不应该为空        assert !leaderEntries.isEmpty();        //找到第一个不匹配的日志索引        int firstUnmatched = findFirstUnmatchedLog(leaderEntries);        //没有不匹配的日志        if (firstUnmatched < 0) {return new EntrySequenceView(Collections.emptyList());        }//移除不匹配的日志索引开始的所有日志        removeEntriesAfter(firstUnmatched - 1);        //返回之后追加的日志条目        return leaderEntries.subView(firstUnmatched);    }/**     * 查找第一条不匹配的日志     * @param leaderEntries     * @return     */    private int findFirstUnmatchedLog(EntrySequenceView leaderEntries) {//Leader节点过来的entries不应该为空        assert !leaderEntries.isEmpty();        int logIndex;        EntryMeta followerEntryMeta;        Iterator entryIterator = leaderEntries.iterator();        while (entryIterator.hasNext()) {            Entry leaderEntry = entryIterator.next();            logIndex = leaderEntry.getIndex();            //按照索引查找日志条目信息            followerEntryMeta = entrySequence.getEntryMeta(logIndex);            //日志不存在或者term不一致            if (followerEntryMeta == null || followerEntryMeta.getTerm() != leaderEntry.getTerm()) {return logIndex;            }        }return -1;    }/**     * 检查前一条日志是否匹配     * @param prevLogIndex     * @param prevLogTerm     * @return     */    private boolean checkIfPreviousLogMatches(int prevLogIndex, int prevLogTerm) {//检查指定索引的日志条目        Entry entry = entrySequence.getEntry(prevLogIndex);        //日志不存在        if (entry == null) {log.debug("previous log {} not found", prevLogIndex);            return false;        }int term = entry.getTerm();        if (term != prevLogTerm) {log.debug("different term of previous log, local {}, remote {}", term, prevLogTerm);            return false;        }return true;    }/**     * 移除不匹配的索引之后的日志条目     * @param index     */    private void removeEntriesAfter(int index) {if (entrySequence.isEmpty() || index >= entrySequence.getLastLogIndex()) {return;        }log.debug("remove entries after {}", index);        entrySequence.removeAfter(index);    }/**     * 推进commitIndex     * @param newCommitIndex 新的commitIndex     * @param currentTerm    当前term     */    @Override    public void advanceCommitIndex(int newCommitIndex, int currentTerm) {if (!validateNewCommitIndex(newCommitIndex, currentTerm)) {return;        }log.debug("advance commit index from {} to {}", entrySequence.getCommitIndex(), newCommitIndex);        entrySequence.commit(newCommitIndex);    }/**     * 检查新的commitIndex     * @param newCommitIndex     * @param currentTerm     * @return     */    private boolean validateNewCommitIndex(int newCommitIndex, int currentTerm) {//小于当前的commitIndex        if (newCommitIndex <= entrySequence.getCommitIndex()) {return false;        }        Entry entry = entrySequence.getEntry(newCommitIndex);        if (entry == null) {log.debug("log of new commit index {} not found", newCommitIndex);            return false;        }//日志条目的term必须是当前term,才可推进commitIndex        if (entry.getTerm() != currentTerm) {log.debug("log term of new commit index != current term ({} != {})", entry.getTerm(), currentTerm);            return false;        }return true;    }@Override    public void close() {entrySequence.close();    }/**     * 日志条目序列视图     */    private static class EntrySequenceView implements Iterable {private final List entries;        @Getter        private int firstLogIndex;        @Getter        private int lastLogIndex;        EntrySequenceView(List entries) {this.entries = entries;            if (!entries.isEmpty()) {firstLogIndex = entries.get(0).getIndex();                lastLogIndex = entries.get(entries.size() - 1).getIndex();            }        }        Entry get(int index) {if (entries.isEmpty() || index < firstLogIndex || index > lastLogIndex) {return null;            }return entries.get(index - firstLogIndex);        }boolean isEmpty() {return entries.isEmpty();        }        EntrySequenceView subView(int fromIndex) {if (entries.isEmpty() || fromIndex > lastLogIndex) {return new EntrySequenceView(Collections.emptyList());            }return new EntrySequenceView(entries.subList(fromIndex - firstLogIndex, entries.size())            );        }@Override        public Iterator iterator() {return entries.iterator();        }    }}


/** * 基于内存的日志 */public class MemoryLog extends AbstractLog {public MemoryLog(EntrySequence entrySequence) {this.entrySequence = entrySequence;    }public MemoryLog() {this(new MemoryEntrySequence());    }}


/** * 抽象文件地址 */@AllArgsConstructorpublic abstract class AbstractLogDir implements LogDir {protected final File dir;    @Override    public void initialize() {if (!dir.exists() && !dir.mkdir()) {throw new LogException("failed to create directory " + dir);        }try {            Files.touch(getEntriesFile());            Files.touch(getEntryOffsetIndexFile());        } catch (IOException e) {throw new LogException("failed to create file", e);        }    }@Override    public boolean exists() {return dir.exists();    }@Override    public File getEntriesFile() {return new File(dir, RootDir.FILE_NAME_ENTRIES);    }@Override    public File getEntryOffsetIndexFile() {return new File(dir, RootDir.FILE_NAME_ENTRY_OFFSET_INDEX);    }@Override    public File get() {return dir;    }@Override    public boolean renameTo(LogDir logDir) {return dir.renameTo(logDir.get());    }}
/** * 日志代 * 日志根目录下存在多个日志的分代 * log-root *      |-log-1 *      |   |-entries.bin *      |   /-entries.idx *      /-log-100 *          |-entries.bin *          /-entries.idx * 上面log-1、log-100是两个日志代,数字是日志索引偏移lastIncludedIndex */public class LogGeneration extends AbstractLogDir implements Comparable {//前缀正则匹配    private static final Pattern DIR_NAME_PATTERN = Pattern.compile("log-(\\d+)");    //最新的日志代索引偏移    @Getter    private final int lastIncludedIndex;    public LogGeneration(File baseDir, int lastIncludedIndex) {super(new File(baseDir, generateDirName(lastIncludedIndex)));        this.lastIncludedIndex = lastIncludedIndex;    }public LogGeneration(File dir) {super(dir);        Matcher matcher = DIR_NAME_PATTERN.matcher(dir.getName());        if (!matcher.matches()) {throw new IllegalArgumentException("not a directory name of log generation, [" + dir.getName() + "]");        }lastIncludedIndex = Integer.parseInt(matcher.group(1));    }/**     * 日志地址是否有效     * @param dirName     * @return     */    public static boolean isValidDirName(String dirName) {return DIR_NAME_PATTERN.matcher(dirName).matches();    }/**     * 获取日志代名称     * @param lastIncludedIndex     * @return     */    private static String generateDirName(int lastIncludedIndex) {return "log-" + lastIncludedIndex;    }/**     * 比较日志代的大小     * @param o     * @return     */    @Override    public int compareTo(LogGeneration o) {return Integer.compare(lastIncludedIndex, o.lastIncludedIndex);    }}
/** * 普通日志地址 */@ToStringpublic class NormalLogDir extends AbstractLogDir {public NormalLogDir(File dir) {super(dir);    }}
/** * 根目录 */@Slf4jpublic class RootDir {//日志条目文件名    public static final String FILE_NAME_ENTRIES = "entries.bin";    //日志索引条目文件名    public static final String FILE_NAME_ENTRY_OFFSET_INDEX = "entries.idx";    //分代目录名    private static final String DIR_NAME_GENERATING = "generating";    //根目录    private final File baseDir;    public RootDir(File baseDir) {if (!baseDir.exists()) {throw new IllegalArgumentException("dir " + baseDir + " not exists");        }this.baseDir = baseDir;    }public LogDir getLogDirForGenerating() {return getOrCreateNormalLogDir(DIR_NAME_GENERATING);    }/**     * 获取或创建普通日志地址     * @param name     * @return     */    private NormalLogDir getOrCreateNormalLogDir(String name) {        NormalLogDir logDir = new NormalLogDir(new File(baseDir, name));        if (!logDir.exists()) {            logDir.initialize();        }return logDir;    }/**     * 重命名日志代     * @param dir     * @param lastIncludedIndex     * @return     */    public LogDir rename(LogDir dir, int lastIncludedIndex) {        LogGeneration destDir = new LogGeneration(baseDir, lastIncludedIndex);        if (destDir.exists()) {throw new IllegalStateException("failed to rename, dest dir " + destDir + " exists");        }log.info("rename dir {} to {}", dir, destDir);        if (!dir.renameTo(destDir)) {throw new IllegalStateException("failed to rename " + dir + " to " + destDir);        }return destDir;    }/**     * 创建第一个日志代     * @return     */    public LogGeneration createFirstGeneration() {        LogGeneration generation = new LogGeneration(baseDir, 0);        generation.initialize();        return generation;    }/**     * 获取最新的日志代     * @return     */    public LogGeneration getLatestGeneration() {        File[] files = baseDir.listFiles();        if (files == null) {return null;        }        LogGeneration latest = null;        String fileName;        LogGeneration generation;        for (File file : files) {if (!file.isDirectory()) {continue;            }            fileName = file.getName();            if (DIR_NAME_GENERATING.equals(fileName) || !LogGeneration.isValidDirName(fileName)) {continue;            }            generation = new LogGeneration(file);            if (latest == null || generation.compareTo(latest) > 0) {                latest = generation;            }        }return latest;    }}
/** * 基于文件的日志 */public class FileLog extends AbstractLog {private final RootDir rootDir;    public FileLog(File baseDir) {rootDir = new RootDir(baseDir);        LogGeneration latestGeneration = rootDir.getLatestGeneration();        if (latestGeneration != null) {entrySequence = new FileEntrySequence(latestGeneration,                    latestGeneration.getLastIncludedIndex());        }else {            LogGeneration firstGeneration = rootDir.createFirstGeneration();            entrySequence = new FileEntrySequence(firstGeneration,1);        }    }}
