Fix some bugs when testing opensds ansible
[stor4nfv.git] / src / ceph / src / rgw / rgw_coroutine.h
1 #ifndef CEPH_RGW_COROUTINE_H
2 #define CEPH_RGW_COROUTINE_H
3
4 #ifdef _ASSERT_H
5 #define NEED_ASSERT_H
6 #pragma push_macro("_ASSERT_H")
7 #endif
8
9 #include <boost/asio.hpp>
10 #include <boost/intrusive_ptr.hpp>
11
12 #ifdef NEED_ASSERT_H
13 #pragma pop_macro("_ASSERT_H")
14 #endif
15
16 #include "include/utime.h"
17 #include "common/RefCountedObj.h"
18 #include "common/debug.h"
19 #include "common/Timer.h"
20 #include "common/admin_socket.h"
21
22 #include "rgw_common.h"
23 #include <boost/asio/coroutine.hpp>
24
25 #include <atomic>
26
27 #define RGW_ASYNC_OPS_MGR_WINDOW 100
28
29 class RGWCoroutinesStack;
30 class RGWCoroutinesManager;
31 class RGWAioCompletionNotifier;
32
33 class RGWCompletionManager : public RefCountedObject {
34   CephContext *cct;
35   list<void *> complete_reqs;
36   using NotifierRef = boost::intrusive_ptr<RGWAioCompletionNotifier>;
37   set<NotifierRef> cns;
38
39   Mutex lock;
40   Cond cond;
41
42   SafeTimer timer;
43
44   std::atomic<bool> going_down = { false };
45
46   map<void *, void *> waiters;
47
48   class WaitContext;
49
50 protected:
51   void _wakeup(void *opaque);
52   void _complete(RGWAioCompletionNotifier *cn, void *user_info);
53 public:
54   RGWCompletionManager(CephContext *_cct);
55   ~RGWCompletionManager() override;
56
57   void complete(RGWAioCompletionNotifier *cn, void *user_info);
58   int get_next(void **user_info);
59   bool try_get_next(void **user_info);
60
61   void go_down();
62
63   /*
64    * wait for interval length to complete user_info
65    */
66   void wait_interval(void *opaque, const utime_t& interval, void *user_info);
67   void wakeup(void *opaque);
68
69   void register_completion_notifier(RGWAioCompletionNotifier *cn);
70   void unregister_completion_notifier(RGWAioCompletionNotifier *cn);
71 };
72
73 /* a single use librados aio completion notifier that hooks into the RGWCompletionManager */
74 class RGWAioCompletionNotifier : public RefCountedObject {
75   librados::AioCompletion *c;
76   RGWCompletionManager *completion_mgr;
77   void *user_data;
78   Mutex lock;
79   bool registered;
80
81 public:
82   RGWAioCompletionNotifier(RGWCompletionManager *_mgr, void *_user_data);
83   ~RGWAioCompletionNotifier() override {
84     c->release();
85     lock.Lock();
86     bool need_unregister = registered;
87     if (registered) {
88       completion_mgr->get();
89     }
90     registered = false;
91     lock.Unlock();
92     if (need_unregister) {
93       completion_mgr->unregister_completion_notifier(this);
94       completion_mgr->put();
95     }
96   }
97
98   librados::AioCompletion *completion() {
99     return c;
100   }
101
102   void unregister() {
103     Mutex::Locker l(lock);
104     if (!registered) {
105       return;
106     }
107     registered = false;
108   }
109
110   void cb() {
111     lock.Lock();
112     if (!registered) {
113       lock.Unlock();
114       put();
115       return;
116     }
117     completion_mgr->get();
118     registered = false;
119     lock.Unlock();
120     completion_mgr->complete(this, user_data);
121     completion_mgr->put();
122     put();
123   }
124 };
125
126 struct RGWCoroutinesEnv {
127   uint64_t run_context;
128   RGWCoroutinesManager *manager;
129   list<RGWCoroutinesStack *> *scheduled_stacks;
130   RGWCoroutinesStack *stack;
131
132   RGWCoroutinesEnv() : run_context(0), manager(NULL), scheduled_stacks(NULL), stack(NULL) {}
133 };
134
135 enum RGWCoroutineState {
136   RGWCoroutine_Error = -2,
137   RGWCoroutine_Done  = -1,
138   RGWCoroutine_Run   =  0,
139 };
140
141 struct rgw_spawned_stacks {
142   vector<RGWCoroutinesStack *> entries;
143
144   rgw_spawned_stacks() {}
145
146   void add_pending(RGWCoroutinesStack *s) {
147     entries.push_back(s);
148   }
149
150   void inherit(rgw_spawned_stacks *source) {
151     for (vector<RGWCoroutinesStack *>::iterator iter = source->entries.begin();
152          iter != source->entries.end(); ++iter) {
153       add_pending(*iter);
154     }
155     source->entries.clear();
156   }
157 };
158
159
160
161 class RGWCoroutine : public RefCountedObject, public boost::asio::coroutine {
162   friend class RGWCoroutinesStack;
163
164   struct StatusItem {
165     utime_t timestamp;
166     string status;
167
168     StatusItem(utime_t& t, const string& s) : timestamp(t), status(s) {}
169
170     void dump(Formatter *f) const;
171   };
172
173 #define MAX_COROUTINE_HISTORY 10
174
175   struct Status {
176     CephContext *cct;
177     RWLock lock;
178     int max_history;
179
180     utime_t timestamp;
181     stringstream status;
182
183     Status(CephContext *_cct) : cct(_cct), lock("RGWCoroutine::Status::lock"), max_history(MAX_COROUTINE_HISTORY) {}
184
185     deque<StatusItem> history;
186
187     stringstream& set_status();
188   } status;
189
190   stringstream description;
191
192 protected:
193   bool _yield_ret;
194   boost::asio::coroutine drain_cr;
195
196   CephContext *cct;
197
198   RGWCoroutinesStack *stack;
199   int retcode;
200   int state;
201
202   rgw_spawned_stacks spawned;
203
204   stringstream error_stream;
205
206   int set_state(int s, int ret = 0) {
207     state = s;
208     return ret;
209   }
210   int set_cr_error(int ret) {
211     state = RGWCoroutine_Error;
212     return ret;
213   }
214   int set_cr_done() {
215     state = RGWCoroutine_Done;
216     return 0;
217   }
218   void set_io_blocked(bool flag);
219   int io_block(int ret = 0);
220
221   void reset_description() {
222     description.str(string());
223   }
224
225   stringstream& set_description() {
226     return description;
227   }
228   stringstream& set_status() {
229     return status.set_status();
230   }
231
232   stringstream& set_status(const string& s) {
233     stringstream& status = set_status();
234     status << s;
235     return status;
236   }
237
238 public:
239   RGWCoroutine(CephContext *_cct) : status(_cct), _yield_ret(false), cct(_cct), stack(NULL), retcode(0), state(RGWCoroutine_Run) {}
240   ~RGWCoroutine() override;
241
242   virtual int operate() = 0;
243
244   bool is_done() { return (state == RGWCoroutine_Done || state == RGWCoroutine_Error); }
245   bool is_error() { return (state == RGWCoroutine_Error); }
246
247   stringstream& log_error() { return error_stream; }
248   string error_str() {
249     return error_stream.str();
250   }
251
252   void set_retcode(int r) {
253     retcode = r;
254   }
255
256   int get_ret_status() {
257     return retcode;
258   }
259
260   void call(RGWCoroutine *op); /* call at the same stack we're in */
261   RGWCoroutinesStack *spawn(RGWCoroutine *op, bool wait); /* execute on a different stack */
262   bool collect(int *ret, RGWCoroutinesStack *skip_stack); /* returns true if needs to be called again */
263   bool collect_next(int *ret, RGWCoroutinesStack **collected_stack = NULL); /* returns true if found a stack to collect */
264
265   int wait(const utime_t& interval);
266   bool drain_children(int num_cr_left, RGWCoroutinesStack *skip_stack = NULL); /* returns true if needed to be called again */
267   void wakeup();
268   void set_sleeping(bool flag); /* put in sleep, or wakeup from sleep */
269
270   size_t num_spawned() {
271     return spawned.entries.size();
272   }
273
274   void wait_for_child();
275
276   virtual string to_str() const;
277
278   RGWCoroutinesStack *get_stack() const {
279     return stack;
280   }
281
282   void dump(Formatter *f) const;
283 };
284
285 ostream& operator<<(ostream& out, const RGWCoroutine& cr);
286
287 #define yield_until_true(x)     \
288 do {                            \
289   do {                          \
290     yield _yield_ret = x;       \
291   } while (!_yield_ret);        \
292   _yield_ret = false;           \
293 } while (0)
294
295 #define drain_all() \
296   drain_cr = boost::asio::coroutine(); \
297   yield_until_true(drain_children(0))
298
299 #define drain_all_but(n) \
300   drain_cr = boost::asio::coroutine(); \
301   yield_until_true(drain_children(n))
302
303 #define drain_all_but_stack(stack) \
304   drain_cr = boost::asio::coroutine(); \
305   yield_until_true(drain_children(1, stack))
306
307 template <class T>
308 class RGWConsumerCR : public RGWCoroutine {
309   list<T> product;
310
311 public:
312   RGWConsumerCR(CephContext *_cct) : RGWCoroutine(_cct) {}
313
314   bool has_product() {
315     return !product.empty();
316   }
317
318   void wait_for_product() {
319     if (!has_product()) {
320       set_sleeping(true);
321     }
322   }
323
324   bool consume(T *p) {
325     if (product.empty()) {
326       return false;
327     }
328     *p = product.front();
329     product.pop_front();
330     return true;
331   }
332
333   void receive(const T& p, bool wakeup = true);
334   void receive(list<T>& l, bool wakeup = true);
335 };
336
337 class RGWCoroutinesStack : public RefCountedObject {
338   friend class RGWCoroutine;
339   friend class RGWCoroutinesManager;
340
341   CephContext *cct;
342
343   RGWCoroutinesManager *ops_mgr;
344
345   list<RGWCoroutine *> ops;
346   list<RGWCoroutine *>::iterator pos;
347
348   rgw_spawned_stacks spawned;
349
350   set<RGWCoroutinesStack *> blocked_by_stack;
351   set<RGWCoroutinesStack *> blocking_stacks;
352
353   bool done_flag;
354   bool error_flag;
355   bool blocked_flag;
356   bool sleep_flag;
357   bool interval_wait_flag;
358
359   bool is_scheduled;
360
361   bool is_waiting_for_child;
362
363   int retcode;
364
365   uint64_t run_count;
366
367 protected:
368   RGWCoroutinesEnv *env;
369   RGWCoroutinesStack *parent;
370
371   RGWCoroutinesStack *spawn(RGWCoroutine *source_op, RGWCoroutine *next_op, bool wait);
372   bool collect(RGWCoroutine *op, int *ret, RGWCoroutinesStack *skip_stack); /* returns true if needs to be called again */
373   bool collect_next(RGWCoroutine *op, int *ret, RGWCoroutinesStack **collected_stack); /* returns true if found a stack to collect */
374 public:
375   RGWCoroutinesStack(CephContext *_cct, RGWCoroutinesManager *_ops_mgr, RGWCoroutine *start = NULL);
376   ~RGWCoroutinesStack() override;
377
378   int operate(RGWCoroutinesEnv *env);
379
380   bool is_done() {
381     return done_flag;
382   }
383   bool is_error() {
384     return error_flag;
385   }
386   bool is_blocked_by_stack() {
387     return !blocked_by_stack.empty();
388   }
389   void set_io_blocked(bool flag) {
390     blocked_flag = flag;
391   }
392   bool is_io_blocked() {
393     return blocked_flag;
394   }
395   void set_interval_wait(bool flag) {
396     interval_wait_flag = flag;
397   }
398   bool is_interval_waiting() {
399     return interval_wait_flag;
400   }
401   void set_sleeping(bool flag) {
402     bool wakeup = sleep_flag & !flag;
403     sleep_flag = flag;
404     if (wakeup) {
405       schedule();
406     }
407   }
408   bool is_sleeping() {
409     return sleep_flag;
410   }
411   void set_is_scheduled(bool flag) {
412     is_scheduled = flag;
413   }
414
415   bool is_blocked() {
416     return is_blocked_by_stack() || is_sleeping() ||
417           is_io_blocked() || waiting_for_child() ;
418   }
419
420   void schedule(list<RGWCoroutinesStack *> *stacks = NULL) {
421     if (!stacks) {
422       stacks = env->scheduled_stacks;
423     }
424     if (!is_scheduled) {
425       stacks->push_back(this);
426       is_scheduled = true;
427     }
428   }
429
430   int get_ret_status() {
431     return retcode;
432   }
433
434   string error_str();
435
436   void call(RGWCoroutine *next_op);
437   RGWCoroutinesStack *spawn(RGWCoroutine *next_op, bool wait);
438   int unwind(int retcode);
439
440   int wait(const utime_t& interval);
441   void wakeup();
442
443   bool collect(int *ret, RGWCoroutinesStack *skip_stack); /* returns true if needs to be called again */
444
445   RGWAioCompletionNotifier *create_completion_notifier();
446   RGWCompletionManager *get_completion_mgr();
447
448   void set_blocked_by(RGWCoroutinesStack *s) {
449     blocked_by_stack.insert(s);
450     s->blocking_stacks.insert(this);
451   }
452
453   void set_wait_for_child(bool flag) {
454     is_waiting_for_child = flag;
455   }
456
457   bool waiting_for_child() {
458     return is_waiting_for_child;
459   }
460
461   bool unblock_stack(RGWCoroutinesStack **s);
462
463   RGWCoroutinesEnv *get_env() { return env; }
464
465   void dump(Formatter *f) const;
466 };
467
468 template <class T>
469 void RGWConsumerCR<T>::receive(list<T>& l, bool wakeup)
470 {
471   product.splice(product.end(), l);
472   if (wakeup) {
473     set_sleeping(false);
474   }
475 }
476
477
478 template <class T>
479 void RGWConsumerCR<T>::receive(const T& p, bool wakeup)
480 {
481   product.push_back(p);
482   if (wakeup) {
483     set_sleeping(false);
484   }
485 }
486
487 class RGWCoroutinesManagerRegistry : public RefCountedObject, public AdminSocketHook {
488   CephContext *cct;
489
490   set<RGWCoroutinesManager *> managers;
491   RWLock lock;
492
493   string admin_command;
494
495 public:
496   RGWCoroutinesManagerRegistry(CephContext *_cct) : cct(_cct), lock("RGWCoroutinesRegistry::lock") {}
497   ~RGWCoroutinesManagerRegistry() override;
498
499   void add(RGWCoroutinesManager *mgr);
500   void remove(RGWCoroutinesManager *mgr);
501
502   int hook_to_admin_command(const string& command);
503   bool call(std::string command, cmdmap_t& cmdmap, std::string format,
504             bufferlist& out) override;
505     
506   void dump(Formatter *f) const;
507 };
508
509 class RGWCoroutinesManager {
510   CephContext *cct;
511   std::atomic<bool> going_down = { false };
512
513   std::atomic<int64_t> run_context_count = { 0 };
514   map<uint64_t, set<RGWCoroutinesStack *> > run_contexts;
515
516   RWLock lock;
517
518   void handle_unblocked_stack(set<RGWCoroutinesStack *>& context_stacks, list<RGWCoroutinesStack *>& scheduled_stacks, RGWCoroutinesStack *stack, int *waiting_count);
519 protected:
520   RGWCompletionManager *completion_mgr;
521   RGWCoroutinesManagerRegistry *cr_registry;
522
523   int ops_window;
524
525   string id;
526
527   void put_completion_notifier(RGWAioCompletionNotifier *cn);
528 public:
529   RGWCoroutinesManager(CephContext *_cct, RGWCoroutinesManagerRegistry *_cr_registry) : cct(_cct), lock("RGWCoroutinesManager::lock"),
530                                                                                         cr_registry(_cr_registry), ops_window(RGW_ASYNC_OPS_MGR_WINDOW) {
531     completion_mgr = new RGWCompletionManager(cct);
532     if (cr_registry) {
533       cr_registry->add(this);
534     }
535   }
536   virtual ~RGWCoroutinesManager() {
537     stop();
538     completion_mgr->put();
539     if (cr_registry) {
540       cr_registry->remove(this);
541     }
542   }
543
544   int run(list<RGWCoroutinesStack *>& ops);
545   int run(RGWCoroutine *op);
546   void stop() {
547     bool expected = false;
548     if (going_down.compare_exchange_strong(expected, true)) {
549       completion_mgr->go_down();
550     }
551   }
552
553   virtual void report_error(RGWCoroutinesStack *op);
554
555   RGWAioCompletionNotifier *create_completion_notifier(RGWCoroutinesStack *stack);
556   RGWCompletionManager *get_completion_mgr() { return completion_mgr; }
557
558   void schedule(RGWCoroutinesEnv *env, RGWCoroutinesStack *stack);
559   RGWCoroutinesStack *allocate_stack();
560
561   virtual string get_id();
562   void dump(Formatter *f) const;
563 };
564
565 class RGWSimpleCoroutine : public RGWCoroutine {
566   bool called_cleanup;
567
568   int operate() override;
569
570   int state_init();
571   int state_send_request();
572   int state_request_complete();
573   int state_all_complete();
574
575   void call_cleanup();
576
577 public:
578   RGWSimpleCoroutine(CephContext *_cct) : RGWCoroutine(_cct), called_cleanup(false) {}
579   ~RGWSimpleCoroutine() override;
580
581   virtual int init() { return 0; }
582   virtual int send_request() = 0;
583   virtual int request_complete() = 0;
584   virtual int finish() { return 0; }
585   virtual void request_cleanup() {}
586 };
587
588 #endif