Mongodb代理程序如何实现
这篇文章主要介绍"Mongodb代理程序如何实现"的相关知识,小编通过实际案例向大家展示操作过程,操作方法简单快捷,实用性强,希望这篇"Mongodb代理程序如何实现"文章能帮助大家解决问题。
根据一贯的风格,我们先来梳理下项目目录结构,结构如下:
|__ bin/ # 用于存放编译后生成的二进制文件
|__ config/ # 用于存放配置文件
|__ connection/ # 存放连接相关的文件
| |__ proxy.go # 代理组件
| |__ pool.go # 连接池组件
| |__ repl_set.go # 复制集组件
| |__ conn.go # 连接对象组件
|__ internal/ # 存放 mongo 内部协议相关文件
| |__ auth.go # 握手鉴权组件
| |__ protocol.go # 协议解析组件
| |__ request.go # 请求重写组件
| |__ response.go # 响应重写组件
|__ statistics/ # 存放指标统计上报组件
|__ test/ # 存放各种语言驱动测试代码的文件夹
|__ utils/ # 工具函数文件夹
|__ glide.yaml # 依赖包配置文件
|__ main.go # 入口文件
proxy 实现
最简单的 proxy 实现套路就像下面这样:
// main.go
func main() {
// 传入配置参数,实例化一个代理对象
p := NewProxy(conf)
// 卡住,循环接受客户端请求
p.LoopAccept()
}
接着来实现 NewProxy、LoopAccept 方法:
// connection/proxy.go
type Proxy struct {
sync.RWMutex
listener net.Listener
writePool, readPool *pool
}
func NewProxy(conf config.UserConf) *Proxy {
// 开始监听本地端口
listener, err := net.Listen("tcp", ":"+conf.GetString("port"))
if err != nil {
log.Fatalln(err)
}
p := &Proxy{
listener: listener,
}
// 实例化连接池
p.readPool, p.writePool, err = newPool(p)
if err != nil {
panic(err)
}
return p
}
func (p *Proxy) LoopAccept() {
for {
client, err := p.listener.Accept()
go func(c net.Conn) {
defer c.Close()
// 一个连接在多次 messageHandler 中共用一个 Reader 对象
cr := bufio.NewReader(c)
// 因为一个连接可能会进行多次读或写操作
for {
// 将客户端请求代理给服务端,服务端响应代理回客户端
// 同时中间对请求或响应进行重写操作
err := p.messageHandler(cr, c)
if err != nil {
// 只要出现错误,就执行到上面的 defer c.Close() 来关闭连接
return
}
}
}(client)
}
}
接着来实现核心逻辑 messageHandler:
// connection/proxy.go
func (p *Proxy) messageHandler(cr *bufio.Reader, c net.Conn) error {
// 对请求报文进行解析操作
req, err := internal.Decode(clientReader)
if err != nil {
return errors.New("decode error")
}
// 将客户端请求发送给数据库服务器
res, err := p.clientToServer(req)
if err != nil {
return errors.New("request error")
}
// 将数据库服务器响应返回给客户端
return res.WriteTo(c)
}
func (p *Proxy) clientToServer(req *internal.Message) (*internal.Message, error) {
var server net.Conn
// 如果是读操作,就从读池中取出连接
if req.IsReadOp() {
host := req.GetHost()
// 某些读操作需要发送到指定的读库上,所以需要传 host,来获取指定读库连接
server = p.readPool.Acquire(host)
// 反之,写操作从写池中取出连接
} else {
// 由于写库只有一个,所以不用传 host 参数了
server = p.writePool.Acquire()
}
// 将客户端请求发送给数据库服务器
err := req.WriteTo(server)
if err != nil {
return nil, err
}
// 获取解析数据库服务器响应
res, err := internal.Decode(bufio.NewReader(server))
return res, err
}
大致逻辑就是,客户端通过代理把请求发给服务端,服务端响应也通过代理响应回客户端。
------------ request ----------- request ------------
| | --------> | | --------> | |
| client | | proxy | | repl_set |
| | <-------- | | <-------- | |
------------ response ----------- response ------------
呐——,当然还有非常多的细节,由于篇幅原因不得不省略...
pool 实现
由 proxy 的代码逻辑来看,我们取读或写库连接是通过读或写池的 Acquire 方法来取的:
// connection/pool.go
type pool struct {
sync.RWMutex
connCh chan net.Conn
newConn func(string) (net.Conn, error)
freeConn func(net.Conn) error
}
func (p *pool) Acquire(opts ...interface{}) (net.Conn, error) {
host := ""
if len(opts) > 0 {
host, _ = (opts[0]).(string)
}
chLen := len(p.connCh)
// 从 channel 中遍历剩余数量的 conn
for i := 0; i < chLen; i++ {
select {
case conn, ok := <- ch:
if ok {
if len(host) > 0 {
if conn.RemoteAddr().String() == host {
return conn, nil
}
// 没有找到对应 host 的 conn,则把 conn 重新放回 channel
// 你可以简单理解为只是执行了 p.connCh <- conn 操作
p.freeConn(conn)
} else {
return conn, nil
}
}
// 避免数量不足而导致 channel 阻塞等待
default:
}
}
// 若还没有从 channel 中取到 conn,则立马 new 一个
conn, err := p.newConn(host)
if err != nil {
return nil, err
}
return conn, nil
}
池的实现大致就是实现了一个循环队列,连接从池中取,取出的连接在使用完后,可以放回池中。
关于"Mongodb代理程序如何实现"的内容就介绍到这里了,感谢大家的阅读。如果想了解更多行业相关的知识,可以关注行业资讯频道,小编每天都会为大家更新不同的知识点。