X-Git-Url: https://gerrit.opnfv.org/gerrit/gitweb?a=blobdiff_plain;f=src%2Fceph%2Fsrc%2Fcommon%2FWorkQueue.h;fp=src%2Fceph%2Fsrc%2Fcommon%2FWorkQueue.h;h=d3eff47bdf844e61139a0de11df33d159c79a447;hb=812ff6ca9fcd3e629e49d4328905f33eee8ca3f5;hp=0000000000000000000000000000000000000000;hpb=15280273faafb77777eab341909a3f495cf248d9;p=stor4nfv.git diff --git a/src/ceph/src/common/WorkQueue.h b/src/ceph/src/common/WorkQueue.h new file mode 100644 index 0000000..d3eff47 --- /dev/null +++ b/src/ceph/src/common/WorkQueue.h @@ -0,0 +1,725 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab +/* + * Ceph - scalable distributed file system + * + * Copyright (C) 2004-2006 Sage Weil + * + * This is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License version 2.1, as published by the Free Software + * Foundation. See file COPYING. + * + */ + +#ifndef CEPH_WORKQUEUE_H +#define CEPH_WORKQUEUE_H + +#include "Cond.h" +#include "include/unordered_map.h" +#include "common/HeartbeatMap.h" + +#include + +class CephContext; + +/// Pool of threads that share work submitted to multiple work queues. +class ThreadPool : public md_config_obs_t { + CephContext *cct; + string name; + string thread_name; + string lockname; + Mutex _lock; + Cond _cond; + bool _stop; + int _pause; + int _draining; + Cond _wait_cond; + int ioprio_class, ioprio_priority; + +public: + class TPHandle { + friend class ThreadPool; + CephContext *cct; + heartbeat_handle_d *hb; + time_t grace; + time_t suicide_grace; + public: + TPHandle( + CephContext *cct, + heartbeat_handle_d *hb, + time_t grace, + time_t suicide_grace) + : cct(cct), hb(hb), grace(grace), suicide_grace(suicide_grace) {} + void reset_tp_timeout(); + void suspend_tp_timeout(); + }; +private: + + /// Basic interface to a work queue used by the worker threads. + struct WorkQueue_ { + string name; + time_t timeout_interval, suicide_interval; + WorkQueue_(string n, time_t ti, time_t sti) + : name(std::move(n)), timeout_interval(ti), suicide_interval(sti) + { } + virtual ~WorkQueue_() {} + /// Remove all work items from the queue. + virtual void _clear() = 0; + /// Check whether there is anything to do. + virtual bool _empty() = 0; + /// Get the next work item to process. + virtual void *_void_dequeue() = 0; + /** @brief Process the work item. + * This function will be called several times in parallel + * and must therefore be thread-safe. */ + virtual void _void_process(void *item, TPHandle &handle) = 0; + /** @brief Synchronously finish processing a work item. + * This function is called after _void_process with the global thread pool lock held, + * so at most one copy will execute simultaneously for a given thread pool. + * It can be used for non-thread-safe finalization. */ + virtual void _void_process_finish(void *) = 0; + }; + + // track thread pool size changes + unsigned _num_threads; + string _thread_num_option; + const char **_conf_keys; + + const char **get_tracked_conf_keys() const override { + return _conf_keys; + } + void handle_conf_change(const struct md_config_t *conf, + const std::set &changed) override; + +public: + /** @brief Work queue that processes several submitted items at once. + * The queue will automatically add itself to the thread pool on construction + * and remove itself on destruction. */ + template + class BatchWorkQueue : public WorkQueue_ { + ThreadPool *pool; + + virtual bool _enqueue(T *) = 0; + virtual void _dequeue(T *) = 0; + virtual void _dequeue(list *) = 0; + virtual void _process_finish(const list &) {} + + // virtual methods from WorkQueue_ below + void *_void_dequeue() override { + list *out(new list); + _dequeue(out); + if (!out->empty()) { + return (void *)out; + } else { + delete out; + return 0; + } + } + void _void_process(void *p, TPHandle &handle) override { + _process(*((list*)p), handle); + } + void _void_process_finish(void *p) override { + _process_finish(*(list*)p); + delete (list *)p; + } + + protected: + virtual void _process(const list &items, TPHandle &handle) = 0; + + public: + BatchWorkQueue(string n, time_t ti, time_t sti, ThreadPool* p) + : WorkQueue_(std::move(n), ti, sti), pool(p) { + pool->add_work_queue(this); + } + ~BatchWorkQueue() override { + pool->remove_work_queue(this); + } + + bool queue(T *item) { + pool->_lock.Lock(); + bool r = _enqueue(item); + pool->_cond.SignalOne(); + pool->_lock.Unlock(); + return r; + } + void dequeue(T *item) { + pool->_lock.Lock(); + _dequeue(item); + pool->_lock.Unlock(); + } + void clear() { + pool->_lock.Lock(); + _clear(); + pool->_lock.Unlock(); + } + + void lock() { + pool->lock(); + } + void unlock() { + pool->unlock(); + } + void wake() { + pool->wake(); + } + void _wake() { + pool->_wake(); + } + void drain() { + pool->drain(this); + } + + }; + + /** @brief Templated by-value work queue. + * Skeleton implementation of a queue that processes items submitted by value. + * This is useful if the items are single primitive values or very small objects + * (a few bytes). The queue will automatically add itself to the thread pool on + * construction and remove itself on destruction. */ + template + class WorkQueueVal : public WorkQueue_ { + Mutex _lock; + ThreadPool *pool; + list to_process; + list to_finish; + virtual void _enqueue(T) = 0; + virtual void _enqueue_front(T) = 0; + bool _empty() override = 0; + virtual U _dequeue() = 0; + virtual void _process_finish(U) {} + + void *_void_dequeue() override { + { + Mutex::Locker l(_lock); + if (_empty()) + return 0; + U u = _dequeue(); + to_process.push_back(u); + } + return ((void*)1); // Not used + } + void _void_process(void *, TPHandle &handle) override { + _lock.Lock(); + assert(!to_process.empty()); + U u = to_process.front(); + to_process.pop_front(); + _lock.Unlock(); + + _process(u, handle); + + _lock.Lock(); + to_finish.push_back(u); + _lock.Unlock(); + } + + void _void_process_finish(void *) override { + _lock.Lock(); + assert(!to_finish.empty()); + U u = to_finish.front(); + to_finish.pop_front(); + _lock.Unlock(); + + _process_finish(u); + } + + void _clear() override {} + + public: + WorkQueueVal(string n, time_t ti, time_t sti, ThreadPool *p) + : WorkQueue_(std::move(n), ti, sti), _lock("WorkQueueVal::lock"), pool(p) { + pool->add_work_queue(this); + } + ~WorkQueueVal() override { + pool->remove_work_queue(this); + } + void queue(T item) { + Mutex::Locker l(pool->_lock); + _enqueue(item); + pool->_cond.SignalOne(); + } + void queue_front(T item) { + Mutex::Locker l(pool->_lock); + _enqueue_front(item); + pool->_cond.SignalOne(); + } + void drain() { + pool->drain(this); + } + protected: + void lock() { + pool->lock(); + } + void unlock() { + pool->unlock(); + } + virtual void _process(U u, TPHandle &) = 0; + }; + + /** @brief Template by-pointer work queue. + * Skeleton implementation of a queue that processes items of a given type submitted as pointers. + * This is useful when the work item are large or include dynamically allocated memory. The queue + * will automatically add itself to the thread pool on construction and remove itself on + * destruction. */ + template + class WorkQueue : public WorkQueue_ { + ThreadPool *pool; + + /// Add a work item to the queue. + virtual bool _enqueue(T *) = 0; + /// Dequeue a previously submitted work item. + virtual void _dequeue(T *) = 0; + /// Dequeue a work item and return the original submitted pointer. + virtual T *_dequeue() = 0; + virtual void _process_finish(T *) {} + + // implementation of virtual methods from WorkQueue_ + void *_void_dequeue() override { + return (void *)_dequeue(); + } + void _void_process(void *p, TPHandle &handle) override { + _process(static_cast(p), handle); + } + void _void_process_finish(void *p) override { + _process_finish(static_cast(p)); + } + + protected: + /// Process a work item. Called from the worker threads. + virtual void _process(T *t, TPHandle &) = 0; + + public: + WorkQueue(string n, time_t ti, time_t sti, ThreadPool* p) + : WorkQueue_(std::move(n), ti, sti), pool(p) { + pool->add_work_queue(this); + } + ~WorkQueue() override { + pool->remove_work_queue(this); + } + + bool queue(T *item) { + pool->_lock.Lock(); + bool r = _enqueue(item); + pool->_cond.SignalOne(); + pool->_lock.Unlock(); + return r; + } + void dequeue(T *item) { + pool->_lock.Lock(); + _dequeue(item); + pool->_lock.Unlock(); + } + void clear() { + pool->_lock.Lock(); + _clear(); + pool->_lock.Unlock(); + } + + Mutex &get_lock() { + return pool->_lock; + } + + void lock() { + pool->lock(); + } + void unlock() { + pool->unlock(); + } + /// wake up the thread pool (without lock held) + void wake() { + pool->wake(); + } + /// wake up the thread pool (with lock already held) + void _wake() { + pool->_wake(); + } + void _wait() { + pool->_wait(); + } + void drain() { + pool->drain(this); + } + + }; + + template + class PointerWQ : public WorkQueue_ { + public: + ~PointerWQ() override { + m_pool->remove_work_queue(this); + assert(m_processing == 0); + } + void drain() { + { + // if this queue is empty and not processing, don't wait for other + // queues to finish processing + Mutex::Locker l(m_pool->_lock); + if (m_processing == 0 && m_items.empty()) { + return; + } + } + m_pool->drain(this); + } + void queue(T *item) { + Mutex::Locker l(m_pool->_lock); + m_items.push_back(item); + m_pool->_cond.SignalOne(); + } + bool empty() { + Mutex::Locker l(m_pool->_lock); + return _empty(); + } + protected: + PointerWQ(string n, time_t ti, time_t sti, ThreadPool* p) + : WorkQueue_(std::move(n), ti, sti), m_pool(p), m_processing(0) { + } + void register_work_queue() { + m_pool->add_work_queue(this); + } + void _clear() override { + assert(m_pool->_lock.is_locked()); + m_items.clear(); + } + bool _empty() override { + assert(m_pool->_lock.is_locked()); + return m_items.empty(); + } + void *_void_dequeue() override { + assert(m_pool->_lock.is_locked()); + if (m_items.empty()) { + return NULL; + } + + ++m_processing; + T *item = m_items.front(); + m_items.pop_front(); + return item; + } + void _void_process(void *item, ThreadPool::TPHandle &handle) override { + process(reinterpret_cast(item)); + } + void _void_process_finish(void *item) override { + assert(m_pool->_lock.is_locked()); + assert(m_processing > 0); + --m_processing; + } + + virtual void process(T *item) = 0; + void process_finish() { + Mutex::Locker locker(m_pool->_lock); + _void_process_finish(nullptr); + } + + T *front() { + assert(m_pool->_lock.is_locked()); + if (m_items.empty()) { + return NULL; + } + return m_items.front(); + } + void requeue(T *item) { + Mutex::Locker pool_locker(m_pool->_lock); + _void_process_finish(nullptr); + m_items.push_front(item); + } + void signal() { + Mutex::Locker pool_locker(m_pool->_lock); + m_pool->_cond.SignalOne(); + } + Mutex &get_pool_lock() { + return m_pool->_lock; + } + private: + ThreadPool *m_pool; + std::list m_items; + uint32_t m_processing; + }; +private: + vector work_queues; + int next_work_queue = 0; + + + // threads + struct WorkThread : public Thread { + ThreadPool *pool; + // cppcheck-suppress noExplicitConstructor + WorkThread(ThreadPool *p) : pool(p) {} + void *entry() override { + pool->worker(this); + return 0; + } + }; + + set _threads; + list _old_threads; ///< need to be joined + int processing; + + void start_threads(); + void join_old_threads(); + void worker(WorkThread *wt); + +public: + ThreadPool(CephContext *cct_, string nm, string tn, int n, const char *option = NULL); + ~ThreadPool() override; + + /// return number of threads currently running + int get_num_threads() { + Mutex::Locker l(_lock); + return _num_threads; + } + + /// assign a work queue to this thread pool + void add_work_queue(WorkQueue_* wq) { + Mutex::Locker l(_lock); + work_queues.push_back(wq); + } + /// remove a work queue from this thread pool + void remove_work_queue(WorkQueue_* wq) { + Mutex::Locker l(_lock); + unsigned i = 0; + while (work_queues[i] != wq) + i++; + for (i++; i < work_queues.size(); i++) + work_queues[i-1] = work_queues[i]; + assert(i == work_queues.size()); + work_queues.resize(i-1); + } + + /// take thread pool lock + void lock() { + _lock.Lock(); + } + /// release thread pool lock + void unlock() { + _lock.Unlock(); + } + + /// wait for a kick on this thread pool + void wait(Cond &c) { + c.Wait(_lock); + } + + /// wake up a waiter (with lock already held) + void _wake() { + _cond.Signal(); + } + /// wake up a waiter (without lock held) + void wake() { + Mutex::Locker l(_lock); + _cond.Signal(); + } + void _wait() { + _cond.Wait(_lock); + } + + /// start thread pool thread + void start(); + /// stop thread pool thread + void stop(bool clear_after=true); + /// pause thread pool (if it not already paused) + void pause(); + /// pause initiation of new work + void pause_new(); + /// resume work in thread pool. must match each pause() call 1:1 to resume. + void unpause(); + /** @brief Wait until work completes. + * If the parameter is NULL, blocks until all threads are idle. + * If it is not NULL, blocks until the given work queue does not have + * any items left to process. */ + void drain(WorkQueue_* wq = 0); + + /// set io priority + void set_ioprio(int cls, int priority); +}; + +class GenContextWQ : + public ThreadPool::WorkQueueVal*> { + list*> _queue; +public: + GenContextWQ(const string &name, time_t ti, ThreadPool *tp) + : ThreadPool::WorkQueueVal< + GenContext*>(name, ti, ti*10, tp) {} + + void _enqueue(GenContext *c) override { + _queue.push_back(c); + } + void _enqueue_front(GenContext *c) override { + _queue.push_front(c); + } + bool _empty() override { + return _queue.empty(); + } + GenContext *_dequeue() override { + assert(!_queue.empty()); + GenContext *c = _queue.front(); + _queue.pop_front(); + return c; + } + void _process(GenContext *c, + ThreadPool::TPHandle &tp) override { + c->complete(tp); + } +}; + +class C_QueueInWQ : public Context { + GenContextWQ *wq; + GenContext *c; +public: + C_QueueInWQ(GenContextWQ *wq, GenContext *c) + : wq(wq), c(c) {} + void finish(int) override { + wq->queue(c); + } +}; + +/// Work queue that asynchronously completes contexts (executes callbacks). +/// @see Finisher +class ContextWQ : public ThreadPool::PointerWQ { +public: + ContextWQ(const string &name, time_t ti, ThreadPool *tp) + : ThreadPool::PointerWQ(name, ti, 0, tp), + m_lock("ContextWQ::m_lock") { + this->register_work_queue(); + } + + void queue(Context *ctx, int result = 0) { + if (result != 0) { + Mutex::Locker locker(m_lock); + m_context_results[ctx] = result; + } + ThreadPool::PointerWQ::queue(ctx); + } +protected: + void _clear() override { + ThreadPool::PointerWQ::_clear(); + + Mutex::Locker locker(m_lock); + m_context_results.clear(); + } + + void process(Context *ctx) override { + int result = 0; + { + Mutex::Locker locker(m_lock); + ceph::unordered_map::iterator it = + m_context_results.find(ctx); + if (it != m_context_results.end()) { + result = it->second; + m_context_results.erase(it); + } + } + ctx->complete(result); + } +private: + Mutex m_lock; + ceph::unordered_map m_context_results; +}; + +class ShardedThreadPool { + + CephContext *cct; + string name; + string thread_name; + string lockname; + Mutex shardedpool_lock; + Cond shardedpool_cond; + Cond wait_cond; + uint32_t num_threads; + + std::atomic stop_threads = { false }; + std::atomic pause_threads = { false }; + std::atomic drain_threads = { false }; + + uint32_t num_paused; + uint32_t num_drained; + +public: + + class BaseShardedWQ { + + public: + time_t timeout_interval, suicide_interval; + BaseShardedWQ(time_t ti, time_t sti):timeout_interval(ti), suicide_interval(sti) {} + virtual ~BaseShardedWQ() {} + + virtual void _process(uint32_t thread_index, heartbeat_handle_d *hb ) = 0; + virtual void return_waiting_threads() = 0; + virtual bool is_shard_empty(uint32_t thread_index) = 0; + }; + + template + class ShardedWQ: public BaseShardedWQ { + + ShardedThreadPool* sharded_pool; + + protected: + virtual void _enqueue(T) = 0; + virtual void _enqueue_front(T) = 0; + + + public: + ShardedWQ(time_t ti, time_t sti, ShardedThreadPool* tp): BaseShardedWQ(ti, sti), + sharded_pool(tp) { + tp->set_wq(this); + } + ~ShardedWQ() override {} + + void queue(T item) { + _enqueue(item); + } + void queue_front(T item) { + _enqueue_front(item); + } + void drain() { + sharded_pool->drain(); + } + + }; + +private: + + BaseShardedWQ* wq; + // threads + struct WorkThreadSharded : public Thread { + ShardedThreadPool *pool; + uint32_t thread_index; + WorkThreadSharded(ShardedThreadPool *p, uint32_t pthread_index): pool(p), + thread_index(pthread_index) {} + void *entry() override { + pool->shardedthreadpool_worker(thread_index); + return 0; + } + }; + + vector threads_shardedpool; + void start_threads(); + void shardedthreadpool_worker(uint32_t thread_index); + void set_wq(BaseShardedWQ* swq) { + wq = swq; + } + + + +public: + + ShardedThreadPool(CephContext *cct_, string nm, string tn, uint32_t pnum_threads); + + ~ShardedThreadPool(){}; + + /// start thread pool thread + void start(); + /// stop thread pool thread + void stop(); + /// pause thread pool (if it not already paused) + void pause(); + /// pause initiation of new work + void pause_new(); + /// resume work in thread pool. must match each pause() call 1:1 to resume. + void unpause(); + /// wait for all work to complete + void drain(); + +}; + + +#endif