Fix some bugs when testing opensds ansible
[stor4nfv.git] / src / ceph / src / common / Finisher.cc
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
3
4 #include "Finisher.h"
5
6 #define dout_subsys ceph_subsys_finisher
7 #undef dout_prefix
8 #define dout_prefix *_dout << "finisher(" << this << ") "
9
10 void Finisher::start()
11 {
12   ldout(cct, 10) << __func__ << dendl;
13   finisher_thread.create(thread_name.c_str());
14 }
15
16 void Finisher::stop()
17 {
18   ldout(cct, 10) << __func__ << dendl;
19   finisher_lock.Lock();
20   finisher_stop = true;
21   // we don't have any new work to do, but we want the worker to wake up anyway
22   // to process the stop condition.
23   finisher_cond.Signal();
24   finisher_lock.Unlock();
25   finisher_thread.join(); // wait until the worker exits completely
26   ldout(cct, 10) << __func__ << " finish" << dendl;
27 }
28
29 void Finisher::wait_for_empty()
30 {
31   finisher_lock.Lock();
32   while (!finisher_queue.empty() || finisher_running) {
33     ldout(cct, 10) << "wait_for_empty waiting" << dendl;
34     finisher_empty_wait = true;
35     finisher_empty_cond.Wait(finisher_lock);
36   }
37   ldout(cct, 10) << "wait_for_empty empty" << dendl;
38   finisher_empty_wait = false;
39   finisher_lock.Unlock();
40 }
41
42 void *Finisher::finisher_thread_entry()
43 {
44   finisher_lock.Lock();
45   ldout(cct, 10) << "finisher_thread start" << dendl;
46
47   utime_t start;
48   uint64_t count = 0;
49   while (!finisher_stop) {
50     /// Every time we are woken up, we process the queue until it is empty.
51     while (!finisher_queue.empty()) {
52       // To reduce lock contention, we swap out the queue to process.
53       // This way other threads can submit new contexts to complete while we are working.
54       vector<Context*> ls;
55       list<pair<Context*,int> > ls_rval;
56       ls.swap(finisher_queue);
57       ls_rval.swap(finisher_queue_rval);
58       finisher_running = true;
59       finisher_lock.Unlock();
60       ldout(cct, 10) << "finisher_thread doing " << ls << dendl;
61
62       if (logger) {
63         start = ceph_clock_now();
64         count = ls.size();
65       }
66
67       // Now actually process the contexts.
68       for (vector<Context*>::iterator p = ls.begin();
69            p != ls.end();
70            ++p) {
71         if (*p) {
72           (*p)->complete(0);
73         } else {
74           // When an item is NULL in the finisher_queue, it means
75           // we should instead process an item from finisher_queue_rval,
76           // which has a parameter for complete() other than zero.
77           // This preserves the order while saving some storage.
78           assert(!ls_rval.empty());
79           Context *c = ls_rval.front().first;
80           c->complete(ls_rval.front().second);
81           ls_rval.pop_front();
82         }
83       }
84       ldout(cct, 10) << "finisher_thread done with " << ls << dendl;
85       ls.clear();
86       if (logger) {
87         logger->dec(l_finisher_queue_len, count);
88         logger->tinc(l_finisher_complete_lat, ceph_clock_now() - start);
89       }
90
91       finisher_lock.Lock();
92       finisher_running = false;
93     }
94     ldout(cct, 10) << "finisher_thread empty" << dendl;
95     if (unlikely(finisher_empty_wait))
96       finisher_empty_cond.Signal();
97     if (finisher_stop)
98       break;
99     
100     ldout(cct, 10) << "finisher_thread sleeping" << dendl;
101     finisher_cond.Wait(finisher_lock);
102   }
103   // If we are exiting, we signal the thread waiting in stop(),
104   // otherwise it would never unblock
105   finisher_empty_cond.Signal();
106
107   ldout(cct, 10) << "finisher_thread stop" << dendl;
108   finisher_stop = false;
109   finisher_lock.Unlock();
110   return 0;
111 }
112