如何理解tcpServer 中的IOLoop方法
发表于:2025-01-24 作者:千家信息网编辑
千家信息网最后更新 2025年01月24日,本篇文章为大家展示了如何理解tcpServer 中的IOLoop方法,内容简明扼要并且容易理解,绝对能使你眼前一亮,通过这篇文章的详细介绍希望你能有所收获。今天我们就分析一下IOLoop这个方法废话不
千家信息网最后更新 2025年01月24日如何理解tcpServer 中的IOLoop方法
本篇文章为大家展示了如何理解tcpServer 中的IOLoop方法,内容简明扼要并且容易理解,绝对能使你眼前一亮,通过这篇文章的详细介绍希望你能有所收获。
今天我们就分析一下IOLoop
这个方法
废话不多说,直接上代码吧(代码位于nsq/nsqlookupd/lookup_protocol_v1.go这个文件中)
//这段代码位于nsq/nsqlookupd/client_v1.go这个文件中type ClientV1 struct { net.Conn //组合net.Conn接口 peerInfo *PeerInfo //client的信息也就是前面所讲的product的信息}//初始化一个ClientV1func NewClientV1(conn net.Conn) *ClientV1 { return &ClientV1{ Conn: conn, }}//实现String接口func (c *ClientV1) String() string { return c.RemoteAddr().String()}//定义ClientV1结束type LookupProtocolV1 struct { ctx *Context //一直贯穿整个代码的Context,具体可翻看前面章节}func (p *LookupProtocolV1) IOLoop(conn net.Conn) error { var err error var line string client := NewClientV1(conn) //新建一个client版本为V1 err = nil reader := bufio.NewReader(client) //由client 创建一个 带有buffer 的Reader 默认 buffer size 为4096,这里的NewReader参数为io.Reader 接口,为何net.Conn接口也能作为参数呢?因为net.Conn 接口其实也是实现了io.Reader接口了,具体概念大家可翻看golang的教程 for { line, err = reader.ReadString('\n') //按行读取 if err != nil { break } line = strings.TrimSpace(line) //去掉这行两头的空格符 params := strings.Split(line, " ") //字符串按一个空格字符串分割,并获取相应的Commad 以及 该command 的相应的params response, err := p.Exec(client, reader, params) //执行相应的Command if err != nil { ctx := "" if parentErr := err.(protocol.ChildErr).Parent(); parentErr != nil { ctx = " - " + parentErr.Error() } p.ctx.nsqlookupd.logf("ERROR: [%s] - %s%s", client, err, ctx) _, err = protocol.SendResponse(client, []byte(err.Error())) //返回错误给client if err != nil { break } // errors of type FatalClientErr should forceably close the connection if _, ok := err.(*protocol.FatalClientErr); ok { break } continue } if response != nil { _, err = protocol.SendResponse(client, response) //返回命令处理结果给client if err != nil { break } } } //for 循环结束了 说明程序要退出了,调用RegistrationDB 中的 RemoveProducer从producer 的注册数据中删除 producer信息 //这里的RegistrationDB下章再具体讲解 p.ctx.nsqlookupd.logf("CLIENT(%s): closing", client) if client.peerInfo != nil { registrations := p.ctx.nsqlookupd.DB.LookupRegistrations(client.peerInfo.id) for _, r := range registrations { if removed, _ := p.ctx.nsqlookupd.DB.RemoveProducer(r, client.peerInfo.id); removed { p.ctx.nsqlookupd.logf("DB: client(%s) UNREGISTER category:%s key:%s subkey:%s", client, r.Category, r.Key, r.SubKey) } } } return err}//这个方法就是执行相应的命令动作 有 PING IDENTIFY REGISTER UNREGISTER 这四个类型func (p *LookupProtocolV1) Exec(client *ClientV1, reader *bufio.Reader, params []string) ([]byte, error) { switch params[0] { case "PING": //用于client的心跳处理 return p.PING(client, params) case "IDENTIFY": //用于client端的信息注册,执行PING REGISTER UNREGISTER 命令前必须先执行此命令 return p.IDENTIFY(client, reader, params[1:]) case "REGISTER": //用于client端注册topic以及channel的命令 return p.REGISTER(client, reader, params[1:]) case "UNREGISTER": //执行与REGISTER命令相反的逻辑 return p.UNREGISTER(client, reader, params[1:]) } return nil, protocol.NewFatalClientErr(nil, "E_INVALID", fmt.Sprintf("invalid command %s", params[0]))}//INDENTIFY命令处理逻辑//该命令用于注册client的producer信息,并返回nsqlookupd的TCP 以及 HTTP 端口信息给client//大致的报文如下// V1 INDENTIFY\n 注意了这里前面提过的V1前面是两个空格字符//123\n 这里是后面json数据(producer 信息的json数据的字节长度)//{....}\n 一串表示producer信息的json数据,具体的可参考 nsq/nsqlookupd/registration_db.go文件中的PeerInfo structfunc (p *LookupProtocolV1) IDENTIFY(client *ClientV1, reader *bufio.Reader, params []string) ([]byte, error) { var err error if client.peerInfo != nil { //如果有该client 的PeerInfo数据则返回错误 return nil, protocol.NewFatalClientErr(err, "E_INVALID", "cannot IDENTIFY again") } var bodyLen int32 err = binary.Read(reader, binary.BigEndian, &bodyLen) //获取producer PeerInfo json数据的字节长度 大端二进制格式 if err != nil { return nil, protocol.NewFatalClientErr(err, "E_BAD_BODY", "IDENTIFY failed to read body size") } body := make([]byte, bodyLen) //初始化一个producer PeerInfo json数据长度的bytes _, err = io.ReadFull(reader, body) //读取所有的json数据 if err != nil { return nil, protocol.NewFatalClientErr(err, "E_BAD_BODY", "IDENTIFY failed to read body") } // body is a json structure with producer information peerInfo := PeerInfo{id: client.RemoteAddr().String()} //实例化一个PeerInfo err = json.Unmarshal(body, &peerInfo) //解析producer PeerInfo json数据到peerInfo if err != nil { //json 数据解析失败 返回错误 return nil, protocol.NewFatalClientErr(err, "E_BAD_BODY", "IDENTIFY failed to decode JSON body") } peerInfo.RemoteAddress = client.RemoteAddr().String() //获取PeerInfo remote address // require all fields //校验获取的PeerInfo 数据 if peerInfo.BroadcastAddress == "" || peerInfo.TCPPort == 0 || peerInfo.HTTPPort == 0 || peerInfo.Version == "" { return nil, protocol.NewFatalClientErr(nil, "E_BAD_BODY", "IDENTIFY missing fields") } //将当前系统时间(纳秒)更新到PeerInfo 中的lastUpdate中 用于 client的心跳判断 atomic.StoreInt64(&peerInfo.lastUpdate, time.Now().UnixNano()) p.ctx.nsqlookupd.logf("CLIENT(%s): IDENTIFY Address:%s TCP:%d HTTP:%d Version:%s", client, peerInfo.BroadcastAddress, peerInfo.TCPPort, peerInfo.HTTPPort, peerInfo.Version) client.peerInfo = &peerInfo //注册producer PeerInfo 信息到 RegistrationDB中 其中Registration的 Category 为client Key 和 SubKey为空 if p.ctx.nsqlookupd.DB.AddProducer(Registration{"client", "", ""}, &Producer{peerInfo: client.peerInfo}) { p.ctx.nsqlookupd.logf("DB: client(%s) REGISTER category:%s key:%s subkey:%s", client, "client", "", "") } // build a response //将nsqlookupd的TCP端口,HTTP端口信息,版本信息,broadcast address信息,以及host name信息 已json数据格式返回给client data := make(map[string]interface{}) data["tcp_port"] = p.ctx.nsqlookupd.RealTCPAddr().Port data["http_port"] = p.ctx.nsqlookupd.RealHTTPAddr().Port data["version"] = version.Binary hostname, err := os.Hostname() if err != nil { log.Fatalf("ERROR: unable to get hostname %s", err) } data["broadcast_address"] = p.ctx.nsqlookupd.opts.BroadcastAddress data["hostname"] = hostname response, err := json.Marshal(data) if err != nil { p.ctx.nsqlookupd.logf("ERROR: marshaling %v", data) return []byte("OK"), nil } return response, nil}//获取params中的topic 以及 channelfunc getTopicChan(command string, params []string) (string, string, error) { if len(params) == 0 { return "", "", protocol.NewFatalClientErr(nil, "E_INVALID", fmt.Sprintf("%s insufficient number of params", command)) } topicName := params[0] var channelName string if len(params) >= 2 { channelName = params[1] } //校验是否是topic if !protocol.IsValidTopicName(topicName) { return "", "", protocol.NewFatalClientErr(nil, "E_BAD_TOPIC", fmt.Sprintf("%s topic name '%s' is not valid", command, topicName)) } //校验是否是channel if channelName != "" && !protocol.IsValidChannelName(channelName) { return "", "", protocol.NewFatalClientErr(nil, "E_BAD_CHANNEL", fmt.Sprintf("%s channel name '%s' is not valid", command, channelName)) } return topicName, channelName, nil}//REGISTER 命令 用于注册client的topic 以及 channel信息//一个topic下可以有多个channel//一个消费者订阅的是一个topic 那么 生成者给这个topic 下的channel的信息 这个消费者也能接收得到这个信息,如果消费者订阅的不是这个channel的信息,那么这个消费者则接受不到这个信息//nsq topic 与channel的关系,大家可以多搜索下资料,我这里感觉讲的也不太清晰,请大家谅解一下//REGISTER 命令必须在INDENTIFY 之后才能调用//具体协议报文如下//REGISTER topic1\n 这个只创建一个名字为topic1的topic//或如下报文//REGISTER topic1 channel1\n 这个只创建一个名字为topic1的topic 且topic1下面创建一个名字为channel1的channelfunc (p *LookupProtocolV1) REGISTER(client *ClientV1, reader *bufio.Reader, params []string) ([]byte, error) { if client.peerInfo == nil { return nil, protocol.NewFatalClientErr(nil, "E_INVALID", "client must IDENTIFY") } //获取REGISTER 命令时的topic 以及channel名字 topic, channel, err := getTopicChan("REGISTER", params) if err != nil { return nil, err } //如果有channel if channel != "" { //添加channel信息到RegistrationDB中其中Registration的 Category 为"channel"字符串,Key为topic,SubKey为channel key := Registration{"channel", topic, channel} if p.ctx.nsqlookupd.DB.AddProducer(key, &Producer{peerInfo: client.peerInfo}) { p.ctx.nsqlookupd.logf("DB: client(%s) REGISTER category:%s key:%s subkey:%s", client, "channel", topic, channel) } } //添加topic信息到RegistrationDB中其中Registration的 Category 为"topic"字符串,Key为topic,SubKey为空 key := Registration{"topic", topic, ""} if p.ctx.nsqlookupd.DB.AddProducer(key, &Producer{peerInfo: client.peerInfo}) { p.ctx.nsqlookupd.logf("DB: client(%s) REGISTER category:%s key:%s subkey:%s", client, "topic", topic, "") } //返回OK return []byte("OK"), nil}//UNREGISTER命令用于取消注册topic 或取消注册某个topic下的某一个channel//报文格式如下//UNREGISTER topic1 channel1\n 这个报文表示取消注册名字为topic1的topic下的名字为channel1的channel,这个名字 只取消注册这个channel1,不取消注册topic1下的其他channel 以及这个topic1本身//或这个格式的报文//UNREGISTER topic1\n 这个报文表示取消注册名字为topic1的topic,这个时候topic1以及topic1下的所有channel都取消注册了func (p *LookupProtocolV1) UNREGISTER(client *ClientV1, reader *bufio.Reader, params []string) ([]byte, error) { if client.peerInfo == nil { return nil, protocol.NewFatalClientErr(nil, "E_INVALID", "client must IDENTIFY") } topic, channel, err := getTopicChan("UNREGISTER", params) //获取topic 以及channel 的名字 if err != nil { return nil, err } if channel != "" { //如果有channel 则只取消注册这个topic下的这个channel key := Registration{"channel", topic, channel} removed, left := p.ctx.nsqlookupd.DB.RemoveProducer(key, client.peerInfo.id) if removed { p.ctx.nsqlookupd.logf("DB: client(%s) UNREGISTER category:%s key:%s subkey:%s", client, "channel", topic, channel) } // for ephemeral channels, remove the channel as well if it has no producers if left == 0 && strings.HasSuffix(channel, "#ephemeral") { p.ctx.nsqlookupd.DB.RemoveRegistration(key) } } else { // no channel was specified so this is a topic unregistration // remove all of the channel registrations... // normally this shouldn't happen which is why we print a warning message // if anything is actually removed //如果没有channel 这个取消注册这个topic 以及这个topic下的所有channel registrations := p.ctx.nsqlookupd.DB.FindRegistrations("channel", topic, "*") for _, r := range registrations { if removed, _ := p.ctx.nsqlookupd.DB.RemoveProducer(r, client.peerInfo.id); removed { p.ctx.nsqlookupd.logf("WARNING: client(%s) unexpected UNREGISTER category:%s key:%s subkey:%s", client, "channel", topic, r.SubKey) } } key := Registration{"topic", topic, ""} if removed, _ := p.ctx.nsqlookupd.DB.RemoveProducer(key, client.peerInfo.id); removed { p.ctx.nsqlookupd.logf("DB: client(%s) UNREGISTER category:%s key:%s subkey:%s", client, "topic", topic, "") } } //返回OK return []byte("OK"), nil}//PING 用于client中的心跳处理命令func (p *LookupProtocolV1) PING(client *ClientV1, params []string) ([]byte, error) { if client.peerInfo != nil { // we could get a PING before other commands on the same client connection //获取上一次心跳的时间 cur := time.Unix(0, atomic.LoadInt64(&client.peerInfo.lastUpdate)) //获取当前时间 now := time.Now() //这里日志输出两次心跳之间的间隔时间 p.ctx.nsqlookupd.logf("CLIENT(%s): pinged (last ping %s)", client.peerInfo.id, now.Sub(cur)) //更新PeerInfo中的lastUpdate时间为当前时间 atomic.StoreInt64(&client.peerInfo.lastUpdate, now.UnixNano()) } //返回OK return []byte("OK"), nil}
nsqlookupd
中的tcpServer
中的主要协议处理的IOLoop
这里基本讲解完了。
上述内容就是如何理解tcpServer 中的IOLoop方法,你们学到知识或技能了吗?如果还想学到更多技能或者丰富自己的知识储备,欢迎关注行业资讯频道。
信息
命令
数据
名字
报文
接口
时间
字符
处理
方法
代码
字符串
格式
消费者
消费
文件
空格
端口
错误
长度
数据库的安全要保护哪些东西
数据库安全各自的含义是什么
生产安全数据库录入
数据库的安全性及管理
数据库安全策略包含哪些
海淀数据库安全审计系统
建立农村房屋安全信息数据库
易用的数据库客户端支持安全管理
连接数据库失败ssl安全错误
数据库的锁怎样保障安全
信息化是网络安全的目标
苹果手机显示连接伪装服务器
巨飘网络技术
网络安全指挥中心适合女生
美国cdn服务器
cdn服务器运营管理
sql实验数据库设计综合题
司法局网络安全工作会议
数据库端口转发
网络安全工程师需要学编程吗
计算机网络技术南炯
dos操作系统下的数据库语言
smartdns设置加密服务器
今日头条网络安全概念股
数据库json字段查询条件
汽车软件开发公司排名
数码兽数据库木偶兽
.net有数据库游标吗
数据库查询求行的和
建立用户数据库
东北大学网络安全人才
终端和服务器什么意思
南通创新软件开发常见问题
cistrome数据库创始人
无线自组织网络安全的书
未来科技网络技术
网络技术volley
云服务器怎么上传源码
中国网络安全白皮书2021
软件开发面试模拟