2 #include "common/ceph_json.h"
3 #include "rgw_coroutine.h"
5 // re-include our assert to clobber the system one; fix dout:
6 #include "include/assert.h"
8 #include <boost/asio/yield.hpp>
10 #define dout_subsys ceph_subsys_rgw
13 class RGWCompletionManager::WaitContext : public Context {
14 RGWCompletionManager *manager;
17 WaitContext(RGWCompletionManager *_cm, void *_opaque) : manager(_cm), opaque(_opaque) {}
18 void finish(int r) override {
19 manager->_wakeup(opaque);
23 RGWCompletionManager::RGWCompletionManager(CephContext *_cct) : cct(_cct), lock("RGWCompletionManager::lock"),
29 RGWCompletionManager::~RGWCompletionManager()
31 Mutex::Locker l(lock);
32 timer.cancel_all_events();
36 void RGWCompletionManager::complete(RGWAioCompletionNotifier *cn, void *user_info)
38 Mutex::Locker l(lock);
39 _complete(cn, user_info);
42 void RGWCompletionManager::register_completion_notifier(RGWAioCompletionNotifier *cn)
44 Mutex::Locker l(lock);
50 void RGWCompletionManager::unregister_completion_notifier(RGWAioCompletionNotifier *cn)
52 Mutex::Locker l(lock);
58 void RGWCompletionManager::_complete(RGWAioCompletionNotifier *cn, void *user_info)
63 complete_reqs.push_back(user_info);
67 int RGWCompletionManager::get_next(void **user_info)
69 Mutex::Locker l(lock);
70 while (complete_reqs.empty()) {
76 *user_info = complete_reqs.front();
77 complete_reqs.pop_front();
81 bool RGWCompletionManager::try_get_next(void **user_info)
83 Mutex::Locker l(lock);
84 if (complete_reqs.empty()) {
87 *user_info = complete_reqs.front();
88 complete_reqs.pop_front();
92 void RGWCompletionManager::go_down()
94 Mutex::Locker l(lock);
102 void RGWCompletionManager::wait_interval(void *opaque, const utime_t& interval, void *user_info)
104 Mutex::Locker l(lock);
105 assert(waiters.find(opaque) == waiters.end());
106 waiters[opaque] = user_info;
107 timer.add_event_after(interval, new WaitContext(this, opaque));
110 void RGWCompletionManager::wakeup(void *opaque)
112 Mutex::Locker l(lock);
116 void RGWCompletionManager::_wakeup(void *opaque)
118 map<void *, void *>::iterator iter = waiters.find(opaque);
119 if (iter != waiters.end()) {
120 void *user_id = iter->second;
122 _complete(NULL, user_id);
126 RGWCoroutine::~RGWCoroutine() {
127 for (auto stack : spawned.entries) {
132 void RGWCoroutine::set_io_blocked(bool flag) {
133 stack->set_io_blocked(flag);
136 void RGWCoroutine::set_sleeping(bool flag) {
137 stack->set_sleeping(flag);
140 int RGWCoroutine::io_block(int ret) {
141 set_io_blocked(true);
145 void RGWCoroutine::StatusItem::dump(Formatter *f) const {
146 ::encode_json("timestamp", timestamp, f);
147 ::encode_json("status", status, f);
150 stringstream& RGWCoroutine::Status::set_status()
152 RWLock::WLocker l(lock);
153 string s = status.str();
154 status.str(string());
155 if (!timestamp.is_zero()) {
156 history.push_back(StatusItem(timestamp, s));
158 if (history.size() > (size_t)max_history) {
161 timestamp = ceph_clock_now();
166 RGWCoroutinesStack::RGWCoroutinesStack(CephContext *_cct, RGWCoroutinesManager *_ops_mgr, RGWCoroutine *start) : cct(_cct), ops_mgr(_ops_mgr),
167 done_flag(false), error_flag(false), blocked_flag(false),
168 sleep_flag(false), interval_wait_flag(false), is_scheduled(false), is_waiting_for_child(false),
169 retcode(0), run_count(0),
170 env(NULL), parent(NULL)
173 ops.push_back(start);
178 RGWCoroutinesStack::~RGWCoroutinesStack()
180 for (auto op : ops) {
184 for (auto stack : spawned.entries) {
189 int RGWCoroutinesStack::operate(RGWCoroutinesEnv *_env)
192 RGWCoroutine *op = *pos;
194 ldout(cct, 20) << *op << ": operate()" << dendl;
195 int r = op->operate();
197 ldout(cct, 20) << *op << ": operate() returned r=" << r << dendl;
200 error_flag = op->is_error();
204 r = unwind(op_retcode);
206 done_flag = (pos == ops.end());
208 retcode = op_retcode;
213 /* should r ever be negative at this point? */
219 string RGWCoroutinesStack::error_str()
221 if (pos != ops.end()) {
222 return (*pos)->error_str();
227 void RGWCoroutinesStack::call(RGWCoroutine *next_op) {
231 ops.push_back(next_op);
232 if (pos != ops.end()) {
239 RGWCoroutinesStack *RGWCoroutinesStack::spawn(RGWCoroutine *source_op, RGWCoroutine *op, bool wait)
245 rgw_spawned_stacks *s = (source_op ? &source_op->spawned : &spawned);
247 RGWCoroutinesStack *stack = env->manager->allocate_stack();
248 s->add_pending(stack);
249 stack->parent = this;
251 stack->get(); /* we'll need to collect the stack */
254 env->manager->schedule(env, stack);
257 set_blocked_by(stack);
263 RGWCoroutinesStack *RGWCoroutinesStack::spawn(RGWCoroutine *op, bool wait)
265 return spawn(NULL, op, wait);
268 int RGWCoroutinesStack::wait(const utime_t& interval)
270 RGWCompletionManager *completion_mgr = env->manager->get_completion_mgr();
271 completion_mgr->wait_interval((void *)this, interval, (void *)this);
272 set_io_blocked(true);
273 set_interval_wait(true);
277 void RGWCoroutinesStack::wakeup()
279 RGWCompletionManager *completion_mgr = env->manager->get_completion_mgr();
280 completion_mgr->wakeup((void *)this);
283 int RGWCoroutinesStack::unwind(int retcode)
285 rgw_spawned_stacks *src_spawned = &(*pos)->spawned;
287 if (pos == ops.begin()) {
288 spawned.inherit(src_spawned);
296 RGWCoroutine *op = *pos;
297 op->set_retcode(retcode);
298 op->spawned.inherit(src_spawned);
303 bool RGWCoroutinesStack::collect(RGWCoroutine *op, int *ret, RGWCoroutinesStack *skip_stack) /* returns true if needs to be called again */
306 rgw_spawned_stacks *s = (op ? &op->spawned : &spawned);
308 vector<RGWCoroutinesStack *> new_list;
310 for (vector<RGWCoroutinesStack *>::iterator iter = s->entries.begin(); iter != s->entries.end(); ++iter) {
311 RGWCoroutinesStack *stack = *iter;
312 if (stack == skip_stack || !stack->is_done()) {
313 new_list.push_back(stack);
314 if (!stack->is_done()) {
315 ldout(cct, 20) << "collect(): s=" << (void *)this << " stack=" << (void *)stack << " is still running" << dendl;
316 } else if (stack == skip_stack) {
317 ldout(cct, 20) << "collect(): s=" << (void *)this << " stack=" << (void *)stack << " explicitly skipping stack" << dendl;
321 int r = stack->get_ret_status();
325 ldout(cct, 20) << "collect(): s=" << (void *)this << " stack=" << (void *)stack << " encountered error (r=" << r << "), skipping next stacks" << dendl;
326 new_list.insert(new_list.end(), ++iter, s->entries.end());
327 done &= (iter != s->entries.end());
331 ldout(cct, 20) << "collect(): s=" << (void *)this << " stack=" << (void *)stack << " is complete" << dendl;
334 s->entries.swap(new_list);
338 bool RGWCoroutinesStack::collect_next(RGWCoroutine *op, int *ret, RGWCoroutinesStack **collected_stack) /* returns true if found a stack to collect */
340 rgw_spawned_stacks *s = (op ? &op->spawned : &spawned);
343 if (collected_stack) {
344 *collected_stack = NULL;
347 for (vector<RGWCoroutinesStack *>::iterator iter = s->entries.begin(); iter != s->entries.end(); ++iter) {
348 RGWCoroutinesStack *stack = *iter;
349 if (!stack->is_done()) {
352 int r = stack->get_ret_status();
357 if (collected_stack) {
358 *collected_stack = stack;
362 s->entries.erase(iter);
369 bool RGWCoroutinesStack::collect(int *ret, RGWCoroutinesStack *skip_stack) /* returns true if needs to be called again */
371 return collect(NULL, ret, skip_stack);
374 static void _aio_completion_notifier_cb(librados::completion_t cb, void *arg)
376 ((RGWAioCompletionNotifier *)arg)->cb();
379 RGWAioCompletionNotifier::RGWAioCompletionNotifier(RGWCompletionManager *_mgr, void *_user_data) : completion_mgr(_mgr),
380 user_data(_user_data), lock("RGWAioCompletionNotifier"), registered(true) {
381 c = librados::Rados::aio_create_completion((void *)this, NULL,
382 _aio_completion_notifier_cb);
385 RGWAioCompletionNotifier *RGWCoroutinesStack::create_completion_notifier()
387 return ops_mgr->create_completion_notifier(this);
390 RGWCompletionManager *RGWCoroutinesStack::get_completion_mgr()
392 return ops_mgr->get_completion_mgr();
395 bool RGWCoroutinesStack::unblock_stack(RGWCoroutinesStack **s)
397 if (blocking_stacks.empty()) {
401 set<RGWCoroutinesStack *>::iterator iter = blocking_stacks.begin();
403 blocking_stacks.erase(iter);
404 (*s)->blocked_by_stack.erase(this);
409 void RGWCoroutinesManager::report_error(RGWCoroutinesStack *op)
414 string err = op->error_str();
418 lderr(cct) << "ERROR: failed operation: " << op->error_str() << dendl;
421 void RGWCoroutinesStack::dump(Formatter *f) const {
424 ::encode_json("stack", ss.str(), f);
425 ::encode_json("run_count", run_count, f);
426 f->open_array_section("ops");
427 for (auto& i : ops) {
428 encode_json("op", *i, f);
433 void RGWCoroutinesManager::handle_unblocked_stack(set<RGWCoroutinesStack *>& context_stacks, list<RGWCoroutinesStack *>& scheduled_stacks, RGWCoroutinesStack *stack, int *blocked_count)
435 RWLock::WLocker wl(lock);
437 stack->set_io_blocked(false);
438 stack->set_interval_wait(false);
439 if (!stack->is_done()) {
440 scheduled_stacks.push_back(stack);
442 RWLock::WLocker wl(lock);
443 context_stacks.erase(stack);
448 void RGWCoroutinesManager::schedule(RGWCoroutinesEnv *env, RGWCoroutinesStack *stack)
450 assert(lock.is_wlocked());
451 env->scheduled_stacks->push_back(stack);
452 set<RGWCoroutinesStack *>& context_stacks = run_contexts[env->run_context];
453 context_stacks.insert(stack);
456 int RGWCoroutinesManager::run(list<RGWCoroutinesStack *>& stacks)
459 int blocked_count = 0;
460 int interval_wait_count = 0;
461 bool canceled = false; // set on going_down
462 RGWCoroutinesEnv env;
464 uint64_t run_context = ++run_context_count;
467 set<RGWCoroutinesStack *>& context_stacks = run_contexts[run_context];
468 list<RGWCoroutinesStack *> scheduled_stacks;
469 for (auto& st : stacks) {
470 context_stacks.insert(st);
471 scheduled_stacks.push_back(st);
475 env.run_context = run_context;
477 env.scheduled_stacks = &scheduled_stacks;
479 for (list<RGWCoroutinesStack *>::iterator iter = scheduled_stacks.begin(); iter != scheduled_stacks.end() && !going_down;) {
482 RGWCoroutinesStack *stack = *iter;
485 ret = stack->operate(&env);
486 stack->set_is_scheduled(false);
488 ldout(cct, 20) << "stack->operate() returned ret=" << ret << dendl;
491 if (stack->is_error()) {
495 bool op_not_blocked = false;
497 if (stack->is_io_blocked()) {
498 ldout(cct, 20) << __func__ << ":" << " stack=" << (void *)stack << " is io blocked" << dendl;
499 if (stack->is_interval_waiting()) {
500 interval_wait_count++;
503 } else if (stack->is_blocked()) {
504 /* do nothing, we'll re-add the stack when the blocking stack is done,
505 * or when we're awaken
507 ldout(cct, 20) << __func__ << ":" << " stack=" << (void *)stack << " is_blocked_by_stack()=" << stack->is_blocked_by_stack()
508 << " is_sleeping=" << stack->is_sleeping() << " waiting_for_child()=" << stack->waiting_for_child() << dendl;
509 } else if (stack->is_done()) {
510 ldout(cct, 20) << __func__ << ":" << " stack=" << (void *)stack << " is done" << dendl;
511 RGWCoroutinesStack *s;
512 while (stack->unblock_stack(&s)) {
513 if (!s->is_blocked_by_stack() && !s->is_done()) {
514 if (s->is_io_blocked()) {
515 if (stack->is_interval_waiting()) {
516 interval_wait_count++;
524 if (stack->parent && stack->parent->waiting_for_child()) {
525 stack->parent->set_wait_for_child(false);
526 stack->parent->schedule();
528 context_stacks.erase(stack);
532 op_not_blocked = true;
537 if (!op_not_blocked && stack) {
538 stack->run_count = 0;
543 RGWCoroutinesStack *blocked_stack;
544 while (completion_mgr->try_get_next((void **)&blocked_stack)) {
545 handle_unblocked_stack(context_stacks, scheduled_stacks, blocked_stack, &blocked_count);
549 * only account blocked operations that are not in interval_wait, these are stacks that
550 * were put on a wait without any real IO operations. While we mark these as io_blocked,
551 * these aren't really waiting for IOs
553 while (blocked_count - interval_wait_count >= ops_window) {
554 ret = completion_mgr->get_next((void **)&blocked_stack);
556 ldout(cct, 0) << "ERROR: failed to clone shard, completion_mgr.get_next() returned ret=" << ret << dendl;
558 handle_unblocked_stack(context_stacks, scheduled_stacks, blocked_stack, &blocked_count);
562 scheduled_stacks.pop_front();
565 while (scheduled_stacks.empty() && blocked_count > 0) {
566 ret = completion_mgr->get_next((void **)&blocked_stack);
568 ldout(cct, 0) << "ERROR: failed to clone shard, completion_mgr.get_next() returned ret=" << ret << dendl;
571 ldout(cct, 5) << __func__ << "(): was stopped, exiting" << dendl;
576 handle_unblocked_stack(context_stacks, scheduled_stacks, blocked_stack, &blocked_count);
577 iter = scheduled_stacks.begin();
583 if (iter == scheduled_stacks.end()) {
584 iter = scheduled_stacks.begin();
589 if (!context_stacks.empty() && !going_down) {
590 JSONFormatter formatter(true);
591 formatter.open_array_section("context_stacks");
592 for (auto& s : context_stacks) {
593 ::encode_json("entry", *s, &formatter);
595 formatter.close_section();
596 lderr(cct) << __func__ << "(): ERROR: deadlock detected, dumping remaining coroutines:\n";
597 formatter.flush(*_dout);
599 assert(context_stacks.empty() || going_down); // assert on deadlock
602 for (auto stack : context_stacks) {
603 ldout(cct, 20) << "clearing stack on run() exit: stack=" << (void *)stack << " nref=" << stack->get_nref() << dendl;
606 run_contexts.erase(run_context);
612 int RGWCoroutinesManager::run(RGWCoroutine *op)
617 list<RGWCoroutinesStack *> stacks;
618 RGWCoroutinesStack *stack = allocate_stack();
622 stack->schedule(&stacks);
626 ldout(cct, 20) << "run(stacks) returned r=" << r << dendl;
628 r = op->get_ret_status();
635 RGWAioCompletionNotifier *RGWCoroutinesManager::create_completion_notifier(RGWCoroutinesStack *stack)
637 RGWAioCompletionNotifier *cn = new RGWAioCompletionNotifier(completion_mgr, (void *)stack);
638 completion_mgr->register_completion_notifier(cn);
642 void RGWCoroutinesManager::dump(Formatter *f) const {
643 RWLock::RLocker rl(lock);
645 f->open_array_section("run_contexts");
646 for (auto& i : run_contexts) {
647 f->open_object_section("context");
648 ::encode_json("id", i.first, f);
649 f->open_array_section("entries");
650 for (auto& s : i.second) {
651 ::encode_json("entry", *s, f);
659 RGWCoroutinesStack *RGWCoroutinesManager::allocate_stack() {
660 return new RGWCoroutinesStack(cct, this);
663 string RGWCoroutinesManager::get_id()
673 void RGWCoroutinesManagerRegistry::add(RGWCoroutinesManager *mgr)
675 RWLock::WLocker wl(lock);
676 if (managers.find(mgr) == managers.end()) {
677 managers.insert(mgr);
682 void RGWCoroutinesManagerRegistry::remove(RGWCoroutinesManager *mgr)
684 RWLock::WLocker wl(lock);
685 if (managers.find(mgr) != managers.end()) {
691 RGWCoroutinesManagerRegistry::~RGWCoroutinesManagerRegistry()
693 AdminSocket *admin_socket = cct->get_admin_socket();
694 if (!admin_command.empty()) {
695 admin_socket->unregister_command(admin_command);
699 int RGWCoroutinesManagerRegistry::hook_to_admin_command(const string& command)
701 AdminSocket *admin_socket = cct->get_admin_socket();
702 if (!admin_command.empty()) {
703 admin_socket->unregister_command(admin_command);
705 admin_command = command;
706 int r = admin_socket->register_command(admin_command, admin_command, this,
707 "dump current coroutines stack state");
709 lderr(cct) << "ERROR: fail to register admin socket command (r=" << r << ")" << dendl;
715 bool RGWCoroutinesManagerRegistry::call(std::string command, cmdmap_t& cmdmap, std::string format,
717 RWLock::RLocker rl(lock);
720 ::encode_json("cr_managers", *this, &f);
726 void RGWCoroutinesManagerRegistry::dump(Formatter *f) const {
727 f->open_array_section("coroutine_managers");
728 for (auto m : managers) {
729 ::encode_json("entry", *m, f);
734 void RGWCoroutine::call(RGWCoroutine *op)
739 RGWCoroutinesStack *RGWCoroutine::spawn(RGWCoroutine *op, bool wait)
741 return stack->spawn(this, op, wait);
744 bool RGWCoroutine::collect(int *ret, RGWCoroutinesStack *skip_stack) /* returns true if needs to be called again */
746 return stack->collect(this, ret, skip_stack);
749 bool RGWCoroutine::collect_next(int *ret, RGWCoroutinesStack **collected_stack) /* returns true if found a stack to collect */
751 return stack->collect_next(this, ret, collected_stack);
754 int RGWCoroutine::wait(const utime_t& interval)
756 return stack->wait(interval);
759 void RGWCoroutine::wait_for_child()
761 /* should only wait for child if there is a child that is not done yet, and no complete children */
762 if (spawned.entries.empty()) {
765 for (vector<RGWCoroutinesStack *>::iterator iter = spawned.entries.begin(); iter != spawned.entries.end(); ++iter) {
766 if ((*iter)->is_done()) {
770 stack->set_wait_for_child(true);
773 string RGWCoroutine::to_str() const
775 return typeid(*this).name();
778 ostream& operator<<(ostream& out, const RGWCoroutine& cr)
780 out << "cr:s=" << (void *)cr.get_stack() << ":op=" << (void *)&cr << ":" << typeid(cr).name();
784 bool RGWCoroutine::drain_children(int num_cr_left, RGWCoroutinesStack *skip_stack)
787 assert(num_cr_left >= 0);
788 if (num_cr_left == 0 && skip_stack) {
792 while (num_spawned() > (size_t)num_cr_left) {
793 yield wait_for_child();
795 while (collect(&ret, skip_stack)) {
797 ldout(cct, 10) << "collect() returned ret=" << ret << dendl;
798 /* we should have reported this error */
799 log_error() << "ERROR: collect() returned error (ret=" << ret << ")";
808 void RGWCoroutine::wakeup()
813 void RGWCoroutine::dump(Formatter *f) const {
814 if (!description.str().empty()) {
815 encode_json("description", description.str(), f);
817 encode_json("type", to_str(), f);
818 if (!spawned.entries.empty()) {
819 f->open_array_section("spawned");
820 for (auto& i : spawned.entries) {
822 snprintf(buf, sizeof(buf), "%p", (void *)i);
823 encode_json("stack", string(buf), f);
827 if (!status.history.empty()) {
828 encode_json("history", status.history, f);
831 if (!status.status.str().empty()) {
832 f->open_object_section("status");
833 encode_json("status", status.status.str(), f);
834 encode_json("timestamp", status.timestamp, f);
839 RGWSimpleCoroutine::~RGWSimpleCoroutine()
841 if (!called_cleanup) {
846 void RGWSimpleCoroutine::call_cleanup()
848 called_cleanup = true;
852 int RGWSimpleCoroutine::operate()
856 yield return state_init();
857 yield return state_send_request();
858 yield return state_request_complete();
859 yield return state_all_complete();
862 return set_state(RGWCoroutine_Done, ret);
867 int RGWSimpleCoroutine::state_init()
872 return set_state(RGWCoroutine_Error, ret);
877 int RGWSimpleCoroutine::state_send_request()
879 int ret = send_request();
882 return set_state(RGWCoroutine_Error, ret);
887 int RGWSimpleCoroutine::state_request_complete()
889 int ret = request_complete();
892 return set_state(RGWCoroutine_Error, ret);
897 int RGWSimpleCoroutine::state_all_complete()
902 return set_state(RGWCoroutine_Error, ret);