initial code repo
[stor4nfv.git] / src / ceph / src / test / bench / dumb_backend.h
diff --git a/src/ceph/src/test/bench/dumb_backend.h b/src/ceph/src/test/bench/dumb_backend.h
new file mode 100644 (file)
index 0000000..11843f1
--- /dev/null
@@ -0,0 +1,168 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+
+#ifndef DUMBBACKEND
+#define DUMBBACKEND
+
+#include "backend.h"
+#include "include/Context.h"
+#include "os/ObjectStore.h"
+#include "common/WorkQueue.h"
+#include "common/Semaphore.h"
+
+#include <deque>
+
+class DumbBackend : public Backend {
+       const string path;
+
+  struct write_item {
+    const string oid;
+    bufferlist bl;
+    uint64_t offset;
+    Context *on_applied;
+    Context *on_commit;
+    write_item(
+      const string &oid,
+      const bufferlist &bl,
+      uint64_t offset,
+      Context *on_applied,
+      Context *on_commit) :
+      oid(oid), bl(bl), offset(offset), on_applied(on_applied),
+      on_commit(on_commit) {}
+  };
+
+  Semaphore sem;
+
+  bool do_fsync;
+  bool do_sync_file_range;
+  bool do_fadvise;
+  unsigned sync_interval;
+  int sync_fd;
+  ThreadPool tp;
+
+  class SyncThread : public Thread {
+    DumbBackend *backend;
+  public:
+    explicit SyncThread(DumbBackend *backend) : backend(backend) {}
+    void *entry() override {
+      backend->sync_loop();
+      return 0;
+    }
+  } thread;
+  friend class SyncThread;
+
+  Mutex sync_loop_mutex;
+  Cond sync_loop_cond;
+  int sync_loop_stop; // 0 for running, 1 for stopping, 2 for stopped
+  void sync_loop();
+
+  Mutex pending_commit_mutex;
+  set<Context*> pending_commits;
+
+  class WriteQueue : public ThreadPool::WorkQueue<write_item> {
+    deque<write_item*> item_queue;
+    DumbBackend *backend;
+
+  public:
+    WriteQueue(
+      DumbBackend *_backend,
+      time_t ti,
+      ThreadPool *tp) :
+      ThreadPool::WorkQueue<write_item>("DumbBackend::queue", ti, ti*10, tp),
+      backend(_backend) {}
+    bool _enqueue(write_item *item) override {
+      item_queue.push_back(item);
+      return true;
+    }
+    void _dequeue(write_item*) override { ceph_abort(); }
+    write_item *_dequeue() override {
+      if (item_queue.empty())
+       return 0;
+      write_item *retval = item_queue.front();
+      item_queue.pop_front();
+      return retval;
+    }
+    bool _empty() override {
+      return item_queue.empty();
+    }
+    void _process(write_item *item, ThreadPool::TPHandle &) override {
+      return backend->_write(
+       item->oid,
+       item->offset,
+       item->bl,
+       item->on_applied,
+       item->on_commit);
+    }
+    void _clear() override {
+      return item_queue.clear();
+    }
+  } queue;
+  friend class WriteQueue;
+
+  string get_full_path(const string &oid);
+
+  void _write(
+    const string &oid,
+    uint64_t offset,
+    const bufferlist &bl,
+    Context *on_applied,
+    Context *on_commit);
+
+public:
+  DumbBackend(
+    const string &path,
+    bool do_fsync,
+    bool do_sync_file_range,
+    bool do_fadvise,
+    unsigned sync_interval,
+    int sync_fd,
+    unsigned worker_threads,
+    CephContext *cct)
+    : path(path), do_fsync(do_fsync),
+      do_sync_file_range(do_sync_file_range),
+      do_fadvise(do_fadvise),
+      sync_interval(sync_interval),
+      sync_fd(sync_fd),
+      tp(cct, "DumbBackend::tp", "tp_dumb_backend", worker_threads),
+      thread(this),
+      sync_loop_mutex("DumbBackend::sync_loop_mutex"),
+      sync_loop_stop(0),
+      pending_commit_mutex("DumbBackend::pending_commit_mutex"),
+      queue(this, 20, &tp) {
+    thread.create("thread");
+    tp.start();
+    for (unsigned i = 0; i < 10*worker_threads; ++i) {
+      sem.Put();
+    }
+  }
+  ~DumbBackend() override {
+    {
+      Mutex::Locker l(sync_loop_mutex);
+      if (sync_loop_stop == 0)
+       sync_loop_stop = 1;
+      while (sync_loop_stop < 2)
+       sync_loop_cond.Wait(sync_loop_mutex);
+    }
+    tp.stop();
+    thread.join();
+  }
+  void write(
+    const string &oid,
+    uint64_t offset,
+    const bufferlist &bl,
+    Context *on_applied,
+    Context *on_commit) override {
+    sem.Get();
+    queue.queue(
+      new write_item(
+       oid, bl, offset, on_applied, on_commit));
+  }
+
+  void read(
+    const string &oid,
+    uint64_t offset,
+    uint64_t length,
+    bufferlist *bl,
+    Context *on_complete) override;
+};
+
+#endif