X-Git-Url: https://gerrit.opnfv.org/gerrit/gitweb?a=blobdiff_plain;f=src%2Fceph%2Fsrc%2Fkv%2FLevelDBStore.cc;fp=src%2Fceph%2Fsrc%2Fkv%2FLevelDBStore.cc;h=0000000000000000000000000000000000000000;hb=7da45d65be36d36b880cc55c5036e96c24b53f00;hp=25ff7a698d2e32bc56d6682d347373d687c5f731;hpb=691462d09d0987b47e112d6ee8740375df3c51b2;p=stor4nfv.git diff --git a/src/ceph/src/kv/LevelDBStore.cc b/src/ceph/src/kv/LevelDBStore.cc deleted file mode 100644 index 25ff7a6..0000000 --- a/src/ceph/src/kv/LevelDBStore.cc +++ /dev/null @@ -1,403 +0,0 @@ -// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- -// vim: ts=8 sw=2 smarttab -#include "LevelDBStore.h" - -#include -#include -#include -#include - -using std::string; - -#include "include/memory.h" - -#include "common/debug.h" -#include "common/perf_counters.h" - -// re-include our assert to clobber the system one; fix dout: -#include "include/assert.h" - -#define dout_context cct -#define dout_subsys ceph_subsys_leveldb -#undef dout_prefix -#define dout_prefix *_dout << "leveldb: " - -class CephLevelDBLogger : public leveldb::Logger { - CephContext *cct; -public: - explicit CephLevelDBLogger(CephContext *c) : cct(c) { - cct->get(); - } - ~CephLevelDBLogger() override { - cct->put(); - } - - // Write an entry to the log file with the specified format. - void Logv(const char* format, va_list ap) override { - dout(1); - char buf[65536]; - vsnprintf(buf, sizeof(buf), format, ap); - *_dout << buf << dendl; - } -}; - -leveldb::Logger *create_leveldb_ceph_logger() -{ - return new CephLevelDBLogger(g_ceph_context); -} - -int LevelDBStore::init(string option_str) -{ - // init defaults. caller can override these if they want - // prior to calling open. - options.write_buffer_size = g_conf->leveldb_write_buffer_size; - options.cache_size = g_conf->leveldb_cache_size; - options.block_size = g_conf->leveldb_block_size; - options.bloom_size = g_conf->leveldb_bloom_size; - options.compression_enabled = g_conf->leveldb_compression; - options.paranoid_checks = g_conf->leveldb_paranoid; - options.max_open_files = g_conf->leveldb_max_open_files; - options.log_file = g_conf->leveldb_log; - return 0; -} - -int LevelDBStore::do_open(ostream &out, bool create_if_missing) -{ - leveldb::Options ldoptions; - - if (options.write_buffer_size) - ldoptions.write_buffer_size = options.write_buffer_size; - if (options.max_open_files) - ldoptions.max_open_files = options.max_open_files; - if (options.cache_size) { - leveldb::Cache *_db_cache = leveldb::NewLRUCache(options.cache_size); - db_cache.reset(_db_cache); - ldoptions.block_cache = db_cache.get(); - } - if (options.block_size) - ldoptions.block_size = options.block_size; - if (options.bloom_size) { -#ifdef HAVE_LEVELDB_FILTER_POLICY - const leveldb::FilterPolicy *_filterpolicy = - leveldb::NewBloomFilterPolicy(options.bloom_size); - filterpolicy.reset(_filterpolicy); - ldoptions.filter_policy = filterpolicy.get(); -#else - assert(0 == "bloom size set but installed leveldb doesn't support bloom filters"); -#endif - } - if (options.compression_enabled) - ldoptions.compression = leveldb::kSnappyCompression; - else - ldoptions.compression = leveldb::kNoCompression; - if (options.block_restart_interval) - ldoptions.block_restart_interval = options.block_restart_interval; - - ldoptions.error_if_exists = options.error_if_exists; - ldoptions.paranoid_checks = options.paranoid_checks; - ldoptions.create_if_missing = create_if_missing; - - if (g_conf->leveldb_log_to_ceph_log) { - ceph_logger = new CephLevelDBLogger(g_ceph_context); - ldoptions.info_log = ceph_logger; - } - - if (options.log_file.length()) { - leveldb::Env *env = leveldb::Env::Default(); - env->NewLogger(options.log_file, &ldoptions.info_log); - } - - leveldb::DB *_db; - leveldb::Status status = leveldb::DB::Open(ldoptions, path, &_db); - db.reset(_db); - if (!status.ok()) { - out << status.ToString() << std::endl; - return -EINVAL; - } - - PerfCountersBuilder plb(g_ceph_context, "leveldb", l_leveldb_first, l_leveldb_last); - plb.add_u64_counter(l_leveldb_gets, "leveldb_get", "Gets"); - plb.add_u64_counter(l_leveldb_txns, "leveldb_transaction", "Transactions"); - plb.add_time_avg(l_leveldb_get_latency, "leveldb_get_latency", "Get Latency"); - plb.add_time_avg(l_leveldb_submit_latency, "leveldb_submit_latency", "Submit Latency"); - plb.add_time_avg(l_leveldb_submit_sync_latency, "leveldb_submit_sync_latency", "Submit Sync Latency"); - plb.add_u64_counter(l_leveldb_compact, "leveldb_compact", "Compactions"); - plb.add_u64_counter(l_leveldb_compact_range, "leveldb_compact_range", "Compactions by range"); - plb.add_u64_counter(l_leveldb_compact_queue_merge, "leveldb_compact_queue_merge", "Mergings of ranges in compaction queue"); - plb.add_u64(l_leveldb_compact_queue_len, "leveldb_compact_queue_len", "Length of compaction queue"); - logger = plb.create_perf_counters(); - cct->get_perfcounters_collection()->add(logger); - - if (g_conf->leveldb_compact_on_mount) { - derr << "Compacting leveldb store..." << dendl; - compact(); - derr << "Finished compacting leveldb store" << dendl; - } - return 0; -} - -int LevelDBStore::_test_init(const string& dir) -{ - leveldb::Options options; - options.create_if_missing = true; - leveldb::DB *db; - leveldb::Status status = leveldb::DB::Open(options, dir, &db); - delete db; - return status.ok() ? 0 : -EIO; -} - -LevelDBStore::~LevelDBStore() -{ - close(); - delete logger; - - // Ensure db is destroyed before dependent db_cache and filterpolicy - db.reset(); - delete ceph_logger; -} - -void LevelDBStore::close() -{ - // stop compaction thread - compact_queue_lock.Lock(); - if (compact_thread.is_started()) { - compact_queue_stop = true; - compact_queue_cond.Signal(); - compact_queue_lock.Unlock(); - compact_thread.join(); - } else { - compact_queue_lock.Unlock(); - } - - if (logger) - cct->get_perfcounters_collection()->remove(logger); -} - -int LevelDBStore::submit_transaction(KeyValueDB::Transaction t) -{ - utime_t start = ceph_clock_now(); - LevelDBTransactionImpl * _t = - static_cast(t.get()); - leveldb::Status s = db->Write(leveldb::WriteOptions(), &(_t->bat)); - utime_t lat = ceph_clock_now() - start; - logger->inc(l_leveldb_txns); - logger->tinc(l_leveldb_submit_latency, lat); - return s.ok() ? 0 : -1; -} - -int LevelDBStore::submit_transaction_sync(KeyValueDB::Transaction t) -{ - utime_t start = ceph_clock_now(); - LevelDBTransactionImpl * _t = - static_cast(t.get()); - leveldb::WriteOptions options; - options.sync = true; - leveldb::Status s = db->Write(options, &(_t->bat)); - utime_t lat = ceph_clock_now() - start; - logger->inc(l_leveldb_txns); - logger->tinc(l_leveldb_submit_sync_latency, lat); - return s.ok() ? 0 : -1; -} - -void LevelDBStore::LevelDBTransactionImpl::set( - const string &prefix, - const string &k, - const bufferlist &to_set_bl) -{ - string key = combine_strings(prefix, k); - size_t bllen = to_set_bl.length(); - // bufferlist::c_str() is non-constant, so we can't call c_str() - if (to_set_bl.is_contiguous() && bllen > 0) { - // bufferlist contains just one ptr or they're contiguous - bat.Put(leveldb::Slice(key), leveldb::Slice(to_set_bl.buffers().front().c_str(), bllen)); - } else if ((bllen <= 32 * 1024) && (bllen > 0)) { - // 2+ bufferptrs that are not contiguopus - // allocate buffer on stack and copy bl contents to that buffer - // make sure the buffer isn't too large or we might crash here... - char* slicebuf = (char*) alloca(bllen); - leveldb::Slice newslice(slicebuf, bllen); - std::list::const_iterator pb; - for (pb = to_set_bl.buffers().begin(); pb != to_set_bl.buffers().end(); ++pb) { - size_t ptrlen = (*pb).length(); - memcpy((void*)slicebuf, (*pb).c_str(), ptrlen); - slicebuf += ptrlen; - } - bat.Put(leveldb::Slice(key), newslice); - } else { - // 2+ bufferptrs that are not contiguous, and enormous in size - bufferlist val = to_set_bl; - bat.Put(leveldb::Slice(key), leveldb::Slice(val.c_str(), val.length())); - } -} - -void LevelDBStore::LevelDBTransactionImpl::rmkey(const string &prefix, - const string &k) -{ - string key = combine_strings(prefix, k); - bat.Delete(leveldb::Slice(key)); -} - -void LevelDBStore::LevelDBTransactionImpl::rmkeys_by_prefix(const string &prefix) -{ - KeyValueDB::Iterator it = db->get_iterator(prefix); - for (it->seek_to_first(); - it->valid(); - it->next()) { - bat.Delete(leveldb::Slice(combine_strings(prefix, it->key()))); - } -} - -void LevelDBStore::LevelDBTransactionImpl::rm_range_keys(const string &prefix, const string &start, const string &end) -{ - KeyValueDB::Iterator it = db->get_iterator(prefix); - it->lower_bound(start); - while (it->valid()) { - if (it->key() >= end) { - break; - } - bat.Delete(combine_strings(prefix, it->key())); - it->next(); - } -} - -int LevelDBStore::get( - const string &prefix, - const std::set &keys, - std::map *out) -{ - utime_t start = ceph_clock_now(); - for (std::set::const_iterator i = keys.begin(); - i != keys.end(); ++i) { - std::string value; - std::string bound = combine_strings(prefix, *i); - auto status = db->Get(leveldb::ReadOptions(), leveldb::Slice(bound), &value); - if (status.ok()) - (*out)[*i].append(value); - } - utime_t lat = ceph_clock_now() - start; - logger->inc(l_leveldb_gets); - logger->tinc(l_leveldb_get_latency, lat); - return 0; -} - -int LevelDBStore::get(const string &prefix, - const string &key, - bufferlist *out) -{ - assert(out && (out->length() == 0)); - utime_t start = ceph_clock_now(); - int r = 0; - string value, k; - leveldb::Status s; - k = combine_strings(prefix, key); - s = db->Get(leveldb::ReadOptions(), leveldb::Slice(k), &value); - if (s.ok()) { - out->append(value); - } else { - r = -ENOENT; - } - utime_t lat = ceph_clock_now() - start; - logger->inc(l_leveldb_gets); - logger->tinc(l_leveldb_get_latency, lat); - return r; -} - -string LevelDBStore::combine_strings(const string &prefix, const string &value) -{ - string out = prefix; - out.push_back(0); - out.append(value); - return out; -} - -bufferlist LevelDBStore::to_bufferlist(leveldb::Slice in) -{ - bufferlist bl; - bl.append(bufferptr(in.data(), in.size())); - return bl; -} - -int LevelDBStore::split_key(leveldb::Slice in, string *prefix, string *key) -{ - size_t prefix_len = 0; - - // Find separator inside Slice - char* separator = (char*) memchr(in.data(), 0, in.size()); - if (separator == NULL) - return -EINVAL; - prefix_len = size_t(separator - in.data()); - if (prefix_len >= in.size()) - return -EINVAL; - - if (prefix) - *prefix = string(in.data(), prefix_len); - if (key) - *key = string(separator+1, in.size() - prefix_len - 1); - return 0; -} - -void LevelDBStore::compact() -{ - logger->inc(l_leveldb_compact); - db->CompactRange(NULL, NULL); -} - - -void LevelDBStore::compact_thread_entry() -{ - compact_queue_lock.Lock(); - while (!compact_queue_stop) { - while (!compact_queue.empty()) { - pair range = compact_queue.front(); - compact_queue.pop_front(); - logger->set(l_leveldb_compact_queue_len, compact_queue.size()); - compact_queue_lock.Unlock(); - logger->inc(l_leveldb_compact_range); - compact_range(range.first, range.second); - compact_queue_lock.Lock(); - continue; - } - compact_queue_cond.Wait(compact_queue_lock); - } - compact_queue_lock.Unlock(); -} - -void LevelDBStore::compact_range_async(const string& start, const string& end) -{ - Mutex::Locker l(compact_queue_lock); - - // try to merge adjacent ranges. this is O(n), but the queue should - // be short. note that we do not cover all overlap cases and merge - // opportunities here, but we capture the ones we currently need. - list< pair >::iterator p = compact_queue.begin(); - while (p != compact_queue.end()) { - if (p->first == start && p->second == end) { - // dup; no-op - return; - } - if (p->first <= end && p->first > start) { - // merge with existing range to the right - compact_queue.push_back(make_pair(start, p->second)); - compact_queue.erase(p); - logger->inc(l_leveldb_compact_queue_merge); - break; - } - if (p->second >= start && p->second < end) { - // merge with existing range to the left - compact_queue.push_back(make_pair(p->first, end)); - compact_queue.erase(p); - logger->inc(l_leveldb_compact_queue_merge); - break; - } - ++p; - } - if (p == compact_queue.end()) { - // no merge, new entry. - compact_queue.push_back(make_pair(start, end)); - logger->set(l_leveldb_compact_queue_len, compact_queue.size()); - } - compact_queue_cond.Signal(); - if (!compact_thread.is_started()) { - compact_thread.create("levdbst_compact"); - } -}