X-Git-Url: https://gerrit.opnfv.org/gerrit/gitweb?a=blobdiff_plain;f=src%2Fceph%2Fsrc%2Fmon%2FMonitorDBStore.h;fp=src%2Fceph%2Fsrc%2Fmon%2FMonitorDBStore.h;h=00e56a9d8fcdb8654e40132ff339611047d8e291;hb=812ff6ca9fcd3e629e49d4328905f33eee8ca3f5;hp=0000000000000000000000000000000000000000;hpb=15280273faafb77777eab341909a3f495cf248d9;p=stor4nfv.git diff --git a/src/ceph/src/mon/MonitorDBStore.h b/src/ceph/src/mon/MonitorDBStore.h new file mode 100644 index 0000000..00e56a9 --- /dev/null +++ b/src/ceph/src/mon/MonitorDBStore.h @@ -0,0 +1,777 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab +/* +* Ceph - scalable distributed file system +* +* Copyright (C) 2012 Inktank, Inc. +* +* This is free software; you can redistribute it and/or +* modify it under the terms of the GNU Lesser General Public +* License version 2.1, as published by the Free Software +* Foundation. See file COPYING. +*/ +#ifndef CEPH_MONITOR_DB_STORE_H +#define CEPH_MONITOR_DB_STORE_H + +#include "include/types.h" +#include "include/buffer.h" +#include +#include +#include +#include +#include +#include +#include "kv/KeyValueDB.h" + +#include "include/assert.h" +#include "common/Formatter.h" +#include "common/Finisher.h" +#include "common/errno.h" +#include "common/debug.h" +#include "common/safe_io.h" + +#define dout_context g_ceph_context + +class MonitorDBStore +{ + string path; + boost::scoped_ptr db; + bool do_dump; + int dump_fd_binary; + std::ofstream dump_fd_json; + JSONFormatter dump_fmt; + + + Finisher io_work; + + bool is_open; + + public: + + struct Op { + uint8_t type; + string prefix; + string key, endkey; + bufferlist bl; + + Op() + : type(0) { } + Op(int t, string p, string k) + : type(t), prefix(p), key(k) { } + Op(int t, const string& p, string k, bufferlist& b) + : type(t), prefix(p), key(k), bl(b) { } + Op(int t, const string& p, string start, string end) + : type(t), prefix(p), key(start), endkey(end) { } + + void encode(bufferlist& encode_bl) const { + ENCODE_START(2, 1, encode_bl); + ::encode(type, encode_bl); + ::encode(prefix, encode_bl); + ::encode(key, encode_bl); + ::encode(bl, encode_bl); + ::encode(endkey, encode_bl); + ENCODE_FINISH(encode_bl); + } + + void decode(bufferlist::iterator& decode_bl) { + DECODE_START(2, decode_bl); + ::decode(type, decode_bl); + ::decode(prefix, decode_bl); + ::decode(key, decode_bl); + ::decode(bl, decode_bl); + if (struct_v >= 2) + ::decode(endkey, decode_bl); + DECODE_FINISH(decode_bl); + } + + void dump(Formatter *f) const { + f->dump_int("type", type); + f->dump_string("prefix", prefix); + f->dump_string("key", key); + if (endkey.length()) + f->dump_string("endkey", endkey); + } + + static void generate_test_instances(list& ls) { + ls.push_back(new Op); + // we get coverage here from the Transaction instances + } + }; + + struct Transaction; + typedef ceph::shared_ptr TransactionRef; + struct Transaction { + list ops; + uint64_t bytes, keys; + + Transaction() : bytes(0), keys(0) {} + + enum { + OP_PUT = 1, + OP_ERASE = 2, + OP_COMPACT = 3, + }; + + void put(string prefix, string key, bufferlist& bl) { + ops.push_back(Op(OP_PUT, prefix, key, bl)); + ++keys; + bytes += prefix.length() + key.length() + bl.length(); + } + + void put(string prefix, version_t ver, bufferlist& bl) { + ostringstream os; + os << ver; + put(prefix, os.str(), bl); + } + + void put(string prefix, string key, version_t ver) { + bufferlist bl; + ::encode(ver, bl); + put(prefix, key, bl); + } + + void erase(string prefix, string key) { + ops.push_back(Op(OP_ERASE, prefix, key)); + ++keys; + bytes += prefix.length() + key.length(); + } + + void erase(string prefix, version_t ver) { + ostringstream os; + os << ver; + erase(prefix, os.str()); + } + + void compact_prefix(string prefix) { + ops.push_back(Op(OP_COMPACT, prefix, string())); + } + + void compact_range(string prefix, string start, string end) { + ops.push_back(Op(OP_COMPACT, prefix, start, end)); + } + + void encode(bufferlist& bl) const { + ENCODE_START(2, 1, bl); + ::encode(ops, bl); + ::encode(bytes, bl); + ::encode(keys, bl); + ENCODE_FINISH(bl); + } + + void decode(bufferlist::iterator& bl) { + DECODE_START(2, bl); + ::decode(ops, bl); + if (struct_v >= 2) { + ::decode(bytes, bl); + ::decode(keys, bl); + } + DECODE_FINISH(bl); + } + + static void generate_test_instances(list& ls) { + ls.push_back(new Transaction); + ls.push_back(new Transaction); + bufferlist bl; + bl.append("value"); + ls.back()->put("prefix", "key", bl); + ls.back()->erase("prefix2", "key2"); + ls.back()->compact_prefix("prefix3"); + ls.back()->compact_range("prefix4", "from", "to"); + } + + void append(TransactionRef other) { + ops.splice(ops.end(), other->ops); + keys += other->keys; + bytes += other->bytes; + } + + void append_from_encoded(bufferlist& bl) { + auto other(std::make_shared()); + bufferlist::iterator it = bl.begin(); + other->decode(it); + append(other); + } + + bool empty() { + return (size() == 0); + } + + size_t size() const { + return ops.size(); + } + uint64_t get_keys() const { + return keys; + } + uint64_t get_bytes() const { + return bytes; + } + + void dump(ceph::Formatter *f, bool dump_val=false) const { + f->open_object_section("transaction"); + f->open_array_section("ops"); + list::const_iterator it; + int op_num = 0; + for (it = ops.begin(); it != ops.end(); ++it) { + const Op& op = *it; + f->open_object_section("op"); + f->dump_int("op_num", op_num++); + switch (op.type) { + case OP_PUT: + { + f->dump_string("type", "PUT"); + f->dump_string("prefix", op.prefix); + f->dump_string("key", op.key); + f->dump_unsigned("length", op.bl.length()); + if (dump_val) { + ostringstream os; + op.bl.hexdump(os); + f->dump_string("bl", os.str()); + } + } + break; + case OP_ERASE: + { + f->dump_string("type", "ERASE"); + f->dump_string("prefix", op.prefix); + f->dump_string("key", op.key); + } + break; + case OP_COMPACT: + { + f->dump_string("type", "COMPACT"); + f->dump_string("prefix", op.prefix); + f->dump_string("start", op.key); + f->dump_string("end", op.endkey); + } + break; + default: + { + f->dump_string("type", "unknown"); + f->dump_unsigned("op_code", op.type); + break; + } + } + f->close_section(); + } + f->close_section(); + f->dump_unsigned("num_keys", keys); + f->dump_unsigned("num_bytes", bytes); + f->close_section(); + } + }; + + int apply_transaction(MonitorDBStore::TransactionRef t) { + KeyValueDB::Transaction dbt = db->get_transaction(); + + if (do_dump) { + if (!g_conf->mon_debug_dump_json) { + bufferlist bl; + t->encode(bl); + bl.write_fd(dump_fd_binary); + } else { + t->dump(&dump_fmt, true); + dump_fmt.flush(dump_fd_json); + dump_fd_json.flush(); + } + } + + list > > compact; + for (list::const_iterator it = t->ops.begin(); + it != t->ops.end(); + ++it) { + const Op& op = *it; + switch (op.type) { + case Transaction::OP_PUT: + dbt->set(op.prefix, op.key, op.bl); + break; + case Transaction::OP_ERASE: + dbt->rmkey(op.prefix, op.key); + break; + case Transaction::OP_COMPACT: + compact.push_back(make_pair(op.prefix, make_pair(op.key, op.endkey))); + break; + default: + derr << __func__ << " unknown op type " << op.type << dendl; + ceph_abort(); + break; + } + } + int r = db->submit_transaction_sync(dbt); + if (r >= 0) { + while (!compact.empty()) { + if (compact.front().second.first == string() && + compact.front().second.second == string()) + db->compact_prefix_async(compact.front().first); + else + db->compact_range_async(compact.front().first, compact.front().second.first, compact.front().second.second); + compact.pop_front(); + } + } else { + assert(0 == "failed to write to db"); + } + return r; + } + + struct C_DoTransaction : public Context { + MonitorDBStore *store; + MonitorDBStore::TransactionRef t; + Context *oncommit; + C_DoTransaction(MonitorDBStore *s, MonitorDBStore::TransactionRef t, + Context *f) + : store(s), t(t), oncommit(f) + {} + void finish(int r) override { + /* The store serializes writes. Each transaction is handled + * sequentially by the io_work Finisher. If a transaction takes longer + * to apply its state to permanent storage, then no other transaction + * will be handled meanwhile. + * + * We will now randomly inject random delays. We can safely sleep prior + * to applying the transaction as it won't break the model. + */ + double delay_prob = g_conf->mon_inject_transaction_delay_probability; + if (delay_prob && (rand() % 10000 < delay_prob * 10000.0)) { + utime_t delay; + double delay_max = g_conf->mon_inject_transaction_delay_max; + delay.set_from_double(delay_max * (double)(rand() % 10000) / 10000.0); + lsubdout(g_ceph_context, mon, 1) + << "apply_transaction will be delayed for " << delay + << " seconds" << dendl; + delay.sleep(); + } + int ret = store->apply_transaction(t); + oncommit->complete(ret); + } + }; + + /** + * queue transaction + * + * Queue a transaction to commit asynchronously. Trigger a context + * on completion (without any locks held). + */ + void queue_transaction(MonitorDBStore::TransactionRef t, + Context *oncommit) { + io_work.queue(new C_DoTransaction(this, t, oncommit)); + } + + /** + * block and flush all io activity + */ + void flush() { + io_work.wait_for_empty(); + } + + class StoreIteratorImpl { + protected: + bool done; + pair last_key; + bufferlist crc_bl; + + StoreIteratorImpl() : done(false) { } + virtual ~StoreIteratorImpl() { } + + bool add_chunk_entry(TransactionRef tx, + string &prefix, + string &key, + bufferlist &value, + uint64_t max) { + auto tmp(std::make_shared()); + bufferlist tmp_bl; + tmp->put(prefix, key, value); + tmp->encode(tmp_bl); + + bufferlist tx_bl; + tx->encode(tx_bl); + + size_t len = tx_bl.length() + tmp_bl.length(); + + if (!tx->empty() && (len > max)) { + return false; + } + + tx->append(tmp); + last_key.first = prefix; + last_key.second = key; + + if (g_conf->mon_sync_debug) { + ::encode(prefix, crc_bl); + ::encode(key, crc_bl); + ::encode(value, crc_bl); + } + + return true; + } + + virtual bool _is_valid() = 0; + + public: + __u32 crc() { + if (g_conf->mon_sync_debug) + return crc_bl.crc32c(0); + return 0; + } + pair get_last_key() { + return last_key; + } + virtual bool has_next_chunk() { + return !done && _is_valid(); + } + virtual void get_chunk_tx(TransactionRef tx, uint64_t max) = 0; + virtual pair get_next_key() = 0; + }; + typedef ceph::shared_ptr Synchronizer; + + class WholeStoreIteratorImpl : public StoreIteratorImpl { + KeyValueDB::WholeSpaceIterator iter; + set sync_prefixes; + + public: + WholeStoreIteratorImpl(KeyValueDB::WholeSpaceIterator iter, + set &prefixes) + : StoreIteratorImpl(), + iter(iter), + sync_prefixes(prefixes) + { } + + ~WholeStoreIteratorImpl() override { } + + /** + * Obtain a chunk of the store + * + * @param bl Encoded transaction that will recreate the chunk + * @param first_key Pair containing the first key to obtain, and that + * will contain the first key in the chunk (that may + * differ from the one passed on to the function) + * @param last_key[out] Last key in the chunk + */ + void get_chunk_tx(TransactionRef tx, uint64_t max) override { + assert(done == false); + assert(iter->valid() == true); + + while (iter->valid()) { + string prefix(iter->raw_key().first); + string key(iter->raw_key().second); + if (sync_prefixes.count(prefix)) { + bufferlist value = iter->value(); + if (!add_chunk_entry(tx, prefix, key, value, max)) + return; + } + iter->next(); + } + assert(iter->valid() == false); + done = true; + } + + pair get_next_key() override { + assert(iter->valid()); + + for (; iter->valid(); iter->next()) { + pair r = iter->raw_key(); + if (sync_prefixes.count(r.first) > 0) { + iter->next(); + return r; + } + } + return pair(); + } + + bool _is_valid() override { + return iter->valid(); + } + }; + + Synchronizer get_synchronizer(pair &key, + set &prefixes) { + KeyValueDB::WholeSpaceIterator iter; + iter = db->get_iterator(); + + if (!key.first.empty() && !key.second.empty()) + iter->upper_bound(key.first, key.second); + else + iter->seek_to_first(); + + return ceph::shared_ptr( + new WholeStoreIteratorImpl(iter, prefixes) + ); + } + + KeyValueDB::Iterator get_iterator(const string &prefix) { + assert(!prefix.empty()); + KeyValueDB::Iterator iter = db->get_iterator(prefix); + iter->seek_to_first(); + return iter; + } + + KeyValueDB::WholeSpaceIterator get_iterator() { + KeyValueDB::WholeSpaceIterator iter; + iter = db->get_iterator(); + iter->seek_to_first(); + return iter; + } + + int get(const string& prefix, const string& key, bufferlist& bl) { + assert(bl.length() == 0); + return db->get(prefix, key, &bl); + } + + int get(const string& prefix, const version_t ver, bufferlist& bl) { + ostringstream os; + os << ver; + return get(prefix, os.str(), bl); + } + + version_t get(const string& prefix, const string& key) { + bufferlist bl; + int err = get(prefix, key, bl); + if (err < 0) { + if (err == -ENOENT) // if key doesn't exist, assume its value is 0 + return 0; + // we're not expecting any other negative return value, and we can't + // just return a negative value if we're returning a version_t + generic_dout(0) << "MonitorDBStore::get() error obtaining" + << " (" << prefix << ":" << key << "): " + << cpp_strerror(err) << dendl; + assert(0 == "error obtaining key"); + } + + assert(bl.length()); + version_t ver; + bufferlist::iterator p = bl.begin(); + ::decode(ver, p); + return ver; + } + + bool exists(const string& prefix, const string& key) { + KeyValueDB::Iterator it = db->get_iterator(prefix); + int err = it->lower_bound(key); + if (err < 0) + return false; + + return (it->valid() && it->key() == key); + } + + bool exists(const string& prefix, version_t ver) { + ostringstream os; + os << ver; + return exists(prefix, os.str()); + } + + string combine_strings(const string& prefix, const string& value) { + string out = prefix; + out.push_back('_'); + out.append(value); + return out; + } + + string combine_strings(const string& prefix, const version_t ver) { + ostringstream os; + os << ver; + return combine_strings(prefix, os.str()); + } + + void clear(set& prefixes) { + set::iterator iter; + KeyValueDB::Transaction dbt = db->get_transaction(); + + for (iter = prefixes.begin(); iter != prefixes.end(); ++iter) { + dbt->rmkeys_by_prefix((*iter)); + } + int r = db->submit_transaction_sync(dbt); + assert(r >= 0); + } + + void _open(string kv_type) { + string::const_reverse_iterator rit; + int pos = 0; + for (rit = path.rbegin(); rit != path.rend(); ++rit, ++pos) { + if (*rit != '/') + break; + } + ostringstream os; + os << path.substr(0, path.size() - pos) << "/store.db"; + string full_path = os.str(); + + KeyValueDB *db_ptr = KeyValueDB::create(g_ceph_context, + kv_type, + full_path); + if (!db_ptr) { + derr << __func__ << " error initializing " + << kv_type << " db back storage in " + << full_path << dendl; + assert(0 == "MonitorDBStore: error initializing keyvaluedb back storage"); + } + db.reset(db_ptr); + + if (g_conf->mon_debug_dump_transactions) { + if (!g_conf->mon_debug_dump_json) { + dump_fd_binary = ::open( + g_conf->mon_debug_dump_location.c_str(), + O_CREAT|O_APPEND|O_WRONLY, 0644); + if (dump_fd_binary < 0) { + dump_fd_binary = -errno; + derr << "Could not open log file, got " + << cpp_strerror(dump_fd_binary) << dendl; + } + } else { + dump_fmt.reset(); + dump_fmt.open_array_section("dump"); + dump_fd_json.open(g_conf->mon_debug_dump_location.c_str()); + } + do_dump = true; + } + if (kv_type == "rocksdb") + db->init(g_conf->mon_rocksdb_options); + else + db->init(); + + + } + + int open(ostream &out) { + string kv_type; + int r = read_meta("kv_backend", &kv_type); + if (r < 0 || kv_type.empty()) { + // assume old monitors that did not mark the type were leveldb. + kv_type = "leveldb"; + r = write_meta("kv_backend", kv_type); + if (r < 0) + return r; + } + _open(kv_type); + r = db->open(out); + if (r < 0) + return r; + + // Monitors are few in number, so the resource cost of exposing + // very detailed stats is low: ramp up the priority of all the + // KV store's perf counters. Do this after open, because backend may + // not have constructed PerfCounters earlier. + if (db->get_perf_counters()) { + db->get_perf_counters()->set_prio_adjust( + PerfCountersBuilder::PRIO_USEFUL - PerfCountersBuilder::PRIO_DEBUGONLY); + } + + io_work.start(); + is_open = true; + return 0; + } + + int create_and_open(ostream &out) { + // record the type before open + string kv_type; + int r = read_meta("kv_backend", &kv_type); + if (r < 0) { + kv_type = g_conf->mon_keyvaluedb; + r = write_meta("kv_backend", kv_type); + if (r < 0) + return r; + } + _open(kv_type); + r = db->create_and_open(out); + if (r < 0) + return r; + io_work.start(); + is_open = true; + return 0; + } + + void close() { + // there should be no work queued! + io_work.stop(); + is_open = false; + db.reset(NULL); + } + + void compact() { + db->compact(); + } + + void compact_prefix(const string& prefix) { + db->compact_prefix(prefix); + } + + uint64_t get_estimated_size(map &extras) { + return db->get_estimated_size(extras); + } + + /** + * write_meta - write a simple configuration key out-of-band + * + * Write a simple key/value pair for basic store configuration + * (e.g., a uuid or magic number) to an unopened/unmounted store. + * The default implementation writes this to a plaintext file in the + * path. + * + * A newline is appended. + * + * @param key key name (e.g., "fsid") + * @param value value (e.g., a uuid rendered as a string) + * @returns 0 for success, or an error code + */ + int write_meta(const std::string& key, + const std::string& value) const { + string v = value; + v += "\n"; + int r = safe_write_file(path.c_str(), key.c_str(), + v.c_str(), v.length()); + if (r < 0) + return r; + return 0; + } + + /** + * read_meta - read a simple configuration key out-of-band + * + * Read a simple key value to an unopened/mounted store. + * + * Trailing whitespace is stripped off. + * + * @param key key name + * @param value pointer to value string + * @returns 0 for success, or an error code + */ + int read_meta(const std::string& key, + std::string *value) const { + char buf[4096]; + int r = safe_read_file(path.c_str(), key.c_str(), + buf, sizeof(buf)); + if (r <= 0) + return r; + // drop trailing newlines + while (r && isspace(buf[r-1])) { + --r; + } + *value = string(buf, r); + return 0; + } + + explicit MonitorDBStore(const string& path) + : path(path), + db(0), + do_dump(false), + dump_fd_binary(-1), + dump_fmt(true), + io_work(g_ceph_context, "monstore", "fn_monstore"), + is_open(false) { + } + ~MonitorDBStore() { + assert(!is_open); + if (do_dump) { + if (!g_conf->mon_debug_dump_json) { + ::close(dump_fd_binary); + } else { + dump_fmt.close_section(); + dump_fmt.flush(dump_fd_json); + dump_fd_json.flush(); + dump_fd_json.close(); + } + } + } + +}; + +WRITE_CLASS_ENCODER(MonitorDBStore::Op) +WRITE_CLASS_ENCODER(MonitorDBStore::Transaction) + +#endif /* CEPH_MONITOR_DB_STORE_H */