1 #ifndef CEPH_RGW_COROUTINE_H
2 #define CEPH_RGW_COROUTINE_H
6 #pragma push_macro("_ASSERT_H")
9 #include <boost/asio.hpp>
10 #include <boost/intrusive_ptr.hpp>
13 #pragma pop_macro("_ASSERT_H")
16 #include "include/utime.h"
17 #include "common/RefCountedObj.h"
18 #include "common/debug.h"
19 #include "common/Timer.h"
20 #include "common/admin_socket.h"
22 #include "rgw_common.h"
23 #include <boost/asio/coroutine.hpp>
27 #define RGW_ASYNC_OPS_MGR_WINDOW 100
29 class RGWCoroutinesStack;
30 class RGWCoroutinesManager;
31 class RGWAioCompletionNotifier;
33 class RGWCompletionManager : public RefCountedObject {
35 list<void *> complete_reqs;
36 using NotifierRef = boost::intrusive_ptr<RGWAioCompletionNotifier>;
44 std::atomic<bool> going_down = { false };
46 map<void *, void *> waiters;
51 void _wakeup(void *opaque);
52 void _complete(RGWAioCompletionNotifier *cn, void *user_info);
54 RGWCompletionManager(CephContext *_cct);
55 ~RGWCompletionManager() override;
57 void complete(RGWAioCompletionNotifier *cn, void *user_info);
58 int get_next(void **user_info);
59 bool try_get_next(void **user_info);
64 * wait for interval length to complete user_info
66 void wait_interval(void *opaque, const utime_t& interval, void *user_info);
67 void wakeup(void *opaque);
69 void register_completion_notifier(RGWAioCompletionNotifier *cn);
70 void unregister_completion_notifier(RGWAioCompletionNotifier *cn);
73 /* a single use librados aio completion notifier that hooks into the RGWCompletionManager */
74 class RGWAioCompletionNotifier : public RefCountedObject {
75 librados::AioCompletion *c;
76 RGWCompletionManager *completion_mgr;
82 RGWAioCompletionNotifier(RGWCompletionManager *_mgr, void *_user_data);
83 ~RGWAioCompletionNotifier() override {
86 bool need_unregister = registered;
88 completion_mgr->get();
92 if (need_unregister) {
93 completion_mgr->unregister_completion_notifier(this);
94 completion_mgr->put();
98 librados::AioCompletion *completion() {
103 Mutex::Locker l(lock);
117 completion_mgr->get();
120 completion_mgr->complete(this, user_data);
121 completion_mgr->put();
126 struct RGWCoroutinesEnv {
127 uint64_t run_context;
128 RGWCoroutinesManager *manager;
129 list<RGWCoroutinesStack *> *scheduled_stacks;
130 RGWCoroutinesStack *stack;
132 RGWCoroutinesEnv() : run_context(0), manager(NULL), scheduled_stacks(NULL), stack(NULL) {}
135 enum RGWCoroutineState {
136 RGWCoroutine_Error = -2,
137 RGWCoroutine_Done = -1,
138 RGWCoroutine_Run = 0,
141 struct rgw_spawned_stacks {
142 vector<RGWCoroutinesStack *> entries;
144 rgw_spawned_stacks() {}
146 void add_pending(RGWCoroutinesStack *s) {
147 entries.push_back(s);
150 void inherit(rgw_spawned_stacks *source) {
151 for (vector<RGWCoroutinesStack *>::iterator iter = source->entries.begin();
152 iter != source->entries.end(); ++iter) {
155 source->entries.clear();
161 class RGWCoroutine : public RefCountedObject, public boost::asio::coroutine {
162 friend class RGWCoroutinesStack;
168 StatusItem(utime_t& t, const string& s) : timestamp(t), status(s) {}
170 void dump(Formatter *f) const;
173 #define MAX_COROUTINE_HISTORY 10
183 Status(CephContext *_cct) : cct(_cct), lock("RGWCoroutine::Status::lock"), max_history(MAX_COROUTINE_HISTORY) {}
185 deque<StatusItem> history;
187 stringstream& set_status();
190 stringstream description;
194 boost::asio::coroutine drain_cr;
198 RGWCoroutinesStack *stack;
202 rgw_spawned_stacks spawned;
204 stringstream error_stream;
206 int set_state(int s, int ret = 0) {
210 int set_cr_error(int ret) {
211 state = RGWCoroutine_Error;
215 state = RGWCoroutine_Done;
218 void set_io_blocked(bool flag);
219 int io_block(int ret = 0);
221 void reset_description() {
222 description.str(string());
225 stringstream& set_description() {
228 stringstream& set_status() {
229 return status.set_status();
232 stringstream& set_status(const string& s) {
233 stringstream& status = set_status();
239 RGWCoroutine(CephContext *_cct) : status(_cct), _yield_ret(false), cct(_cct), stack(NULL), retcode(0), state(RGWCoroutine_Run) {}
240 ~RGWCoroutine() override;
242 virtual int operate() = 0;
244 bool is_done() { return (state == RGWCoroutine_Done || state == RGWCoroutine_Error); }
245 bool is_error() { return (state == RGWCoroutine_Error); }
247 stringstream& log_error() { return error_stream; }
249 return error_stream.str();
252 void set_retcode(int r) {
256 int get_ret_status() {
260 void call(RGWCoroutine *op); /* call at the same stack we're in */
261 RGWCoroutinesStack *spawn(RGWCoroutine *op, bool wait); /* execute on a different stack */
262 bool collect(int *ret, RGWCoroutinesStack *skip_stack); /* returns true if needs to be called again */
263 bool collect_next(int *ret, RGWCoroutinesStack **collected_stack = NULL); /* returns true if found a stack to collect */
265 int wait(const utime_t& interval);
266 bool drain_children(int num_cr_left, RGWCoroutinesStack *skip_stack = NULL); /* returns true if needed to be called again */
268 void set_sleeping(bool flag); /* put in sleep, or wakeup from sleep */
270 size_t num_spawned() {
271 return spawned.entries.size();
274 void wait_for_child();
276 virtual string to_str() const;
278 RGWCoroutinesStack *get_stack() const {
282 void dump(Formatter *f) const;
285 ostream& operator<<(ostream& out, const RGWCoroutine& cr);
287 #define yield_until_true(x) \
290 yield _yield_ret = x; \
291 } while (!_yield_ret); \
292 _yield_ret = false; \
295 #define drain_all() \
296 drain_cr = boost::asio::coroutine(); \
297 yield_until_true(drain_children(0))
299 #define drain_all_but(n) \
300 drain_cr = boost::asio::coroutine(); \
301 yield_until_true(drain_children(n))
303 #define drain_all_but_stack(stack) \
304 drain_cr = boost::asio::coroutine(); \
305 yield_until_true(drain_children(1, stack))
308 class RGWConsumerCR : public RGWCoroutine {
312 RGWConsumerCR(CephContext *_cct) : RGWCoroutine(_cct) {}
315 return !product.empty();
318 void wait_for_product() {
319 if (!has_product()) {
325 if (product.empty()) {
328 *p = product.front();
333 void receive(const T& p, bool wakeup = true);
334 void receive(list<T>& l, bool wakeup = true);
337 class RGWCoroutinesStack : public RefCountedObject {
338 friend class RGWCoroutine;
339 friend class RGWCoroutinesManager;
343 RGWCoroutinesManager *ops_mgr;
345 list<RGWCoroutine *> ops;
346 list<RGWCoroutine *>::iterator pos;
348 rgw_spawned_stacks spawned;
350 set<RGWCoroutinesStack *> blocked_by_stack;
351 set<RGWCoroutinesStack *> blocking_stacks;
357 bool interval_wait_flag;
361 bool is_waiting_for_child;
368 RGWCoroutinesEnv *env;
369 RGWCoroutinesStack *parent;
371 RGWCoroutinesStack *spawn(RGWCoroutine *source_op, RGWCoroutine *next_op, bool wait);
372 bool collect(RGWCoroutine *op, int *ret, RGWCoroutinesStack *skip_stack); /* returns true if needs to be called again */
373 bool collect_next(RGWCoroutine *op, int *ret, RGWCoroutinesStack **collected_stack); /* returns true if found a stack to collect */
375 RGWCoroutinesStack(CephContext *_cct, RGWCoroutinesManager *_ops_mgr, RGWCoroutine *start = NULL);
376 ~RGWCoroutinesStack() override;
378 int operate(RGWCoroutinesEnv *env);
386 bool is_blocked_by_stack() {
387 return !blocked_by_stack.empty();
389 void set_io_blocked(bool flag) {
392 bool is_io_blocked() {
395 void set_interval_wait(bool flag) {
396 interval_wait_flag = flag;
398 bool is_interval_waiting() {
399 return interval_wait_flag;
401 void set_sleeping(bool flag) {
402 bool wakeup = sleep_flag & !flag;
411 void set_is_scheduled(bool flag) {
416 return is_blocked_by_stack() || is_sleeping() ||
417 is_io_blocked() || waiting_for_child() ;
420 void schedule(list<RGWCoroutinesStack *> *stacks = NULL) {
422 stacks = env->scheduled_stacks;
425 stacks->push_back(this);
430 int get_ret_status() {
436 void call(RGWCoroutine *next_op);
437 RGWCoroutinesStack *spawn(RGWCoroutine *next_op, bool wait);
438 int unwind(int retcode);
440 int wait(const utime_t& interval);
443 bool collect(int *ret, RGWCoroutinesStack *skip_stack); /* returns true if needs to be called again */
445 RGWAioCompletionNotifier *create_completion_notifier();
446 RGWCompletionManager *get_completion_mgr();
448 void set_blocked_by(RGWCoroutinesStack *s) {
449 blocked_by_stack.insert(s);
450 s->blocking_stacks.insert(this);
453 void set_wait_for_child(bool flag) {
454 is_waiting_for_child = flag;
457 bool waiting_for_child() {
458 return is_waiting_for_child;
461 bool unblock_stack(RGWCoroutinesStack **s);
463 RGWCoroutinesEnv *get_env() { return env; }
465 void dump(Formatter *f) const;
469 void RGWConsumerCR<T>::receive(list<T>& l, bool wakeup)
471 product.splice(product.end(), l);
479 void RGWConsumerCR<T>::receive(const T& p, bool wakeup)
481 product.push_back(p);
487 class RGWCoroutinesManagerRegistry : public RefCountedObject, public AdminSocketHook {
490 set<RGWCoroutinesManager *> managers;
493 string admin_command;
496 RGWCoroutinesManagerRegistry(CephContext *_cct) : cct(_cct), lock("RGWCoroutinesRegistry::lock") {}
497 ~RGWCoroutinesManagerRegistry() override;
499 void add(RGWCoroutinesManager *mgr);
500 void remove(RGWCoroutinesManager *mgr);
502 int hook_to_admin_command(const string& command);
503 bool call(std::string command, cmdmap_t& cmdmap, std::string format,
504 bufferlist& out) override;
506 void dump(Formatter *f) const;
509 class RGWCoroutinesManager {
511 std::atomic<bool> going_down = { false };
513 std::atomic<int64_t> run_context_count = { 0 };
514 map<uint64_t, set<RGWCoroutinesStack *> > run_contexts;
518 void handle_unblocked_stack(set<RGWCoroutinesStack *>& context_stacks, list<RGWCoroutinesStack *>& scheduled_stacks, RGWCoroutinesStack *stack, int *waiting_count);
520 RGWCompletionManager *completion_mgr;
521 RGWCoroutinesManagerRegistry *cr_registry;
527 void put_completion_notifier(RGWAioCompletionNotifier *cn);
529 RGWCoroutinesManager(CephContext *_cct, RGWCoroutinesManagerRegistry *_cr_registry) : cct(_cct), lock("RGWCoroutinesManager::lock"),
530 cr_registry(_cr_registry), ops_window(RGW_ASYNC_OPS_MGR_WINDOW) {
531 completion_mgr = new RGWCompletionManager(cct);
533 cr_registry->add(this);
536 virtual ~RGWCoroutinesManager() {
538 completion_mgr->put();
540 cr_registry->remove(this);
544 int run(list<RGWCoroutinesStack *>& ops);
545 int run(RGWCoroutine *op);
547 bool expected = false;
548 if (going_down.compare_exchange_strong(expected, true)) {
549 completion_mgr->go_down();
553 virtual void report_error(RGWCoroutinesStack *op);
555 RGWAioCompletionNotifier *create_completion_notifier(RGWCoroutinesStack *stack);
556 RGWCompletionManager *get_completion_mgr() { return completion_mgr; }
558 void schedule(RGWCoroutinesEnv *env, RGWCoroutinesStack *stack);
559 RGWCoroutinesStack *allocate_stack();
561 virtual string get_id();
562 void dump(Formatter *f) const;
565 class RGWSimpleCoroutine : public RGWCoroutine {
568 int operate() override;
571 int state_send_request();
572 int state_request_complete();
573 int state_all_complete();
578 RGWSimpleCoroutine(CephContext *_cct) : RGWCoroutine(_cct), called_cleanup(false) {}
579 ~RGWSimpleCoroutine() override;
581 virtual int init() { return 0; }
582 virtual int send_request() = 0;
583 virtual int request_complete() = 0;
584 virtual int finish() { return 0; }
585 virtual void request_cleanup() {}