barometer: update DMA's vendoring packages
[barometer.git] / src / dma / vendor / github.com / go-redis / redis / internal / pool / pool.go
index cab6690..9cecee8 100644 (file)
@@ -28,7 +28,6 @@ type Stats struct {
        Timeouts uint32 // number of times a wait timeout occurred
 
        TotalConns uint32 // number of total connections in the pool
-       FreeConns  uint32 // deprecated - use IdleConns
        IdleConns  uint32 // number of idle connections in the pool
        StaleConns uint32 // number of stale connections removed from the pool
 }
@@ -53,6 +52,8 @@ type Options struct {
        OnClose func(*Conn) error
 
        PoolSize           int
+       MinIdleConns       int
+       MaxConnAge         time.Duration
        PoolTimeout        time.Duration
        IdleTimeout        time.Duration
        IdleCheckFrequency time.Duration
@@ -63,16 +64,16 @@ type ConnPool struct {
 
        dialErrorsNum uint32 // atomic
 
-       lastDialError   error
        lastDialErrorMu sync.RWMutex
+       lastDialError   error
 
        queue chan struct{}
 
-       connsMu sync.Mutex
-       conns   []*Conn
-
-       idleConnsMu sync.RWMutex
-       idleConns   []*Conn
+       connsMu      sync.Mutex
+       conns        []*Conn
+       idleConns    []*Conn
+       poolSize     int
+       idleConnsLen int
 
        stats Stats
 
@@ -90,6 +91,10 @@ func NewConnPool(opt *Options) *ConnPool {
                idleConns: make([]*Conn, 0, opt.PoolSize),
        }
 
+       for i := 0; i < opt.MinIdleConns; i++ {
+               p.checkMinIdleConns()
+       }
+
        if opt.IdleTimeout > 0 && opt.IdleCheckFrequency > 0 {
                go p.reaper(opt.IdleCheckFrequency)
        }
@@ -97,19 +102,53 @@ func NewConnPool(opt *Options) *ConnPool {
        return p
 }
 
+func (p *ConnPool) checkMinIdleConns() {
+       if p.opt.MinIdleConns == 0 {
+               return
+       }
+       if p.poolSize < p.opt.PoolSize && p.idleConnsLen < p.opt.MinIdleConns {
+               p.poolSize++
+               p.idleConnsLen++
+               go p.addIdleConn()
+       }
+}
+
+func (p *ConnPool) addIdleConn() {
+       cn, err := p.newConn(true)
+       if err != nil {
+               return
+       }
+
+       p.connsMu.Lock()
+       p.conns = append(p.conns, cn)
+       p.idleConns = append(p.idleConns, cn)
+       p.connsMu.Unlock()
+}
+
 func (p *ConnPool) NewConn() (*Conn, error) {
-       cn, err := p.newConn()
+       return p._NewConn(false)
+}
+
+func (p *ConnPool) _NewConn(pooled bool) (*Conn, error) {
+       cn, err := p.newConn(pooled)
        if err != nil {
                return nil, err
        }
 
        p.connsMu.Lock()
        p.conns = append(p.conns, cn)
+       if pooled {
+               if p.poolSize < p.opt.PoolSize {
+                       p.poolSize++
+               } else {
+                       cn.pooled = false
+               }
+       }
        p.connsMu.Unlock()
        return cn, nil
 }
 
-func (p *ConnPool) newConn() (*Conn, error) {
+func (p *ConnPool) newConn(pooled bool) (*Conn, error) {
        if p.closed() {
                return nil, ErrClosed
        }
@@ -127,7 +166,9 @@ func (p *ConnPool) newConn() (*Conn, error) {
                return nil, err
        }
 
-       return NewConn(netConn), nil
+       cn := NewConn(netConn)
+       cn.pooled = pooled
+       return cn, nil
 }
 
 func (p *ConnPool) tryDial() {
@@ -174,16 +215,16 @@ func (p *ConnPool) Get() (*Conn, error) {
        }
 
        for {
-               p.idleConnsMu.Lock()
+               p.connsMu.Lock()
                cn := p.popIdle()
-               p.idleConnsMu.Unlock()
+               p.connsMu.Unlock()
 
                if cn == nil {
                        break
                }
 
-               if cn.IsStale(p.opt.IdleTimeout) {
-                       p.CloseConn(cn)
+               if p.isStaleConn(cn) {
+                       _ = p.CloseConn(cn)
                        continue
                }
 
@@ -193,7 +234,7 @@ func (p *ConnPool) Get() (*Conn, error) {
 
        atomic.AddUint32(&p.stats.Misses, 1)
 
-       newcn, err := p.NewConn()
+       newcn, err := p._NewConn(true)
        if err != nil {
                p.freeTurn()
                return nil, err
@@ -241,21 +282,21 @@ func (p *ConnPool) popIdle() *Conn {
        idx := len(p.idleConns) - 1
        cn := p.idleConns[idx]
        p.idleConns = p.idleConns[:idx]
-
+       p.idleConnsLen--
+       p.checkMinIdleConns()
        return cn
 }
 
 func (p *ConnPool) Put(cn *Conn) {
-       buf := cn.Rd.PeekBuffered()
-       if buf != nil {
-               internal.Logf("connection has unread data: %.100q", buf)
+       if !cn.pooled {
                p.Remove(cn)
                return
        }
 
-       p.idleConnsMu.Lock()
+       p.connsMu.Lock()
        p.idleConns = append(p.idleConns, cn)
-       p.idleConnsMu.Unlock()
+       p.idleConnsLen++
+       p.connsMu.Unlock()
        p.freeTurn()
 }
 
@@ -275,6 +316,10 @@ func (p *ConnPool) removeConn(cn *Conn) {
        for i, c := range p.conns {
                if c == cn {
                        p.conns = append(p.conns[:i], p.conns[i+1:]...)
+                       if cn.pooled {
+                               p.poolSize--
+                               p.checkMinIdleConns()
+                       }
                        break
                }
        }
@@ -291,17 +336,17 @@ func (p *ConnPool) closeConn(cn *Conn) error {
 // Len returns total number of connections.
 func (p *ConnPool) Len() int {
        p.connsMu.Lock()
-       l := len(p.conns)
+       n := len(p.conns)
        p.connsMu.Unlock()
-       return l
+       return n
 }
 
-// FreeLen returns number of idle connections.
+// IdleLen returns number of idle connections.
 func (p *ConnPool) IdleLen() int {
-       p.idleConnsMu.RLock()
-       l := len(p.idleConns)
-       p.idleConnsMu.RUnlock()
-       return l
+       p.connsMu.Lock()
+       n := p.idleConnsLen
+       p.connsMu.Unlock()
+       return n
 }
 
 func (p *ConnPool) Stats() *Stats {
@@ -312,7 +357,6 @@ func (p *ConnPool) Stats() *Stats {
                Timeouts: atomic.LoadUint32(&p.stats.Timeouts),
 
                TotalConns: uint32(p.Len()),
-               FreeConns:  uint32(idleLen),
                IdleConns:  uint32(idleLen),
                StaleConns: atomic.LoadUint32(&p.stats.StaleConns),
        }
@@ -349,11 +393,10 @@ func (p *ConnPool) Close() error {
                }
        }
        p.conns = nil
-       p.connsMu.Unlock()
-
-       p.idleConnsMu.Lock()
+       p.poolSize = 0
        p.idleConns = nil
-       p.idleConnsMu.Unlock()
+       p.idleConnsLen = 0
+       p.connsMu.Unlock()
 
        return firstErr
 }
@@ -364,11 +407,12 @@ func (p *ConnPool) reapStaleConn() *Conn {
        }
 
        cn := p.idleConns[0]
-       if !cn.IsStale(p.opt.IdleTimeout) {
+       if !p.isStaleConn(cn) {
                return nil
        }
 
        p.idleConns = append(p.idleConns[:0], p.idleConns[1:]...)
+       p.idleConnsLen--
 
        return cn
 }
@@ -378,9 +422,9 @@ func (p *ConnPool) ReapStaleConns() (int, error) {
        for {
                p.getTurn()
 
-               p.idleConnsMu.Lock()
+               p.connsMu.Lock()
                cn := p.reapStaleConn()
-               p.idleConnsMu.Unlock()
+               p.connsMu.Unlock()
 
                if cn != nil {
                        p.removeConn(cn)
@@ -414,3 +458,19 @@ func (p *ConnPool) reaper(frequency time.Duration) {
                atomic.AddUint32(&p.stats.StaleConns, uint32(n))
        }
 }
+
+func (p *ConnPool) isStaleConn(cn *Conn) bool {
+       if p.opt.IdleTimeout == 0 && p.opt.MaxConnAge == 0 {
+               return false
+       }
+
+       now := time.Now()
+       if p.opt.IdleTimeout > 0 && now.Sub(cn.UsedAt()) >= p.opt.IdleTimeout {
+               return true
+       }
+       if p.opt.MaxConnAge > 0 && now.Sub(cn.InitedAt) >= p.opt.MaxConnAge {
+               return true
+       }
+
+       return false
+}