Go语言 连接池相关总结:HTTP、RPC、Redis 和数据库等( 六 )

客户端:redigo 的客户端需要显式声明并初始化内部的 pool:
func newPool(addr string) *redis.Pool {    return &redis.Pool{        MaxIdle: 3,        IdleTimeout: 240 * time.Second,        // Dial or DialContext must be set. When both are set, DialContext takes precedence over Dial.        Dial: func () (redis.Conn, error) { return redis.Dial("tcp", addr) },    }}初始化时可以提供 TestOnBorrow 的行为:
pool := &redis.Pool{  // Other pool configuration not shown in this example.  TestOnBorrow: func(c redis.Conn, t time.Time) error {    if time.Since(t) < time.Minute {      return nil    }    _, err := c.Do("PING")    return err  },}使用时也需要用户显式地 defer Close:
func serveHome(w http.ResponseWriter, r *http.Request) {    conn := pool.Get()    defer conn.Close()    ...}pool.Get

Go语言 连接池相关总结:HTTP、RPC、Redis 和数据库等

文章插图
 
7
用户需要设置 pool.Wait 是否等待,如果 Waittrue,则在没有连接可用时,会阻塞等待 。如果 Waitfalse,且连接已到达阈值 pool.MaxActive,则直接返回错误 ErrPoolExhausted 。
activeConn.Closefunc (ac *activeConn) Close() error { pc := ac.pc if pc == nil {  return nil } ac.pc = nil if ac.state&connectionMultiState != 0 {  pc.c.Send("DISCARD")  ac.state &^= (connectionMultiState | connectionWatchState) } else if ac.state&connectionWatchState != 0 {  pc.c.Send("UNWATCH")  ac.state &^= connectionWatchState } if ac.state&connectionSubscribeState != 0 {  pc.c.Send("UNSUBSCRIBE")  pc.c.Send("PUNSUBSCRIBE")  // To detect the end of the message stream, ask the server to echo  // a sentinel value and read until we see that value.  sentinelOnce.Do(initSentinel)  pc.c.Send("ECHO", sentinel)  pc.c.Flush()  for {   p, err := pc.c.Receive()   if err != nil {    break   }   if p, ok := p.([]byte); ok && bytes.Equal(p, sentinel) {    ac.state &^= connectionSubscribeState    break   }  } } pc.c.Do("") ac.p.put(pc, ac.state != 0 || pc.c.Err() != nil) return nil}close 时会把这个 activeConn 放回连接池 。
go-redis/redishttps://github.com/go-redis/redis
这个 redis 库屏蔽了连接池逻辑,用户侧基本不用关心连接,初始化时,传入连接池相关配置:
 rdb := redis.NewClient(&redis.Options{  Addr:     "localhost:6379", // use default Addr  Password: "",               // no password set  DB:       0,                // use default DB })func NewClient(opt *Options) *Client { opt.init() c := Client{  baseClient: newBaseClient(opt, newConnPool(opt)),  ctx:        context.Background(), } c.cmdable = c.Process return &c}func newConnPool(opt *Options) *pool.ConnPool { return pool.NewConnPool(&pool.Options{  Dialer: func(ctx context.Context) (net.Conn, error) {   return opt.Dialer(ctx, opt.Network, opt.Addr)  },  PoolSize:           opt.PoolSize,  MinIdleConns:       opt.MinIdleConns,  MaxConnAge:         opt.MaxConnAge,  PoolTimeout:        opt.PoolTimeout,  IdleTimeout:        opt.IdleTimeout,  IdleCheckFrequency: opt.IdleCheckFrequency, })}func (c *baseClient) _process(ctx context.Context, cmd Cmder) error { var lastErr error for attempt := 0; attempt <= c.opt.MaxRetries; attempt++ {  if attempt > 0 {   if err := internal.Sleep(ctx, c.retryBackoff(attempt)); err != nil {    return err   }  }  retryTimeout := true  lastErr = c.withConn(ctx, func(ctx context.Context, cn *pool.Conn) error {   err := cn.WithWriter(ctx, c.opt.WriteTimeout, func(wr *proto.Writer) error {    return writeCmd(wr, cmd)   })   if err != nil {    return err   }   err = cn.WithReader(ctx, c.cmdTimeout(cmd), cmd.readReply)   if err != nil {    retryTimeout = cmd.readTimeout() == nil    return err   }   return nil  })  if lastErr == nil || !isRetryableError(lastErr, retryTimeout) {   return lastErr  } } return lastErr}func (c *baseClient) withConn( ctx context.Context, fn func(context.Context, *pool.Conn) error,) error { cn, err := c.getConn(ctx) if err != nil {  return err } defer func() {  c.releaseConn(cn, err) }() err = fn(ctx, cn) return err


推荐阅读