X-Git-Url: https://gerrit.opnfv.org/gerrit/gitweb?a=blobdiff_plain;f=src%2Fceph%2Fsrc%2Fmon%2FMonitorDBStore.h;fp=src%2Fceph%2Fsrc%2Fmon%2FMonitorDBStore.h;h=0000000000000000000000000000000000000000;hb=7da45d65be36d36b880cc55c5036e96c24b53f00;hp=00e56a9d8fcdb8654e40132ff339611047d8e291;hpb=691462d09d0987b47e112d6ee8740375df3c51b2;p=stor4nfv.git diff --git a/src/ceph/src/mon/MonitorDBStore.h b/src/ceph/src/mon/MonitorDBStore.h deleted file mode 100644 index 00e56a9..0000000 --- a/src/ceph/src/mon/MonitorDBStore.h +++ /dev/null @@ -1,777 +0,0 @@ -// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- -// vim: ts=8 sw=2 smarttab -/* -* Ceph - scalable distributed file system -* -* Copyright (C) 2012 Inktank, Inc. -* -* This is free software; you can redistribute it and/or -* modify it under the terms of the GNU Lesser General Public -* License version 2.1, as published by the Free Software -* Foundation. See file COPYING. -*/ -#ifndef CEPH_MONITOR_DB_STORE_H -#define CEPH_MONITOR_DB_STORE_H - -#include "include/types.h" -#include "include/buffer.h" -#include -#include -#include -#include -#include -#include -#include "kv/KeyValueDB.h" - -#include "include/assert.h" -#include "common/Formatter.h" -#include "common/Finisher.h" -#include "common/errno.h" -#include "common/debug.h" -#include "common/safe_io.h" - -#define dout_context g_ceph_context - -class MonitorDBStore -{ - string path; - boost::scoped_ptr db; - bool do_dump; - int dump_fd_binary; - std::ofstream dump_fd_json; - JSONFormatter dump_fmt; - - - Finisher io_work; - - bool is_open; - - public: - - struct Op { - uint8_t type; - string prefix; - string key, endkey; - bufferlist bl; - - Op() - : type(0) { } - Op(int t, string p, string k) - : type(t), prefix(p), key(k) { } - Op(int t, const string& p, string k, bufferlist& b) - : type(t), prefix(p), key(k), bl(b) { } - Op(int t, const string& p, string start, string end) - : type(t), prefix(p), key(start), endkey(end) { } - - void encode(bufferlist& encode_bl) const { - ENCODE_START(2, 1, encode_bl); - ::encode(type, encode_bl); - ::encode(prefix, encode_bl); - ::encode(key, encode_bl); - ::encode(bl, encode_bl); - ::encode(endkey, encode_bl); - ENCODE_FINISH(encode_bl); - } - - void decode(bufferlist::iterator& decode_bl) { - DECODE_START(2, decode_bl); - ::decode(type, decode_bl); - ::decode(prefix, decode_bl); - ::decode(key, decode_bl); - ::decode(bl, decode_bl); - if (struct_v >= 2) - ::decode(endkey, decode_bl); - DECODE_FINISH(decode_bl); - } - - void dump(Formatter *f) const { - f->dump_int("type", type); - f->dump_string("prefix", prefix); - f->dump_string("key", key); - if (endkey.length()) - f->dump_string("endkey", endkey); - } - - static void generate_test_instances(list& ls) { - ls.push_back(new Op); - // we get coverage here from the Transaction instances - } - }; - - struct Transaction; - typedef ceph::shared_ptr TransactionRef; - struct Transaction { - list ops; - uint64_t bytes, keys; - - Transaction() : bytes(0), keys(0) {} - - enum { - OP_PUT = 1, - OP_ERASE = 2, - OP_COMPACT = 3, - }; - - void put(string prefix, string key, bufferlist& bl) { - ops.push_back(Op(OP_PUT, prefix, key, bl)); - ++keys; - bytes += prefix.length() + key.length() + bl.length(); - } - - void put(string prefix, version_t ver, bufferlist& bl) { - ostringstream os; - os << ver; - put(prefix, os.str(), bl); - } - - void put(string prefix, string key, version_t ver) { - bufferlist bl; - ::encode(ver, bl); - put(prefix, key, bl); - } - - void erase(string prefix, string key) { - ops.push_back(Op(OP_ERASE, prefix, key)); - ++keys; - bytes += prefix.length() + key.length(); - } - - void erase(string prefix, version_t ver) { - ostringstream os; - os << ver; - erase(prefix, os.str()); - } - - void compact_prefix(string prefix) { - ops.push_back(Op(OP_COMPACT, prefix, string())); - } - - void compact_range(string prefix, string start, string end) { - ops.push_back(Op(OP_COMPACT, prefix, start, end)); - } - - void encode(bufferlist& bl) const { - ENCODE_START(2, 1, bl); - ::encode(ops, bl); - ::encode(bytes, bl); - ::encode(keys, bl); - ENCODE_FINISH(bl); - } - - void decode(bufferlist::iterator& bl) { - DECODE_START(2, bl); - ::decode(ops, bl); - if (struct_v >= 2) { - ::decode(bytes, bl); - ::decode(keys, bl); - } - DECODE_FINISH(bl); - } - - static void generate_test_instances(list& ls) { - ls.push_back(new Transaction); - ls.push_back(new Transaction); - bufferlist bl; - bl.append("value"); - ls.back()->put("prefix", "key", bl); - ls.back()->erase("prefix2", "key2"); - ls.back()->compact_prefix("prefix3"); - ls.back()->compact_range("prefix4", "from", "to"); - } - - void append(TransactionRef other) { - ops.splice(ops.end(), other->ops); - keys += other->keys; - bytes += other->bytes; - } - - void append_from_encoded(bufferlist& bl) { - auto other(std::make_shared()); - bufferlist::iterator it = bl.begin(); - other->decode(it); - append(other); - } - - bool empty() { - return (size() == 0); - } - - size_t size() const { - return ops.size(); - } - uint64_t get_keys() const { - return keys; - } - uint64_t get_bytes() const { - return bytes; - } - - void dump(ceph::Formatter *f, bool dump_val=false) const { - f->open_object_section("transaction"); - f->open_array_section("ops"); - list::const_iterator it; - int op_num = 0; - for (it = ops.begin(); it != ops.end(); ++it) { - const Op& op = *it; - f->open_object_section("op"); - f->dump_int("op_num", op_num++); - switch (op.type) { - case OP_PUT: - { - f->dump_string("type", "PUT"); - f->dump_string("prefix", op.prefix); - f->dump_string("key", op.key); - f->dump_unsigned("length", op.bl.length()); - if (dump_val) { - ostringstream os; - op.bl.hexdump(os); - f->dump_string("bl", os.str()); - } - } - break; - case OP_ERASE: - { - f->dump_string("type", "ERASE"); - f->dump_string("prefix", op.prefix); - f->dump_string("key", op.key); - } - break; - case OP_COMPACT: - { - f->dump_string("type", "COMPACT"); - f->dump_string("prefix", op.prefix); - f->dump_string("start", op.key); - f->dump_string("end", op.endkey); - } - break; - default: - { - f->dump_string("type", "unknown"); - f->dump_unsigned("op_code", op.type); - break; - } - } - f->close_section(); - } - f->close_section(); - f->dump_unsigned("num_keys", keys); - f->dump_unsigned("num_bytes", bytes); - f->close_section(); - } - }; - - int apply_transaction(MonitorDBStore::TransactionRef t) { - KeyValueDB::Transaction dbt = db->get_transaction(); - - if (do_dump) { - if (!g_conf->mon_debug_dump_json) { - bufferlist bl; - t->encode(bl); - bl.write_fd(dump_fd_binary); - } else { - t->dump(&dump_fmt, true); - dump_fmt.flush(dump_fd_json); - dump_fd_json.flush(); - } - } - - list > > compact; - for (list::const_iterator it = t->ops.begin(); - it != t->ops.end(); - ++it) { - const Op& op = *it; - switch (op.type) { - case Transaction::OP_PUT: - dbt->set(op.prefix, op.key, op.bl); - break; - case Transaction::OP_ERASE: - dbt->rmkey(op.prefix, op.key); - break; - case Transaction::OP_COMPACT: - compact.push_back(make_pair(op.prefix, make_pair(op.key, op.endkey))); - break; - default: - derr << __func__ << " unknown op type " << op.type << dendl; - ceph_abort(); - break; - } - } - int r = db->submit_transaction_sync(dbt); - if (r >= 0) { - while (!compact.empty()) { - if (compact.front().second.first == string() && - compact.front().second.second == string()) - db->compact_prefix_async(compact.front().first); - else - db->compact_range_async(compact.front().first, compact.front().second.first, compact.front().second.second); - compact.pop_front(); - } - } else { - assert(0 == "failed to write to db"); - } - return r; - } - - struct C_DoTransaction : public Context { - MonitorDBStore *store; - MonitorDBStore::TransactionRef t; - Context *oncommit; - C_DoTransaction(MonitorDBStore *s, MonitorDBStore::TransactionRef t, - Context *f) - : store(s), t(t), oncommit(f) - {} - void finish(int r) override { - /* The store serializes writes. Each transaction is handled - * sequentially by the io_work Finisher. If a transaction takes longer - * to apply its state to permanent storage, then no other transaction - * will be handled meanwhile. - * - * We will now randomly inject random delays. We can safely sleep prior - * to applying the transaction as it won't break the model. - */ - double delay_prob = g_conf->mon_inject_transaction_delay_probability; - if (delay_prob && (rand() % 10000 < delay_prob * 10000.0)) { - utime_t delay; - double delay_max = g_conf->mon_inject_transaction_delay_max; - delay.set_from_double(delay_max * (double)(rand() % 10000) / 10000.0); - lsubdout(g_ceph_context, mon, 1) - << "apply_transaction will be delayed for " << delay - << " seconds" << dendl; - delay.sleep(); - } - int ret = store->apply_transaction(t); - oncommit->complete(ret); - } - }; - - /** - * queue transaction - * - * Queue a transaction to commit asynchronously. Trigger a context - * on completion (without any locks held). - */ - void queue_transaction(MonitorDBStore::TransactionRef t, - Context *oncommit) { - io_work.queue(new C_DoTransaction(this, t, oncommit)); - } - - /** - * block and flush all io activity - */ - void flush() { - io_work.wait_for_empty(); - } - - class StoreIteratorImpl { - protected: - bool done; - pair last_key; - bufferlist crc_bl; - - StoreIteratorImpl() : done(false) { } - virtual ~StoreIteratorImpl() { } - - bool add_chunk_entry(TransactionRef tx, - string &prefix, - string &key, - bufferlist &value, - uint64_t max) { - auto tmp(std::make_shared()); - bufferlist tmp_bl; - tmp->put(prefix, key, value); - tmp->encode(tmp_bl); - - bufferlist tx_bl; - tx->encode(tx_bl); - - size_t len = tx_bl.length() + tmp_bl.length(); - - if (!tx->empty() && (len > max)) { - return false; - } - - tx->append(tmp); - last_key.first = prefix; - last_key.second = key; - - if (g_conf->mon_sync_debug) { - ::encode(prefix, crc_bl); - ::encode(key, crc_bl); - ::encode(value, crc_bl); - } - - return true; - } - - virtual bool _is_valid() = 0; - - public: - __u32 crc() { - if (g_conf->mon_sync_debug) - return crc_bl.crc32c(0); - return 0; - } - pair get_last_key() { - return last_key; - } - virtual bool has_next_chunk() { - return !done && _is_valid(); - } - virtual void get_chunk_tx(TransactionRef tx, uint64_t max) = 0; - virtual pair get_next_key() = 0; - }; - typedef ceph::shared_ptr Synchronizer; - - class WholeStoreIteratorImpl : public StoreIteratorImpl { - KeyValueDB::WholeSpaceIterator iter; - set sync_prefixes; - - public: - WholeStoreIteratorImpl(KeyValueDB::WholeSpaceIterator iter, - set &prefixes) - : StoreIteratorImpl(), - iter(iter), - sync_prefixes(prefixes) - { } - - ~WholeStoreIteratorImpl() override { } - - /** - * Obtain a chunk of the store - * - * @param bl Encoded transaction that will recreate the chunk - * @param first_key Pair containing the first key to obtain, and that - * will contain the first key in the chunk (that may - * differ from the one passed on to the function) - * @param last_key[out] Last key in the chunk - */ - void get_chunk_tx(TransactionRef tx, uint64_t max) override { - assert(done == false); - assert(iter->valid() == true); - - while (iter->valid()) { - string prefix(iter->raw_key().first); - string key(iter->raw_key().second); - if (sync_prefixes.count(prefix)) { - bufferlist value = iter->value(); - if (!add_chunk_entry(tx, prefix, key, value, max)) - return; - } - iter->next(); - } - assert(iter->valid() == false); - done = true; - } - - pair get_next_key() override { - assert(iter->valid()); - - for (; iter->valid(); iter->next()) { - pair r = iter->raw_key(); - if (sync_prefixes.count(r.first) > 0) { - iter->next(); - return r; - } - } - return pair(); - } - - bool _is_valid() override { - return iter->valid(); - } - }; - - Synchronizer get_synchronizer(pair &key, - set &prefixes) { - KeyValueDB::WholeSpaceIterator iter; - iter = db->get_iterator(); - - if (!key.first.empty() && !key.second.empty()) - iter->upper_bound(key.first, key.second); - else - iter->seek_to_first(); - - return ceph::shared_ptr( - new WholeStoreIteratorImpl(iter, prefixes) - ); - } - - KeyValueDB::Iterator get_iterator(const string &prefix) { - assert(!prefix.empty()); - KeyValueDB::Iterator iter = db->get_iterator(prefix); - iter->seek_to_first(); - return iter; - } - - KeyValueDB::WholeSpaceIterator get_iterator() { - KeyValueDB::WholeSpaceIterator iter; - iter = db->get_iterator(); - iter->seek_to_first(); - return iter; - } - - int get(const string& prefix, const string& key, bufferlist& bl) { - assert(bl.length() == 0); - return db->get(prefix, key, &bl); - } - - int get(const string& prefix, const version_t ver, bufferlist& bl) { - ostringstream os; - os << ver; - return get(prefix, os.str(), bl); - } - - version_t get(const string& prefix, const string& key) { - bufferlist bl; - int err = get(prefix, key, bl); - if (err < 0) { - if (err == -ENOENT) // if key doesn't exist, assume its value is 0 - return 0; - // we're not expecting any other negative return value, and we can't - // just return a negative value if we're returning a version_t - generic_dout(0) << "MonitorDBStore::get() error obtaining" - << " (" << prefix << ":" << key << "): " - << cpp_strerror(err) << dendl; - assert(0 == "error obtaining key"); - } - - assert(bl.length()); - version_t ver; - bufferlist::iterator p = bl.begin(); - ::decode(ver, p); - return ver; - } - - bool exists(const string& prefix, const string& key) { - KeyValueDB::Iterator it = db->get_iterator(prefix); - int err = it->lower_bound(key); - if (err < 0) - return false; - - return (it->valid() && it->key() == key); - } - - bool exists(const string& prefix, version_t ver) { - ostringstream os; - os << ver; - return exists(prefix, os.str()); - } - - string combine_strings(const string& prefix, const string& value) { - string out = prefix; - out.push_back('_'); - out.append(value); - return out; - } - - string combine_strings(const string& prefix, const version_t ver) { - ostringstream os; - os << ver; - return combine_strings(prefix, os.str()); - } - - void clear(set& prefixes) { - set::iterator iter; - KeyValueDB::Transaction dbt = db->get_transaction(); - - for (iter = prefixes.begin(); iter != prefixes.end(); ++iter) { - dbt->rmkeys_by_prefix((*iter)); - } - int r = db->submit_transaction_sync(dbt); - assert(r >= 0); - } - - void _open(string kv_type) { - string::const_reverse_iterator rit; - int pos = 0; - for (rit = path.rbegin(); rit != path.rend(); ++rit, ++pos) { - if (*rit != '/') - break; - } - ostringstream os; - os << path.substr(0, path.size() - pos) << "/store.db"; - string full_path = os.str(); - - KeyValueDB *db_ptr = KeyValueDB::create(g_ceph_context, - kv_type, - full_path); - if (!db_ptr) { - derr << __func__ << " error initializing " - << kv_type << " db back storage in " - << full_path << dendl; - assert(0 == "MonitorDBStore: error initializing keyvaluedb back storage"); - } - db.reset(db_ptr); - - if (g_conf->mon_debug_dump_transactions) { - if (!g_conf->mon_debug_dump_json) { - dump_fd_binary = ::open( - g_conf->mon_debug_dump_location.c_str(), - O_CREAT|O_APPEND|O_WRONLY, 0644); - if (dump_fd_binary < 0) { - dump_fd_binary = -errno; - derr << "Could not open log file, got " - << cpp_strerror(dump_fd_binary) << dendl; - } - } else { - dump_fmt.reset(); - dump_fmt.open_array_section("dump"); - dump_fd_json.open(g_conf->mon_debug_dump_location.c_str()); - } - do_dump = true; - } - if (kv_type == "rocksdb") - db->init(g_conf->mon_rocksdb_options); - else - db->init(); - - - } - - int open(ostream &out) { - string kv_type; - int r = read_meta("kv_backend", &kv_type); - if (r < 0 || kv_type.empty()) { - // assume old monitors that did not mark the type were leveldb. - kv_type = "leveldb"; - r = write_meta("kv_backend", kv_type); - if (r < 0) - return r; - } - _open(kv_type); - r = db->open(out); - if (r < 0) - return r; - - // Monitors are few in number, so the resource cost of exposing - // very detailed stats is low: ramp up the priority of all the - // KV store's perf counters. Do this after open, because backend may - // not have constructed PerfCounters earlier. - if (db->get_perf_counters()) { - db->get_perf_counters()->set_prio_adjust( - PerfCountersBuilder::PRIO_USEFUL - PerfCountersBuilder::PRIO_DEBUGONLY); - } - - io_work.start(); - is_open = true; - return 0; - } - - int create_and_open(ostream &out) { - // record the type before open - string kv_type; - int r = read_meta("kv_backend", &kv_type); - if (r < 0) { - kv_type = g_conf->mon_keyvaluedb; - r = write_meta("kv_backend", kv_type); - if (r < 0) - return r; - } - _open(kv_type); - r = db->create_and_open(out); - if (r < 0) - return r; - io_work.start(); - is_open = true; - return 0; - } - - void close() { - // there should be no work queued! - io_work.stop(); - is_open = false; - db.reset(NULL); - } - - void compact() { - db->compact(); - } - - void compact_prefix(const string& prefix) { - db->compact_prefix(prefix); - } - - uint64_t get_estimated_size(map &extras) { - return db->get_estimated_size(extras); - } - - /** - * write_meta - write a simple configuration key out-of-band - * - * Write a simple key/value pair for basic store configuration - * (e.g., a uuid or magic number) to an unopened/unmounted store. - * The default implementation writes this to a plaintext file in the - * path. - * - * A newline is appended. - * - * @param key key name (e.g., "fsid") - * @param value value (e.g., a uuid rendered as a string) - * @returns 0 for success, or an error code - */ - int write_meta(const std::string& key, - const std::string& value) const { - string v = value; - v += "\n"; - int r = safe_write_file(path.c_str(), key.c_str(), - v.c_str(), v.length()); - if (r < 0) - return r; - return 0; - } - - /** - * read_meta - read a simple configuration key out-of-band - * - * Read a simple key value to an unopened/mounted store. - * - * Trailing whitespace is stripped off. - * - * @param key key name - * @param value pointer to value string - * @returns 0 for success, or an error code - */ - int read_meta(const std::string& key, - std::string *value) const { - char buf[4096]; - int r = safe_read_file(path.c_str(), key.c_str(), - buf, sizeof(buf)); - if (r <= 0) - return r; - // drop trailing newlines - while (r && isspace(buf[r-1])) { - --r; - } - *value = string(buf, r); - return 0; - } - - explicit MonitorDBStore(const string& path) - : path(path), - db(0), - do_dump(false), - dump_fd_binary(-1), - dump_fmt(true), - io_work(g_ceph_context, "monstore", "fn_monstore"), - is_open(false) { - } - ~MonitorDBStore() { - assert(!is_open); - if (do_dump) { - if (!g_conf->mon_debug_dump_json) { - ::close(dump_fd_binary); - } else { - dump_fmt.close_section(); - dump_fmt.flush(dump_fd_json); - dump_fd_json.flush(); - dump_fd_json.close(); - } - } - } - -}; - -WRITE_CLASS_ENCODER(MonitorDBStore::Op) -WRITE_CLASS_ENCODER(MonitorDBStore::Transaction) - -#endif /* CEPH_MONITOR_DB_STORE_H */