initial code repo
[stor4nfv.git] / src / ceph / src / common / Finisher.h
diff --git a/src/ceph/src/common/Finisher.h b/src/ceph/src/common/Finisher.h
new file mode 100644 (file)
index 0000000..e1a4519
--- /dev/null
@@ -0,0 +1,189 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- 
+// vim: ts=8 sw=2 smarttab
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software 
+ * Foundation.  See file COPYING.
+ * 
+ */
+
+#ifndef CEPH_FINISHER_H
+#define CEPH_FINISHER_H
+
+#include "common/Mutex.h"
+#include "common/Cond.h"
+#include "common/perf_counters.h"
+
+class CephContext;
+
+/// Finisher queue length performance counter ID.
+enum {
+  l_finisher_first = 997082,
+  l_finisher_queue_len,
+  l_finisher_complete_lat,
+  l_finisher_last
+};
+
+/** @brief Asynchronous cleanup class.
+ * Finisher asynchronously completes Contexts, which are simple classes
+ * representing callbacks, in a dedicated worker thread. Enqueuing
+ * contexts to complete is thread-safe.
+ */
+class Finisher {
+  CephContext *cct;
+  Mutex        finisher_lock; ///< Protects access to queues and finisher_running.
+  Cond         finisher_cond; ///< Signaled when there is something to process.
+  Cond         finisher_empty_cond; ///< Signaled when the finisher has nothing more to process.
+  bool         finisher_stop; ///< Set when the finisher should stop.
+  bool         finisher_running; ///< True when the finisher is currently executing contexts.
+  bool        finisher_empty_wait; ///< True mean someone wait finisher empty.
+  /// Queue for contexts for which complete(0) will be called.
+  /// NULLs in this queue indicate that an item from finisher_queue_rval
+  /// should be completed in that place instead.
+  vector<Context*> finisher_queue;
+
+  string thread_name;
+
+  /// Queue for contexts for which the complete function will be called
+  /// with a parameter other than 0.
+  list<pair<Context*,int> > finisher_queue_rval;
+
+  /// Performance counter for the finisher's queue length.
+  /// Only active for named finishers.
+  PerfCounters *logger;
+  
+  void *finisher_thread_entry();
+
+  struct FinisherThread : public Thread {
+    Finisher *fin;    
+    explicit FinisherThread(Finisher *f) : fin(f) {}
+    void* entry() override { return (void*)fin->finisher_thread_entry(); }
+  } finisher_thread;
+
+ public:
+  /// Add a context to complete, optionally specifying a parameter for the complete function.
+  void queue(Context *c, int r = 0) {
+    finisher_lock.Lock();
+    if (finisher_queue.empty()) {
+      finisher_cond.Signal();
+    }
+    if (r) {
+      finisher_queue_rval.push_back(pair<Context*, int>(c, r));
+      finisher_queue.push_back(NULL);
+    } else
+      finisher_queue.push_back(c);
+    if (logger)
+      logger->inc(l_finisher_queue_len);
+    finisher_lock.Unlock();
+  }
+  void queue(vector<Context*>& ls) {
+    finisher_lock.Lock();
+    if (finisher_queue.empty()) {
+      finisher_cond.Signal();
+    }
+    finisher_queue.insert(finisher_queue.end(), ls.begin(), ls.end());
+    if (logger)
+      logger->inc(l_finisher_queue_len, ls.size());
+    finisher_lock.Unlock();
+    ls.clear();
+  }
+  void queue(deque<Context*>& ls) {
+    finisher_lock.Lock();
+    if (finisher_queue.empty()) {
+      finisher_cond.Signal();
+    }
+    finisher_queue.insert(finisher_queue.end(), ls.begin(), ls.end());
+    if (logger)
+      logger->inc(l_finisher_queue_len, ls.size());
+    finisher_lock.Unlock();
+    ls.clear();
+  }
+  void queue(list<Context*>& ls) {
+    finisher_lock.Lock();
+    if (finisher_queue.empty()) {
+      finisher_cond.Signal();
+    }
+    finisher_queue.insert(finisher_queue.end(), ls.begin(), ls.end());
+    if (logger)
+      logger->inc(l_finisher_queue_len, ls.size());
+    finisher_lock.Unlock();
+    ls.clear();
+  }
+
+  /// Start the worker thread.
+  void start();
+
+  /** @brief Stop the worker thread.
+   *
+   * Does not wait until all outstanding contexts are completed.
+   * To ensure that everything finishes, you should first shut down
+   * all sources that can add contexts to this finisher and call
+   * wait_for_empty() before calling stop(). */
+  void stop();
+
+  /** @brief Blocks until the finisher has nothing left to process.
+   * This function will also return when a concurrent call to stop()
+   * finishes, but this class should never be used in this way. */
+  void wait_for_empty();
+
+  /// Construct an anonymous Finisher.
+  /// Anonymous finishers do not log their queue length.
+  explicit Finisher(CephContext *cct_) :
+    cct(cct_), finisher_lock("Finisher::finisher_lock"),
+    finisher_stop(false), finisher_running(false), finisher_empty_wait(false),
+    thread_name("fn_anonymous"), logger(0),
+    finisher_thread(this) {}
+
+  /// Construct a named Finisher that logs its queue length.
+  Finisher(CephContext *cct_, string name, string tn) :
+    cct(cct_), finisher_lock("Finisher::" + name),
+    finisher_stop(false), finisher_running(false), finisher_empty_wait(false),
+    thread_name(tn), logger(0),
+    finisher_thread(this) {
+    PerfCountersBuilder b(cct, string("finisher-") + name,
+                         l_finisher_first, l_finisher_last);
+    b.add_u64(l_finisher_queue_len, "queue_len");
+    b.add_time_avg(l_finisher_complete_lat, "complete_latency");
+    logger = b.create_perf_counters();
+    cct->get_perfcounters_collection()->add(logger);
+    logger->set(l_finisher_queue_len, 0);
+    logger->set(l_finisher_complete_lat, 0);
+  }
+
+  ~Finisher() {
+    if (logger && cct) {
+      cct->get_perfcounters_collection()->remove(logger);
+      delete logger;
+    }
+  }
+};
+
+/// Context that is completed asynchronously on the supplied finisher.
+class C_OnFinisher : public Context {
+  Context *con;
+  Finisher *fin;
+public:
+  C_OnFinisher(Context *c, Finisher *f) : con(c), fin(f) {
+    assert(fin != NULL);
+    assert(con != NULL);
+  }
+
+  ~C_OnFinisher() override {
+    if (con != nullptr) {
+      delete con;
+      con = nullptr;
+    }
+  }
+
+  void finish(int r) override {
+    fin->queue(con, r);
+    con = nullptr;
+  }
+};
+
+#endif