千家信息网

Mongodb代理程序如何实现

发表于:2025-01-20 作者:千家信息网编辑
千家信息网最后更新 2025年01月20日,这篇文章主要介绍"Mongodb代理程序如何实现"的相关知识,小编通过实际案例向大家展示操作过程,操作方法简单快捷,实用性强,希望这篇"Mongodb代理程序如何实现"文章能帮助大家解决问题。根据一贯
千家信息网最后更新 2025年01月20日Mongodb代理程序如何实现

这篇文章主要介绍"Mongodb代理程序如何实现"的相关知识,小编通过实际案例向大家展示操作过程,操作方法简单快捷,实用性强,希望这篇"Mongodb代理程序如何实现"文章能帮助大家解决问题。

根据一贯的风格,我们先来梳理下项目目录结构,结构如下:

|__ bin/ # 用于存放编译后生成的二进制文件

|__ config/ # 用于存放配置文件

|__ connection/ # 存放连接相关的文件

| |__ proxy.go # 代理组件

| |__ pool.go # 连接池组件

| |__ repl_set.go # 复制集组件

| |__ conn.go # 连接对象组件

|__ internal/ # 存放 mongo 内部协议相关文件

| |__ auth.go # 握手鉴权组件

| |__ protocol.go # 协议解析组件

| |__ request.go # 请求重写组件

| |__ response.go # 响应重写组件

|__ statistics/ # 存放指标统计上报组件

|__ test/ # 存放各种语言驱动测试代码的文件夹

|__ utils/ # 工具函数文件夹

|__ glide.yaml # 依赖包配置文件

|__ main.go # 入口文件

proxy 实现

最简单的 proxy 实现套路就像下面这样:

// main.go

func main() {

// 传入配置参数,实例化一个代理对象

p := NewProxy(conf)

// 卡住,循环接受客户端请求

p.LoopAccept()

}

接着来实现 NewProxy、LoopAccept 方法:

// connection/proxy.go

type Proxy struct {

sync.RWMutex

listener net.Listener

writePool, readPool *pool

}

func NewProxy(conf config.UserConf) *Proxy {

// 开始监听本地端口

listener, err := net.Listen("tcp", ":"+conf.GetString("port"))

if err != nil {

log.Fatalln(err)

}

p := &Proxy{

listener: listener,

}

// 实例化连接池

p.readPool, p.writePool, err = newPool(p)

if err != nil {

panic(err)

}

return p

}

func (p *Proxy) LoopAccept() {

for {

client, err := p.listener.Accept()

go func(c net.Conn) {

defer c.Close()

// 一个连接在多次 messageHandler 中共用一个 Reader 对象

cr := bufio.NewReader(c)

// 因为一个连接可能会进行多次读或写操作

for {

// 将客户端请求代理给服务端,服务端响应代理回客户端

// 同时中间对请求或响应进行重写操作

err := p.messageHandler(cr, c)

if err != nil {

// 只要出现错误,就执行到上面的 defer c.Close() 来关闭连接

return

}

}

}(client)

}

}

接着来实现核心逻辑 messageHandler:

// connection/proxy.go

func (p *Proxy) messageHandler(cr *bufio.Reader, c net.Conn) error {

// 对请求报文进行解析操作

req, err := internal.Decode(clientReader)

if err != nil {

return errors.New("decode error")

}

// 将客户端请求发送给数据库服务器

res, err := p.clientToServer(req)

if err != nil {

return errors.New("request error")

}

// 将数据库服务器响应返回给客户端

return res.WriteTo(c)

}

func (p *Proxy) clientToServer(req *internal.Message) (*internal.Message, error) {

var server net.Conn

// 如果是读操作,就从读池中取出连接

if req.IsReadOp() {

host := req.GetHost()

// 某些读操作需要发送到指定的读库上,所以需要传 host,来获取指定读库连接

server = p.readPool.Acquire(host)

// 反之,写操作从写池中取出连接

} else {

// 由于写库只有一个,所以不用传 host 参数了

server = p.writePool.Acquire()

}

// 将客户端请求发送给数据库服务器

err := req.WriteTo(server)

if err != nil {

return nil, err

}

// 获取解析数据库服务器响应

res, err := internal.Decode(bufio.NewReader(server))

return res, err

}

大致逻辑就是,客户端通过代理把请求发给服务端,服务端响应也通过代理响应回客户端。

------------ request ----------- request ------------

| | --------> | | --------> | |

| client | | proxy | | repl_set |

| | <-------- | | <-------- | |

------------ response ----------- response ------------

呐——,当然还有非常多的细节,由于篇幅原因不得不省略...

pool 实现

由 proxy 的代码逻辑来看,我们取读或写库连接是通过读或写池的 Acquire 方法来取的:

// connection/pool.go

type pool struct {

sync.RWMutex

connCh chan net.Conn

newConn func(string) (net.Conn, error)

freeConn func(net.Conn) error

}

func (p *pool) Acquire(opts ...interface{}) (net.Conn, error) {

host := ""

if len(opts) > 0 {

host, _ = (opts[0]).(string)

}

chLen := len(p.connCh)

// 从 channel 中遍历剩余数量的 conn

for i := 0; i < chLen; i++ {

select {

case conn, ok := <- ch:

if ok {

if len(host) > 0 {

if conn.RemoteAddr().String() == host {

return conn, nil

}

// 没有找到对应 host 的 conn,则把 conn 重新放回 channel

// 你可以简单理解为只是执行了 p.connCh <- conn 操作

p.freeConn(conn)

} else {

return conn, nil

}

}

// 避免数量不足而导致 channel 阻塞等待

default:

}

}

// 若还没有从 channel 中取到 conn,则立马 new 一个

conn, err := p.newConn(host)

if err != nil {

return nil, err

}

return conn, nil

}

池的实现大致就是实现了一个循环队列,连接从池中取,取出的连接在使用完后,可以放回池中。

关于"Mongodb代理程序如何实现"的内容就介绍到这里了,感谢大家的阅读。如果想了解更多行业相关的知识,可以关注行业资讯频道,小编每天都会为大家更新不同的知识点。

0