initial code repo
[stor4nfv.git] / src / ceph / src / rgw / rgw_coroutine.h
diff --git a/src/ceph/src/rgw/rgw_coroutine.h b/src/ceph/src/rgw/rgw_coroutine.h
new file mode 100644 (file)
index 0000000..f842328
--- /dev/null
@@ -0,0 +1,588 @@
+#ifndef CEPH_RGW_COROUTINE_H
+#define CEPH_RGW_COROUTINE_H
+
+#ifdef _ASSERT_H
+#define NEED_ASSERT_H
+#pragma push_macro("_ASSERT_H")
+#endif
+
+#include <boost/asio.hpp>
+#include <boost/intrusive_ptr.hpp>
+
+#ifdef NEED_ASSERT_H
+#pragma pop_macro("_ASSERT_H")
+#endif
+
+#include "include/utime.h"
+#include "common/RefCountedObj.h"
+#include "common/debug.h"
+#include "common/Timer.h"
+#include "common/admin_socket.h"
+
+#include "rgw_common.h"
+#include <boost/asio/coroutine.hpp>
+
+#include <atomic>
+
+#define RGW_ASYNC_OPS_MGR_WINDOW 100
+
+class RGWCoroutinesStack;
+class RGWCoroutinesManager;
+class RGWAioCompletionNotifier;
+
+class RGWCompletionManager : public RefCountedObject {
+  CephContext *cct;
+  list<void *> complete_reqs;
+  using NotifierRef = boost::intrusive_ptr<RGWAioCompletionNotifier>;
+  set<NotifierRef> cns;
+
+  Mutex lock;
+  Cond cond;
+
+  SafeTimer timer;
+
+  std::atomic<bool> going_down = { false };
+
+  map<void *, void *> waiters;
+
+  class WaitContext;
+
+protected:
+  void _wakeup(void *opaque);
+  void _complete(RGWAioCompletionNotifier *cn, void *user_info);
+public:
+  RGWCompletionManager(CephContext *_cct);
+  ~RGWCompletionManager() override;
+
+  void complete(RGWAioCompletionNotifier *cn, void *user_info);
+  int get_next(void **user_info);
+  bool try_get_next(void **user_info);
+
+  void go_down();
+
+  /*
+   * wait for interval length to complete user_info
+   */
+  void wait_interval(void *opaque, const utime_t& interval, void *user_info);
+  void wakeup(void *opaque);
+
+  void register_completion_notifier(RGWAioCompletionNotifier *cn);
+  void unregister_completion_notifier(RGWAioCompletionNotifier *cn);
+};
+
+/* a single use librados aio completion notifier that hooks into the RGWCompletionManager */
+class RGWAioCompletionNotifier : public RefCountedObject {
+  librados::AioCompletion *c;
+  RGWCompletionManager *completion_mgr;
+  void *user_data;
+  Mutex lock;
+  bool registered;
+
+public:
+  RGWAioCompletionNotifier(RGWCompletionManager *_mgr, void *_user_data);
+  ~RGWAioCompletionNotifier() override {
+    c->release();
+    lock.Lock();
+    bool need_unregister = registered;
+    if (registered) {
+      completion_mgr->get();
+    }
+    registered = false;
+    lock.Unlock();
+    if (need_unregister) {
+      completion_mgr->unregister_completion_notifier(this);
+      completion_mgr->put();
+    }
+  }
+
+  librados::AioCompletion *completion() {
+    return c;
+  }
+
+  void unregister() {
+    Mutex::Locker l(lock);
+    if (!registered) {
+      return;
+    }
+    registered = false;
+  }
+
+  void cb() {
+    lock.Lock();
+    if (!registered) {
+      lock.Unlock();
+      put();
+      return;
+    }
+    completion_mgr->get();
+    registered = false;
+    lock.Unlock();
+    completion_mgr->complete(this, user_data);
+    completion_mgr->put();
+    put();
+  }
+};
+
+struct RGWCoroutinesEnv {
+  uint64_t run_context;
+  RGWCoroutinesManager *manager;
+  list<RGWCoroutinesStack *> *scheduled_stacks;
+  RGWCoroutinesStack *stack;
+
+  RGWCoroutinesEnv() : run_context(0), manager(NULL), scheduled_stacks(NULL), stack(NULL) {}
+};
+
+enum RGWCoroutineState {
+  RGWCoroutine_Error = -2,
+  RGWCoroutine_Done  = -1,
+  RGWCoroutine_Run   =  0,
+};
+
+struct rgw_spawned_stacks {
+  vector<RGWCoroutinesStack *> entries;
+
+  rgw_spawned_stacks() {}
+
+  void add_pending(RGWCoroutinesStack *s) {
+    entries.push_back(s);
+  }
+
+  void inherit(rgw_spawned_stacks *source) {
+    for (vector<RGWCoroutinesStack *>::iterator iter = source->entries.begin();
+         iter != source->entries.end(); ++iter) {
+      add_pending(*iter);
+    }
+    source->entries.clear();
+  }
+};
+
+
+
+class RGWCoroutine : public RefCountedObject, public boost::asio::coroutine {
+  friend class RGWCoroutinesStack;
+
+  struct StatusItem {
+    utime_t timestamp;
+    string status;
+
+    StatusItem(utime_t& t, const string& s) : timestamp(t), status(s) {}
+
+    void dump(Formatter *f) const;
+  };
+
+#define MAX_COROUTINE_HISTORY 10
+
+  struct Status {
+    CephContext *cct;
+    RWLock lock;
+    int max_history;
+
+    utime_t timestamp;
+    stringstream status;
+
+    Status(CephContext *_cct) : cct(_cct), lock("RGWCoroutine::Status::lock"), max_history(MAX_COROUTINE_HISTORY) {}
+
+    deque<StatusItem> history;
+
+    stringstream& set_status();
+  } status;
+
+  stringstream description;
+
+protected:
+  bool _yield_ret;
+  boost::asio::coroutine drain_cr;
+
+  CephContext *cct;
+
+  RGWCoroutinesStack *stack;
+  int retcode;
+  int state;
+
+  rgw_spawned_stacks spawned;
+
+  stringstream error_stream;
+
+  int set_state(int s, int ret = 0) {
+    state = s;
+    return ret;
+  }
+  int set_cr_error(int ret) {
+    state = RGWCoroutine_Error;
+    return ret;
+  }
+  int set_cr_done() {
+    state = RGWCoroutine_Done;
+    return 0;
+  }
+  void set_io_blocked(bool flag);
+  int io_block(int ret = 0);
+
+  void reset_description() {
+    description.str(string());
+  }
+
+  stringstream& set_description() {
+    return description;
+  }
+  stringstream& set_status() {
+    return status.set_status();
+  }
+
+  stringstream& set_status(const string& s) {
+    stringstream& status = set_status();
+    status << s;
+    return status;
+  }
+
+public:
+  RGWCoroutine(CephContext *_cct) : status(_cct), _yield_ret(false), cct(_cct), stack(NULL), retcode(0), state(RGWCoroutine_Run) {}
+  ~RGWCoroutine() override;
+
+  virtual int operate() = 0;
+
+  bool is_done() { return (state == RGWCoroutine_Done || state == RGWCoroutine_Error); }
+  bool is_error() { return (state == RGWCoroutine_Error); }
+
+  stringstream& log_error() { return error_stream; }
+  string error_str() {
+    return error_stream.str();
+  }
+
+  void set_retcode(int r) {
+    retcode = r;
+  }
+
+  int get_ret_status() {
+    return retcode;
+  }
+
+  void call(RGWCoroutine *op); /* call at the same stack we're in */
+  RGWCoroutinesStack *spawn(RGWCoroutine *op, bool wait); /* execute on a different stack */
+  bool collect(int *ret, RGWCoroutinesStack *skip_stack); /* returns true if needs to be called again */
+  bool collect_next(int *ret, RGWCoroutinesStack **collected_stack = NULL); /* returns true if found a stack to collect */
+
+  int wait(const utime_t& interval);
+  bool drain_children(int num_cr_left, RGWCoroutinesStack *skip_stack = NULL); /* returns true if needed to be called again */
+  void wakeup();
+  void set_sleeping(bool flag); /* put in sleep, or wakeup from sleep */
+
+  size_t num_spawned() {
+    return spawned.entries.size();
+  }
+
+  void wait_for_child();
+
+  virtual string to_str() const;
+
+  RGWCoroutinesStack *get_stack() const {
+    return stack;
+  }
+
+  void dump(Formatter *f) const;
+};
+
+ostream& operator<<(ostream& out, const RGWCoroutine& cr);
+
+#define yield_until_true(x)     \
+do {                            \
+  do {                          \
+    yield _yield_ret = x;       \
+  } while (!_yield_ret);        \
+  _yield_ret = false;           \
+} while (0)
+
+#define drain_all() \
+  drain_cr = boost::asio::coroutine(); \
+  yield_until_true(drain_children(0))
+
+#define drain_all_but(n) \
+  drain_cr = boost::asio::coroutine(); \
+  yield_until_true(drain_children(n))
+
+#define drain_all_but_stack(stack) \
+  drain_cr = boost::asio::coroutine(); \
+  yield_until_true(drain_children(1, stack))
+
+template <class T>
+class RGWConsumerCR : public RGWCoroutine {
+  list<T> product;
+
+public:
+  RGWConsumerCR(CephContext *_cct) : RGWCoroutine(_cct) {}
+
+  bool has_product() {
+    return !product.empty();
+  }
+
+  void wait_for_product() {
+    if (!has_product()) {
+      set_sleeping(true);
+    }
+  }
+
+  bool consume(T *p) {
+    if (product.empty()) {
+      return false;
+    }
+    *p = product.front();
+    product.pop_front();
+    return true;
+  }
+
+  void receive(const T& p, bool wakeup = true);
+  void receive(list<T>& l, bool wakeup = true);
+};
+
+class RGWCoroutinesStack : public RefCountedObject {
+  friend class RGWCoroutine;
+  friend class RGWCoroutinesManager;
+
+  CephContext *cct;
+
+  RGWCoroutinesManager *ops_mgr;
+
+  list<RGWCoroutine *> ops;
+  list<RGWCoroutine *>::iterator pos;
+
+  rgw_spawned_stacks spawned;
+
+  set<RGWCoroutinesStack *> blocked_by_stack;
+  set<RGWCoroutinesStack *> blocking_stacks;
+
+  bool done_flag;
+  bool error_flag;
+  bool blocked_flag;
+  bool sleep_flag;
+  bool interval_wait_flag;
+
+  bool is_scheduled;
+
+  bool is_waiting_for_child;
+
+  int retcode;
+
+  uint64_t run_count;
+
+protected:
+  RGWCoroutinesEnv *env;
+  RGWCoroutinesStack *parent;
+
+  RGWCoroutinesStack *spawn(RGWCoroutine *source_op, RGWCoroutine *next_op, bool wait);
+  bool collect(RGWCoroutine *op, int *ret, RGWCoroutinesStack *skip_stack); /* returns true if needs to be called again */
+  bool collect_next(RGWCoroutine *op, int *ret, RGWCoroutinesStack **collected_stack); /* returns true if found a stack to collect */
+public:
+  RGWCoroutinesStack(CephContext *_cct, RGWCoroutinesManager *_ops_mgr, RGWCoroutine *start = NULL);
+  ~RGWCoroutinesStack() override;
+
+  int operate(RGWCoroutinesEnv *env);
+
+  bool is_done() {
+    return done_flag;
+  }
+  bool is_error() {
+    return error_flag;
+  }
+  bool is_blocked_by_stack() {
+    return !blocked_by_stack.empty();
+  }
+  void set_io_blocked(bool flag) {
+    blocked_flag = flag;
+  }
+  bool is_io_blocked() {
+    return blocked_flag;
+  }
+  void set_interval_wait(bool flag) {
+    interval_wait_flag = flag;
+  }
+  bool is_interval_waiting() {
+    return interval_wait_flag;
+  }
+  void set_sleeping(bool flag) {
+    bool wakeup = sleep_flag & !flag;
+    sleep_flag = flag;
+    if (wakeup) {
+      schedule();
+    }
+  }
+  bool is_sleeping() {
+    return sleep_flag;
+  }
+  void set_is_scheduled(bool flag) {
+    is_scheduled = flag;
+  }
+
+  bool is_blocked() {
+    return is_blocked_by_stack() || is_sleeping() ||
+          is_io_blocked() || waiting_for_child() ;
+  }
+
+  void schedule(list<RGWCoroutinesStack *> *stacks = NULL) {
+    if (!stacks) {
+      stacks = env->scheduled_stacks;
+    }
+    if (!is_scheduled) {
+      stacks->push_back(this);
+      is_scheduled = true;
+    }
+  }
+
+  int get_ret_status() {
+    return retcode;
+  }
+
+  string error_str();
+
+  void call(RGWCoroutine *next_op);
+  RGWCoroutinesStack *spawn(RGWCoroutine *next_op, bool wait);
+  int unwind(int retcode);
+
+  int wait(const utime_t& interval);
+  void wakeup();
+
+  bool collect(int *ret, RGWCoroutinesStack *skip_stack); /* returns true if needs to be called again */
+
+  RGWAioCompletionNotifier *create_completion_notifier();
+  RGWCompletionManager *get_completion_mgr();
+
+  void set_blocked_by(RGWCoroutinesStack *s) {
+    blocked_by_stack.insert(s);
+    s->blocking_stacks.insert(this);
+  }
+
+  void set_wait_for_child(bool flag) {
+    is_waiting_for_child = flag;
+  }
+
+  bool waiting_for_child() {
+    return is_waiting_for_child;
+  }
+
+  bool unblock_stack(RGWCoroutinesStack **s);
+
+  RGWCoroutinesEnv *get_env() { return env; }
+
+  void dump(Formatter *f) const;
+};
+
+template <class T>
+void RGWConsumerCR<T>::receive(list<T>& l, bool wakeup)
+{
+  product.splice(product.end(), l);
+  if (wakeup) {
+    set_sleeping(false);
+  }
+}
+
+
+template <class T>
+void RGWConsumerCR<T>::receive(const T& p, bool wakeup)
+{
+  product.push_back(p);
+  if (wakeup) {
+    set_sleeping(false);
+  }
+}
+
+class RGWCoroutinesManagerRegistry : public RefCountedObject, public AdminSocketHook {
+  CephContext *cct;
+
+  set<RGWCoroutinesManager *> managers;
+  RWLock lock;
+
+  string admin_command;
+
+public:
+  RGWCoroutinesManagerRegistry(CephContext *_cct) : cct(_cct), lock("RGWCoroutinesRegistry::lock") {}
+  ~RGWCoroutinesManagerRegistry() override;
+
+  void add(RGWCoroutinesManager *mgr);
+  void remove(RGWCoroutinesManager *mgr);
+
+  int hook_to_admin_command(const string& command);
+  bool call(std::string command, cmdmap_t& cmdmap, std::string format,
+           bufferlist& out) override;
+    
+  void dump(Formatter *f) const;
+};
+
+class RGWCoroutinesManager {
+  CephContext *cct;
+  std::atomic<bool> going_down = { false };
+
+  std::atomic<int64_t> run_context_count = { 0 };
+  map<uint64_t, set<RGWCoroutinesStack *> > run_contexts;
+
+  RWLock lock;
+
+  void handle_unblocked_stack(set<RGWCoroutinesStack *>& context_stacks, list<RGWCoroutinesStack *>& scheduled_stacks, RGWCoroutinesStack *stack, int *waiting_count);
+protected:
+  RGWCompletionManager *completion_mgr;
+  RGWCoroutinesManagerRegistry *cr_registry;
+
+  int ops_window;
+
+  string id;
+
+  void put_completion_notifier(RGWAioCompletionNotifier *cn);
+public:
+  RGWCoroutinesManager(CephContext *_cct, RGWCoroutinesManagerRegistry *_cr_registry) : cct(_cct), lock("RGWCoroutinesManager::lock"),
+                                                                                        cr_registry(_cr_registry), ops_window(RGW_ASYNC_OPS_MGR_WINDOW) {
+    completion_mgr = new RGWCompletionManager(cct);
+    if (cr_registry) {
+      cr_registry->add(this);
+    }
+  }
+  virtual ~RGWCoroutinesManager() {
+    stop();
+    completion_mgr->put();
+    if (cr_registry) {
+      cr_registry->remove(this);
+    }
+  }
+
+  int run(list<RGWCoroutinesStack *>& ops);
+  int run(RGWCoroutine *op);
+  void stop() {
+    bool expected = false;
+    if (going_down.compare_exchange_strong(expected, true)) {
+      completion_mgr->go_down();
+    }
+  }
+
+  virtual void report_error(RGWCoroutinesStack *op);
+
+  RGWAioCompletionNotifier *create_completion_notifier(RGWCoroutinesStack *stack);
+  RGWCompletionManager *get_completion_mgr() { return completion_mgr; }
+
+  void schedule(RGWCoroutinesEnv *env, RGWCoroutinesStack *stack);
+  RGWCoroutinesStack *allocate_stack();
+
+  virtual string get_id();
+  void dump(Formatter *f) const;
+};
+
+class RGWSimpleCoroutine : public RGWCoroutine {
+  bool called_cleanup;
+
+  int operate() override;
+
+  int state_init();
+  int state_send_request();
+  int state_request_complete();
+  int state_all_complete();
+
+  void call_cleanup();
+
+public:
+  RGWSimpleCoroutine(CephContext *_cct) : RGWCoroutine(_cct), called_cleanup(false) {}
+  ~RGWSimpleCoroutine() override;
+
+  virtual int init() { return 0; }
+  virtual int send_request() = 0;
+  virtual int request_complete() = 0;
+  virtual int finish() { return 0; }
+  virtual void request_cleanup() {}
+};
+
+#endif