Fix some bugs when testing opensds ansible
[stor4nfv.git] / src / ceph / src / common / WorkQueue.cc
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- 
2 // vim: ts=8 sw=2 smarttab
3 /*
4  * Ceph - scalable distributed file system
5  *
6  * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
7  *
8  * This is free software; you can redistribute it and/or
9  * modify it under the terms of the GNU Lesser General Public
10  * License version 2.1, as published by the Free Software 
11  * Foundation.  See file COPYING.
12  * 
13  */
14
15 #include "WorkQueue.h"
16 #include "include/compat.h"
17 #include "common/errno.h"
18
19 #define dout_subsys ceph_subsys_tp
20 #undef dout_prefix
21 #define dout_prefix *_dout << name << " "
22
23
24 ThreadPool::ThreadPool(CephContext *cct_, string nm, string tn, int n, const char *option)
25   : cct(cct_), name(std::move(nm)), thread_name(std::move(tn)),
26     lockname(name + "::lock"),
27     _lock(lockname.c_str()),  // this should be safe due to declaration order
28     _stop(false),
29     _pause(0),
30     _draining(0),
31     ioprio_class(-1),
32     ioprio_priority(-1),
33     _num_threads(n),
34     processing(0)
35 {
36   if (option) {
37     _thread_num_option = option;
38     // set up conf_keys
39     _conf_keys = new const char*[2];
40     _conf_keys[0] = _thread_num_option.c_str();
41     _conf_keys[1] = NULL;
42   } else {
43     _conf_keys = new const char*[1];
44     _conf_keys[0] = NULL;
45   }
46 }
47
48 void ThreadPool::TPHandle::suspend_tp_timeout()
49 {
50   cct->get_heartbeat_map()->clear_timeout(hb);
51 }
52
53 void ThreadPool::TPHandle::reset_tp_timeout()
54 {
55   cct->get_heartbeat_map()->reset_timeout(
56     hb, grace, suicide_grace);
57 }
58
59 ThreadPool::~ThreadPool()
60 {
61   assert(_threads.empty());
62   delete[] _conf_keys;
63 }
64
65 void ThreadPool::handle_conf_change(const struct md_config_t *conf,
66                                     const std::set <std::string> &changed)
67 {
68   if (changed.count(_thread_num_option)) {
69     char *buf;
70     int r = conf->get_val(_thread_num_option.c_str(), &buf, -1);
71     assert(r >= 0);
72     int v = atoi(buf);
73     free(buf);
74     if (v >= 0) {
75       _lock.Lock();
76       _num_threads = v;
77       start_threads();
78       _cond.SignalAll();
79       _lock.Unlock();
80     }
81   }
82 }
83
84 void ThreadPool::worker(WorkThread *wt)
85 {
86   _lock.Lock();
87   ldout(cct,10) << "worker start" << dendl;
88   
89   std::stringstream ss;
90   ss << name << " thread " << (void *)pthread_self();
91   heartbeat_handle_d *hb = cct->get_heartbeat_map()->add_worker(ss.str(), pthread_self());
92
93   while (!_stop) {
94
95     // manage dynamic thread pool
96     join_old_threads();
97     if (_threads.size() > _num_threads) {
98       ldout(cct,1) << " worker shutting down; too many threads (" << _threads.size() << " > " << _num_threads << ")" << dendl;
99       _threads.erase(wt);
100       _old_threads.push_back(wt);
101       break;
102     }
103
104     if (!_pause && !work_queues.empty()) {
105       WorkQueue_* wq;
106       int tries = work_queues.size();
107       bool did = false;
108       while (tries--) {
109         next_work_queue %= work_queues.size();
110         wq = work_queues[next_work_queue++];
111         
112         void *item = wq->_void_dequeue();
113         if (item) {
114           processing++;
115           ldout(cct,12) << "worker wq " << wq->name << " start processing " << item
116                         << " (" << processing << " active)" << dendl;
117           TPHandle tp_handle(cct, hb, wq->timeout_interval, wq->suicide_interval);
118           tp_handle.reset_tp_timeout();
119           _lock.Unlock();
120           wq->_void_process(item, tp_handle);
121           _lock.Lock();
122           wq->_void_process_finish(item);
123           processing--;
124           ldout(cct,15) << "worker wq " << wq->name << " done processing " << item
125                         << " (" << processing << " active)" << dendl;
126           if (_pause || _draining)
127             _wait_cond.Signal();
128           did = true;
129           break;
130         }
131       }
132       if (did)
133         continue;
134     }
135
136     ldout(cct,20) << "worker waiting" << dendl;
137     cct->get_heartbeat_map()->reset_timeout(
138       hb,
139       cct->_conf->threadpool_default_timeout,
140       0);
141     _cond.WaitInterval(_lock,
142       utime_t(
143         cct->_conf->threadpool_empty_queue_max_wait, 0));
144   }
145   ldout(cct,1) << "worker finish" << dendl;
146
147   cct->get_heartbeat_map()->remove_worker(hb);
148
149   _lock.Unlock();
150 }
151
152 void ThreadPool::start_threads()
153 {
154   assert(_lock.is_locked());
155   while (_threads.size() < _num_threads) {
156     WorkThread *wt = new WorkThread(this);
157     ldout(cct, 10) << "start_threads creating and starting " << wt << dendl;
158     _threads.insert(wt);
159
160     int r = wt->set_ioprio(ioprio_class, ioprio_priority);
161     if (r < 0)
162       lderr(cct) << " set_ioprio got " << cpp_strerror(r) << dendl;
163
164     wt->create(thread_name.c_str());
165   }
166 }
167
168 void ThreadPool::join_old_threads()
169 {
170   assert(_lock.is_locked());
171   while (!_old_threads.empty()) {
172     ldout(cct, 10) << "join_old_threads joining and deleting " << _old_threads.front() << dendl;
173     _old_threads.front()->join();
174     delete _old_threads.front();
175     _old_threads.pop_front();
176   }
177 }
178
179 void ThreadPool::start()
180 {
181   ldout(cct,10) << "start" << dendl;
182
183   if (_thread_num_option.length()) {
184     ldout(cct, 10) << " registering config observer on " << _thread_num_option << dendl;
185     cct->_conf->add_observer(this);
186   }
187
188   _lock.Lock();
189   start_threads();
190   _lock.Unlock();
191   ldout(cct,15) << "started" << dendl;
192 }
193
194 void ThreadPool::stop(bool clear_after)
195 {
196   ldout(cct,10) << "stop" << dendl;
197
198   if (_thread_num_option.length()) {
199     ldout(cct, 10) << " unregistering config observer on " << _thread_num_option << dendl;
200     cct->_conf->remove_observer(this);
201   }
202
203   _lock.Lock();
204   _stop = true;
205   _cond.Signal();
206   join_old_threads();
207   _lock.Unlock();
208   for (set<WorkThread*>::iterator p = _threads.begin();
209        p != _threads.end();
210        ++p) {
211     (*p)->join();
212     delete *p;
213   }
214   _threads.clear();
215   _lock.Lock();
216   for (unsigned i=0; i<work_queues.size(); i++)
217     work_queues[i]->_clear();
218   _stop = false;
219   _lock.Unlock();    
220   ldout(cct,15) << "stopped" << dendl;
221 }
222
223 void ThreadPool::pause()
224 {
225   ldout(cct,10) << "pause" << dendl;
226   _lock.Lock();
227   _pause++;
228   while (processing)
229     _wait_cond.Wait(_lock);
230   _lock.Unlock();
231   ldout(cct,15) << "paused" << dendl;
232 }
233
234 void ThreadPool::pause_new()
235 {
236   ldout(cct,10) << "pause_new" << dendl;
237   _lock.Lock();
238   _pause++;
239   _lock.Unlock();
240 }
241
242 void ThreadPool::unpause()
243 {
244   ldout(cct,10) << "unpause" << dendl;
245   _lock.Lock();
246   assert(_pause > 0);
247   _pause--;
248   _cond.Signal();
249   _lock.Unlock();
250 }
251
252 void ThreadPool::drain(WorkQueue_* wq)
253 {
254   ldout(cct,10) << "drain" << dendl;
255   _lock.Lock();
256   _draining++;
257   while (processing || (wq != NULL && !wq->_empty()))
258     _wait_cond.Wait(_lock);
259   _draining--;
260   _lock.Unlock();
261 }
262
263 void ThreadPool::set_ioprio(int cls, int priority)
264 {
265   Mutex::Locker l(_lock);
266   ioprio_class = cls;
267   ioprio_priority = priority;
268   for (set<WorkThread*>::iterator p = _threads.begin();
269        p != _threads.end();
270        ++p) {
271     ldout(cct,10) << __func__ 
272                   << " class " << cls << " priority " << priority
273                   << " pid " << (*p)->get_pid()
274                   << dendl;
275     int r = (*p)->set_ioprio(cls, priority);
276     if (r < 0)
277       lderr(cct) << " set_ioprio got " << cpp_strerror(r) << dendl;
278   }
279 }
280
281 ShardedThreadPool::ShardedThreadPool(CephContext *pcct_, string nm, string tn,
282   uint32_t pnum_threads):
283   cct(pcct_),
284   name(std::move(nm)),
285   thread_name(std::move(tn)),
286   lockname(name + "::lock"),
287   shardedpool_lock(lockname.c_str()),
288   num_threads(pnum_threads),
289   num_paused(0),
290   num_drained(0),
291   wq(NULL) {}
292
293 void ShardedThreadPool::shardedthreadpool_worker(uint32_t thread_index)
294 {
295   assert(wq != NULL);
296   ldout(cct,10) << "worker start" << dendl;
297
298   std::stringstream ss;
299   ss << name << " thread " << (void *)pthread_self();
300   heartbeat_handle_d *hb = cct->get_heartbeat_map()->add_worker(ss.str(), pthread_self());
301
302   while (!stop_threads) {
303     if (pause_threads) {
304       shardedpool_lock.Lock();
305       ++num_paused;
306       wait_cond.Signal();
307       while (pause_threads) {
308        cct->get_heartbeat_map()->reset_timeout(
309                 hb,
310                 wq->timeout_interval, wq->suicide_interval);
311        shardedpool_cond.WaitInterval(shardedpool_lock,
312            utime_t(
313            cct->_conf->threadpool_empty_queue_max_wait, 0));
314       }
315       --num_paused;
316       shardedpool_lock.Unlock();
317     }
318     if (drain_threads) {
319       shardedpool_lock.Lock();
320       if (wq->is_shard_empty(thread_index)) {
321         ++num_drained;
322         wait_cond.Signal();
323         while (drain_threads) {
324           cct->get_heartbeat_map()->reset_timeout(
325             hb,
326             wq->timeout_interval, wq->suicide_interval);
327           shardedpool_cond.WaitInterval(shardedpool_lock,
328             utime_t(
329               cct->_conf->threadpool_empty_queue_max_wait, 0));
330         }
331         --num_drained;
332       }
333       shardedpool_lock.Unlock();
334     }
335
336     cct->get_heartbeat_map()->reset_timeout(
337       hb,
338       wq->timeout_interval, wq->suicide_interval);
339     wq->_process(thread_index, hb);
340
341   }
342
343   ldout(cct,10) << "sharded worker finish" << dendl;
344
345   cct->get_heartbeat_map()->remove_worker(hb);
346
347 }
348
349 void ShardedThreadPool::start_threads()
350 {
351   assert(shardedpool_lock.is_locked());
352   int32_t thread_index = 0;
353   while (threads_shardedpool.size() < num_threads) {
354
355     WorkThreadSharded *wt = new WorkThreadSharded(this, thread_index);
356     ldout(cct, 10) << "start_threads creating and starting " << wt << dendl;
357     threads_shardedpool.push_back(wt);
358     wt->create(thread_name.c_str());
359     thread_index++;
360   }
361 }
362
363 void ShardedThreadPool::start()
364 {
365   ldout(cct,10) << "start" << dendl;
366
367   shardedpool_lock.Lock();
368   start_threads();
369   shardedpool_lock.Unlock();
370   ldout(cct,15) << "started" << dendl;
371 }
372
373 void ShardedThreadPool::stop()
374 {
375   ldout(cct,10) << "stop" << dendl;
376   stop_threads = true;
377   assert(wq != NULL);
378   wq->return_waiting_threads();
379   for (vector<WorkThreadSharded*>::iterator p = threads_shardedpool.begin();
380        p != threads_shardedpool.end();
381        ++p) {
382     (*p)->join();
383     delete *p;
384   }
385   threads_shardedpool.clear();
386   ldout(cct,15) << "stopped" << dendl;
387 }
388
389 void ShardedThreadPool::pause()
390 {
391   ldout(cct,10) << "pause" << dendl;
392   shardedpool_lock.Lock();
393   pause_threads = true;
394   assert(wq != NULL);
395   wq->return_waiting_threads();
396   while (num_threads != num_paused){
397     wait_cond.Wait(shardedpool_lock);
398   }
399   shardedpool_lock.Unlock();
400   ldout(cct,10) << "paused" << dendl; 
401 }
402
403 void ShardedThreadPool::pause_new()
404 {
405   ldout(cct,10) << "pause_new" << dendl;
406   shardedpool_lock.Lock();
407   pause_threads = true;
408   assert(wq != NULL);
409   wq->return_waiting_threads();
410   shardedpool_lock.Unlock();
411   ldout(cct,10) << "paused_new" << dendl;
412 }
413
414 void ShardedThreadPool::unpause()
415 {
416   ldout(cct,10) << "unpause" << dendl;
417   shardedpool_lock.Lock();
418   pause_threads = false;
419   shardedpool_cond.Signal();
420   shardedpool_lock.Unlock();
421   ldout(cct,10) << "unpaused" << dendl;
422 }
423
424 void ShardedThreadPool::drain()
425 {
426   ldout(cct,10) << "drain" << dendl;
427   shardedpool_lock.Lock();
428   drain_threads = true;
429   assert(wq != NULL);
430   wq->return_waiting_threads();
431   while (num_threads != num_drained) {
432     wait_cond.Wait(shardedpool_lock);
433   }
434   drain_threads = false;
435   shardedpool_cond.Signal();
436   shardedpool_lock.Unlock();
437   ldout(cct,10) << "drained" << dendl;
438 }
439