10 "github.com/go-redis/redis/internal"
13 var ErrClosed = errors.New("redis: client is closed")
14 var ErrPoolTimeout = errors.New("redis: connection pool timeout")
16 var timers = sync.Pool{
17 New: func() interface{} {
18 t := time.NewTimer(time.Hour)
24 // Stats contains pool state information and accumulated stats.
26 Hits uint32 // number of times free connection was found in the pool
27 Misses uint32 // number of times free connection was NOT found in the pool
28 Timeouts uint32 // number of times a wait timeout occurred
30 TotalConns uint32 // number of total connections in the pool
31 IdleConns uint32 // number of idle connections in the pool
32 StaleConns uint32 // number of stale connections removed from the pool
35 type Pooler interface {
36 NewConn() (*Conn, error)
37 CloseConn(*Conn) error
51 Dialer func() (net.Conn, error)
52 OnClose func(*Conn) error
56 MaxConnAge time.Duration
57 PoolTimeout time.Duration
58 IdleTimeout time.Duration
59 IdleCheckFrequency time.Duration
62 type ConnPool struct {
65 dialErrorsNum uint32 // atomic
67 lastDialErrorMu sync.RWMutex
80 _closed uint32 // atomic
83 var _ Pooler = (*ConnPool)(nil)
85 func NewConnPool(opt *Options) *ConnPool {
89 queue: make(chan struct{}, opt.PoolSize),
90 conns: make([]*Conn, 0, opt.PoolSize),
91 idleConns: make([]*Conn, 0, opt.PoolSize),
94 for i := 0; i < opt.MinIdleConns; i++ {
98 if opt.IdleTimeout > 0 && opt.IdleCheckFrequency > 0 {
99 go p.reaper(opt.IdleCheckFrequency)
105 func (p *ConnPool) checkMinIdleConns() {
106 if p.opt.MinIdleConns == 0 {
109 if p.poolSize < p.opt.PoolSize && p.idleConnsLen < p.opt.MinIdleConns {
116 func (p *ConnPool) addIdleConn() {
117 cn, err := p.newConn(true)
123 p.conns = append(p.conns, cn)
124 p.idleConns = append(p.idleConns, cn)
128 func (p *ConnPool) NewConn() (*Conn, error) {
129 return p._NewConn(false)
132 func (p *ConnPool) _NewConn(pooled bool) (*Conn, error) {
133 cn, err := p.newConn(pooled)
139 p.conns = append(p.conns, cn)
141 if p.poolSize < p.opt.PoolSize {
151 func (p *ConnPool) newConn(pooled bool) (*Conn, error) {
153 return nil, ErrClosed
156 if atomic.LoadUint32(&p.dialErrorsNum) >= uint32(p.opt.PoolSize) {
157 return nil, p.getLastDialError()
160 netConn, err := p.opt.Dialer()
162 p.setLastDialError(err)
163 if atomic.AddUint32(&p.dialErrorsNum, 1) == uint32(p.opt.PoolSize) {
169 cn := NewConn(netConn)
174 func (p *ConnPool) tryDial() {
180 conn, err := p.opt.Dialer()
182 p.setLastDialError(err)
183 time.Sleep(time.Second)
187 atomic.StoreUint32(&p.dialErrorsNum, 0)
193 func (p *ConnPool) setLastDialError(err error) {
194 p.lastDialErrorMu.Lock()
195 p.lastDialError = err
196 p.lastDialErrorMu.Unlock()
199 func (p *ConnPool) getLastDialError() error {
200 p.lastDialErrorMu.RLock()
201 err := p.lastDialError
202 p.lastDialErrorMu.RUnlock()
206 // Get returns existed connection from the pool or creates a new one.
207 func (p *ConnPool) Get() (*Conn, error) {
209 return nil, ErrClosed
226 if p.isStaleConn(cn) {
231 atomic.AddUint32(&p.stats.Hits, 1)
235 atomic.AddUint32(&p.stats.Misses, 1)
237 newcn, err := p._NewConn(true)
246 func (p *ConnPool) getTurn() {
247 p.queue <- struct{}{}
250 func (p *ConnPool) waitTurn() error {
252 case p.queue <- struct{}{}:
255 timer := timers.Get().(*time.Timer)
256 timer.Reset(p.opt.PoolTimeout)
259 case p.queue <- struct{}{}:
267 atomic.AddUint32(&p.stats.Timeouts, 1)
268 return ErrPoolTimeout
273 func (p *ConnPool) freeTurn() {
277 func (p *ConnPool) popIdle() *Conn {
278 if len(p.idleConns) == 0 {
282 idx := len(p.idleConns) - 1
283 cn := p.idleConns[idx]
284 p.idleConns = p.idleConns[:idx]
286 p.checkMinIdleConns()
290 func (p *ConnPool) Put(cn *Conn) {
297 p.idleConns = append(p.idleConns, cn)
303 func (p *ConnPool) Remove(cn *Conn) {
309 func (p *ConnPool) CloseConn(cn *Conn) error {
311 return p.closeConn(cn)
314 func (p *ConnPool) removeConn(cn *Conn) {
316 for i, c := range p.conns {
318 p.conns = append(p.conns[:i], p.conns[i+1:]...)
321 p.checkMinIdleConns()
329 func (p *ConnPool) closeConn(cn *Conn) error {
330 if p.opt.OnClose != nil {
331 _ = p.opt.OnClose(cn)
336 // Len returns total number of connections.
337 func (p *ConnPool) Len() int {
344 // IdleLen returns number of idle connections.
345 func (p *ConnPool) IdleLen() int {
352 func (p *ConnPool) Stats() *Stats {
353 idleLen := p.IdleLen()
355 Hits: atomic.LoadUint32(&p.stats.Hits),
356 Misses: atomic.LoadUint32(&p.stats.Misses),
357 Timeouts: atomic.LoadUint32(&p.stats.Timeouts),
359 TotalConns: uint32(p.Len()),
360 IdleConns: uint32(idleLen),
361 StaleConns: atomic.LoadUint32(&p.stats.StaleConns),
365 func (p *ConnPool) closed() bool {
366 return atomic.LoadUint32(&p._closed) == 1
369 func (p *ConnPool) Filter(fn func(*Conn) bool) error {
372 for _, cn := range p.conns {
374 if err := p.closeConn(cn); err != nil && firstErr == nil {
383 func (p *ConnPool) Close() error {
384 if !atomic.CompareAndSwapUint32(&p._closed, 0, 1) {
390 for _, cn := range p.conns {
391 if err := p.closeConn(cn); err != nil && firstErr == nil {
404 func (p *ConnPool) reapStaleConn() *Conn {
405 if len(p.idleConns) == 0 {
410 if !p.isStaleConn(cn) {
414 p.idleConns = append(p.idleConns[:0], p.idleConns[1:]...)
420 func (p *ConnPool) ReapStaleConns() (int, error) {
426 cn := p.reapStaleConn()
445 func (p *ConnPool) reaper(frequency time.Duration) {
446 ticker := time.NewTicker(frequency)
453 n, err := p.ReapStaleConns()
455 internal.Logf("ReapStaleConns failed: %s", err)
458 atomic.AddUint32(&p.stats.StaleConns, uint32(n))
462 func (p *ConnPool) isStaleConn(cn *Conn) bool {
463 if p.opt.IdleTimeout == 0 && p.opt.MaxConnAge == 0 {
468 if p.opt.IdleTimeout > 0 && now.Sub(cn.UsedAt()) >= p.opt.IdleTimeout {
471 if p.opt.MaxConnAge > 0 && now.Sub(cn.InitedAt) >= p.opt.MaxConnAge {