千家信息网

Go语言之并发示例-Pool(二)

发表于:2024-09-22 作者:千家信息网编辑
千家信息网最后更新 2024年09月22日,针对这个资源池管理的一步步都实现了,而且做了详细的讲解,下面就看下整个示例代码,方便理解。package commonimport ( "errors" "io" "sync"
千家信息网最后更新 2024年09月22日Go语言之并发示例-Pool(二)

针对这个资源池管理的一步步都实现了,而且做了详细的讲解,下面就看下整个示例代码,方便理解。

package commonimport (    "errors"    "io"    "sync"    "log")//一个安全的资源池,被管理的资源必须都实现io.Close接口type Pool struct {    m       sync.Mutex    res     chan io.Closer    factory func() (io.Closer, error)    closed  bool}var ErrPoolClosed = errors.New("资源池已经被关闭。")//创建一个资源池func New(fn func() (io.Closer, error), size uint) (*Pool, error) {    if size <= 0 {            return nil, errors.New("size的值太小了。")    }        return &Pool{        factory: fn,        res:     make(chan io.Closer, size),    }, nil}//从资源池里获取一个资源func (p *Pool) Acquire() (io.Closer,error) {    select {       case r,ok := <-p.res:        log.Println("Acquire:共享资源")               if !ok {                    return nil,ErrPoolClosed        }                return r,nil    default:        log.Println("Acquire:新生成资源")                return p.factory()    }} //关闭资源池,释放资源func (p *Pool) Close() {    p.m.Lock()    defer p.m.Unlock()        if p.closed {            return    }    p.closed = true    //关闭通道,不让写入了    close(p.res)    //关闭通道里的资源    for r:=range p.res {        r.Close()    }}func (p *Pool) Release(r io.Closer){    //保证该操作和Close方法的操作是安全的    p.m.Lock()        defer p.m.Unlock()        //资源池都关闭了,就省这一个没有释放的资源了,释放即可    if p.closed {        r.Close()                return     }        select {    case p.res <- r:        log.Println("资源释放到池子里了")        default:        log.Println("资源池满了,释放这个资源吧")        r.Close()    }}


好了,资源池管理写好了,也知道资源池是如何实现的啦,现在我们看看如何使用这个资源池,模拟一个数据库连接池吧。

package mainimport (    "flysnow.org/hello/common"    "io"    "log"    "math/rand"    "sync"    "sync/atomic"    "time")const (        //模拟的最大goroutine    maxGoroutine = 5    //资源池的大小    poolRes      = 2)func main() {        //等待任务完成    var wg sync.WaitGroup    wg.Add(maxGoroutine)    p, err := common.New(createConnection, poolRes)        if err != nil {        log.Println(err)                return    }        //模拟好几个goroutine同时使用资源池查询数据    for query := 0; query < maxGoroutine; query++ {        go func(q int) {            dbQuery(q, p)            wg.Done()        }(query)    }    wg.Wait()    log.Println("开始关闭资源池")    p.Close()}//模拟数据库查询func dbQuery(query int, pool *common.Pool) {    conn, err := pool.Acquire()    if err != nil {        log.Println(err)                return    }        defer pool.Release(conn)        //模拟查询    time.Sleep(time.Duration(rand.Intn(1000)) * time.Millisecond)    log.Printf("第%d个查询,使用的是ID为%d的数据库连接", query, conn.(*dbConnection).ID)}//数据库连接type dbConnection struct {    ID int32//连接的标志}//实现io.Closer接口func (db *dbConnection) Close() error {    log.Println("关闭连接", db.ID)        return nil}var idCounter int32//生成数据库连接的方法,以供资源池使用func createConnection() (io.Closer, error) {       //并发安全,给数据库连接生成唯一标志    id := atomic.AddInt32(&idCounter, 1)        return &dbConnection{id}, nil}


这时我们测试使用资源池的例子,首先定义了一个结构体dbConnection,它只有一个字段,用来做唯一标记。然后dbConnection实现了io.Closer接口,这样才可以使用我们的资源池。


createConnection函数对应的是资源池中的factory字段,用来创建数据库连接dbConnection的,同时为其赋予了一个为止的标志


接着我们就同时开了 5 个goroutine,模拟并发的数据库查询dbQuery,查询方法里,先从资源池获取可用的数据库连接,用完后再释放。


这里我们会创建 5 个数据库连接,但是我们设置的资源池大小只有 2 ,所以再释放了 2 个连接后,后面的 3 个连接会因为资源池满了而释放不了,一会我们看下输出的打印信息就可以看到。


最后这个资源连接池使用完之后,我们要关闭资源池,使用资源池的Close方法即可。


2017/04/17 22:25:08 Acquire:新生成资源
2017/04/17 22:25:08 Acquire:新生成资源
2017/04/17 22:25:08 Acquire:新生成资源
2017/04/17 22:25:08 Acquire:新生成资源
2017/04/17 22:25:08 Acquire:新生成资源
2017/04/17 22:25:08 2个查询,使用的是ID4的数据库连接
2017/04/17 22:25:08 资源释放到池子里了
2017/04/17 22:25:08 4个查询,使用的是ID1的数据库连接
2017/04/17 22:25:08 资源释放到池子里了
2017/04/17 22:25:08 3个查询,使用的是ID5的数据库连接
2017/04/17 22:25:08 资源池满了,释放这个资源吧
2017/04/17 22:25:08 关闭连接 5
2017/04/17 22:25:09 1个查询,使用的是ID3的数据库连接
2017/04/17 22:25:09 资源池满了,释放这个资源吧
2017/04/17 22:25:09 关闭连接 3
2017/04/17 22:25:09 0个查询,使用的是ID2的数据库连接
2017/04/17 22:25:09 资源池满了,释放这个资源吧
2017/04/17 22:25:09 关闭连接 2
2017/04/17 22:25:09 开始关闭资源池
2017/04/17 22:25:09 关闭连接 4
2017/04/17 22:25:09 关闭连接 1


到这里,我们已经完成了一个资源池的管理,并且进行了使用测试。


资源对象池的使用比较频繁,因为我们想把一些对象缓存起来,以便使用,这样就会比较高效,而且不会经常调用GC,为此Go为我们提供了原生的资源池管理,防止我们重复造轮子,这就是sync.Pool,我们看下刚刚我们的例子,如果用sync.Pool实现。


package mainimport (    "log"    "math/rand"    "sync"    "sync/atomic"    "time")const (        //模拟的最大goroutine    maxGoroutine = 5)func main() {    //等待任务完成    var wg sync.WaitGroup    wg.Add(maxGoroutine)    p:=&sync.Pool{        New:createConnection,    }        //模拟好几个goroutine同时使用资源池查询数据    for query := 0; query < maxGoroutine; query++ {      go func(q int) {            dbQuery(q, p)            wg.Done()        }(query)    }    wg.Wait()}//模拟数据库查询
func dbQuery(query int, pool *sync.Pool) { conn:=pool.Get().(*dbConnection) defer pool.Put(conn) //模拟查询 time.Sleep(time.Duration(rand.Intn(1000)) * time.Millisecond) log.Printf("第%d个查询,使用的是ID为%d的数据库连接", query, conn.ID)}//数据库连接
type dbConnection struct { ID int32//连接的标志}//实现io.Closer接口
func (db *dbConnection) Close() error { log.Println("关闭连接", db.ID) return nil}var idCounter int32//生成数据库连接的方法,以供资源池使用
func createConnection() interface{} { //并发安全,给数据库连接生成唯一标志 id := atomic.AddInt32(&idCounter, 1) return &dbConnection{ID:id}
}


进行微小的改变即可,因为系统库没有提供New这类的工厂函数,所以我们使用字面量创建了一个sync.Pool,注意里面的New字段,这是一个返回任意对象的方法,类似我们自己实现的资源池中的factory字段,意思都是一样的,都是当没有可用资源的时候,生成一个。


这里我们留意到系统的资源池是没有大小限制的,也就是说默认情况下是无上限的,受内存大小限制。


资源的获取和释放对应的方法是GetPut,也很简洁,返回任意对象interface{}


2017/04/17 22:42:43 0个查询,使用的是ID2的数据库连接
2017/04/17 22:42:43 2个查询,使用的是ID5的数据库连接
2017/04/17 22:42:43 4个查询,使用的是ID1的数据库连接
2017/04/17 22:42:44 3个查询,使用的是ID4的数据库连接
2017/04/17 22:42:44 1个查询,使用的是ID3的数据库连接


关于系统的资源池,我们需要注意的是它缓存的对象都是临时的,也就说下一次GC的时候,这些存放的对象都会被清除掉。


0