X-Git-Url: https://gerrit.opnfv.org/gerrit/gitweb?a=blobdiff_plain;f=src%2Fceph%2Fsrc%2Fkv%2FLevelDBStore.h;fp=src%2Fceph%2Fsrc%2Fkv%2FLevelDBStore.h;h=5a3ced9e4c9b9d565f435274c668b70d2d11e118;hb=812ff6ca9fcd3e629e49d4328905f33eee8ca3f5;hp=0000000000000000000000000000000000000000;hpb=15280273faafb77777eab341909a3f495cf248d9;p=stor4nfv.git diff --git a/src/ceph/src/kv/LevelDBStore.h b/src/ceph/src/kv/LevelDBStore.h new file mode 100644 index 0000000..5a3ced9 --- /dev/null +++ b/src/ceph/src/kv/LevelDBStore.h @@ -0,0 +1,413 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab +#ifndef LEVEL_DB_STORE_H +#define LEVEL_DB_STORE_H + +#include "include/types.h" +#include "include/buffer_fwd.h" +#include "KeyValueDB.h" +#include +#include +#include +#include "include/memory.h" +#include +#include "leveldb/db.h" +#include "leveldb/env.h" +#include "leveldb/write_batch.h" +#include "leveldb/slice.h" +#include "leveldb/cache.h" +#ifdef HAVE_LEVELDB_FILTER_POLICY +#include "leveldb/filter_policy.h" +#endif + +#include +#include "common/errno.h" +#include "common/dout.h" +#include "include/assert.h" +#include "common/Formatter.h" +#include "common/Cond.h" + +#include "common/ceph_context.h" + +// reinclude our assert to clobber the system one +# include "include/assert.h" + +class PerfCounters; + +enum { + l_leveldb_first = 34300, + l_leveldb_gets, + l_leveldb_txns, + l_leveldb_get_latency, + l_leveldb_submit_latency, + l_leveldb_submit_sync_latency, + l_leveldb_compact, + l_leveldb_compact_range, + l_leveldb_compact_queue_merge, + l_leveldb_compact_queue_len, + l_leveldb_last, +}; + +extern leveldb::Logger *create_leveldb_ceph_logger(); + +class CephLevelDBLogger; + +/** + * Uses LevelDB to implement the KeyValueDB interface + */ +class LevelDBStore : public KeyValueDB { + CephContext *cct; + PerfCounters *logger; + CephLevelDBLogger *ceph_logger; + string path; + boost::scoped_ptr db_cache; +#ifdef HAVE_LEVELDB_FILTER_POLICY + boost::scoped_ptr filterpolicy; +#endif + boost::scoped_ptr db; + + int do_open(ostream &out, bool create_if_missing); + + // manage async compactions + Mutex compact_queue_lock; + Cond compact_queue_cond; + list< pair > compact_queue; + bool compact_queue_stop; + class CompactThread : public Thread { + LevelDBStore *db; + public: + explicit CompactThread(LevelDBStore *d) : db(d) {} + void *entry() override { + db->compact_thread_entry(); + return NULL; + } + friend class LevelDBStore; + } compact_thread; + + void compact_thread_entry(); + + void compact_range(const string& start, const string& end) { + leveldb::Slice cstart(start); + leveldb::Slice cend(end); + db->CompactRange(&cstart, &cend); + } + void compact_range_async(const string& start, const string& end); + +public: + /// compact the underlying leveldb store + void compact() override; + + /// compact db for all keys with a given prefix + void compact_prefix(const string& prefix) override { + compact_range(prefix, past_prefix(prefix)); + } + void compact_prefix_async(const string& prefix) override { + compact_range_async(prefix, past_prefix(prefix)); + } + void compact_range(const string& prefix, + const string& start, const string& end) override { + compact_range(combine_strings(prefix, start), combine_strings(prefix, end)); + } + void compact_range_async(const string& prefix, + const string& start, const string& end) override { + compact_range_async(combine_strings(prefix, start), + combine_strings(prefix, end)); + } + + + /** + * options_t: Holds options which are minimally interpreted + * on initialization and then passed through to LevelDB. + * We transform a couple of these into actual LevelDB + * structures, but the rest are simply passed through unchanged. See + * leveldb/options.h for more precise details on each. + * + * Set them after constructing the LevelDBStore, but before calling + * open() or create_and_open(). + */ + struct options_t { + uint64_t write_buffer_size; /// in-memory write buffer size + int max_open_files; /// maximum number of files LevelDB can open at once + uint64_t cache_size; /// size of extra decompressed cache to use + uint64_t block_size; /// user data per block + int bloom_size; /// number of bits per entry to put in a bloom filter + bool compression_enabled; /// whether to use libsnappy compression or not + + // don't change these ones. No, seriously + int block_restart_interval; + bool error_if_exists; + bool paranoid_checks; + + string log_file; + + options_t() : + write_buffer_size(0), //< 0 means default + max_open_files(0), //< 0 means default + cache_size(0), //< 0 means no cache (default) + block_size(0), //< 0 means default + bloom_size(0), //< 0 means no bloom filter (default) + compression_enabled(true), //< set to false for no compression + block_restart_interval(0), //< 0 means default + error_if_exists(false), //< set to true if you want to check nonexistence + paranoid_checks(false) //< set to true if you want paranoid checks + {} + } options; + + LevelDBStore(CephContext *c, const string &path) : + cct(c), + logger(NULL), + ceph_logger(NULL), + path(path), + db_cache(NULL), +#ifdef HAVE_LEVELDB_FILTER_POLICY + filterpolicy(NULL), +#endif + compact_queue_lock("LevelDBStore::compact_thread_lock"), + compact_queue_stop(false), + compact_thread(this), + options() + {} + + ~LevelDBStore() override; + + static int _test_init(const string& dir); + int init(string option_str="") override; + + /// Opens underlying db + int open(ostream &out) override { + return do_open(out, false); + } + /// Creates underlying db if missing and opens it + int create_and_open(ostream &out) override { + return do_open(out, true); + } + + void close() override; + + PerfCounters *get_perf_counters() override + { + return logger; + } + + class LevelDBTransactionImpl : public KeyValueDB::TransactionImpl { + public: + leveldb::WriteBatch bat; + LevelDBStore *db; + explicit LevelDBTransactionImpl(LevelDBStore *db) : db(db) {} + void set( + const string &prefix, + const string &k, + const bufferlist &bl) override; + using KeyValueDB::TransactionImpl::set; + void rmkey( + const string &prefix, + const string &k) override; + void rmkeys_by_prefix( + const string &prefix + ) override; + virtual void rm_range_keys( + const string &prefix, + const string &start, + const string &end) override; + + using KeyValueDB::TransactionImpl::rmkey; + }; + + KeyValueDB::Transaction get_transaction() override { + return std::make_shared(this); + } + + int submit_transaction(KeyValueDB::Transaction t) override; + int submit_transaction_sync(KeyValueDB::Transaction t) override; + int get( + const string &prefix, + const std::set &key, + std::map *out + ) override; + + int get(const string &prefix, + const string &key, + bufferlist *value) override; + + using KeyValueDB::get; + + class LevelDBWholeSpaceIteratorImpl : + public KeyValueDB::WholeSpaceIteratorImpl { + protected: + boost::scoped_ptr dbiter; + public: + explicit LevelDBWholeSpaceIteratorImpl(leveldb::Iterator *iter) : + dbiter(iter) { } + ~LevelDBWholeSpaceIteratorImpl() override { } + + int seek_to_first() override { + dbiter->SeekToFirst(); + return dbiter->status().ok() ? 0 : -1; + } + int seek_to_first(const string &prefix) override { + leveldb::Slice slice_prefix(prefix); + dbiter->Seek(slice_prefix); + return dbiter->status().ok() ? 0 : -1; + } + int seek_to_last() override { + dbiter->SeekToLast(); + return dbiter->status().ok() ? 0 : -1; + } + int seek_to_last(const string &prefix) override { + string limit = past_prefix(prefix); + leveldb::Slice slice_limit(limit); + dbiter->Seek(slice_limit); + + if (!dbiter->Valid()) { + dbiter->SeekToLast(); + } else { + dbiter->Prev(); + } + return dbiter->status().ok() ? 0 : -1; + } + int upper_bound(const string &prefix, const string &after) override { + lower_bound(prefix, after); + if (valid()) { + pair key = raw_key(); + if (key.first == prefix && key.second == after) + next(); + } + return dbiter->status().ok() ? 0 : -1; + } + int lower_bound(const string &prefix, const string &to) override { + string bound = combine_strings(prefix, to); + leveldb::Slice slice_bound(bound); + dbiter->Seek(slice_bound); + return dbiter->status().ok() ? 0 : -1; + } + bool valid() override { + return dbiter->Valid(); + } + int next() override { + if (valid()) + dbiter->Next(); + return dbiter->status().ok() ? 0 : -1; + } + int prev() override { + if (valid()) + dbiter->Prev(); + return dbiter->status().ok() ? 0 : -1; + } + string key() override { + string out_key; + split_key(dbiter->key(), 0, &out_key); + return out_key; + } + pair raw_key() override { + string prefix, key; + split_key(dbiter->key(), &prefix, &key); + return make_pair(prefix, key); + } + bool raw_key_is_prefixed(const string &prefix) override { + leveldb::Slice key = dbiter->key(); + if ((key.size() > prefix.length()) && (key[prefix.length()] == '\0')) { + return memcmp(key.data(), prefix.c_str(), prefix.length()) == 0; + } else { + return false; + } + } + bufferlist value() override { + return to_bufferlist(dbiter->value()); + } + + bufferptr value_as_ptr() override { + leveldb::Slice data = dbiter->value(); + return bufferptr(data.data(), data.size()); + } + + int status() override { + return dbiter->status().ok() ? 0 : -1; + } + }; + + /// Utility + static string combine_strings(const string &prefix, const string &value); + static int split_key(leveldb::Slice in, string *prefix, string *key); + static bufferlist to_bufferlist(leveldb::Slice in); + static string past_prefix(const string &prefix) { + string limit = prefix; + limit.push_back(1); + return limit; + } + + uint64_t get_estimated_size(map &extra) override { + DIR *store_dir = opendir(path.c_str()); + if (!store_dir) { + lderr(cct) << __func__ << " something happened opening the store: " + << cpp_strerror(errno) << dendl; + return 0; + } + + uint64_t total_size = 0; + uint64_t sst_size = 0; + uint64_t log_size = 0; + uint64_t misc_size = 0; + + struct dirent *entry = NULL; + while ((entry = readdir(store_dir)) != NULL) { + string n(entry->d_name); + + if (n == "." || n == "..") + continue; + + string fpath = path + '/' + n; + struct stat s; + int err = stat(fpath.c_str(), &s); + if (err < 0) + err = -errno; + // we may race against leveldb while reading files; this should only + // happen when those files are being updated, data is being shuffled + // and files get removed, in which case there's not much of a problem + // as we'll get to them next time around. + if (err == -ENOENT) { + continue; + } + if (err < 0) { + lderr(cct) << __func__ << " error obtaining stats for " << fpath + << ": " << cpp_strerror(err) << dendl; + goto err; + } + + size_t pos = n.find_last_of('.'); + if (pos == string::npos) { + misc_size += s.st_size; + continue; + } + + string ext = n.substr(pos+1); + if (ext == "sst") { + sst_size += s.st_size; + } else if (ext == "log") { + log_size += s.st_size; + } else { + misc_size += s.st_size; + } + } + + total_size = sst_size + log_size + misc_size; + + extra["sst"] = sst_size; + extra["log"] = log_size; + extra["misc"] = misc_size; + extra["total"] = total_size; + +err: + closedir(store_dir); + return total_size; + } + + +protected: + WholeSpaceIterator _get_iterator() override { + return std::make_shared( + db->NewIterator(leveldb::ReadOptions())); + } + +}; + +#endif