千家信息网

micro微服务基础组件的组织方式是什么

发表于:2025-02-08 作者:千家信息网编辑
千家信息网最后更新 2025年02月08日,这篇文章主要讲解了"micro微服务基础组件的组织方式是什么",文中的讲解内容简单清晰,易于学习与理解,下面请大家跟着小编的思路慢慢深入,一起来研究和学习"micro微服务基础组件的组织方式是什么"吧
千家信息网最后更新 2025年02月08日micro微服务基础组件的组织方式是什么

这篇文章主要讲解了"micro微服务基础组件的组织方式是什么",文中的讲解内容简单清晰,易于学习与理解,下面请大家跟着小编的思路慢慢深入,一起来研究和学习"micro微服务基础组件的组织方式是什么"吧!

简介

micro是go语言实现的一个微服务框架,该框架自身实现了为服务常见的几大要素,网关,代理,注册中心,消息传递,也支持可插拔扩展。本本通过micro中的一个核心对象展开去探讨这个项目是如何实现这些组件并将其组织在一起工作的。

Notice: go代码有时候比较繁琐,截取源码的时候会删除部分不影响思想的代码会标记为...

核心服务

micro通过micro.NewService创建一个服务实例,所有的微服务实例(包括网关,代理等等)需要通过这个实例来与其他服务打交道。

    type service struct {        opts Options        once sync.Once    }

