initial code repo
[stor4nfv.git] / src / ceph / src / test / bench / bencher.cc
diff --git a/src/ceph/src/test/bench/bencher.cc b/src/ceph/src/test/bench/bencher.cc
new file mode 100644 (file)
index 0000000..9937c5c
--- /dev/null
@@ -0,0 +1,202 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+
+#include "bencher.h"
+#include "include/utime.h"
+#include <unistd.h>
+#include "include/memory.h"
+#include "common/Mutex.h"
+#include "common/Cond.h"
+
+template<typename T>
+struct C_Holder : public Context {
+  T obj;
+  explicit C_Holder(
+    T obj)
+    : obj(obj) {}
+  void finish(int r) override {
+    return;
+  }
+};
+
+struct OnDelete {
+  Context *c;
+  explicit OnDelete(Context *c) : c(c) {}
+  ~OnDelete() { c->complete(0); }
+};
+
+struct Cleanup : public Context {
+  Bencher *bench;
+  explicit Cleanup(Bencher *bench) : bench(bench) {}
+  void finish(int r) override {
+    bench->complete_op();
+  }
+};
+
+struct OnWriteApplied : public Context {
+  Bencher *bench;
+  uint64_t seq;
+  ceph::shared_ptr<OnDelete> on_delete;
+  OnWriteApplied(
+    Bencher *bench, uint64_t seq,
+    ceph::shared_ptr<OnDelete> on_delete
+    ) : bench(bench), seq(seq), on_delete(on_delete) {}
+  void finish(int r) override {
+    bench->stat_collector->write_applied(seq);
+  }
+};
+
+struct OnWriteCommit : public Context {
+  Bencher *bench;
+  uint64_t seq;
+  ceph::shared_ptr<OnDelete> on_delete;
+  OnWriteCommit(
+    Bencher *bench, uint64_t seq,
+    ceph::shared_ptr<OnDelete> on_delete
+    ) : bench(bench), seq(seq), on_delete(on_delete) {}
+  void finish(int r) override {
+    bench->stat_collector->write_committed(seq);
+  }
+};
+
+struct OnReadComplete : public Context {
+  Bencher *bench;
+  uint64_t seq;
+  boost::scoped_ptr<bufferlist> bl;
+  OnReadComplete(Bencher *bench, uint64_t seq, bufferlist *bl) :
+    bench(bench), seq(seq), bl(bl) {}
+  void finish(int r) override {
+    bench->stat_collector->read_complete(seq);
+    bench->complete_op();
+  }
+};
+
+void Bencher::start_op() {
+  Mutex::Locker l(lock);
+  while (open_ops >= max_in_flight)
+    open_ops_cond.Wait(lock);
+  ++open_ops;
+}
+
+void Bencher::drain_ops() {
+  Mutex::Locker l(lock);
+  while (open_ops)
+    open_ops_cond.Wait(lock);
+}
+
+void Bencher::complete_op() {
+  Mutex::Locker l(lock);
+  assert(open_ops > 0);
+  --open_ops;
+  open_ops_cond.Signal();
+}
+
+struct OnFinish {
+  bool *done;
+  Mutex *lock;
+  Cond *cond;
+  OnFinish(
+    bool *done,
+    Mutex *lock,
+    Cond *cond) :
+    done(done), lock(lock), cond(cond) {}
+  ~OnFinish() {
+    Mutex::Locker l(*lock);
+    *done = true;
+    cond->Signal();
+  }
+};
+
+void Bencher::init(
+  const set<std::string> &objects,
+  uint64_t size,
+  std::ostream *out
+  )
+{
+  bufferlist bl;
+  for (uint64_t i = 0; i < size; ++i) {
+    bl.append(0);
+  }
+  Mutex lock("init_lock");
+  Cond cond;
+  bool done = 0;
+  {
+    ceph::shared_ptr<OnFinish> on_finish(
+      new OnFinish(&done, &lock, &cond));
+    uint64_t num = 0;
+    for (set<std::string>::const_iterator i = objects.begin();
+        i != objects.end();
+        ++i, ++num) {
+      if (!(num % 20))
+       *out << "Creating " << num << "/" << objects.size() << std::endl;
+      backend->write(
+       *i,
+       0,
+       bl,
+       new C_Holder<ceph::shared_ptr<OnFinish> >(on_finish),
+       new C_Holder<ceph::shared_ptr<OnFinish> >(on_finish)
+       );
+    }
+  }
+  {
+    Mutex::Locker l(lock);
+    while (!done)
+      cond.Wait(lock);
+  }
+}
+
+void Bencher::run_bench()
+{
+  time_t end = time(0) + max_duration;
+  uint64_t ops = 0;
+
+  bufferlist bl;
+
+  while ((!max_duration || time(0) < end) && (!max_ops || ops < max_ops)) {
+    start_op();
+    uint64_t seq = stat_collector->next_seq();
+    boost::tuple<std::string, uint64_t, uint64_t, OpType> next =
+      (*op_dist)();
+    string obj_name = next.get<0>();
+    uint64_t offset = next.get<1>();
+    uint64_t length = next.get<2>();
+    OpType op_type = next.get<3>();
+    switch (op_type) {
+      case WRITE: {
+       ceph::shared_ptr<OnDelete> on_delete(
+         new OnDelete(new Cleanup(this)));
+       stat_collector->start_write(seq, length);
+       while (bl.length() < length) {
+         bl.append(rand());
+       }
+       backend->write(
+         obj_name,
+         offset,
+         bl,
+         new OnWriteApplied(
+           this, seq, on_delete),
+         new OnWriteCommit(
+           this, seq, on_delete)
+         );
+       break;
+      }
+      case READ: {
+       stat_collector->start_read(seq, length);
+       bufferlist *read_bl = new bufferlist;
+       backend->read(
+         obj_name,
+         offset,
+         length,
+         read_bl,
+         new OnReadComplete(
+           this, seq, read_bl)
+         );
+       break;
+      }
+      default: {
+       ceph_abort();
+      }
+    } 
+    ops++;
+  }
+  drain_ops();
+}