9cecee8ad4c87a3f52ba3739328c5f80bfda6530
[barometer.git] / src / dma / vendor / github.com / go-redis / redis / internal / pool / pool.go
1 package pool
2
3 import (
4         "errors"
5         "net"
6         "sync"
7         "sync/atomic"
8         "time"
9
10         "github.com/go-redis/redis/internal"
11 )
12
13 var ErrClosed = errors.New("redis: client is closed")
14 var ErrPoolTimeout = errors.New("redis: connection pool timeout")
15
16 var timers = sync.Pool{
17         New: func() interface{} {
18                 t := time.NewTimer(time.Hour)
19                 t.Stop()
20                 return t
21         },
22 }
23
24 // Stats contains pool state information and accumulated stats.
25 type Stats struct {
26         Hits     uint32 // number of times free connection was found in the pool
27         Misses   uint32 // number of times free connection was NOT found in the pool
28         Timeouts uint32 // number of times a wait timeout occurred
29
30         TotalConns uint32 // number of total connections in the pool
31         IdleConns  uint32 // number of idle connections in the pool
32         StaleConns uint32 // number of stale connections removed from the pool
33 }
34
35 type Pooler interface {
36         NewConn() (*Conn, error)
37         CloseConn(*Conn) error
38
39         Get() (*Conn, error)
40         Put(*Conn)
41         Remove(*Conn)
42
43         Len() int
44         IdleLen() int
45         Stats() *Stats
46
47         Close() error
48 }
49
50 type Options struct {
51         Dialer  func() (net.Conn, error)
52         OnClose func(*Conn) error
53
54         PoolSize           int
55         MinIdleConns       int
56         MaxConnAge         time.Duration
57         PoolTimeout        time.Duration
58         IdleTimeout        time.Duration
59         IdleCheckFrequency time.Duration
60 }
61
62 type ConnPool struct {
63         opt *Options
64
65         dialErrorsNum uint32 // atomic
66
67         lastDialErrorMu sync.RWMutex
68         lastDialError   error
69
70         queue chan struct{}
71
72         connsMu      sync.Mutex
73         conns        []*Conn
74         idleConns    []*Conn
75         poolSize     int
76         idleConnsLen int
77
78         stats Stats
79
80         _closed uint32 // atomic
81 }
82
83 var _ Pooler = (*ConnPool)(nil)
84
85 func NewConnPool(opt *Options) *ConnPool {
86         p := &ConnPool{
87                 opt: opt,
88
89                 queue:     make(chan struct{}, opt.PoolSize),
90                 conns:     make([]*Conn, 0, opt.PoolSize),
91                 idleConns: make([]*Conn, 0, opt.PoolSize),
92         }
93
94         for i := 0; i < opt.MinIdleConns; i++ {
95                 p.checkMinIdleConns()
96         }
97
98         if opt.IdleTimeout > 0 && opt.IdleCheckFrequency > 0 {
99                 go p.reaper(opt.IdleCheckFrequency)
100         }
101
102         return p
103 }
104
105 func (p *ConnPool) checkMinIdleConns() {
106         if p.opt.MinIdleConns == 0 {
107                 return
108         }
109         if p.poolSize < p.opt.PoolSize && p.idleConnsLen < p.opt.MinIdleConns {
110                 p.poolSize++
111                 p.idleConnsLen++
112                 go p.addIdleConn()
113         }
114 }
115
116 func (p *ConnPool) addIdleConn() {
117         cn, err := p.newConn(true)
118         if err != nil {
119                 return
120         }
121
122         p.connsMu.Lock()
123         p.conns = append(p.conns, cn)
124         p.idleConns = append(p.idleConns, cn)
125         p.connsMu.Unlock()
126 }
127
128 func (p *ConnPool) NewConn() (*Conn, error) {
129         return p._NewConn(false)
130 }
131
132 func (p *ConnPool) _NewConn(pooled bool) (*Conn, error) {
133         cn, err := p.newConn(pooled)
134         if err != nil {
135                 return nil, err
136         }
137
138         p.connsMu.Lock()
139         p.conns = append(p.conns, cn)
140         if pooled {
141                 if p.poolSize < p.opt.PoolSize {
142                         p.poolSize++
143                 } else {
144                         cn.pooled = false
145                 }
146         }
147         p.connsMu.Unlock()
148         return cn, nil
149 }
150
151 func (p *ConnPool) newConn(pooled bool) (*Conn, error) {
152         if p.closed() {
153                 return nil, ErrClosed
154         }
155
156         if atomic.LoadUint32(&p.dialErrorsNum) >= uint32(p.opt.PoolSize) {
157                 return nil, p.getLastDialError()
158         }
159
160         netConn, err := p.opt.Dialer()
161         if err != nil {
162                 p.setLastDialError(err)
163                 if atomic.AddUint32(&p.dialErrorsNum, 1) == uint32(p.opt.PoolSize) {
164                         go p.tryDial()
165                 }
166                 return nil, err
167         }
168
169         cn := NewConn(netConn)
170         cn.pooled = pooled
171         return cn, nil
172 }
173
174 func (p *ConnPool) tryDial() {
175         for {
176                 if p.closed() {
177                         return
178                 }
179
180                 conn, err := p.opt.Dialer()
181                 if err != nil {
182                         p.setLastDialError(err)
183                         time.Sleep(time.Second)
184                         continue
185                 }
186
187                 atomic.StoreUint32(&p.dialErrorsNum, 0)
188                 _ = conn.Close()
189                 return
190         }
191 }
192
193 func (p *ConnPool) setLastDialError(err error) {
194         p.lastDialErrorMu.Lock()
195         p.lastDialError = err
196         p.lastDialErrorMu.Unlock()
197 }
198
199 func (p *ConnPool) getLastDialError() error {
200         p.lastDialErrorMu.RLock()
201         err := p.lastDialError
202         p.lastDialErrorMu.RUnlock()
203         return err
204 }
205
206 // Get returns existed connection from the pool or creates a new one.
207 func (p *ConnPool) Get() (*Conn, error) {
208         if p.closed() {
209                 return nil, ErrClosed
210         }
211
212         err := p.waitTurn()
213         if err != nil {
214                 return nil, err
215         }
216
217         for {
218                 p.connsMu.Lock()
219                 cn := p.popIdle()
220                 p.connsMu.Unlock()
221
222                 if cn == nil {
223                         break
224                 }
225
226                 if p.isStaleConn(cn) {
227                         _ = p.CloseConn(cn)
228                         continue
229                 }
230
231                 atomic.AddUint32(&p.stats.Hits, 1)
232                 return cn, nil
233         }
234
235         atomic.AddUint32(&p.stats.Misses, 1)
236
237         newcn, err := p._NewConn(true)
238         if err != nil {
239                 p.freeTurn()
240                 return nil, err
241         }
242
243         return newcn, nil
244 }
245
246 func (p *ConnPool) getTurn() {
247         p.queue <- struct{}{}
248 }
249
250 func (p *ConnPool) waitTurn() error {
251         select {
252         case p.queue <- struct{}{}:
253                 return nil
254         default:
255                 timer := timers.Get().(*time.Timer)
256                 timer.Reset(p.opt.PoolTimeout)
257
258                 select {
259                 case p.queue <- struct{}{}:
260                         if !timer.Stop() {
261                                 <-timer.C
262                         }
263                         timers.Put(timer)
264                         return nil
265                 case <-timer.C:
266                         timers.Put(timer)
267                         atomic.AddUint32(&p.stats.Timeouts, 1)
268                         return ErrPoolTimeout
269                 }
270         }
271 }
272
273 func (p *ConnPool) freeTurn() {
274         <-p.queue
275 }
276
277 func (p *ConnPool) popIdle() *Conn {
278         if len(p.idleConns) == 0 {
279                 return nil
280         }
281
282         idx := len(p.idleConns) - 1
283         cn := p.idleConns[idx]
284         p.idleConns = p.idleConns[:idx]
285         p.idleConnsLen--
286         p.checkMinIdleConns()
287         return cn
288 }
289
290 func (p *ConnPool) Put(cn *Conn) {
291         if !cn.pooled {
292                 p.Remove(cn)
293                 return
294         }
295
296         p.connsMu.Lock()
297         p.idleConns = append(p.idleConns, cn)
298         p.idleConnsLen++
299         p.connsMu.Unlock()
300         p.freeTurn()
301 }
302
303 func (p *ConnPool) Remove(cn *Conn) {
304         p.removeConn(cn)
305         p.freeTurn()
306         _ = p.closeConn(cn)
307 }
308
309 func (p *ConnPool) CloseConn(cn *Conn) error {
310         p.removeConn(cn)
311         return p.closeConn(cn)
312 }
313
314 func (p *ConnPool) removeConn(cn *Conn) {
315         p.connsMu.Lock()
316         for i, c := range p.conns {
317                 if c == cn {
318                         p.conns = append(p.conns[:i], p.conns[i+1:]...)
319                         if cn.pooled {
320                                 p.poolSize--
321                                 p.checkMinIdleConns()
322                         }
323                         break
324                 }
325         }
326         p.connsMu.Unlock()
327 }
328
329 func (p *ConnPool) closeConn(cn *Conn) error {
330         if p.opt.OnClose != nil {
331                 _ = p.opt.OnClose(cn)
332         }
333         return cn.Close()
334 }
335
336 // Len returns total number of connections.
337 func (p *ConnPool) Len() int {
338         p.connsMu.Lock()
339         n := len(p.conns)
340         p.connsMu.Unlock()
341         return n
342 }
343
344 // IdleLen returns number of idle connections.
345 func (p *ConnPool) IdleLen() int {
346         p.connsMu.Lock()
347         n := p.idleConnsLen
348         p.connsMu.Unlock()
349         return n
350 }
351
352 func (p *ConnPool) Stats() *Stats {
353         idleLen := p.IdleLen()
354         return &Stats{
355                 Hits:     atomic.LoadUint32(&p.stats.Hits),
356                 Misses:   atomic.LoadUint32(&p.stats.Misses),
357                 Timeouts: atomic.LoadUint32(&p.stats.Timeouts),
358
359                 TotalConns: uint32(p.Len()),
360                 IdleConns:  uint32(idleLen),
361                 StaleConns: atomic.LoadUint32(&p.stats.StaleConns),
362         }
363 }
364
365 func (p *ConnPool) closed() bool {
366         return atomic.LoadUint32(&p._closed) == 1
367 }
368
369 func (p *ConnPool) Filter(fn func(*Conn) bool) error {
370         var firstErr error
371         p.connsMu.Lock()
372         for _, cn := range p.conns {
373                 if fn(cn) {
374                         if err := p.closeConn(cn); err != nil && firstErr == nil {
375                                 firstErr = err
376                         }
377                 }
378         }
379         p.connsMu.Unlock()
380         return firstErr
381 }
382
383 func (p *ConnPool) Close() error {
384         if !atomic.CompareAndSwapUint32(&p._closed, 0, 1) {
385                 return ErrClosed
386         }
387
388         var firstErr error
389         p.connsMu.Lock()
390         for _, cn := range p.conns {
391                 if err := p.closeConn(cn); err != nil && firstErr == nil {
392                         firstErr = err
393                 }
394         }
395         p.conns = nil
396         p.poolSize = 0
397         p.idleConns = nil
398         p.idleConnsLen = 0
399         p.connsMu.Unlock()
400
401         return firstErr
402 }
403
404 func (p *ConnPool) reapStaleConn() *Conn {
405         if len(p.idleConns) == 0 {
406                 return nil
407         }
408
409         cn := p.idleConns[0]
410         if !p.isStaleConn(cn) {
411                 return nil
412         }
413
414         p.idleConns = append(p.idleConns[:0], p.idleConns[1:]...)
415         p.idleConnsLen--
416
417         return cn
418 }
419
420 func (p *ConnPool) ReapStaleConns() (int, error) {
421         var n int
422         for {
423                 p.getTurn()
424
425                 p.connsMu.Lock()
426                 cn := p.reapStaleConn()
427                 p.connsMu.Unlock()
428
429                 if cn != nil {
430                         p.removeConn(cn)
431                 }
432
433                 p.freeTurn()
434
435                 if cn != nil {
436                         p.closeConn(cn)
437                         n++
438                 } else {
439                         break
440                 }
441         }
442         return n, nil
443 }
444
445 func (p *ConnPool) reaper(frequency time.Duration) {
446         ticker := time.NewTicker(frequency)
447         defer ticker.Stop()
448
449         for range ticker.C {
450                 if p.closed() {
451                         break
452                 }
453                 n, err := p.ReapStaleConns()
454                 if err != nil {
455                         internal.Logf("ReapStaleConns failed: %s", err)
456                         continue
457                 }
458                 atomic.AddUint32(&p.stats.StaleConns, uint32(n))
459         }
460 }
461
462 func (p *ConnPool) isStaleConn(cn *Conn) bool {
463         if p.opt.IdleTimeout == 0 && p.opt.MaxConnAge == 0 {
464                 return false
465         }
466
467         now := time.Now()
468         if p.opt.IdleTimeout > 0 && now.Sub(cn.UsedAt()) >= p.opt.IdleTimeout {
469                 return true
470         }
471         if p.opt.MaxConnAge > 0 && now.Sub(cn.InitedAt) >= p.opt.MaxConnAge {
472                 return true
473         }
474
475         return false
476 }