Fix some bugs when testing opensds ansible
[stor4nfv.git] / src / ceph / src / common / WorkQueue.h
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- 
2 // vim: ts=8 sw=2 smarttab
3 /*
4  * Ceph - scalable distributed file system
5  *
6  * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
7  *
8  * This is free software; you can redistribute it and/or
9  * modify it under the terms of the GNU Lesser General Public
10  * License version 2.1, as published by the Free Software 
11  * Foundation.  See file COPYING.
12  * 
13  */
14
15 #ifndef CEPH_WORKQUEUE_H
16 #define CEPH_WORKQUEUE_H
17
18 #include "Cond.h"
19 #include "include/unordered_map.h"
20 #include "common/HeartbeatMap.h"
21
22 #include <atomic>
23
24 class CephContext;
25
26 /// Pool of threads that share work submitted to multiple work queues.
27 class ThreadPool : public md_config_obs_t {
28   CephContext *cct;
29   string name;
30   string thread_name;
31   string lockname;
32   Mutex _lock;
33   Cond _cond;
34   bool _stop;
35   int _pause;
36   int _draining;
37   Cond _wait_cond;
38   int ioprio_class, ioprio_priority;
39
40 public:
41   class TPHandle {
42     friend class ThreadPool;
43     CephContext *cct;
44     heartbeat_handle_d *hb;
45     time_t grace;
46     time_t suicide_grace;
47   public:
48     TPHandle(
49       CephContext *cct,
50       heartbeat_handle_d *hb,
51       time_t grace,
52       time_t suicide_grace)
53       : cct(cct), hb(hb), grace(grace), suicide_grace(suicide_grace) {}
54     void reset_tp_timeout();
55     void suspend_tp_timeout();
56   };
57 private:
58
59   /// Basic interface to a work queue used by the worker threads.
60   struct WorkQueue_ {
61     string name;
62     time_t timeout_interval, suicide_interval;
63     WorkQueue_(string n, time_t ti, time_t sti)
64       : name(std::move(n)), timeout_interval(ti), suicide_interval(sti)
65     { }
66     virtual ~WorkQueue_() {}
67     /// Remove all work items from the queue.
68     virtual void _clear() = 0;
69     /// Check whether there is anything to do.
70     virtual bool _empty() = 0;
71     /// Get the next work item to process.
72     virtual void *_void_dequeue() = 0;
73     /** @brief Process the work item.
74      * This function will be called several times in parallel
75      * and must therefore be thread-safe. */
76     virtual void _void_process(void *item, TPHandle &handle) = 0;
77     /** @brief Synchronously finish processing a work item.
78      * This function is called after _void_process with the global thread pool lock held,
79      * so at most one copy will execute simultaneously for a given thread pool.
80      * It can be used for non-thread-safe finalization. */
81     virtual void _void_process_finish(void *) = 0;
82   };
83
84   // track thread pool size changes
85   unsigned _num_threads;
86   string _thread_num_option;
87   const char **_conf_keys;
88
89   const char **get_tracked_conf_keys() const override {
90     return _conf_keys;
91   }
92   void handle_conf_change(const struct md_config_t *conf,
93                           const std::set <std::string> &changed) override;
94
95 public:
96   /** @brief Work queue that processes several submitted items at once.
97    * The queue will automatically add itself to the thread pool on construction
98    * and remove itself on destruction. */
99   template<class T>
100   class BatchWorkQueue : public WorkQueue_ {
101     ThreadPool *pool;
102
103     virtual bool _enqueue(T *) = 0;
104     virtual void _dequeue(T *) = 0;
105     virtual void _dequeue(list<T*> *) = 0;
106     virtual void _process_finish(const list<T*> &) {}
107
108     // virtual methods from WorkQueue_ below
109     void *_void_dequeue() override {
110       list<T*> *out(new list<T*>);
111       _dequeue(out);
112       if (!out->empty()) {
113         return (void *)out;
114       } else {
115         delete out;
116         return 0;
117       }
118     }
119     void _void_process(void *p, TPHandle &handle) override {
120       _process(*((list<T*>*)p), handle);
121     }
122     void _void_process_finish(void *p) override {
123       _process_finish(*(list<T*>*)p);
124       delete (list<T*> *)p;
125     }
126
127   protected:
128     virtual void _process(const list<T*> &items, TPHandle &handle) = 0;
129
130   public:
131     BatchWorkQueue(string n, time_t ti, time_t sti, ThreadPool* p)
132       : WorkQueue_(std::move(n), ti, sti), pool(p) {
133       pool->add_work_queue(this);
134     }
135     ~BatchWorkQueue() override {
136       pool->remove_work_queue(this);
137     }
138
139     bool queue(T *item) {
140       pool->_lock.Lock();
141       bool r = _enqueue(item);
142       pool->_cond.SignalOne();
143       pool->_lock.Unlock();
144       return r;
145     }
146     void dequeue(T *item) {
147       pool->_lock.Lock();
148       _dequeue(item);
149       pool->_lock.Unlock();
150     }
151     void clear() {
152       pool->_lock.Lock();
153       _clear();
154       pool->_lock.Unlock();
155     }
156
157     void lock() {
158       pool->lock();
159     }
160     void unlock() {
161       pool->unlock();
162     }
163     void wake() {
164       pool->wake();
165     }
166     void _wake() {
167       pool->_wake();
168     }
169     void drain() {
170       pool->drain(this);
171     }
172
173   };
174
175   /** @brief Templated by-value work queue.
176    * Skeleton implementation of a queue that processes items submitted by value.
177    * This is useful if the items are single primitive values or very small objects
178    * (a few bytes). The queue will automatically add itself to the thread pool on
179    * construction and remove itself on destruction. */
180   template<typename T, typename U = T>
181   class WorkQueueVal : public WorkQueue_ {
182     Mutex _lock;
183     ThreadPool *pool;
184     list<U> to_process;
185     list<U> to_finish;
186     virtual void _enqueue(T) = 0;
187     virtual void _enqueue_front(T) = 0;
188     bool _empty() override = 0;
189     virtual U _dequeue() = 0;
190     virtual void _process_finish(U) {}
191
192     void *_void_dequeue() override {
193       {
194         Mutex::Locker l(_lock);
195         if (_empty())
196           return 0;
197         U u = _dequeue();
198         to_process.push_back(u);
199       }
200       return ((void*)1); // Not used
201     }
202     void _void_process(void *, TPHandle &handle) override {
203       _lock.Lock();
204       assert(!to_process.empty());
205       U u = to_process.front();
206       to_process.pop_front();
207       _lock.Unlock();
208
209       _process(u, handle);
210
211       _lock.Lock();
212       to_finish.push_back(u);
213       _lock.Unlock();
214     }
215
216     void _void_process_finish(void *) override {
217       _lock.Lock();
218       assert(!to_finish.empty());
219       U u = to_finish.front();
220       to_finish.pop_front();
221       _lock.Unlock();
222
223       _process_finish(u);
224     }
225
226     void _clear() override {}
227
228   public:
229     WorkQueueVal(string n, time_t ti, time_t sti, ThreadPool *p)
230       : WorkQueue_(std::move(n), ti, sti), _lock("WorkQueueVal::lock"), pool(p) {
231       pool->add_work_queue(this);
232     }
233     ~WorkQueueVal() override {
234       pool->remove_work_queue(this);
235     }
236     void queue(T item) {
237       Mutex::Locker l(pool->_lock);
238       _enqueue(item);
239       pool->_cond.SignalOne();
240     }
241     void queue_front(T item) {
242       Mutex::Locker l(pool->_lock);
243       _enqueue_front(item);
244       pool->_cond.SignalOne();
245     }
246     void drain() {
247       pool->drain(this);
248     }
249   protected:
250     void lock() {
251       pool->lock();
252     }
253     void unlock() {
254       pool->unlock();
255     }
256     virtual void _process(U u, TPHandle &) = 0;
257   };
258
259   /** @brief Template by-pointer work queue.
260    * Skeleton implementation of a queue that processes items of a given type submitted as pointers.
261    * This is useful when the work item are large or include dynamically allocated memory. The queue
262    * will automatically add itself to the thread pool on construction and remove itself on
263    * destruction. */
264   template<class T>
265   class WorkQueue : public WorkQueue_ {
266     ThreadPool *pool;
267     
268     /// Add a work item to the queue.
269     virtual bool _enqueue(T *) = 0;
270     /// Dequeue a previously submitted work item.
271     virtual void _dequeue(T *) = 0;
272     /// Dequeue a work item and return the original submitted pointer.
273     virtual T *_dequeue() = 0;
274     virtual void _process_finish(T *) {}
275
276     // implementation of virtual methods from WorkQueue_
277     void *_void_dequeue() override {
278       return (void *)_dequeue();
279     }
280     void _void_process(void *p, TPHandle &handle) override {
281       _process(static_cast<T *>(p), handle);
282     }
283     void _void_process_finish(void *p) override {
284       _process_finish(static_cast<T *>(p));
285     }
286
287   protected:
288     /// Process a work item. Called from the worker threads.
289     virtual void _process(T *t, TPHandle &) = 0;
290
291   public:
292     WorkQueue(string n, time_t ti, time_t sti, ThreadPool* p)
293       : WorkQueue_(std::move(n), ti, sti), pool(p) {
294       pool->add_work_queue(this);
295     }
296     ~WorkQueue() override {
297       pool->remove_work_queue(this);
298     }
299     
300     bool queue(T *item) {
301       pool->_lock.Lock();
302       bool r = _enqueue(item);
303       pool->_cond.SignalOne();
304       pool->_lock.Unlock();
305       return r;
306     }
307     void dequeue(T *item) {
308       pool->_lock.Lock();
309       _dequeue(item);
310       pool->_lock.Unlock();
311     }
312     void clear() {
313       pool->_lock.Lock();
314       _clear();
315       pool->_lock.Unlock();
316     }
317
318     Mutex &get_lock() {
319       return pool->_lock;
320     }
321
322     void lock() {
323       pool->lock();
324     }
325     void unlock() {
326       pool->unlock();
327     }
328     /// wake up the thread pool (without lock held)
329     void wake() {
330       pool->wake();
331     }
332     /// wake up the thread pool (with lock already held)
333     void _wake() {
334       pool->_wake();
335     }
336     void _wait() {
337       pool->_wait();
338     }
339     void drain() {
340       pool->drain(this);
341     }
342
343   };
344
345   template<typename T>
346   class PointerWQ : public WorkQueue_ {
347   public:
348     ~PointerWQ() override {
349       m_pool->remove_work_queue(this);
350       assert(m_processing == 0);
351     }
352     void drain() {
353       {
354         // if this queue is empty and not processing, don't wait for other
355         // queues to finish processing
356         Mutex::Locker l(m_pool->_lock);
357         if (m_processing == 0 && m_items.empty()) {
358           return;
359         }
360       }
361       m_pool->drain(this);
362     }
363     void queue(T *item) {
364       Mutex::Locker l(m_pool->_lock);
365       m_items.push_back(item);
366       m_pool->_cond.SignalOne();
367     }
368     bool empty() {
369       Mutex::Locker l(m_pool->_lock);
370       return _empty();
371     }
372   protected:
373     PointerWQ(string n, time_t ti, time_t sti, ThreadPool* p)
374       : WorkQueue_(std::move(n), ti, sti), m_pool(p), m_processing(0) {
375     }
376     void register_work_queue() {
377       m_pool->add_work_queue(this);
378     }
379     void _clear() override {
380       assert(m_pool->_lock.is_locked());
381       m_items.clear();
382     }
383     bool _empty() override {
384       assert(m_pool->_lock.is_locked());
385       return m_items.empty();
386     }
387     void *_void_dequeue() override {
388       assert(m_pool->_lock.is_locked());
389       if (m_items.empty()) {
390         return NULL;
391       }
392
393       ++m_processing;
394       T *item = m_items.front();
395       m_items.pop_front();
396       return item;
397     }
398     void _void_process(void *item, ThreadPool::TPHandle &handle) override {
399       process(reinterpret_cast<T *>(item));
400     }
401     void _void_process_finish(void *item) override {
402       assert(m_pool->_lock.is_locked());
403       assert(m_processing > 0);
404       --m_processing;
405     }
406
407     virtual void process(T *item) = 0;
408     void process_finish() {
409       Mutex::Locker locker(m_pool->_lock);
410       _void_process_finish(nullptr);
411     }
412
413     T *front() {
414       assert(m_pool->_lock.is_locked());
415       if (m_items.empty()) {
416         return NULL;
417       }
418       return m_items.front();
419     }
420     void requeue(T *item) {
421       Mutex::Locker pool_locker(m_pool->_lock);
422       _void_process_finish(nullptr);
423       m_items.push_front(item);
424     }
425     void signal() {
426       Mutex::Locker pool_locker(m_pool->_lock);
427       m_pool->_cond.SignalOne();
428     }
429     Mutex &get_pool_lock() {
430       return m_pool->_lock;
431     }
432   private:
433     ThreadPool *m_pool;
434     std::list<T *> m_items;
435     uint32_t m_processing;
436   };
437 private:
438   vector<WorkQueue_*> work_queues;
439   int next_work_queue = 0;
440  
441
442   // threads
443   struct WorkThread : public Thread {
444     ThreadPool *pool;
445     // cppcheck-suppress noExplicitConstructor
446     WorkThread(ThreadPool *p) : pool(p) {}
447     void *entry() override {
448       pool->worker(this);
449       return 0;
450     }
451   };
452   
453   set<WorkThread*> _threads;
454   list<WorkThread*> _old_threads;  ///< need to be joined
455   int processing;
456
457   void start_threads();
458   void join_old_threads();
459   void worker(WorkThread *wt);
460
461 public:
462   ThreadPool(CephContext *cct_, string nm, string tn, int n, const char *option = NULL);
463   ~ThreadPool() override;
464
465   /// return number of threads currently running
466   int get_num_threads() {
467     Mutex::Locker l(_lock);
468     return _num_threads;
469   }
470   
471   /// assign a work queue to this thread pool
472   void add_work_queue(WorkQueue_* wq) {
473     Mutex::Locker l(_lock);
474     work_queues.push_back(wq);
475   }
476   /// remove a work queue from this thread pool
477   void remove_work_queue(WorkQueue_* wq) {
478     Mutex::Locker l(_lock);
479     unsigned i = 0;
480     while (work_queues[i] != wq)
481       i++;
482     for (i++; i < work_queues.size(); i++) 
483       work_queues[i-1] = work_queues[i];
484     assert(i == work_queues.size());
485     work_queues.resize(i-1);
486   }
487
488   /// take thread pool lock
489   void lock() {
490     _lock.Lock();
491   }
492   /// release thread pool lock
493   void unlock() {
494     _lock.Unlock();
495   }
496
497   /// wait for a kick on this thread pool
498   void wait(Cond &c) {
499     c.Wait(_lock);
500   }
501
502   /// wake up a waiter (with lock already held)
503   void _wake() {
504     _cond.Signal();
505   }
506   /// wake up a waiter (without lock held)
507   void wake() {
508     Mutex::Locker l(_lock);
509     _cond.Signal();
510   }
511   void _wait() {
512     _cond.Wait(_lock);
513   }
514
515   /// start thread pool thread
516   void start();
517   /// stop thread pool thread
518   void stop(bool clear_after=true);
519   /// pause thread pool (if it not already paused)
520   void pause();
521   /// pause initiation of new work
522   void pause_new();
523   /// resume work in thread pool.  must match each pause() call 1:1 to resume.
524   void unpause();
525   /** @brief Wait until work completes.
526    * If the parameter is NULL, blocks until all threads are idle.
527    * If it is not NULL, blocks until the given work queue does not have
528    * any items left to process. */
529   void drain(WorkQueue_* wq = 0);
530
531   /// set io priority
532   void set_ioprio(int cls, int priority);
533 };
534
535 class GenContextWQ :
536   public ThreadPool::WorkQueueVal<GenContext<ThreadPool::TPHandle&>*> {
537   list<GenContext<ThreadPool::TPHandle&>*> _queue;
538 public:
539   GenContextWQ(const string &name, time_t ti, ThreadPool *tp)
540     : ThreadPool::WorkQueueVal<
541       GenContext<ThreadPool::TPHandle&>*>(name, ti, ti*10, tp) {}
542   
543   void _enqueue(GenContext<ThreadPool::TPHandle&> *c) override {
544     _queue.push_back(c);
545   }
546   void _enqueue_front(GenContext<ThreadPool::TPHandle&> *c) override {
547     _queue.push_front(c);
548   }
549   bool _empty() override {
550     return _queue.empty();
551   }
552   GenContext<ThreadPool::TPHandle&> *_dequeue() override {
553     assert(!_queue.empty());
554     GenContext<ThreadPool::TPHandle&> *c = _queue.front();
555     _queue.pop_front();
556     return c;
557   }
558   void _process(GenContext<ThreadPool::TPHandle&> *c,
559                 ThreadPool::TPHandle &tp) override {
560     c->complete(tp);
561   }
562 };
563
564 class C_QueueInWQ : public Context {
565   GenContextWQ *wq;
566   GenContext<ThreadPool::TPHandle&> *c;
567 public:
568   C_QueueInWQ(GenContextWQ *wq, GenContext<ThreadPool::TPHandle &> *c)
569     : wq(wq), c(c) {}
570   void finish(int) override {
571     wq->queue(c);
572   }
573 };
574
575 /// Work queue that asynchronously completes contexts (executes callbacks).
576 /// @see Finisher
577 class ContextWQ : public ThreadPool::PointerWQ<Context> {
578 public:
579   ContextWQ(const string &name, time_t ti, ThreadPool *tp)
580     : ThreadPool::PointerWQ<Context>(name, ti, 0, tp),
581       m_lock("ContextWQ::m_lock") {
582     this->register_work_queue();
583   }
584
585   void queue(Context *ctx, int result = 0) {
586     if (result != 0) {
587       Mutex::Locker locker(m_lock);
588       m_context_results[ctx] = result;
589     }
590     ThreadPool::PointerWQ<Context>::queue(ctx);
591   }
592 protected:
593   void _clear() override {
594     ThreadPool::PointerWQ<Context>::_clear();
595
596     Mutex::Locker locker(m_lock);
597     m_context_results.clear();
598   }
599
600   void process(Context *ctx) override {
601     int result = 0;
602     {
603       Mutex::Locker locker(m_lock);
604       ceph::unordered_map<Context *, int>::iterator it =
605         m_context_results.find(ctx);
606       if (it != m_context_results.end()) {
607         result = it->second;
608         m_context_results.erase(it);
609       }
610     }
611     ctx->complete(result);
612   }
613 private:
614   Mutex m_lock;
615   ceph::unordered_map<Context*, int> m_context_results;
616 };
617
618 class ShardedThreadPool {
619
620   CephContext *cct;
621   string name;
622   string thread_name;
623   string lockname;
624   Mutex shardedpool_lock;
625   Cond shardedpool_cond;
626   Cond wait_cond;
627   uint32_t num_threads;
628
629   std::atomic<bool> stop_threads = { false };
630   std::atomic<bool> pause_threads = { false };
631   std::atomic<bool> drain_threads = { false };
632
633   uint32_t num_paused;
634   uint32_t num_drained;
635
636 public:
637
638   class BaseShardedWQ {
639   
640   public:
641     time_t timeout_interval, suicide_interval;
642     BaseShardedWQ(time_t ti, time_t sti):timeout_interval(ti), suicide_interval(sti) {}
643     virtual ~BaseShardedWQ() {}
644
645     virtual void _process(uint32_t thread_index, heartbeat_handle_d *hb ) = 0;
646     virtual void return_waiting_threads() = 0;
647     virtual bool is_shard_empty(uint32_t thread_index) = 0;
648   };      
649
650   template <typename T>
651   class ShardedWQ: public BaseShardedWQ {
652   
653     ShardedThreadPool* sharded_pool;
654
655   protected:
656     virtual void _enqueue(T) = 0;
657     virtual void _enqueue_front(T) = 0;
658
659
660   public:
661     ShardedWQ(time_t ti, time_t sti, ShardedThreadPool* tp): BaseShardedWQ(ti, sti), 
662                                                                  sharded_pool(tp) {
663       tp->set_wq(this);
664     }
665     ~ShardedWQ() override {}
666
667     void queue(T item) {
668       _enqueue(item);
669     }
670     void queue_front(T item) {
671       _enqueue_front(item);
672     }
673     void drain() {
674       sharded_pool->drain();
675     }
676     
677   };
678
679 private:
680
681   BaseShardedWQ* wq;
682   // threads
683   struct WorkThreadSharded : public Thread {
684     ShardedThreadPool *pool;
685     uint32_t thread_index;
686     WorkThreadSharded(ShardedThreadPool *p, uint32_t pthread_index): pool(p),
687       thread_index(pthread_index) {}
688     void *entry() override {
689       pool->shardedthreadpool_worker(thread_index);
690       return 0;
691     }
692   };
693
694   vector<WorkThreadSharded*> threads_shardedpool;
695   void start_threads();
696   void shardedthreadpool_worker(uint32_t thread_index);
697   void set_wq(BaseShardedWQ* swq) {
698     wq = swq;
699   }
700
701
702
703 public:
704
705   ShardedThreadPool(CephContext *cct_, string nm, string tn, uint32_t pnum_threads);
706
707   ~ShardedThreadPool(){};
708
709   /// start thread pool thread
710   void start();
711   /// stop thread pool thread
712   void stop();
713   /// pause thread pool (if it not already paused)
714   void pause();
715   /// pause initiation of new work
716   void pause_new();
717   /// resume work in thread pool.  must match each pause() call 1:1 to resume.
718   void unpause();
719   /// wait for all work to complete
720   void drain();
721
722 };
723
724
725 #endif