initial code repo
[stor4nfv.git] / src / ceph / src / kv / RocksDBStore.h
diff --git a/src/ceph/src/kv/RocksDBStore.h b/src/ceph/src/kv/RocksDBStore.h
new file mode 100644 (file)
index 0000000..44c99e1
--- /dev/null
@@ -0,0 +1,458 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+#ifndef ROCKS_DB_STORE_H
+#define ROCKS_DB_STORE_H
+
+#include "include/types.h"
+#include "include/buffer_fwd.h"
+#include "KeyValueDB.h"
+#include <set>
+#include <map>
+#include <string>
+#include <memory>
+#include <boost/scoped_ptr.hpp>
+#include "rocksdb/write_batch.h"
+#include "rocksdb/perf_context.h"
+#include "rocksdb/iostats_context.h"
+#include "rocksdb/statistics.h"
+#include "rocksdb/table.h"
+#include <errno.h>
+#include "common/errno.h"
+#include "common/dout.h"
+#include "include/assert.h"
+#include "common/Formatter.h"
+#include "common/Cond.h"
+
+#include "common/ceph_context.h"
+class PerfCounters;
+
+enum {
+  l_rocksdb_first = 34300,
+  l_rocksdb_gets,
+  l_rocksdb_txns,
+  l_rocksdb_txns_sync,
+  l_rocksdb_get_latency,
+  l_rocksdb_submit_latency,
+  l_rocksdb_submit_sync_latency,
+  l_rocksdb_compact,
+  l_rocksdb_compact_range,
+  l_rocksdb_compact_queue_merge,
+  l_rocksdb_compact_queue_len,
+  l_rocksdb_write_wal_time,
+  l_rocksdb_write_memtable_time,
+  l_rocksdb_write_delay_time,
+  l_rocksdb_write_pre_and_post_process_time,
+  l_rocksdb_last,
+};
+
+namespace rocksdb{
+  class DB;
+  class Env;
+  class Cache;
+  class FilterPolicy;
+  class Snapshot;
+  class Slice;
+  class WriteBatch;
+  class Iterator;
+  class Logger;
+  struct Options;
+  struct BlockBasedTableOptions;
+}
+
+extern rocksdb::Logger *create_rocksdb_ceph_logger();
+
+/**
+ * Uses RocksDB to implement the KeyValueDB interface
+ */
+class RocksDBStore : public KeyValueDB {
+  CephContext *cct;
+  PerfCounters *logger;
+  string path;
+  void *priv;
+  rocksdb::DB *db;
+  rocksdb::Env *env;
+  std::shared_ptr<rocksdb::Statistics> dbstats;
+  rocksdb::BlockBasedTableOptions bbt_opts;
+  string options_str;
+
+  uint64_t cache_size = 0;
+  bool set_cache_flag = false;
+
+  int do_open(ostream &out, bool create_if_missing);
+
+  // manage async compactions
+  Mutex compact_queue_lock;
+  Cond compact_queue_cond;
+  list< pair<string,string> > compact_queue;
+  bool compact_queue_stop;
+  class CompactThread : public Thread {
+    RocksDBStore *db;
+  public:
+    explicit CompactThread(RocksDBStore *d) : db(d) {}
+    void *entry() override {
+      db->compact_thread_entry();
+      return NULL;
+    }
+    friend class RocksDBStore;
+  } compact_thread;
+
+  void compact_thread_entry();
+
+  void compact_range(const string& start, const string& end);
+  void compact_range_async(const string& start, const string& end);
+
+public:
+  /// compact the underlying rocksdb store
+  bool compact_on_mount;
+  bool disableWAL;
+  bool enable_rmrange;
+  void compact() override;
+
+  int tryInterpret(const string& key, const string& val, rocksdb::Options &opt);
+  int ParseOptionsFromString(const string& opt_str, rocksdb::Options &opt);
+  static int _test_init(const string& dir);
+  int init(string options_str) override;
+  /// compact rocksdb for all keys with a given prefix
+  void compact_prefix(const string& prefix) override {
+    compact_range(prefix, past_prefix(prefix));
+  }
+  void compact_prefix_async(const string& prefix) override {
+    compact_range_async(prefix, past_prefix(prefix));
+  }
+
+  void compact_range(const string& prefix, const string& start, const string& end) override {
+    compact_range(combine_strings(prefix, start), combine_strings(prefix, end));
+  }
+  void compact_range_async(const string& prefix, const string& start, const string& end) override {
+    compact_range_async(combine_strings(prefix, start), combine_strings(prefix, end));
+  }
+
+  RocksDBStore(CephContext *c, const string &path, void *p) :
+    cct(c),
+    logger(NULL),
+    path(path),
+    priv(p),
+    db(NULL),
+    env(static_cast<rocksdb::Env*>(p)),
+    dbstats(NULL),
+    compact_queue_lock("RocksDBStore::compact_thread_lock"),
+    compact_queue_stop(false),
+    compact_thread(this),
+    compact_on_mount(false),
+    disableWAL(false),
+    enable_rmrange(cct->_conf->rocksdb_enable_rmrange)
+  {}
+
+  ~RocksDBStore() override;
+
+  static bool check_omap_dir(string &omap_dir);
+  /// Opens underlying db
+  int open(ostream &out) override {
+    return do_open(out, false);
+  }
+  /// Creates underlying db if missing and opens it
+  int create_and_open(ostream &out) override;
+
+  void close() override;
+
+  void split_stats(const std::string &s, char delim, std::vector<std::string> &elems);
+  void get_statistics(Formatter *f) override;
+
+  PerfCounters *get_perf_counters() override
+  {
+    return logger;
+  }
+
+  struct  RocksWBHandler: public rocksdb::WriteBatch::Handler {
+    std::string seen ;
+    int num_seen = 0;
+    static string pretty_binary_string(const string& in) {
+      char buf[10];
+      string out;
+      out.reserve(in.length() * 3);
+      enum { NONE, HEX, STRING } mode = NONE;
+      unsigned from = 0, i;
+      for (i=0; i < in.length(); ++i) {
+        if ((in[i] < 32 || (unsigned char)in[i] > 126) ||
+          (mode == HEX && in.length() - i >= 4 &&
+          ((in[i] < 32 || (unsigned char)in[i] > 126) ||
+          (in[i+1] < 32 || (unsigned char)in[i+1] > 126) ||
+          (in[i+2] < 32 || (unsigned char)in[i+2] > 126) ||
+          (in[i+3] < 32 || (unsigned char)in[i+3] > 126)))) {
+
+          if (mode == STRING) {
+            out.append(in.substr(from, i - from));
+            out.push_back('\'');
+          }
+          if (mode != HEX) {
+            out.append("0x");
+            mode = HEX;
+          }
+          if (in.length() - i >= 4) {
+            // print a whole u32 at once
+            snprintf(buf, sizeof(buf), "%08x",
+                  (uint32_t)(((unsigned char)in[i] << 24) |
+                            ((unsigned char)in[i+1] << 16) |
+                            ((unsigned char)in[i+2] << 8) |
+                            ((unsigned char)in[i+3] << 0)));
+            i += 3;
+          } else {
+            snprintf(buf, sizeof(buf), "%02x", (int)(unsigned char)in[i]);
+          }
+          out.append(buf);
+        } else {
+          if (mode != STRING) {
+            out.push_back('\'');
+            mode = STRING;
+            from = i;
+          }
+        }
+      }
+      if (mode == STRING) {
+        out.append(in.substr(from, i - from));
+        out.push_back('\'');
+      }
+      return out;
+    }
+    void Put(const rocksdb::Slice& key,
+                    const rocksdb::Slice& value) override {
+      string prefix ((key.ToString()).substr(0,1));
+      string key_to_decode ((key.ToString()).substr(2,string::npos));
+      uint64_t size = (value.ToString()).size();
+      seen += "\nPut( Prefix = " + prefix + " key = " 
+            + pretty_binary_string(key_to_decode) 
+            + " Value size = " + std::to_string(size) + ")";
+      num_seen++;
+    }
+    void SingleDelete(const rocksdb::Slice& key) override {
+      string prefix ((key.ToString()).substr(0,1));
+      string key_to_decode ((key.ToString()).substr(2,string::npos));
+      seen += "\nSingleDelete(Prefix = "+ prefix + " Key = " 
+            + pretty_binary_string(key_to_decode) + ")";
+      num_seen++;
+    }
+    void Delete(const rocksdb::Slice& key) override {
+      string prefix ((key.ToString()).substr(0,1));
+      string key_to_decode ((key.ToString()).substr(2,string::npos));
+      seen += "\nDelete( Prefix = " + prefix + " key = " 
+            + pretty_binary_string(key_to_decode) + ")";
+
+      num_seen++;
+    }
+    void Merge(const rocksdb::Slice& key,
+                      const rocksdb::Slice& value) override {
+      string prefix ((key.ToString()).substr(0,1));
+      string key_to_decode ((key.ToString()).substr(2,string::npos));
+      uint64_t size = (value.ToString()).size();
+      seen += "\nMerge( Prefix = " + prefix + " key = " 
+            + pretty_binary_string(key_to_decode) + " Value size = " 
+            + std::to_string(size) + ")";
+
+      num_seen++;
+    }
+    bool Continue() override { return num_seen < 50; }
+
+  };
+
+
+  class RocksDBTransactionImpl : public KeyValueDB::TransactionImpl {
+  public:
+    rocksdb::WriteBatch bat;
+    RocksDBStore *db;
+
+    explicit RocksDBTransactionImpl(RocksDBStore *_db);
+    void set(
+      const string &prefix,
+      const string &k,
+      const bufferlist &bl) override;
+    void set(
+      const string &prefix,
+      const char *k,
+      size_t keylen,
+      const bufferlist &bl) override;
+    void rmkey(
+      const string &prefix,
+      const string &k) override;
+    void rmkey(
+      const string &prefix,
+      const char *k,
+      size_t keylen) override;
+    void rm_single_key(
+      const string &prefix,
+      const string &k) override;
+    void rmkeys_by_prefix(
+      const string &prefix
+      ) override;
+    void rm_range_keys(
+      const string &prefix,
+      const string &start,
+      const string &end) override;
+    void merge(
+      const string& prefix,
+      const string& k,
+      const bufferlist &bl) override;
+  };
+
+  KeyValueDB::Transaction get_transaction() override {
+    return std::make_shared<RocksDBTransactionImpl>(this);
+  }
+
+  int submit_transaction(KeyValueDB::Transaction t) override;
+  int submit_transaction_sync(KeyValueDB::Transaction t) override;
+  int get(
+    const string &prefix,
+    const std::set<string> &key,
+    std::map<string, bufferlist> *out
+    ) override;
+  int get(
+    const string &prefix,
+    const string &key,
+    bufferlist *out
+    ) override;
+  int get(
+    const string &prefix,
+    const char *key,
+    size_t keylen,
+    bufferlist *out) override;
+
+
+  class RocksDBWholeSpaceIteratorImpl :
+    public KeyValueDB::WholeSpaceIteratorImpl {
+  protected:
+    rocksdb::Iterator *dbiter;
+  public:
+    explicit RocksDBWholeSpaceIteratorImpl(rocksdb::Iterator *iter) :
+      dbiter(iter) { }
+    //virtual ~RocksDBWholeSpaceIteratorImpl() { }
+    ~RocksDBWholeSpaceIteratorImpl() override;
+
+    int seek_to_first() override;
+    int seek_to_first(const string &prefix) override;
+    int seek_to_last() override;
+    int seek_to_last(const string &prefix) override;
+    int upper_bound(const string &prefix, const string &after) override;
+    int lower_bound(const string &prefix, const string &to) override;
+    bool valid() override;
+    int next() override;
+    int prev() override;
+    string key() override;
+    pair<string,string> raw_key() override;
+    bool raw_key_is_prefixed(const string &prefix) override;
+    bufferlist value() override;
+    bufferptr value_as_ptr() override;
+    int status() override;
+    size_t key_size() override;
+    size_t value_size() override;
+  };
+
+  /// Utility
+  static string combine_strings(const string &prefix, const string &value) {
+    string out = prefix;
+    out.push_back(0);
+    out.append(value);
+    return out;
+  }
+  static void combine_strings(const string &prefix,
+                             const char *key, size_t keylen,
+                             string *out) {
+    out->reserve(prefix.size() + 1 + keylen);
+    *out = prefix;
+    out->push_back(0);
+    out->append(key, keylen);
+  }
+
+  static int split_key(rocksdb::Slice in, string *prefix, string *key);
+
+  static bufferlist to_bufferlist(rocksdb::Slice in) {
+    bufferlist bl;
+    bl.append(bufferptr(in.data(), in.size()));
+    return bl;
+  }
+
+  static string past_prefix(const string &prefix);
+
+  class MergeOperatorRouter;
+  friend class MergeOperatorRouter;
+  int set_merge_operator(const std::string& prefix,
+                                std::shared_ptr<KeyValueDB::MergeOperator> mop) override;
+  string assoc_name; ///< Name of associative operator
+
+  uint64_t get_estimated_size(map<string,uint64_t> &extra) override {
+    DIR *store_dir = opendir(path.c_str());
+    if (!store_dir) {
+      lderr(cct) << __func__ << " something happened opening the store: "
+                 << cpp_strerror(errno) << dendl;
+      return 0;
+    }
+
+    uint64_t total_size = 0;
+    uint64_t sst_size = 0;
+    uint64_t log_size = 0;
+    uint64_t misc_size = 0;
+
+    struct dirent *entry = NULL;
+    while ((entry = readdir(store_dir)) != NULL) {
+      string n(entry->d_name);
+
+      if (n == "." || n == "..")
+        continue;
+
+      string fpath = path + '/' + n;
+      struct stat s;
+      int err = stat(fpath.c_str(), &s);
+      if (err < 0)
+       err = -errno;
+      // we may race against rocksdb while reading files; this should only
+      // happen when those files are being updated, data is being shuffled
+      // and files get removed, in which case there's not much of a problem
+      // as we'll get to them next time around.
+      if (err == -ENOENT) {
+       continue;
+      }
+      if (err < 0) {
+        lderr(cct) << __func__ << " error obtaining stats for " << fpath
+                   << ": " << cpp_strerror(err) << dendl;
+        goto err;
+      }
+
+      size_t pos = n.find_last_of('.');
+      if (pos == string::npos) {
+        misc_size += s.st_size;
+        continue;
+      }
+
+      string ext = n.substr(pos+1);
+      if (ext == "sst") {
+        sst_size += s.st_size;
+      } else if (ext == "log") {
+        log_size += s.st_size;
+      } else {
+        misc_size += s.st_size;
+      }
+    }
+
+    total_size = sst_size + log_size + misc_size;
+
+    extra["sst"] = sst_size;
+    extra["log"] = log_size;
+    extra["misc"] = misc_size;
+    extra["total"] = total_size;
+
+err:
+    closedir(store_dir);
+    return total_size;
+  }
+
+  int set_cache_size(uint64_t s) override {
+    cache_size = s;
+    set_cache_flag = true;
+    return 0;
+  }
+
+protected:
+  WholeSpaceIterator _get_iterator() override;
+};
+
+
+
+#endif