客户端:redigo 的客户端需要显式声明并初始化内部的 pool:
func newPool(addr string) *redis.Pool { return &redis.Pool{ MaxIdle: 3, IdleTimeout: 240 * time.Second, // Dial or DialContext must be set. When both are set, DialContext takes precedence over Dial. Dial: func () (redis.Conn, error) { return redis.Dial("tcp", addr) }, }}
初始化时可以提供 TestOnBorrow 的行为:
pool := &redis.Pool{ // Other pool configuration not shown in this example. TestOnBorrow: func(c redis.Conn, t time.Time) error { if time.Since(t) < time.Minute { return nil } _, err := c.Do("PING") return err },}
使用时也需要用户显式地 defer Close:
func serveHome(w http.ResponseWriter, r *http.Request) { conn := pool.Get() defer conn.Close() ...}
pool.Get
文章插图
7
用户需要设置 pool.Wait 是否等待,如果 Waittrue,则在没有连接可用时,会阻塞等待 。如果 Waitfalse,且连接已到达阈值 pool.MaxActive,则直接返回错误 ErrPoolExhausted 。
activeConn.Close
func (ac *activeConn) Close() error { pc := ac.pc if pc == nil { return nil } ac.pc = nil if ac.state&connectionMultiState != 0 { pc.c.Send("DISCARD") ac.state &^= (connectionMultiState | connectionWatchState) } else if ac.state&connectionWatchState != 0 { pc.c.Send("UNWATCH") ac.state &^= connectionWatchState } if ac.state&connectionSubscribeState != 0 { pc.c.Send("UNSUBSCRIBE") pc.c.Send("PUNSUBSCRIBE") // To detect the end of the message stream, ask the server to echo // a sentinel value and read until we see that value. sentinelOnce.Do(initSentinel) pc.c.Send("ECHO", sentinel) pc.c.Flush() for { p, err := pc.c.Receive() if err != nil { break } if p, ok := p.([]byte); ok && bytes.Equal(p, sentinel) { ac.state &^= connectionSubscribeState break } } } pc.c.Do("") ac.p.put(pc, ac.state != 0 || pc.c.Err() != nil) return nil}
close 时会把这个 activeConn 放回连接池 。go-redis/redishttps://github.com/go-redis/redis
这个 redis 库屏蔽了连接池逻辑,用户侧基本不用关心连接,初始化时,传入连接池相关配置:
rdb := redis.NewClient(&redis.Options{ Addr: "localhost:6379", // use default Addr Password: "", // no password set DB: 0, // use default DB })func NewClient(opt *Options) *Client { opt.init() c := Client{ baseClient: newBaseClient(opt, newConnPool(opt)), ctx: context.Background(), } c.cmdable = c.Process return &c}func newConnPool(opt *Options) *pool.ConnPool { return pool.NewConnPool(&pool.Options{ Dialer: func(ctx context.Context) (net.Conn, error) { return opt.Dialer(ctx, opt.Network, opt.Addr) }, PoolSize: opt.PoolSize, MinIdleConns: opt.MinIdleConns, MaxConnAge: opt.MaxConnAge, PoolTimeout: opt.PoolTimeout, IdleTimeout: opt.IdleTimeout, IdleCheckFrequency: opt.IdleCheckFrequency, })}func (c *baseClient) _process(ctx context.Context, cmd Cmder) error { var lastErr error for attempt := 0; attempt <= c.opt.MaxRetries; attempt++ { if attempt > 0 { if err := internal.Sleep(ctx, c.retryBackoff(attempt)); err != nil { return err } } retryTimeout := true lastErr = c.withConn(ctx, func(ctx context.Context, cn *pool.Conn) error { err := cn.WithWriter(ctx, c.opt.WriteTimeout, func(wr *proto.Writer) error { return writeCmd(wr, cmd) }) if err != nil { return err } err = cn.WithReader(ctx, c.cmdTimeout(cmd), cmd.readReply) if err != nil { retryTimeout = cmd.readTimeout() == nil return err } return nil }) if lastErr == nil || !isRetryableError(lastErr, retryTimeout) { return lastErr } } return lastErr}func (c *baseClient) withConn( ctx context.Context, fn func(context.Context, *pool.Conn) error,) error { cn, err := c.getConn(ctx) if err != nil { return err } defer func() { c.releaseConn(cn, err) }() err = fn(ctx, cn) return err
推荐阅读
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
- 快充|等不及发布会!realme Q5i抢先开售:天玑810+5000mAh电池
- C语言的main函数的三个要点
- C语言的编译机制:分制原则与三种文件
- 微软承认Windows 10新BUG:错误显示没有网络连接
- MySQL 5.6--------SSL连接最佳实战
- GO 切片实力踩坑
- 无线投屏器怎么连接电视和手机?用手机怎么无线投屏到投影仪上?
- 我用 Go语言 生成的随机数为什么不随机?随机数是怎样产生的
- 什么是RPC?RPC有什么用?与语言有关吗?
- 华为b2手环与华为手机的连接方法