Fix some bugs when testing opensds ansible
[stor4nfv.git] / src / ceph / src / rgw / rgw_coroutine.cc
1
2 #include "common/ceph_json.h"
3 #include "rgw_coroutine.h"
4
5 // re-include our assert to clobber the system one; fix dout:
6 #include "include/assert.h"
7
8 #include <boost/asio/yield.hpp>
9
10 #define dout_subsys ceph_subsys_rgw
11
12
13 class RGWCompletionManager::WaitContext : public Context {
14   RGWCompletionManager *manager;
15   void *opaque;
16 public:
17   WaitContext(RGWCompletionManager *_cm, void *_opaque) : manager(_cm), opaque(_opaque) {}
18   void finish(int r) override {
19     manager->_wakeup(opaque);
20   }
21 };
22
23 RGWCompletionManager::RGWCompletionManager(CephContext *_cct) : cct(_cct), lock("RGWCompletionManager::lock"),
24                                             timer(cct, lock)
25 {
26   timer.init();
27 }
28
29 RGWCompletionManager::~RGWCompletionManager()
30 {
31   Mutex::Locker l(lock);
32   timer.cancel_all_events();
33   timer.shutdown();
34 }
35
36 void RGWCompletionManager::complete(RGWAioCompletionNotifier *cn, void *user_info)
37 {
38   Mutex::Locker l(lock);
39   _complete(cn, user_info);
40 }
41
42 void RGWCompletionManager::register_completion_notifier(RGWAioCompletionNotifier *cn)
43 {
44   Mutex::Locker l(lock);
45   if (cn) {
46     cns.insert(cn);
47   }
48 }
49
50 void RGWCompletionManager::unregister_completion_notifier(RGWAioCompletionNotifier *cn)
51 {
52   Mutex::Locker l(lock);
53   if (cn) {
54     cns.erase(cn);
55   }
56 }
57
58 void RGWCompletionManager::_complete(RGWAioCompletionNotifier *cn, void *user_info)
59 {
60   if (cn) {
61     cns.erase(cn);
62   }
63   complete_reqs.push_back(user_info);
64   cond.Signal();
65 }
66
67 int RGWCompletionManager::get_next(void **user_info)
68 {
69   Mutex::Locker l(lock);
70   while (complete_reqs.empty()) {
71     cond.Wait(lock);
72     if (going_down) {
73       return -ECANCELED;
74     }
75   }
76   *user_info = complete_reqs.front();
77   complete_reqs.pop_front();
78   return 0;
79 }
80
81 bool RGWCompletionManager::try_get_next(void **user_info)
82 {
83   Mutex::Locker l(lock);
84   if (complete_reqs.empty()) {
85     return false;
86   }
87   *user_info = complete_reqs.front();
88   complete_reqs.pop_front();
89   return true;
90 }
91
92 void RGWCompletionManager::go_down()
93 {
94   Mutex::Locker l(lock);
95   for (auto cn : cns) {
96     cn->unregister();
97   }
98   going_down = true;
99   cond.Signal();
100 }
101
102 void RGWCompletionManager::wait_interval(void *opaque, const utime_t& interval, void *user_info)
103 {
104   Mutex::Locker l(lock);
105   assert(waiters.find(opaque) == waiters.end());
106   waiters[opaque] = user_info;
107   timer.add_event_after(interval, new WaitContext(this, opaque));
108 }
109
110 void RGWCompletionManager::wakeup(void *opaque)
111 {
112   Mutex::Locker l(lock);
113   _wakeup(opaque);
114 }
115
116 void RGWCompletionManager::_wakeup(void *opaque)
117 {
118   map<void *, void *>::iterator iter = waiters.find(opaque);
119   if (iter != waiters.end()) {
120     void *user_id = iter->second;
121     waiters.erase(iter);
122     _complete(NULL, user_id);
123   }
124 }
125
126 RGWCoroutine::~RGWCoroutine() {
127   for (auto stack : spawned.entries) {
128     stack->put();
129   }
130 }
131
132 void RGWCoroutine::set_io_blocked(bool flag) {
133   stack->set_io_blocked(flag);
134 }
135
136 void RGWCoroutine::set_sleeping(bool flag) {
137   stack->set_sleeping(flag);
138 }
139
140 int RGWCoroutine::io_block(int ret) {
141   set_io_blocked(true);
142   return ret;
143 }
144
145 void RGWCoroutine::StatusItem::dump(Formatter *f) const {
146   ::encode_json("timestamp", timestamp, f);
147   ::encode_json("status", status, f);
148 }
149
150 stringstream& RGWCoroutine::Status::set_status()
151 {
152   RWLock::WLocker l(lock);
153   string s = status.str();
154   status.str(string());
155   if (!timestamp.is_zero()) {
156     history.push_back(StatusItem(timestamp, s));
157   }
158   if (history.size() > (size_t)max_history) {
159     history.pop_front();
160   }
161   timestamp = ceph_clock_now();
162
163   return status;
164 }
165
166 RGWCoroutinesStack::RGWCoroutinesStack(CephContext *_cct, RGWCoroutinesManager *_ops_mgr, RGWCoroutine *start) : cct(_cct), ops_mgr(_ops_mgr),
167                                                                                                          done_flag(false), error_flag(false), blocked_flag(false),
168                                                                                                          sleep_flag(false), interval_wait_flag(false), is_scheduled(false), is_waiting_for_child(false),
169                                                                                                          retcode(0), run_count(0),
170                                                                                                          env(NULL), parent(NULL)
171 {
172   if (start) {
173     ops.push_back(start);
174   }
175   pos = ops.begin();
176 }
177
178 RGWCoroutinesStack::~RGWCoroutinesStack()
179 {
180   for (auto op : ops) {
181     op->put();
182   }
183
184   for (auto stack : spawned.entries) {
185     stack->put();
186   }
187 }
188
189 int RGWCoroutinesStack::operate(RGWCoroutinesEnv *_env)
190 {
191   env = _env;
192   RGWCoroutine *op = *pos;
193   op->stack = this;
194   ldout(cct, 20) << *op << ": operate()" << dendl;
195   int r = op->operate();
196   if (r < 0) {
197     ldout(cct, 20) << *op << ": operate() returned r=" << r << dendl;
198   }
199
200   error_flag = op->is_error();
201
202   if (op->is_done()) {
203     int op_retcode = r;
204     r = unwind(op_retcode);
205     op->put();
206     done_flag = (pos == ops.end());
207     if (done_flag) {
208       retcode = op_retcode;
209     }
210     return r;
211   }
212
213   /* should r ever be negative at this point? */
214   assert(r >= 0);
215
216   return 0;
217 }
218
219 string RGWCoroutinesStack::error_str()
220 {
221   if (pos != ops.end()) {
222     return (*pos)->error_str();
223   }
224   return string();
225 }
226
227 void RGWCoroutinesStack::call(RGWCoroutine *next_op) {
228   if (!next_op) {
229     return;
230   }
231   ops.push_back(next_op);
232   if (pos != ops.end()) {
233     ++pos;
234   } else {
235     pos = ops.begin();
236   }
237 }
238
239 RGWCoroutinesStack *RGWCoroutinesStack::spawn(RGWCoroutine *source_op, RGWCoroutine *op, bool wait)
240 {
241   if (!op) {
242     return NULL;
243   }
244
245   rgw_spawned_stacks *s = (source_op ? &source_op->spawned : &spawned);
246
247   RGWCoroutinesStack *stack = env->manager->allocate_stack();
248   s->add_pending(stack);
249   stack->parent = this;
250
251   stack->get(); /* we'll need to collect the stack */
252   stack->call(op);
253
254   env->manager->schedule(env, stack);
255
256   if (wait) {
257     set_blocked_by(stack);
258   }
259
260   return stack;
261 }
262
263 RGWCoroutinesStack *RGWCoroutinesStack::spawn(RGWCoroutine *op, bool wait)
264 {
265   return spawn(NULL, op, wait);
266 }
267
268 int RGWCoroutinesStack::wait(const utime_t& interval)
269 {
270   RGWCompletionManager *completion_mgr = env->manager->get_completion_mgr();
271   completion_mgr->wait_interval((void *)this, interval, (void *)this);
272   set_io_blocked(true);
273   set_interval_wait(true);
274   return 0;
275 }
276
277 void RGWCoroutinesStack::wakeup()
278 {
279   RGWCompletionManager *completion_mgr = env->manager->get_completion_mgr();
280   completion_mgr->wakeup((void *)this);
281 }
282
283 int RGWCoroutinesStack::unwind(int retcode)
284 {
285   rgw_spawned_stacks *src_spawned = &(*pos)->spawned;
286
287   if (pos == ops.begin()) {
288     spawned.inherit(src_spawned);
289     ops.clear();
290     pos = ops.end();
291     return retcode;
292   }
293
294   --pos;
295   ops.pop_back();
296   RGWCoroutine *op = *pos;
297   op->set_retcode(retcode);
298   op->spawned.inherit(src_spawned);
299   return 0;
300 }
301
302
303 bool RGWCoroutinesStack::collect(RGWCoroutine *op, int *ret, RGWCoroutinesStack *skip_stack) /* returns true if needs to be called again */
304 {
305   bool done = true;
306   rgw_spawned_stacks *s = (op ? &op->spawned : &spawned);
307   *ret = 0;
308   vector<RGWCoroutinesStack *> new_list;
309
310   for (vector<RGWCoroutinesStack *>::iterator iter = s->entries.begin(); iter != s->entries.end(); ++iter) {
311     RGWCoroutinesStack *stack = *iter;
312     if (stack == skip_stack || !stack->is_done()) {
313       new_list.push_back(stack);
314       if (!stack->is_done()) {
315         ldout(cct, 20) << "collect(): s=" << (void *)this << " stack=" << (void *)stack << " is still running" << dendl;
316       } else if (stack == skip_stack) {
317         ldout(cct, 20) << "collect(): s=" << (void *)this << " stack=" << (void *)stack << " explicitly skipping stack" << dendl;
318       }
319       continue;
320     }
321     int r = stack->get_ret_status();
322     stack->put();
323     if (r < 0) {
324       *ret = r;
325       ldout(cct, 20) << "collect(): s=" << (void *)this << " stack=" << (void *)stack << " encountered error (r=" << r << "), skipping next stacks" << dendl;
326       new_list.insert(new_list.end(), ++iter, s->entries.end());
327       done &= (iter != s->entries.end());
328       break;
329     }
330
331     ldout(cct, 20) << "collect(): s=" << (void *)this << " stack=" << (void *)stack << " is complete" << dendl;
332   }
333
334   s->entries.swap(new_list);
335   return (!done);
336 }
337
338 bool RGWCoroutinesStack::collect_next(RGWCoroutine *op, int *ret, RGWCoroutinesStack **collected_stack) /* returns true if found a stack to collect */
339 {
340   rgw_spawned_stacks *s = (op ? &op->spawned : &spawned);
341   *ret = 0;
342
343   if (collected_stack) {
344     *collected_stack = NULL;
345   }
346
347   for (vector<RGWCoroutinesStack *>::iterator iter = s->entries.begin(); iter != s->entries.end(); ++iter) {
348     RGWCoroutinesStack *stack = *iter;
349     if (!stack->is_done()) {
350       continue;
351     }
352     int r = stack->get_ret_status();
353     if (r < 0) {
354       *ret = r;
355     }
356
357     if (collected_stack) {
358       *collected_stack = stack;
359     }
360     stack->put();
361
362     s->entries.erase(iter);
363     return true;
364   }
365
366   return false;
367 }
368
369 bool RGWCoroutinesStack::collect(int *ret, RGWCoroutinesStack *skip_stack) /* returns true if needs to be called again */
370 {
371   return collect(NULL, ret, skip_stack);
372 }
373
374 static void _aio_completion_notifier_cb(librados::completion_t cb, void *arg)
375 {
376   ((RGWAioCompletionNotifier *)arg)->cb();
377 }
378
379 RGWAioCompletionNotifier::RGWAioCompletionNotifier(RGWCompletionManager *_mgr, void *_user_data) : completion_mgr(_mgr),
380                                                                          user_data(_user_data), lock("RGWAioCompletionNotifier"), registered(true) {
381   c = librados::Rados::aio_create_completion((void *)this, NULL,
382                                              _aio_completion_notifier_cb);
383 }
384
385 RGWAioCompletionNotifier *RGWCoroutinesStack::create_completion_notifier()
386 {
387   return ops_mgr->create_completion_notifier(this);
388 }
389
390 RGWCompletionManager *RGWCoroutinesStack::get_completion_mgr()
391 {
392   return ops_mgr->get_completion_mgr();
393 }
394
395 bool RGWCoroutinesStack::unblock_stack(RGWCoroutinesStack **s)
396 {
397   if (blocking_stacks.empty()) {
398     return false;
399   }
400
401   set<RGWCoroutinesStack *>::iterator iter = blocking_stacks.begin();
402   *s = *iter;
403   blocking_stacks.erase(iter);
404   (*s)->blocked_by_stack.erase(this);
405
406   return true;
407 }
408
409 void RGWCoroutinesManager::report_error(RGWCoroutinesStack *op)
410 {
411   if (!op) {
412     return;
413   }
414   string err = op->error_str();
415   if (err.empty()) {
416     return;
417   }
418   lderr(cct) << "ERROR: failed operation: " << op->error_str() << dendl;
419 }
420
421 void RGWCoroutinesStack::dump(Formatter *f) const {
422   stringstream ss;
423   ss << (void *)this;
424   ::encode_json("stack", ss.str(), f);
425   ::encode_json("run_count", run_count, f);
426   f->open_array_section("ops");
427   for (auto& i : ops) {
428     encode_json("op", *i, f);
429   }
430   f->close_section();
431 }
432
433 void RGWCoroutinesManager::handle_unblocked_stack(set<RGWCoroutinesStack *>& context_stacks, list<RGWCoroutinesStack *>& scheduled_stacks, RGWCoroutinesStack *stack, int *blocked_count)
434 {
435   RWLock::WLocker wl(lock);
436   --(*blocked_count);
437   stack->set_io_blocked(false);
438   stack->set_interval_wait(false);
439   if (!stack->is_done()) {
440     scheduled_stacks.push_back(stack);
441   } else {
442     RWLock::WLocker wl(lock);
443     context_stacks.erase(stack);
444     stack->put();
445   }
446 }
447
448 void RGWCoroutinesManager::schedule(RGWCoroutinesEnv *env, RGWCoroutinesStack *stack)
449 {
450   assert(lock.is_wlocked());
451   env->scheduled_stacks->push_back(stack);
452   set<RGWCoroutinesStack *>& context_stacks = run_contexts[env->run_context];
453   context_stacks.insert(stack);
454 }
455
456 int RGWCoroutinesManager::run(list<RGWCoroutinesStack *>& stacks)
457 {
458   int ret = 0;
459   int blocked_count = 0;
460   int interval_wait_count = 0;
461   bool canceled = false; // set on going_down
462   RGWCoroutinesEnv env;
463
464   uint64_t run_context = ++run_context_count;
465
466   lock.get_write();
467   set<RGWCoroutinesStack *>& context_stacks = run_contexts[run_context];
468   list<RGWCoroutinesStack *> scheduled_stacks;
469   for (auto& st : stacks) {
470     context_stacks.insert(st);
471     scheduled_stacks.push_back(st);
472   }
473   lock.unlock();
474
475   env.run_context = run_context;
476   env.manager = this;
477   env.scheduled_stacks = &scheduled_stacks;
478
479   for (list<RGWCoroutinesStack *>::iterator iter = scheduled_stacks.begin(); iter != scheduled_stacks.end() && !going_down;) {
480     lock.get_write();
481
482     RGWCoroutinesStack *stack = *iter;
483     env.stack = stack;
484
485     ret = stack->operate(&env);
486     stack->set_is_scheduled(false);
487     if (ret < 0) {
488       ldout(cct, 20) << "stack->operate() returned ret=" << ret << dendl;
489     }
490
491     if (stack->is_error()) {
492       report_error(stack);
493     }
494
495     bool op_not_blocked = false;
496
497     if (stack->is_io_blocked()) {
498       ldout(cct, 20) << __func__ << ":" << " stack=" << (void *)stack << " is io blocked" << dendl;
499       if (stack->is_interval_waiting()) {
500         interval_wait_count++;
501       }
502       blocked_count++;
503     } else if (stack->is_blocked()) {
504       /* do nothing, we'll re-add the stack when the blocking stack is done,
505        * or when we're awaken
506        */
507       ldout(cct, 20) << __func__ << ":" << " stack=" << (void *)stack << " is_blocked_by_stack()=" << stack->is_blocked_by_stack()
508                      << " is_sleeping=" << stack->is_sleeping() << " waiting_for_child()=" << stack->waiting_for_child() << dendl;
509     } else if (stack->is_done()) {
510       ldout(cct, 20) << __func__ << ":" << " stack=" << (void *)stack << " is done" << dendl;
511       RGWCoroutinesStack *s;
512       while (stack->unblock_stack(&s)) {
513         if (!s->is_blocked_by_stack() && !s->is_done()) {
514           if (s->is_io_blocked()) {
515             if (stack->is_interval_waiting()) {
516               interval_wait_count++;
517             }
518             blocked_count++;
519           } else {
520             s->schedule();
521           }
522         }
523       }
524       if (stack->parent && stack->parent->waiting_for_child()) {
525         stack->parent->set_wait_for_child(false);
526         stack->parent->schedule();
527       }
528       context_stacks.erase(stack);
529       stack->put();
530       stack = NULL;
531     } else {
532       op_not_blocked = true;
533       stack->run_count++;
534       stack->schedule();
535     }
536
537     if (!op_not_blocked && stack) {
538       stack->run_count = 0;
539     }
540
541     lock.unlock();
542
543     RGWCoroutinesStack *blocked_stack;
544     while (completion_mgr->try_get_next((void **)&blocked_stack)) {
545       handle_unblocked_stack(context_stacks, scheduled_stacks, blocked_stack, &blocked_count);
546     }
547
548     /*
549      * only account blocked operations that are not in interval_wait, these are stacks that
550      * were put on a wait without any real IO operations. While we mark these as io_blocked,
551      * these aren't really waiting for IOs
552      */
553     while (blocked_count - interval_wait_count >= ops_window) {
554       ret = completion_mgr->get_next((void **)&blocked_stack);
555       if (ret < 0) {
556        ldout(cct, 0) << "ERROR: failed to clone shard, completion_mgr.get_next() returned ret=" << ret << dendl;
557       }
558       handle_unblocked_stack(context_stacks, scheduled_stacks, blocked_stack, &blocked_count);
559     }
560
561     ++iter;
562     scheduled_stacks.pop_front();
563
564
565     while (scheduled_stacks.empty() && blocked_count > 0) {
566       ret = completion_mgr->get_next((void **)&blocked_stack);
567       if (ret < 0) {
568         ldout(cct, 0) << "ERROR: failed to clone shard, completion_mgr.get_next() returned ret=" << ret << dendl;
569       }
570       if (going_down) {
571         ldout(cct, 5) << __func__ << "(): was stopped, exiting" << dendl;
572         ret = -ECANCELED;
573         canceled = true;
574         break;
575       }
576       handle_unblocked_stack(context_stacks, scheduled_stacks, blocked_stack, &blocked_count);
577       iter = scheduled_stacks.begin();
578     }
579     if (canceled) {
580       break;
581     }
582
583     if (iter == scheduled_stacks.end()) {
584       iter = scheduled_stacks.begin();
585     }
586   }
587
588   lock.get_write();
589   if (!context_stacks.empty() && !going_down) {
590     JSONFormatter formatter(true);
591     formatter.open_array_section("context_stacks");
592     for (auto& s : context_stacks) {
593       ::encode_json("entry", *s, &formatter);
594     }
595     formatter.close_section();
596     lderr(cct) << __func__ << "(): ERROR: deadlock detected, dumping remaining coroutines:\n";
597     formatter.flush(*_dout);
598     *_dout << dendl;
599     assert(context_stacks.empty() || going_down); // assert on deadlock
600   }
601
602   for (auto stack : context_stacks) {
603     ldout(cct, 20) << "clearing stack on run() exit: stack=" << (void *)stack << " nref=" << stack->get_nref() << dendl;
604     stack->put();
605   }
606   run_contexts.erase(run_context);
607   lock.unlock();
608
609   return ret;
610 }
611
612 int RGWCoroutinesManager::run(RGWCoroutine *op)
613 {
614   if (!op) {
615     return 0;
616   }
617   list<RGWCoroutinesStack *> stacks;
618   RGWCoroutinesStack *stack = allocate_stack();
619   op->get();
620   stack->call(op);
621
622   stack->schedule(&stacks);
623
624   int r = run(stacks);
625   if (r < 0) {
626     ldout(cct, 20) << "run(stacks) returned r=" << r << dendl;
627   } else {
628     r = op->get_ret_status();
629   }
630   op->put();
631
632   return r;
633 }
634
635 RGWAioCompletionNotifier *RGWCoroutinesManager::create_completion_notifier(RGWCoroutinesStack *stack)
636 {
637   RGWAioCompletionNotifier *cn = new RGWAioCompletionNotifier(completion_mgr, (void *)stack);
638   completion_mgr->register_completion_notifier(cn);
639   return cn;
640 }
641
642 void RGWCoroutinesManager::dump(Formatter *f) const {
643   RWLock::RLocker rl(lock);
644
645   f->open_array_section("run_contexts");
646   for (auto& i : run_contexts) {
647     f->open_object_section("context");
648     ::encode_json("id", i.first, f);
649     f->open_array_section("entries");
650     for (auto& s : i.second) {
651       ::encode_json("entry", *s, f);
652     }
653     f->close_section();
654     f->close_section();
655   }
656   f->close_section();
657 }
658
659 RGWCoroutinesStack *RGWCoroutinesManager::allocate_stack() {
660   return new RGWCoroutinesStack(cct, this);
661 }
662
663 string RGWCoroutinesManager::get_id()
664 {
665   if (!id.empty()) {
666     return id;
667   }
668   stringstream ss;
669   ss << (void *)this;
670   return ss.str();
671 }
672
673 void RGWCoroutinesManagerRegistry::add(RGWCoroutinesManager *mgr)
674 {
675   RWLock::WLocker wl(lock);
676   if (managers.find(mgr) == managers.end()) {
677     managers.insert(mgr);
678     get();
679   }
680 }
681
682 void RGWCoroutinesManagerRegistry::remove(RGWCoroutinesManager *mgr)
683 {
684   RWLock::WLocker wl(lock);
685   if (managers.find(mgr) != managers.end()) {
686     managers.erase(mgr);
687     put();
688   }
689 }
690
691 RGWCoroutinesManagerRegistry::~RGWCoroutinesManagerRegistry()
692 {
693   AdminSocket *admin_socket = cct->get_admin_socket();
694   if (!admin_command.empty()) {
695     admin_socket->unregister_command(admin_command);
696   }
697 }
698
699 int RGWCoroutinesManagerRegistry::hook_to_admin_command(const string& command)
700 {
701   AdminSocket *admin_socket = cct->get_admin_socket();
702   if (!admin_command.empty()) {
703     admin_socket->unregister_command(admin_command);
704   }
705   admin_command = command;
706   int r = admin_socket->register_command(admin_command, admin_command, this,
707                                      "dump current coroutines stack state");
708   if (r < 0) {
709     lderr(cct) << "ERROR: fail to register admin socket command (r=" << r << ")" << dendl;
710     return r;
711   }
712   return 0;
713 }
714
715 bool RGWCoroutinesManagerRegistry::call(std::string command, cmdmap_t& cmdmap, std::string format,
716             bufferlist& out) {
717   RWLock::RLocker rl(lock);
718   stringstream ss;
719   JSONFormatter f;
720   ::encode_json("cr_managers", *this, &f);
721   f.flush(ss);
722   out.append(ss);
723   return true;
724 }
725
726 void RGWCoroutinesManagerRegistry::dump(Formatter *f) const {
727   f->open_array_section("coroutine_managers");
728   for (auto m : managers) {
729     ::encode_json("entry", *m, f);
730   }
731   f->close_section();
732 }
733
734 void RGWCoroutine::call(RGWCoroutine *op)
735 {
736   stack->call(op);
737 }
738
739 RGWCoroutinesStack *RGWCoroutine::spawn(RGWCoroutine *op, bool wait)
740 {
741   return stack->spawn(this, op, wait);
742 }
743
744 bool RGWCoroutine::collect(int *ret, RGWCoroutinesStack *skip_stack) /* returns true if needs to be called again */
745 {
746   return stack->collect(this, ret, skip_stack);
747 }
748
749 bool RGWCoroutine::collect_next(int *ret, RGWCoroutinesStack **collected_stack) /* returns true if found a stack to collect */
750 {
751   return stack->collect_next(this, ret, collected_stack);
752 }
753
754 int RGWCoroutine::wait(const utime_t& interval)
755 {
756   return stack->wait(interval);
757 }
758
759 void RGWCoroutine::wait_for_child()
760 {
761   /* should only wait for child if there is a child that is not done yet, and no complete children */
762   if (spawned.entries.empty()) {
763     return;
764   }
765   for (vector<RGWCoroutinesStack *>::iterator iter = spawned.entries.begin(); iter != spawned.entries.end(); ++iter) {
766     if ((*iter)->is_done()) {
767       return;
768     }
769   }
770   stack->set_wait_for_child(true);
771 }
772
773 string RGWCoroutine::to_str() const
774 {
775   return typeid(*this).name();
776 }
777
778 ostream& operator<<(ostream& out, const RGWCoroutine& cr)
779 {
780   out << "cr:s=" << (void *)cr.get_stack() << ":op=" << (void *)&cr << ":" << typeid(cr).name();
781   return out;
782 }
783
784 bool RGWCoroutine::drain_children(int num_cr_left, RGWCoroutinesStack *skip_stack)
785 {
786   bool done = false;
787   assert(num_cr_left >= 0);
788   if (num_cr_left == 0 && skip_stack) {
789     num_cr_left = 1;
790   }
791   reenter(&drain_cr) {
792     while (num_spawned() > (size_t)num_cr_left) {
793       yield wait_for_child();
794       int ret;
795       while (collect(&ret, skip_stack)) {
796         if (ret < 0) {
797           ldout(cct, 10) << "collect() returned ret=" << ret << dendl;
798           /* we should have reported this error */
799           log_error() << "ERROR: collect() returned error (ret=" << ret << ")";
800         }
801       }
802     }
803     done = true;
804   }
805   return done;
806 }
807
808 void RGWCoroutine::wakeup()
809 {
810   stack->wakeup();
811 }
812
813 void RGWCoroutine::dump(Formatter *f) const {
814   if (!description.str().empty()) {
815     encode_json("description", description.str(), f);
816   }
817   encode_json("type", to_str(), f);
818   if (!spawned.entries.empty()) {
819     f->open_array_section("spawned");
820     for (auto& i : spawned.entries) {
821       char buf[32];
822       snprintf(buf, sizeof(buf), "%p", (void *)i);
823       encode_json("stack", string(buf), f);
824     }
825     f->close_section();
826   }
827   if (!status.history.empty()) {
828     encode_json("history", status.history, f);
829   }
830
831   if (!status.status.str().empty()) {
832     f->open_object_section("status");
833     encode_json("status", status.status.str(), f);
834     encode_json("timestamp", status.timestamp, f);
835     f->close_section();
836   }
837 }
838
839 RGWSimpleCoroutine::~RGWSimpleCoroutine()
840 {
841   if (!called_cleanup) {
842     request_cleanup();
843   }
844 }
845
846 void RGWSimpleCoroutine::call_cleanup()
847 {
848   called_cleanup = true;
849   request_cleanup();
850 }
851
852 int RGWSimpleCoroutine::operate()
853 {
854   int ret = 0;
855   reenter(this) {
856     yield return state_init();
857     yield return state_send_request();
858     yield return state_request_complete();
859     yield return state_all_complete();
860     drain_all();
861     call_cleanup();
862     return set_state(RGWCoroutine_Done, ret);
863   }
864   return 0;
865 }
866
867 int RGWSimpleCoroutine::state_init()
868 {
869   int ret = init();
870   if (ret < 0) {
871     call_cleanup();
872     return set_state(RGWCoroutine_Error, ret);
873   }
874   return 0;
875 }
876
877 int RGWSimpleCoroutine::state_send_request()
878 {
879   int ret = send_request();
880   if (ret < 0) {
881     call_cleanup();
882     return set_state(RGWCoroutine_Error, ret);
883   }
884   return io_block(0);
885 }
886
887 int RGWSimpleCoroutine::state_request_complete()
888 {
889   int ret = request_complete();
890   if (ret < 0) {
891     call_cleanup();
892     return set_state(RGWCoroutine_Error, ret);
893   }
894   return 0;
895 }
896
897 int RGWSimpleCoroutine::state_all_complete()
898 {
899   int ret = finish();
900   if (ret < 0) {
901     call_cleanup();
902     return set_state(RGWCoroutine_Error, ret);
903   }
904   return 0;
905 }
906
907