X-Git-Url: https://gerrit.opnfv.org/gerrit/gitweb?a=blobdiff_plain;f=src%2Fceph%2Fsrc%2Fkv%2FRocksDBStore.h;fp=src%2Fceph%2Fsrc%2Fkv%2FRocksDBStore.h;h=0000000000000000000000000000000000000000;hb=7da45d65be36d36b880cc55c5036e96c24b53f00;hp=44c99e1b2e9d59b6ec8aaeb48735e09afaace8d3;hpb=691462d09d0987b47e112d6ee8740375df3c51b2;p=stor4nfv.git diff --git a/src/ceph/src/kv/RocksDBStore.h b/src/ceph/src/kv/RocksDBStore.h deleted file mode 100644 index 44c99e1..0000000 --- a/src/ceph/src/kv/RocksDBStore.h +++ /dev/null @@ -1,458 +0,0 @@ -// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- -// vim: ts=8 sw=2 smarttab -#ifndef ROCKS_DB_STORE_H -#define ROCKS_DB_STORE_H - -#include "include/types.h" -#include "include/buffer_fwd.h" -#include "KeyValueDB.h" -#include -#include -#include -#include -#include -#include "rocksdb/write_batch.h" -#include "rocksdb/perf_context.h" -#include "rocksdb/iostats_context.h" -#include "rocksdb/statistics.h" -#include "rocksdb/table.h" -#include -#include "common/errno.h" -#include "common/dout.h" -#include "include/assert.h" -#include "common/Formatter.h" -#include "common/Cond.h" - -#include "common/ceph_context.h" -class PerfCounters; - -enum { - l_rocksdb_first = 34300, - l_rocksdb_gets, - l_rocksdb_txns, - l_rocksdb_txns_sync, - l_rocksdb_get_latency, - l_rocksdb_submit_latency, - l_rocksdb_submit_sync_latency, - l_rocksdb_compact, - l_rocksdb_compact_range, - l_rocksdb_compact_queue_merge, - l_rocksdb_compact_queue_len, - l_rocksdb_write_wal_time, - l_rocksdb_write_memtable_time, - l_rocksdb_write_delay_time, - l_rocksdb_write_pre_and_post_process_time, - l_rocksdb_last, -}; - -namespace rocksdb{ - class DB; - class Env; - class Cache; - class FilterPolicy; - class Snapshot; - class Slice; - class WriteBatch; - class Iterator; - class Logger; - struct Options; - struct BlockBasedTableOptions; -} - -extern rocksdb::Logger *create_rocksdb_ceph_logger(); - -/** - * Uses RocksDB to implement the KeyValueDB interface - */ -class RocksDBStore : public KeyValueDB { - CephContext *cct; - PerfCounters *logger; - string path; - void *priv; - rocksdb::DB *db; - rocksdb::Env *env; - std::shared_ptr dbstats; - rocksdb::BlockBasedTableOptions bbt_opts; - string options_str; - - uint64_t cache_size = 0; - bool set_cache_flag = false; - - int do_open(ostream &out, bool create_if_missing); - - // manage async compactions - Mutex compact_queue_lock; - Cond compact_queue_cond; - list< pair > compact_queue; - bool compact_queue_stop; - class CompactThread : public Thread { - RocksDBStore *db; - public: - explicit CompactThread(RocksDBStore *d) : db(d) {} - void *entry() override { - db->compact_thread_entry(); - return NULL; - } - friend class RocksDBStore; - } compact_thread; - - void compact_thread_entry(); - - void compact_range(const string& start, const string& end); - void compact_range_async(const string& start, const string& end); - -public: - /// compact the underlying rocksdb store - bool compact_on_mount; - bool disableWAL; - bool enable_rmrange; - void compact() override; - - int tryInterpret(const string& key, const string& val, rocksdb::Options &opt); - int ParseOptionsFromString(const string& opt_str, rocksdb::Options &opt); - static int _test_init(const string& dir); - int init(string options_str) override; - /// compact rocksdb for all keys with a given prefix - void compact_prefix(const string& prefix) override { - compact_range(prefix, past_prefix(prefix)); - } - void compact_prefix_async(const string& prefix) override { - compact_range_async(prefix, past_prefix(prefix)); - } - - void compact_range(const string& prefix, const string& start, const string& end) override { - compact_range(combine_strings(prefix, start), combine_strings(prefix, end)); - } - void compact_range_async(const string& prefix, const string& start, const string& end) override { - compact_range_async(combine_strings(prefix, start), combine_strings(prefix, end)); - } - - RocksDBStore(CephContext *c, const string &path, void *p) : - cct(c), - logger(NULL), - path(path), - priv(p), - db(NULL), - env(static_cast(p)), - dbstats(NULL), - compact_queue_lock("RocksDBStore::compact_thread_lock"), - compact_queue_stop(false), - compact_thread(this), - compact_on_mount(false), - disableWAL(false), - enable_rmrange(cct->_conf->rocksdb_enable_rmrange) - {} - - ~RocksDBStore() override; - - static bool check_omap_dir(string &omap_dir); - /// Opens underlying db - int open(ostream &out) override { - return do_open(out, false); - } - /// Creates underlying db if missing and opens it - int create_and_open(ostream &out) override; - - void close() override; - - void split_stats(const std::string &s, char delim, std::vector &elems); - void get_statistics(Formatter *f) override; - - PerfCounters *get_perf_counters() override - { - return logger; - } - - struct RocksWBHandler: public rocksdb::WriteBatch::Handler { - std::string seen ; - int num_seen = 0; - static string pretty_binary_string(const string& in) { - char buf[10]; - string out; - out.reserve(in.length() * 3); - enum { NONE, HEX, STRING } mode = NONE; - unsigned from = 0, i; - for (i=0; i < in.length(); ++i) { - if ((in[i] < 32 || (unsigned char)in[i] > 126) || - (mode == HEX && in.length() - i >= 4 && - ((in[i] < 32 || (unsigned char)in[i] > 126) || - (in[i+1] < 32 || (unsigned char)in[i+1] > 126) || - (in[i+2] < 32 || (unsigned char)in[i+2] > 126) || - (in[i+3] < 32 || (unsigned char)in[i+3] > 126)))) { - - if (mode == STRING) { - out.append(in.substr(from, i - from)); - out.push_back('\''); - } - if (mode != HEX) { - out.append("0x"); - mode = HEX; - } - if (in.length() - i >= 4) { - // print a whole u32 at once - snprintf(buf, sizeof(buf), "%08x", - (uint32_t)(((unsigned char)in[i] << 24) | - ((unsigned char)in[i+1] << 16) | - ((unsigned char)in[i+2] << 8) | - ((unsigned char)in[i+3] << 0))); - i += 3; - } else { - snprintf(buf, sizeof(buf), "%02x", (int)(unsigned char)in[i]); - } - out.append(buf); - } else { - if (mode != STRING) { - out.push_back('\''); - mode = STRING; - from = i; - } - } - } - if (mode == STRING) { - out.append(in.substr(from, i - from)); - out.push_back('\''); - } - return out; - } - void Put(const rocksdb::Slice& key, - const rocksdb::Slice& value) override { - string prefix ((key.ToString()).substr(0,1)); - string key_to_decode ((key.ToString()).substr(2,string::npos)); - uint64_t size = (value.ToString()).size(); - seen += "\nPut( Prefix = " + prefix + " key = " - + pretty_binary_string(key_to_decode) - + " Value size = " + std::to_string(size) + ")"; - num_seen++; - } - void SingleDelete(const rocksdb::Slice& key) override { - string prefix ((key.ToString()).substr(0,1)); - string key_to_decode ((key.ToString()).substr(2,string::npos)); - seen += "\nSingleDelete(Prefix = "+ prefix + " Key = " - + pretty_binary_string(key_to_decode) + ")"; - num_seen++; - } - void Delete(const rocksdb::Slice& key) override { - string prefix ((key.ToString()).substr(0,1)); - string key_to_decode ((key.ToString()).substr(2,string::npos)); - seen += "\nDelete( Prefix = " + prefix + " key = " - + pretty_binary_string(key_to_decode) + ")"; - - num_seen++; - } - void Merge(const rocksdb::Slice& key, - const rocksdb::Slice& value) override { - string prefix ((key.ToString()).substr(0,1)); - string key_to_decode ((key.ToString()).substr(2,string::npos)); - uint64_t size = (value.ToString()).size(); - seen += "\nMerge( Prefix = " + prefix + " key = " - + pretty_binary_string(key_to_decode) + " Value size = " - + std::to_string(size) + ")"; - - num_seen++; - } - bool Continue() override { return num_seen < 50; } - - }; - - - class RocksDBTransactionImpl : public KeyValueDB::TransactionImpl { - public: - rocksdb::WriteBatch bat; - RocksDBStore *db; - - explicit RocksDBTransactionImpl(RocksDBStore *_db); - void set( - const string &prefix, - const string &k, - const bufferlist &bl) override; - void set( - const string &prefix, - const char *k, - size_t keylen, - const bufferlist &bl) override; - void rmkey( - const string &prefix, - const string &k) override; - void rmkey( - const string &prefix, - const char *k, - size_t keylen) override; - void rm_single_key( - const string &prefix, - const string &k) override; - void rmkeys_by_prefix( - const string &prefix - ) override; - void rm_range_keys( - const string &prefix, - const string &start, - const string &end) override; - void merge( - const string& prefix, - const string& k, - const bufferlist &bl) override; - }; - - KeyValueDB::Transaction get_transaction() override { - return std::make_shared(this); - } - - int submit_transaction(KeyValueDB::Transaction t) override; - int submit_transaction_sync(KeyValueDB::Transaction t) override; - int get( - const string &prefix, - const std::set &key, - std::map *out - ) override; - int get( - const string &prefix, - const string &key, - bufferlist *out - ) override; - int get( - const string &prefix, - const char *key, - size_t keylen, - bufferlist *out) override; - - - class RocksDBWholeSpaceIteratorImpl : - public KeyValueDB::WholeSpaceIteratorImpl { - protected: - rocksdb::Iterator *dbiter; - public: - explicit RocksDBWholeSpaceIteratorImpl(rocksdb::Iterator *iter) : - dbiter(iter) { } - //virtual ~RocksDBWholeSpaceIteratorImpl() { } - ~RocksDBWholeSpaceIteratorImpl() override; - - int seek_to_first() override; - int seek_to_first(const string &prefix) override; - int seek_to_last() override; - int seek_to_last(const string &prefix) override; - int upper_bound(const string &prefix, const string &after) override; - int lower_bound(const string &prefix, const string &to) override; - bool valid() override; - int next() override; - int prev() override; - string key() override; - pair raw_key() override; - bool raw_key_is_prefixed(const string &prefix) override; - bufferlist value() override; - bufferptr value_as_ptr() override; - int status() override; - size_t key_size() override; - size_t value_size() override; - }; - - /// Utility - static string combine_strings(const string &prefix, const string &value) { - string out = prefix; - out.push_back(0); - out.append(value); - return out; - } - static void combine_strings(const string &prefix, - const char *key, size_t keylen, - string *out) { - out->reserve(prefix.size() + 1 + keylen); - *out = prefix; - out->push_back(0); - out->append(key, keylen); - } - - static int split_key(rocksdb::Slice in, string *prefix, string *key); - - static bufferlist to_bufferlist(rocksdb::Slice in) { - bufferlist bl; - bl.append(bufferptr(in.data(), in.size())); - return bl; - } - - static string past_prefix(const string &prefix); - - class MergeOperatorRouter; - friend class MergeOperatorRouter; - int set_merge_operator(const std::string& prefix, - std::shared_ptr mop) override; - string assoc_name; ///< Name of associative operator - - uint64_t get_estimated_size(map &extra) override { - DIR *store_dir = opendir(path.c_str()); - if (!store_dir) { - lderr(cct) << __func__ << " something happened opening the store: " - << cpp_strerror(errno) << dendl; - return 0; - } - - uint64_t total_size = 0; - uint64_t sst_size = 0; - uint64_t log_size = 0; - uint64_t misc_size = 0; - - struct dirent *entry = NULL; - while ((entry = readdir(store_dir)) != NULL) { - string n(entry->d_name); - - if (n == "." || n == "..") - continue; - - string fpath = path + '/' + n; - struct stat s; - int err = stat(fpath.c_str(), &s); - if (err < 0) - err = -errno; - // we may race against rocksdb while reading files; this should only - // happen when those files are being updated, data is being shuffled - // and files get removed, in which case there's not much of a problem - // as we'll get to them next time around. - if (err == -ENOENT) { - continue; - } - if (err < 0) { - lderr(cct) << __func__ << " error obtaining stats for " << fpath - << ": " << cpp_strerror(err) << dendl; - goto err; - } - - size_t pos = n.find_last_of('.'); - if (pos == string::npos) { - misc_size += s.st_size; - continue; - } - - string ext = n.substr(pos+1); - if (ext == "sst") { - sst_size += s.st_size; - } else if (ext == "log") { - log_size += s.st_size; - } else { - misc_size += s.st_size; - } - } - - total_size = sst_size + log_size + misc_size; - - extra["sst"] = sst_size; - extra["log"] = log_size; - extra["misc"] = misc_size; - extra["total"] = total_size; - -err: - closedir(store_dir); - return total_size; - } - - int set_cache_size(uint64_t s) override { - cache_size = s; - set_cache_flag = true; - return 0; - } - -protected: - WholeSpaceIterator _get_iterator() override; -}; - - - -#endif