代码很少,但是micro中所有微服务要素最后都会汇聚在这个类型上了,因为option包罗万象(^^).

    type Options struct {        Broker    broker.Broker        Cmd       cmd.Cmd        Client    client.Client        Server    server.Server        Registry  registry.Registry        Transport transport.Transport        // Before and After funcs        BeforeStart []func() error        BeforeStop  []func() error        AfterStart  []func() error        AfterStop   []func() error        // Other options for implementations of the interface        // can be stored in a context        Context context.Context    }

这里可以看到微服务的常见组件了,当micro.Server初始化的时候会给这个对象设置上对应的组件,组件的设置方式包括默认值,cli指定,env读取。

func (s *service) Init(opts ...Option) {        // process options        for _, o := range opts {                o(&s.opts)        }        s.once.Do(func() {                // Initialise the command flags, overriding new service                _ = s.opts.Cmd.Init(                        cmd.Broker(&s.opts.Broker),                        cmd.Registry(&s.opts.Registry),                        cmd.Transport(&s.opts.Transport),                        cmd.Client(&s.opts.Client),                        cmd.Server(&s.opts.Server),                )        })}

Server是最终的行为实体,监听端口并提供业务服务,注册本地服务到注册中心。 Broker异步消息,mq等方法可以替换这个类型 Client服务调用客户端 Registry 注册中心 Cmd cli客户端 Transport 类似与socket,消息同步通信,服务监听等

服务类型

目前支持的服务类型有rpc,grpc。服务中如果存在两种不同的rpc协议,消息投递的时候会进行协议转换。这里详细说下默认的rpc服务。 rpc服务基于HTTP POST协议,服务启动的时候会尝试连接Broker,然后注册本服务到注册中心,最后监听服务端口.简单提一句这里是如何做到协议转换,如果http过来的消息要投递到一个grpc协议服务上,需要在Content-Type设置对应目的服务协议类型application/grpc,SerConn这里读取在Content-Type在获取对应的Codec进行协议转换,最后投递到对应服务,服务的返回内容也同样要处理下再返回。

    type Service interface {        Init(...Option)        Options() Options        Client() client.Client        Server() server.Server        Run() error        String() string    }

rpc server

func (s *rpcServer) Start() error {...        ts, err := config.Transport.Listen(config.Address)        // swap address...        // connect to the broker        if err := config.Broker.Connect(); err != nil {                return err        }...        // use RegisterCheck func before register        if err = s.opts.RegisterCheck(s.opts.Context); err != nil {                log.Logf("Server %s-%s register check error: %s", config.Name, config.Id, err)        } else {                // announce self to the world                if err = s.Register(); err != nil {                        log.Logf("Server %s-%s register error: %s", config.Name, config.Id, err)                }        }...        go func() {                for {                        // listen for connections                        err := ts.Accept(s.ServeConn)...                }        }()..        return nil}

协议转换

    func (s *rpcServer) ServeConn(sock transport.Socket) {...        for {            var msg transport.Message            if err := sock.Recv(&msg); err != nil {                return            }...            // we use this Content-Type header to identify the codec needed            ct := msg.Header["Content-Type"]            // strip our headers            hdr := make(map[string]string)            for k, v := range msg.Header {                hdr[k] = v            }            // set local/remote ips            hdr["Local"] = sock.Local()            hdr["Remote"] = sock.Remote()...            // TODO: needs better error handling            var err error            if cf, err = s.newCodec(ct); err != nil {  //请求协议转换器...返回错误                return            }...            rcodec := newRpcCodec(&msg, sock, cf)   //返回转换器            // internal request            request := &rpcRequest{                service:     getHeader("Micro-Service", msg.Header),                method:      getHeader("Micro-Method", msg.Header),                endpoint:    getHeader("Micro-Endpoint", msg.Header),                contentType: ct,                codec:       rcodec,                header:      msg.Header,                body:        msg.Body,                socket:      sock,                stream:      true,            }            // internal response            response := &rpcResponse{                header: make(map[string]string),                socket: sock,                codec:  rcodec,            }            // set router            r := Router(s.router)...            // serve the actual request using the request router            if err := r.ServeRequest(ctx, request, response); err != nil {                // write an error response                err = rcodec.Write(&codec.Message{                    Header: msg.Header,                    Error:  err.Error(),                    Type:   codec.Error,                }, nil)                // could not write the error response                if err != nil {                    log.Logf("rpc: unable to write error response: %v", err)                }                if s.wg != nil {                    s.wg.Done()                }                return            }...        }    }

注册中心

注册中心包括服务发现和服务注册。micro中每个注册中心类型都要实现注册中心接口

    type Registry interface {        Init(...Option) error        Options() Options        Register(*Service, ...RegisterOption) error        Deregister(*Service) error        GetService(string) ([]*Service, error)        ListServices() ([]*Service, error)        Watch(...WatchOption) (Watcher, error)        String() string    }

默认的注册中心是mdns,该程序会在本地监听一个组播地址接收网络中所有及其广播的信息,同时发送的信息也能被所有其他机器发现。每当程序启动时都会广播自己的服务信息,其他节点收到该信息后添加到自己的服务列表里面,服务关闭时会发出关闭信息。mdns自身不具备健康检查,熔断等功能,其出发点也仅是便于测试使用,因而不推荐在生产环境中使用。

Resolve

查找服务需要根据url或者content信息来获取服务名称,在通过服务名称到注册中心查找,获取到服务后,随机一个节点投递

    type Resolver interface {        Resolve(r *http.Request) (*Endpoint, error)        String() string    }    //默认的api resolve实例,除此之外还有host,path,grpc 三种resolve,可以根据需求在启动程序的时候指定或者设置在环境变量里面    //    func (r *Resolver) Resolve(req *http.Request) (*resolver.Endpoint, error) {        var name, method string        switch r.Options.Handler {        // internal handlers        case "meta", "api", "rpc", "micro":        ///foo/bar/zool => go.micro.api.foo  方法:Bar.Zool            ///foo/bar => go.micro.api.foo 方法:Foo.Bar            name, method = apiRoute(req.URL.Path)        default:            //如果handler是web会走到这里               ///foo/bar/zool => go.micro.api.foo  method bar/zool             // 1/foo/bar/ => go.micro.api.1.foo  method bar             method = req.Method            name = proxyRoute(req.URL.Path)        }        return &resolver.Endpoint{            Name:   name,            Method: method,        }, nil    }

插件

代码中定义了一个plugin,然而这个plugin并非是引入组件的作用,其作用在于在请求的管道中加一层,这样比较容易添加新的逻辑进去。然而真正需要着重提到的是micro引入新组件的方式,micro service的几个重要成员都有其对应的接口规约,只要正确实现了接口就可以轻松的接入新的组件。这里用一个引入kubernetes作为注册中心的例子。

kubernetes组件位于github.com\micro\go-plugins中

    go get -u github.com\micro\go-plugins\kubernetes

由于在go中无法实现java/c#中包动态加载功能,语言本身不提供全局类型,函数扫描。往往只能通过一些曲折的办法来实现。而micro是通过在入口文件中导入包,利用init函数在启动时将需要的功能组件写入到一个map里面。 在import中加入插件

    _ "github.com/micro/go-plugins/registry/kubernetes"
   micro api --registry=kubernetes  --registry_address=yourAddress

工作原理: 在github.com\micro\go-micro\config\cmd\cmd.go中有一个map用于保存所有的注册中心创建函数

    DefaultRegistries = map[string]func(...registry.Option) registry.Registry{                "consul": consul.NewRegistry,                "gossip": gossip.NewRegistry,                "mdns":   mdns.NewRegistry,                "memory": rmem.NewRegistry,        }

kubernetes会在包的init中写入对应的创建函数

    func init() {        cmd.DefaultRegistries["kubernetes"] = NewRegistry    }

生效的位置

    if name := ctx.String("registry"); len(name) > 0 && (*c.opts.Registry).String() != name {                r, ok := c.opts.Registries[name]                if !ok {                        return fmt.Errorf("Registry %s not found", name)                }                *c.opts.Registry = r()                serverOpts = append(serverOpts, server.Registry(*c.opts.Registry))                clientOpts = append(clientOpts, client.Registry(*c.opts.Registry))                if err := (*c.opts.Selector).Init(selector.Registry(*c.opts.Registry)); err != nil {                        log.Fatalf("Error configuring registry: %v", err)                }                clientOpts = append(clientOpts, client.Selector(*c.opts.Selector))                if err := (*c.opts.Broker).Init(broker.Registry(*c.opts.Registry)); err != nil {                        log.Fatalf("Error configuring broker: %v", err)                }        }

最后附上一张图说明下这个核心的对象的引用关系,组件引用那一块只是画了注册中心的,Broker,Server也都是类似的原理。

感谢各位的阅读,以上就是"micro微服务基础组件的组织方式是什么"的内容了,经过本文的学习后,相信大家对micro微服务基础组件的组织方式是什么这一问题有了更深刻的体会,具体使用情况还需要大家实践验证。这里是,小编将为大家推送更多相关知识点的文章,欢迎关注!

0