1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
4 * Ceph - scalable distributed file system
6 * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
8 * This is free software; you can redistribute it and/or
9 * modify it under the terms of the GNU Lesser General Public
10 * License version 2.1, as published by the Free Software
11 * Foundation. See file COPYING.
15 #ifndef CEPH_WORKQUEUE_H
16 #define CEPH_WORKQUEUE_H
19 #include "include/unordered_map.h"
20 #include "common/HeartbeatMap.h"
26 /// Pool of threads that share work submitted to multiple work queues.
27 class ThreadPool : public md_config_obs_t {
38 int ioprio_class, ioprio_priority;
42 friend class ThreadPool;
44 heartbeat_handle_d *hb;
50 heartbeat_handle_d *hb,
53 : cct(cct), hb(hb), grace(grace), suicide_grace(suicide_grace) {}
54 void reset_tp_timeout();
55 void suspend_tp_timeout();
59 /// Basic interface to a work queue used by the worker threads.
62 time_t timeout_interval, suicide_interval;
63 WorkQueue_(string n, time_t ti, time_t sti)
64 : name(std::move(n)), timeout_interval(ti), suicide_interval(sti)
66 virtual ~WorkQueue_() {}
67 /// Remove all work items from the queue.
68 virtual void _clear() = 0;
69 /// Check whether there is anything to do.
70 virtual bool _empty() = 0;
71 /// Get the next work item to process.
72 virtual void *_void_dequeue() = 0;
73 /** @brief Process the work item.
74 * This function will be called several times in parallel
75 * and must therefore be thread-safe. */
76 virtual void _void_process(void *item, TPHandle &handle) = 0;
77 /** @brief Synchronously finish processing a work item.
78 * This function is called after _void_process with the global thread pool lock held,
79 * so at most one copy will execute simultaneously for a given thread pool.
80 * It can be used for non-thread-safe finalization. */
81 virtual void _void_process_finish(void *) = 0;
84 // track thread pool size changes
85 unsigned _num_threads;
86 string _thread_num_option;
87 const char **_conf_keys;
89 const char **get_tracked_conf_keys() const override {
92 void handle_conf_change(const struct md_config_t *conf,
93 const std::set <std::string> &changed) override;
96 /** @brief Work queue that processes several submitted items at once.
97 * The queue will automatically add itself to the thread pool on construction
98 * and remove itself on destruction. */
100 class BatchWorkQueue : public WorkQueue_ {
103 virtual bool _enqueue(T *) = 0;
104 virtual void _dequeue(T *) = 0;
105 virtual void _dequeue(list<T*> *) = 0;
106 virtual void _process_finish(const list<T*> &) {}
108 // virtual methods from WorkQueue_ below
109 void *_void_dequeue() override {
110 list<T*> *out(new list<T*>);
119 void _void_process(void *p, TPHandle &handle) override {
120 _process(*((list<T*>*)p), handle);
122 void _void_process_finish(void *p) override {
123 _process_finish(*(list<T*>*)p);
124 delete (list<T*> *)p;
128 virtual void _process(const list<T*> &items, TPHandle &handle) = 0;
131 BatchWorkQueue(string n, time_t ti, time_t sti, ThreadPool* p)
132 : WorkQueue_(std::move(n), ti, sti), pool(p) {
133 pool->add_work_queue(this);
135 ~BatchWorkQueue() override {
136 pool->remove_work_queue(this);
139 bool queue(T *item) {
141 bool r = _enqueue(item);
142 pool->_cond.SignalOne();
143 pool->_lock.Unlock();
146 void dequeue(T *item) {
149 pool->_lock.Unlock();
154 pool->_lock.Unlock();
175 /** @brief Templated by-value work queue.
176 * Skeleton implementation of a queue that processes items submitted by value.
177 * This is useful if the items are single primitive values or very small objects
178 * (a few bytes). The queue will automatically add itself to the thread pool on
179 * construction and remove itself on destruction. */
180 template<typename T, typename U = T>
181 class WorkQueueVal : public WorkQueue_ {
186 virtual void _enqueue(T) = 0;
187 virtual void _enqueue_front(T) = 0;
188 bool _empty() override = 0;
189 virtual U _dequeue() = 0;
190 virtual void _process_finish(U) {}
192 void *_void_dequeue() override {
194 Mutex::Locker l(_lock);
198 to_process.push_back(u);
200 return ((void*)1); // Not used
202 void _void_process(void *, TPHandle &handle) override {
204 assert(!to_process.empty());
205 U u = to_process.front();
206 to_process.pop_front();
212 to_finish.push_back(u);
216 void _void_process_finish(void *) override {
218 assert(!to_finish.empty());
219 U u = to_finish.front();
220 to_finish.pop_front();
226 void _clear() override {}
229 WorkQueueVal(string n, time_t ti, time_t sti, ThreadPool *p)
230 : WorkQueue_(std::move(n), ti, sti), _lock("WorkQueueVal::lock"), pool(p) {
231 pool->add_work_queue(this);
233 ~WorkQueueVal() override {
234 pool->remove_work_queue(this);
237 Mutex::Locker l(pool->_lock);
239 pool->_cond.SignalOne();
241 void queue_front(T item) {
242 Mutex::Locker l(pool->_lock);
243 _enqueue_front(item);
244 pool->_cond.SignalOne();
256 virtual void _process(U u, TPHandle &) = 0;
259 /** @brief Template by-pointer work queue.
260 * Skeleton implementation of a queue that processes items of a given type submitted as pointers.
261 * This is useful when the work item are large or include dynamically allocated memory. The queue
262 * will automatically add itself to the thread pool on construction and remove itself on
265 class WorkQueue : public WorkQueue_ {
268 /// Add a work item to the queue.
269 virtual bool _enqueue(T *) = 0;
270 /// Dequeue a previously submitted work item.
271 virtual void _dequeue(T *) = 0;
272 /// Dequeue a work item and return the original submitted pointer.
273 virtual T *_dequeue() = 0;
274 virtual void _process_finish(T *) {}
276 // implementation of virtual methods from WorkQueue_
277 void *_void_dequeue() override {
278 return (void *)_dequeue();
280 void _void_process(void *p, TPHandle &handle) override {
281 _process(static_cast<T *>(p), handle);
283 void _void_process_finish(void *p) override {
284 _process_finish(static_cast<T *>(p));
288 /// Process a work item. Called from the worker threads.
289 virtual void _process(T *t, TPHandle &) = 0;
292 WorkQueue(string n, time_t ti, time_t sti, ThreadPool* p)
293 : WorkQueue_(std::move(n), ti, sti), pool(p) {
294 pool->add_work_queue(this);
296 ~WorkQueue() override {
297 pool->remove_work_queue(this);
300 bool queue(T *item) {
302 bool r = _enqueue(item);
303 pool->_cond.SignalOne();
304 pool->_lock.Unlock();
307 void dequeue(T *item) {
310 pool->_lock.Unlock();
315 pool->_lock.Unlock();
328 /// wake up the thread pool (without lock held)
332 /// wake up the thread pool (with lock already held)
346 class PointerWQ : public WorkQueue_ {
348 ~PointerWQ() override {
349 m_pool->remove_work_queue(this);
350 assert(m_processing == 0);
354 // if this queue is empty and not processing, don't wait for other
355 // queues to finish processing
356 Mutex::Locker l(m_pool->_lock);
357 if (m_processing == 0 && m_items.empty()) {
363 void queue(T *item) {
364 Mutex::Locker l(m_pool->_lock);
365 m_items.push_back(item);
366 m_pool->_cond.SignalOne();
369 Mutex::Locker l(m_pool->_lock);
373 PointerWQ(string n, time_t ti, time_t sti, ThreadPool* p)
374 : WorkQueue_(std::move(n), ti, sti), m_pool(p), m_processing(0) {
376 void register_work_queue() {
377 m_pool->add_work_queue(this);
379 void _clear() override {
380 assert(m_pool->_lock.is_locked());
383 bool _empty() override {
384 assert(m_pool->_lock.is_locked());
385 return m_items.empty();
387 void *_void_dequeue() override {
388 assert(m_pool->_lock.is_locked());
389 if (m_items.empty()) {
394 T *item = m_items.front();
398 void _void_process(void *item, ThreadPool::TPHandle &handle) override {
399 process(reinterpret_cast<T *>(item));
401 void _void_process_finish(void *item) override {
402 assert(m_pool->_lock.is_locked());
403 assert(m_processing > 0);
407 virtual void process(T *item) = 0;
408 void process_finish() {
409 Mutex::Locker locker(m_pool->_lock);
410 _void_process_finish(nullptr);
414 assert(m_pool->_lock.is_locked());
415 if (m_items.empty()) {
418 return m_items.front();
420 void requeue(T *item) {
421 Mutex::Locker pool_locker(m_pool->_lock);
422 _void_process_finish(nullptr);
423 m_items.push_front(item);
426 Mutex::Locker pool_locker(m_pool->_lock);
427 m_pool->_cond.SignalOne();
429 Mutex &get_pool_lock() {
430 return m_pool->_lock;
434 std::list<T *> m_items;
435 uint32_t m_processing;
438 vector<WorkQueue_*> work_queues;
439 int next_work_queue = 0;
443 struct WorkThread : public Thread {
445 // cppcheck-suppress noExplicitConstructor
446 WorkThread(ThreadPool *p) : pool(p) {}
447 void *entry() override {
453 set<WorkThread*> _threads;
454 list<WorkThread*> _old_threads; ///< need to be joined
457 void start_threads();
458 void join_old_threads();
459 void worker(WorkThread *wt);
462 ThreadPool(CephContext *cct_, string nm, string tn, int n, const char *option = NULL);
463 ~ThreadPool() override;
465 /// return number of threads currently running
466 int get_num_threads() {
467 Mutex::Locker l(_lock);
471 /// assign a work queue to this thread pool
472 void add_work_queue(WorkQueue_* wq) {
473 Mutex::Locker l(_lock);
474 work_queues.push_back(wq);
476 /// remove a work queue from this thread pool
477 void remove_work_queue(WorkQueue_* wq) {
478 Mutex::Locker l(_lock);
480 while (work_queues[i] != wq)
482 for (i++; i < work_queues.size(); i++)
483 work_queues[i-1] = work_queues[i];
484 assert(i == work_queues.size());
485 work_queues.resize(i-1);
488 /// take thread pool lock
492 /// release thread pool lock
497 /// wait for a kick on this thread pool
502 /// wake up a waiter (with lock already held)
506 /// wake up a waiter (without lock held)
508 Mutex::Locker l(_lock);
515 /// start thread pool thread
517 /// stop thread pool thread
518 void stop(bool clear_after=true);
519 /// pause thread pool (if it not already paused)
521 /// pause initiation of new work
523 /// resume work in thread pool. must match each pause() call 1:1 to resume.
525 /** @brief Wait until work completes.
526 * If the parameter is NULL, blocks until all threads are idle.
527 * If it is not NULL, blocks until the given work queue does not have
528 * any items left to process. */
529 void drain(WorkQueue_* wq = 0);
532 void set_ioprio(int cls, int priority);
536 public ThreadPool::WorkQueueVal<GenContext<ThreadPool::TPHandle&>*> {
537 list<GenContext<ThreadPool::TPHandle&>*> _queue;
539 GenContextWQ(const string &name, time_t ti, ThreadPool *tp)
540 : ThreadPool::WorkQueueVal<
541 GenContext<ThreadPool::TPHandle&>*>(name, ti, ti*10, tp) {}
543 void _enqueue(GenContext<ThreadPool::TPHandle&> *c) override {
546 void _enqueue_front(GenContext<ThreadPool::TPHandle&> *c) override {
547 _queue.push_front(c);
549 bool _empty() override {
550 return _queue.empty();
552 GenContext<ThreadPool::TPHandle&> *_dequeue() override {
553 assert(!_queue.empty());
554 GenContext<ThreadPool::TPHandle&> *c = _queue.front();
558 void _process(GenContext<ThreadPool::TPHandle&> *c,
559 ThreadPool::TPHandle &tp) override {
564 class C_QueueInWQ : public Context {
566 GenContext<ThreadPool::TPHandle&> *c;
568 C_QueueInWQ(GenContextWQ *wq, GenContext<ThreadPool::TPHandle &> *c)
570 void finish(int) override {
575 /// Work queue that asynchronously completes contexts (executes callbacks).
577 class ContextWQ : public ThreadPool::PointerWQ<Context> {
579 ContextWQ(const string &name, time_t ti, ThreadPool *tp)
580 : ThreadPool::PointerWQ<Context>(name, ti, 0, tp),
581 m_lock("ContextWQ::m_lock") {
582 this->register_work_queue();
585 void queue(Context *ctx, int result = 0) {
587 Mutex::Locker locker(m_lock);
588 m_context_results[ctx] = result;
590 ThreadPool::PointerWQ<Context>::queue(ctx);
593 void _clear() override {
594 ThreadPool::PointerWQ<Context>::_clear();
596 Mutex::Locker locker(m_lock);
597 m_context_results.clear();
600 void process(Context *ctx) override {
603 Mutex::Locker locker(m_lock);
604 ceph::unordered_map<Context *, int>::iterator it =
605 m_context_results.find(ctx);
606 if (it != m_context_results.end()) {
608 m_context_results.erase(it);
611 ctx->complete(result);
615 ceph::unordered_map<Context*, int> m_context_results;
618 class ShardedThreadPool {
624 Mutex shardedpool_lock;
625 Cond shardedpool_cond;
627 uint32_t num_threads;
629 std::atomic<bool> stop_threads = { false };
630 std::atomic<bool> pause_threads = { false };
631 std::atomic<bool> drain_threads = { false };
634 uint32_t num_drained;
638 class BaseShardedWQ {
641 time_t timeout_interval, suicide_interval;
642 BaseShardedWQ(time_t ti, time_t sti):timeout_interval(ti), suicide_interval(sti) {}
643 virtual ~BaseShardedWQ() {}
645 virtual void _process(uint32_t thread_index, heartbeat_handle_d *hb ) = 0;
646 virtual void return_waiting_threads() = 0;
647 virtual bool is_shard_empty(uint32_t thread_index) = 0;
650 template <typename T>
651 class ShardedWQ: public BaseShardedWQ {
653 ShardedThreadPool* sharded_pool;
656 virtual void _enqueue(T) = 0;
657 virtual void _enqueue_front(T) = 0;
661 ShardedWQ(time_t ti, time_t sti, ShardedThreadPool* tp): BaseShardedWQ(ti, sti),
665 ~ShardedWQ() override {}
670 void queue_front(T item) {
671 _enqueue_front(item);
674 sharded_pool->drain();
683 struct WorkThreadSharded : public Thread {
684 ShardedThreadPool *pool;
685 uint32_t thread_index;
686 WorkThreadSharded(ShardedThreadPool *p, uint32_t pthread_index): pool(p),
687 thread_index(pthread_index) {}
688 void *entry() override {
689 pool->shardedthreadpool_worker(thread_index);
694 vector<WorkThreadSharded*> threads_shardedpool;
695 void start_threads();
696 void shardedthreadpool_worker(uint32_t thread_index);
697 void set_wq(BaseShardedWQ* swq) {
705 ShardedThreadPool(CephContext *cct_, string nm, string tn, uint32_t pnum_threads);
707 ~ShardedThreadPool(){};
709 /// start thread pool thread
711 /// stop thread pool thread
713 /// pause thread pool (if it not already paused)
715 /// pause initiation of new work
717 /// resume work in thread pool. must match each pause() call 1:1 to resume.
719 /// wait for all work to complete