1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
4 * Ceph - scalable distributed file system
6 * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
8 * This is free software; you can redistribute it and/or
9 * modify it under the terms of the GNU Lesser General Public
10 * License version 2.1, as published by the Free Software
11 * Foundation. See file COPYING.
15 #include "WorkQueue.h"
16 #include "include/compat.h"
17 #include "common/errno.h"
19 #define dout_subsys ceph_subsys_tp
21 #define dout_prefix *_dout << name << " "
24 ThreadPool::ThreadPool(CephContext *cct_, string nm, string tn, int n, const char *option)
25 : cct(cct_), name(std::move(nm)), thread_name(std::move(tn)),
26 lockname(name + "::lock"),
27 _lock(lockname.c_str()), // this should be safe due to declaration order
37 _thread_num_option = option;
39 _conf_keys = new const char*[2];
40 _conf_keys[0] = _thread_num_option.c_str();
43 _conf_keys = new const char*[1];
48 void ThreadPool::TPHandle::suspend_tp_timeout()
50 cct->get_heartbeat_map()->clear_timeout(hb);
53 void ThreadPool::TPHandle::reset_tp_timeout()
55 cct->get_heartbeat_map()->reset_timeout(
56 hb, grace, suicide_grace);
59 ThreadPool::~ThreadPool()
61 assert(_threads.empty());
65 void ThreadPool::handle_conf_change(const struct md_config_t *conf,
66 const std::set <std::string> &changed)
68 if (changed.count(_thread_num_option)) {
70 int r = conf->get_val(_thread_num_option.c_str(), &buf, -1);
84 void ThreadPool::worker(WorkThread *wt)
87 ldout(cct,10) << "worker start" << dendl;
90 ss << name << " thread " << (void *)pthread_self();
91 heartbeat_handle_d *hb = cct->get_heartbeat_map()->add_worker(ss.str(), pthread_self());
95 // manage dynamic thread pool
97 if (_threads.size() > _num_threads) {
98 ldout(cct,1) << " worker shutting down; too many threads (" << _threads.size() << " > " << _num_threads << ")" << dendl;
100 _old_threads.push_back(wt);
104 if (!_pause && !work_queues.empty()) {
106 int tries = work_queues.size();
109 next_work_queue %= work_queues.size();
110 wq = work_queues[next_work_queue++];
112 void *item = wq->_void_dequeue();
115 ldout(cct,12) << "worker wq " << wq->name << " start processing " << item
116 << " (" << processing << " active)" << dendl;
117 TPHandle tp_handle(cct, hb, wq->timeout_interval, wq->suicide_interval);
118 tp_handle.reset_tp_timeout();
120 wq->_void_process(item, tp_handle);
122 wq->_void_process_finish(item);
124 ldout(cct,15) << "worker wq " << wq->name << " done processing " << item
125 << " (" << processing << " active)" << dendl;
126 if (_pause || _draining)
136 ldout(cct,20) << "worker waiting" << dendl;
137 cct->get_heartbeat_map()->reset_timeout(
139 cct->_conf->threadpool_default_timeout,
141 _cond.WaitInterval(_lock,
143 cct->_conf->threadpool_empty_queue_max_wait, 0));
145 ldout(cct,1) << "worker finish" << dendl;
147 cct->get_heartbeat_map()->remove_worker(hb);
152 void ThreadPool::start_threads()
154 assert(_lock.is_locked());
155 while (_threads.size() < _num_threads) {
156 WorkThread *wt = new WorkThread(this);
157 ldout(cct, 10) << "start_threads creating and starting " << wt << dendl;
160 int r = wt->set_ioprio(ioprio_class, ioprio_priority);
162 lderr(cct) << " set_ioprio got " << cpp_strerror(r) << dendl;
164 wt->create(thread_name.c_str());
168 void ThreadPool::join_old_threads()
170 assert(_lock.is_locked());
171 while (!_old_threads.empty()) {
172 ldout(cct, 10) << "join_old_threads joining and deleting " << _old_threads.front() << dendl;
173 _old_threads.front()->join();
174 delete _old_threads.front();
175 _old_threads.pop_front();
179 void ThreadPool::start()
181 ldout(cct,10) << "start" << dendl;
183 if (_thread_num_option.length()) {
184 ldout(cct, 10) << " registering config observer on " << _thread_num_option << dendl;
185 cct->_conf->add_observer(this);
191 ldout(cct,15) << "started" << dendl;
194 void ThreadPool::stop(bool clear_after)
196 ldout(cct,10) << "stop" << dendl;
198 if (_thread_num_option.length()) {
199 ldout(cct, 10) << " unregistering config observer on " << _thread_num_option << dendl;
200 cct->_conf->remove_observer(this);
208 for (set<WorkThread*>::iterator p = _threads.begin();
216 for (unsigned i=0; i<work_queues.size(); i++)
217 work_queues[i]->_clear();
220 ldout(cct,15) << "stopped" << dendl;
223 void ThreadPool::pause()
225 ldout(cct,10) << "pause" << dendl;
229 _wait_cond.Wait(_lock);
231 ldout(cct,15) << "paused" << dendl;
234 void ThreadPool::pause_new()
236 ldout(cct,10) << "pause_new" << dendl;
242 void ThreadPool::unpause()
244 ldout(cct,10) << "unpause" << dendl;
252 void ThreadPool::drain(WorkQueue_* wq)
254 ldout(cct,10) << "drain" << dendl;
257 while (processing || (wq != NULL && !wq->_empty()))
258 _wait_cond.Wait(_lock);
263 void ThreadPool::set_ioprio(int cls, int priority)
265 Mutex::Locker l(_lock);
267 ioprio_priority = priority;
268 for (set<WorkThread*>::iterator p = _threads.begin();
271 ldout(cct,10) << __func__
272 << " class " << cls << " priority " << priority
273 << " pid " << (*p)->get_pid()
275 int r = (*p)->set_ioprio(cls, priority);
277 lderr(cct) << " set_ioprio got " << cpp_strerror(r) << dendl;
281 ShardedThreadPool::ShardedThreadPool(CephContext *pcct_, string nm, string tn,
282 uint32_t pnum_threads):
285 thread_name(std::move(tn)),
286 lockname(name + "::lock"),
287 shardedpool_lock(lockname.c_str()),
288 num_threads(pnum_threads),
293 void ShardedThreadPool::shardedthreadpool_worker(uint32_t thread_index)
296 ldout(cct,10) << "worker start" << dendl;
298 std::stringstream ss;
299 ss << name << " thread " << (void *)pthread_self();
300 heartbeat_handle_d *hb = cct->get_heartbeat_map()->add_worker(ss.str(), pthread_self());
302 while (!stop_threads) {
304 shardedpool_lock.Lock();
307 while (pause_threads) {
308 cct->get_heartbeat_map()->reset_timeout(
310 wq->timeout_interval, wq->suicide_interval);
311 shardedpool_cond.WaitInterval(shardedpool_lock,
313 cct->_conf->threadpool_empty_queue_max_wait, 0));
316 shardedpool_lock.Unlock();
319 shardedpool_lock.Lock();
320 if (wq->is_shard_empty(thread_index)) {
323 while (drain_threads) {
324 cct->get_heartbeat_map()->reset_timeout(
326 wq->timeout_interval, wq->suicide_interval);
327 shardedpool_cond.WaitInterval(shardedpool_lock,
329 cct->_conf->threadpool_empty_queue_max_wait, 0));
333 shardedpool_lock.Unlock();
336 cct->get_heartbeat_map()->reset_timeout(
338 wq->timeout_interval, wq->suicide_interval);
339 wq->_process(thread_index, hb);
343 ldout(cct,10) << "sharded worker finish" << dendl;
345 cct->get_heartbeat_map()->remove_worker(hb);
349 void ShardedThreadPool::start_threads()
351 assert(shardedpool_lock.is_locked());
352 int32_t thread_index = 0;
353 while (threads_shardedpool.size() < num_threads) {
355 WorkThreadSharded *wt = new WorkThreadSharded(this, thread_index);
356 ldout(cct, 10) << "start_threads creating and starting " << wt << dendl;
357 threads_shardedpool.push_back(wt);
358 wt->create(thread_name.c_str());
363 void ShardedThreadPool::start()
365 ldout(cct,10) << "start" << dendl;
367 shardedpool_lock.Lock();
369 shardedpool_lock.Unlock();
370 ldout(cct,15) << "started" << dendl;
373 void ShardedThreadPool::stop()
375 ldout(cct,10) << "stop" << dendl;
378 wq->return_waiting_threads();
379 for (vector<WorkThreadSharded*>::iterator p = threads_shardedpool.begin();
380 p != threads_shardedpool.end();
385 threads_shardedpool.clear();
386 ldout(cct,15) << "stopped" << dendl;
389 void ShardedThreadPool::pause()
391 ldout(cct,10) << "pause" << dendl;
392 shardedpool_lock.Lock();
393 pause_threads = true;
395 wq->return_waiting_threads();
396 while (num_threads != num_paused){
397 wait_cond.Wait(shardedpool_lock);
399 shardedpool_lock.Unlock();
400 ldout(cct,10) << "paused" << dendl;
403 void ShardedThreadPool::pause_new()
405 ldout(cct,10) << "pause_new" << dendl;
406 shardedpool_lock.Lock();
407 pause_threads = true;
409 wq->return_waiting_threads();
410 shardedpool_lock.Unlock();
411 ldout(cct,10) << "paused_new" << dendl;
414 void ShardedThreadPool::unpause()
416 ldout(cct,10) << "unpause" << dendl;
417 shardedpool_lock.Lock();
418 pause_threads = false;
419 shardedpool_cond.Signal();
420 shardedpool_lock.Unlock();
421 ldout(cct,10) << "unpaused" << dendl;
424 void ShardedThreadPool::drain()
426 ldout(cct,10) << "drain" << dendl;
427 shardedpool_lock.Lock();
428 drain_threads = true;
430 wq->return_waiting_threads();
431 while (num_threads != num_drained) {
432 wait_cond.Wait(shardedpool_lock);
434 drain_threads = false;
435 shardedpool_cond.Signal();
436 shardedpool_lock.Unlock();
437 ldout(cct,10) << "drained" << dendl;