Fix some bugs when testing opensds ansible
[stor4nfv.git] / src / ceph / src / common / Finisher.h
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- 
2 // vim: ts=8 sw=2 smarttab
3 /*
4  * Ceph - scalable distributed file system
5  *
6  * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
7  *
8  * This is free software; you can redistribute it and/or
9  * modify it under the terms of the GNU Lesser General Public
10  * License version 2.1, as published by the Free Software 
11  * Foundation.  See file COPYING.
12  * 
13  */
14
15 #ifndef CEPH_FINISHER_H
16 #define CEPH_FINISHER_H
17
18 #include "common/Mutex.h"
19 #include "common/Cond.h"
20 #include "common/perf_counters.h"
21
22 class CephContext;
23
24 /// Finisher queue length performance counter ID.
25 enum {
26   l_finisher_first = 997082,
27   l_finisher_queue_len,
28   l_finisher_complete_lat,
29   l_finisher_last
30 };
31
32 /** @brief Asynchronous cleanup class.
33  * Finisher asynchronously completes Contexts, which are simple classes
34  * representing callbacks, in a dedicated worker thread. Enqueuing
35  * contexts to complete is thread-safe.
36  */
37 class Finisher {
38   CephContext *cct;
39   Mutex        finisher_lock; ///< Protects access to queues and finisher_running.
40   Cond         finisher_cond; ///< Signaled when there is something to process.
41   Cond         finisher_empty_cond; ///< Signaled when the finisher has nothing more to process.
42   bool         finisher_stop; ///< Set when the finisher should stop.
43   bool         finisher_running; ///< True when the finisher is currently executing contexts.
44   bool         finisher_empty_wait; ///< True mean someone wait finisher empty.
45   /// Queue for contexts for which complete(0) will be called.
46   /// NULLs in this queue indicate that an item from finisher_queue_rval
47   /// should be completed in that place instead.
48   vector<Context*> finisher_queue;
49
50   string thread_name;
51
52   /// Queue for contexts for which the complete function will be called
53   /// with a parameter other than 0.
54   list<pair<Context*,int> > finisher_queue_rval;
55
56   /// Performance counter for the finisher's queue length.
57   /// Only active for named finishers.
58   PerfCounters *logger;
59   
60   void *finisher_thread_entry();
61
62   struct FinisherThread : public Thread {
63     Finisher *fin;    
64     explicit FinisherThread(Finisher *f) : fin(f) {}
65     void* entry() override { return (void*)fin->finisher_thread_entry(); }
66   } finisher_thread;
67
68  public:
69   /// Add a context to complete, optionally specifying a parameter for the complete function.
70   void queue(Context *c, int r = 0) {
71     finisher_lock.Lock();
72     if (finisher_queue.empty()) {
73       finisher_cond.Signal();
74     }
75     if (r) {
76       finisher_queue_rval.push_back(pair<Context*, int>(c, r));
77       finisher_queue.push_back(NULL);
78     } else
79       finisher_queue.push_back(c);
80     if (logger)
81       logger->inc(l_finisher_queue_len);
82     finisher_lock.Unlock();
83   }
84   void queue(vector<Context*>& ls) {
85     finisher_lock.Lock();
86     if (finisher_queue.empty()) {
87       finisher_cond.Signal();
88     }
89     finisher_queue.insert(finisher_queue.end(), ls.begin(), ls.end());
90     if (logger)
91       logger->inc(l_finisher_queue_len, ls.size());
92     finisher_lock.Unlock();
93     ls.clear();
94   }
95   void queue(deque<Context*>& ls) {
96     finisher_lock.Lock();
97     if (finisher_queue.empty()) {
98       finisher_cond.Signal();
99     }
100     finisher_queue.insert(finisher_queue.end(), ls.begin(), ls.end());
101     if (logger)
102       logger->inc(l_finisher_queue_len, ls.size());
103     finisher_lock.Unlock();
104     ls.clear();
105   }
106   void queue(list<Context*>& ls) {
107     finisher_lock.Lock();
108     if (finisher_queue.empty()) {
109       finisher_cond.Signal();
110     }
111     finisher_queue.insert(finisher_queue.end(), ls.begin(), ls.end());
112     if (logger)
113       logger->inc(l_finisher_queue_len, ls.size());
114     finisher_lock.Unlock();
115     ls.clear();
116   }
117
118   /// Start the worker thread.
119   void start();
120
121   /** @brief Stop the worker thread.
122    *
123    * Does not wait until all outstanding contexts are completed.
124    * To ensure that everything finishes, you should first shut down
125    * all sources that can add contexts to this finisher and call
126    * wait_for_empty() before calling stop(). */
127   void stop();
128
129   /** @brief Blocks until the finisher has nothing left to process.
130    * This function will also return when a concurrent call to stop()
131    * finishes, but this class should never be used in this way. */
132   void wait_for_empty();
133
134   /// Construct an anonymous Finisher.
135   /// Anonymous finishers do not log their queue length.
136   explicit Finisher(CephContext *cct_) :
137     cct(cct_), finisher_lock("Finisher::finisher_lock"),
138     finisher_stop(false), finisher_running(false), finisher_empty_wait(false),
139     thread_name("fn_anonymous"), logger(0),
140     finisher_thread(this) {}
141
142   /// Construct a named Finisher that logs its queue length.
143   Finisher(CephContext *cct_, string name, string tn) :
144     cct(cct_), finisher_lock("Finisher::" + name),
145     finisher_stop(false), finisher_running(false), finisher_empty_wait(false),
146     thread_name(tn), logger(0),
147     finisher_thread(this) {
148     PerfCountersBuilder b(cct, string("finisher-") + name,
149                           l_finisher_first, l_finisher_last);
150     b.add_u64(l_finisher_queue_len, "queue_len");
151     b.add_time_avg(l_finisher_complete_lat, "complete_latency");
152     logger = b.create_perf_counters();
153     cct->get_perfcounters_collection()->add(logger);
154     logger->set(l_finisher_queue_len, 0);
155     logger->set(l_finisher_complete_lat, 0);
156   }
157
158   ~Finisher() {
159     if (logger && cct) {
160       cct->get_perfcounters_collection()->remove(logger);
161       delete logger;
162     }
163   }
164 };
165
166 /// Context that is completed asynchronously on the supplied finisher.
167 class C_OnFinisher : public Context {
168   Context *con;
169   Finisher *fin;
170 public:
171   C_OnFinisher(Context *c, Finisher *f) : con(c), fin(f) {
172     assert(fin != NULL);
173     assert(con != NULL);
174   }
175
176   ~C_OnFinisher() override {
177     if (con != nullptr) {
178       delete con;
179       con = nullptr;
180     }
181   }
182
183   void finish(int r) override {
184     fin->queue(con, r);
185     con = nullptr;
186   }
187 };
188
189 #endif