golang etcd raft协议是怎样的
发表于:2024-11-26 作者:千家信息网编辑
千家信息网最后更新 2024年11月26日,今天就跟大家聊聊有关golang etcd raft协议是怎样的,可能很多人都不太了解,为了让大家更加了解,小编给大家总结了以下内容,希望大家根据这篇文章可以有所收获。raft分布式一致性算法分布式存
千家信息网最后更新 2024年11月26日golang etcd raft协议是怎样的
今天就跟大家聊聊有关golang etcd raft协议是怎样的,可能很多人都不太了解,为了让大家更加了解,小编给大家总结了以下内容,希望大家根据这篇文章可以有所收获。
raft分布式一致性算法
分布式存储系统通常会通过维护多个副本来进行容错,以提高系统的可用性。这就引出了分布式存储系统的核心问题--如何保证多个副本的一致性?Raft算法把问题分解成了四个子问题:1. 领袖选举(leader election)、2. 日志复制(log replication)、3. 安全性(safety)4. 成员关系变化(membership changes)这几个子问题。源码gitee地址:https://gitee.com/ioly/learning.gooop
目标
根据raft协议,实现高可用分布式强一致的kv存储
子目标(Day 11)
虽然Leader State还有细节没处理完,但应该能启动并提供基本服务了
添加外围功能,为首次"点火"做准备:
config/tRaftConfig:从本地json文件读取集群节点配置,提供IRaftConfig/IRaftNodeConfig的实现
lsm/tRaftLSMImplement: 提供对顶层接口IRaftLSM的实现,将"配置/kv存储/节点通讯"三大块粘合起来
server/IRaftKVServer:server启动器接口
server/tRaftKVServer: server启动器的实现,监听raft rpc和kv rpc
config/tRaftConfig.go
从本地json文件读取集群节点配置,提供IRaftConfig/IRaftNodeConfig的实现
package configimport ( "encoding/json" "os")type tRaftConfig struct { ID string Nodes []*tRaftNodeConfig}type tRaftNodeConfig struct { ID string Endpoint string}func (me *tRaftConfig) GetID() string { return me.ID}func (me *tRaftConfig) GetNodes() []IRaftNodeConfig { a := make([]IRaftNodeConfig, len(me.Nodes)) for i,it := range me.Nodes { a[i] = it } return a}func (me *tRaftNodeConfig) GetID() string { return me.ID}func (me *tRaftNodeConfig) GetEndpoint() string { return me.Endpoint}func LoadJSONFile(file string) IRaftConfig { data, err := os.ReadFile(file) if err != nil { panic(err) } c := new(tRaftConfig) err = json.Unmarshal(data, c) if err != nil { panic(err) } return c}
lsm/tRaftLSMImplement.go
提供对顶层接口IRaftLSM的实现,将"配置/kv存储/节点通讯"三大块粘合起来,并添加诊断日志。
package lsmimport ( "learning/gooop/etcd/raft/common" "learning/gooop/etcd/raft/config" "learning/gooop/etcd/raft/logger" "learning/gooop/etcd/raft/rpc" "learning/gooop/etcd/raft/rpc/client" "learning/gooop/etcd/raft/store" "sync")type tRaftLSMImplement struct { tEventDrivenModel mInitOnce sync.Once mConfig config.IRaftConfig mStore store.ILogStore mClientService client.IRaftClientService mState IRaftState}// trigger: init()// args: emptyconst meInit = "lsm.Init"// trigger: HandleStateChanged()// args: IRaftStateconst meStateChanged = "lsm.StateChnaged"func (me *tRaftLSMImplement) init() { me.mInitOnce.Do(func() { me.initEventHandlers() me.raise(meInit) })}func (me *tRaftLSMImplement) initEventHandlers() { // write only me.hookEventsForConfig() me.hookEventsForStore() me.hookEventsForPeerService() me.hookEventsForState()}func (me *tRaftLSMImplement) hookEventsForConfig() { me.hook(meInit, func(e string, args ...interface{}) { logger.Logf("tRaftLSMImplement.init, ConfigFile = %v", common.ConfigFile) me.mConfig = config.LoadJSONFile(common.ConfigFile) })}func (me *tRaftLSMImplement) hookEventsForStore() { me.hook(meInit, func(e string, args ...interface{}) { logger.Logf("tRaftLSMImplement.init, DataFile = %v", common.DataFile) err, db := store.NewBoltStore(common.DataFile) if err != nil { panic(err) } me.mStore = db })}func (me *tRaftLSMImplement) hookEventsForPeerService() { me.hook(meInit, func(e string, args ...interface{}) { me.mClientService = client.NewRaftClientService(me.mConfig) })}func (me *tRaftLSMImplement) hookEventsForState() { me.hook(meInit, func(e string, args ...interface{}) { me.mState = newFollowerState(me, me.mStore.LastCommittedTerm()) me.mState.Start() }) me.hook(meStateChanged, func(e string, args ...interface{}) { state := args[0].(IRaftState) logger.Logf("tRaftLSMImplement.StateChanged, %v", state.Role()) me.mState = state state.Start() })}func (me *tRaftLSMImplement) Config() config.IRaftConfig { return me.mConfig}func (me *tRaftLSMImplement) Store() store.ILogStore { return me.mStore}func (me *tRaftLSMImplement) HandleStateChanged(state IRaftState) { me.raise(meStateChanged, state)}func (me *tRaftLSMImplement) RaftClientService() client.IRaftClientService { return me.mClientService}func (me *tRaftLSMImplement) Heartbeat(cmd *rpc.HeartbeatCmd, ret *rpc.HeartbeatRet) error { state := me.mState e := state.Heartbeat(cmd, ret) logger.Logf("tRaftLSMImplement.Heartbeat, state=%v, cmd=%v, ret=%v, err=%v", cmd, ret, e) return e}func (me *tRaftLSMImplement) AppendLog(cmd *rpc.AppendLogCmd, ret *rpc.AppendLogRet) error { state := me.mState e := state.AppendLog(cmd, ret) logger.Logf("tRaftLSMImplement.AppendLog, state=%v, cmd=%v, ret=%v, err=%v", cmd, ret, e) return e}func (me *tRaftLSMImplement) CommitLog(cmd *rpc.CommitLogCmd, ret *rpc.CommitLogRet) error { state := me.mState e := state.CommitLog(cmd, ret) logger.Logf("tRaftLSMImplement.CommitLog, state=%v, cmd=%v, ret=%v, err=%v", cmd, ret, e) return e}func (me *tRaftLSMImplement) RequestVote(cmd *rpc.RequestVoteCmd, ret *rpc.RequestVoteRet) error { state := me.mState e := state.RequestVote(cmd, ret) logger.Logf("tRaftLSMImplement.RequestVote, state=%v, cmd=%v, ret=%v, err=%v", cmd, ret, e) return e}func (me *tRaftLSMImplement) ExecuteKVCmd(cmd *rpc.KVCmd, ret *rpc.KVRet) error { state := me.mState e := state.ExecuteKVCmd(cmd, ret) logger.Logf("tRaftLSMImplement.ExecuteKVCmd, state=%v, cmd=%v, ret=%v, err=%v", cmd, ret, e) return e}func (me *tRaftLSMImplement) State() IRaftState { return me.mState}func NewRaftLSM() IRaftLSM { it := new(tRaftLSMImplement) it.init() return it}
server/IRaftKVServer.go
server启动器接口
package servertype IRaftKVServer interface { BeginServeTCP(port int) error}
server/tRaftKVServer.go
server启动器的实现,监听raft rpc和kv rpc
package serverimport ( "fmt" "learning/gooop/etcd/raft/lsm" rrpc "learning/gooop/etcd/raft/rpc" "learning/gooop/saga/mqs/logger" "net" "net/rpc" "time")type tRaftKVServer intfunc (me *tRaftKVServer) BeginServeTCP(port int) error { logger.Logf("tRaftKVServer.BeginServeTCP, starting, port=%v", port) // resolve address addy, err := net.ResolveTCPAddr("tcp", fmt.Sprintf("0.0.0.0:%d", port)) if err != nil { return err } // create raft lsm singleton raftLSM := lsm.NewRaftLSM() // register raft rpc server rserver := &RaftRPCServer { mRaftLSM : raftLSM, } err = rpc.Register(rserver) if err != nil { return err } // register kv rpc server kserver := &KVStoreRPCServer{ mRaftLSM: raftLSM, } err = rpc.Register(kserver) if err != nil { return err } inbound, err := net.ListenTCP("tcp", addy) if err != nil { return err } go rpc.Accept(inbound) logger.Logf("tRaftKVServer.BeginServeTCP, service ready at port=%v", port) return nil}// RaftRPCServer exposes a raft rpc servicetype RaftRPCServer struct { mRaftLSM lsm.IRaftLSM}// Heartbeat leader to followerfunc (me *RaftRPCServer) Heartbeat(cmd *rrpc.HeartbeatCmd, ret *rrpc.HeartbeatRet) error { e := me.mRaftLSM.Heartbeat(cmd, ret) logger.Logf("RaftRPCServer.Heartbeat, cmd=%v, ret=%v, e=%v", cmd, ret, e) return e}// AppendLog leader to followerfunc (me *RaftRPCServer) AppendLog(cmd *rrpc.AppendLogCmd, ret *rrpc.AppendLogRet) error { e := me.mRaftLSM.AppendLog(cmd, ret) logger.Logf("RaftRPCServer.AppendLog, cmd=%v, ret=%v, e=%v", cmd, ret, e) return e}// CommitLog leader to followerfunc (me *RaftRPCServer) CommitLog(cmd *rrpc.CommitLogCmd, ret *rrpc.CommitLogRet) error { e := me.mRaftLSM.CommitLog(cmd, ret) logger.Logf("RaftRPCServer.CommitLog, cmd=%v, ret=%v, e=%v", cmd, ret, e) return e}// RequestVote candidate to othersfunc (me *RaftRPCServer) RequestVote(cmd *rrpc.RequestVoteCmd, ret *rrpc.RequestVoteRet) error { e := me.mRaftLSM.RequestVote(cmd, ret) logger.Logf("RaftRPCServer.RequestVote, cmd=%v, ret=%v, e=%v", cmd, ret, e) return e}// Ping to keep alivefunc (me *RaftRPCServer) Ping(cmd *rrpc.PingCmd, ret *rrpc.PingRet) error { ret.SenderID = me.mRaftLSM.Config().GetID() ret.Timestamp = time.Now().UnixNano() logger.Logf("RaftRPCServer.Ping, cmd=%v, ret=%v", cmd, ret) return nil}// KVStoreRPCServer expose a kv storage servicetype KVStoreRPCServer struct { mRaftLSM lsm.IRaftLSM}// ExecuteKVCmd leader to followerfunc (me *KVStoreRPCServer) ExecuteKVCmd(cmd *rrpc.KVCmd, ret *rrpc.KVRet) error { e := me.mRaftLSM.ExecuteKVCmd(cmd, ret) logger.Logf("KVStoreRPCServer.ExecuteKVCmd, cmd=%v, ret=%v, e=%v", cmd, ret, e) return e}
(未完待续)
看完上述内容,你们对golang etcd raft协议是怎样的有进一步的了解吗?如果还想了解更多知识或者相关内容,请关注行业资讯频道,感谢大家的支持。
存储
分布式
启动器
接口
节点
问题
配置
一致
内容
系统
一致性
三大
个子
副本
多个
文件
日志
目标
算法
通讯
数据库的安全要保护哪些东西
数据库安全各自的含义是什么
生产安全数据库录入
数据库的安全性及管理
数据库安全策略包含哪些
海淀数据库安全审计系统
建立农村房屋安全信息数据库
易用的数据库客户端支持安全管理
连接数据库失败ssl安全错误
数据库的锁怎样保障安全
工业自动化与网络安全
sql 数据库唯一性约束
服务器南昌公司
大学生网络安全与诈骗观后感
网络技术分析数据
网络安全友商加班
虚拟机配置与管理数字证书服务器
普陀区网络技术服务前景
大学生软件开发毕业论文
计算机二级有没有网络技术
传奇四川服务器
服务器额安全等级
卡通网络安全
中国联通网络安全宣传周
杭州炫方网络技术
关于单位网络安全的知识竞赛
麻将软件开发设计
亚马逊 云服务器 中国
软件工程可以从事网络安全工作吗
软件开发报价目录
云计算和云服务器
软件开发公司年度总结范文
密码编码学与网络安全学习
网络技术专业毕业选题
数据库未打开 仅允许
msmq服务器下载
软件开发部单一项目考核方案
dmp数据库文件
网络安全法第几条网络暴力
法意数据库答案