remove ceph code
[stor4nfv.git] / src / ceph / src / mon / MonitorDBStore.h
diff --git a/src/ceph/src/mon/MonitorDBStore.h b/src/ceph/src/mon/MonitorDBStore.h
deleted file mode 100644 (file)
index 00e56a9..0000000
+++ /dev/null
@@ -1,777 +0,0 @@
-// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
-// vim: ts=8 sw=2 smarttab
-/*
-* Ceph - scalable distributed file system
-*
-* Copyright (C) 2012 Inktank, Inc.
-*
-* This is free software; you can redistribute it and/or
-* modify it under the terms of the GNU Lesser General Public
-* License version 2.1, as published by the Free Software
-* Foundation. See file COPYING.
-*/
-#ifndef CEPH_MONITOR_DB_STORE_H
-#define CEPH_MONITOR_DB_STORE_H
-
-#include "include/types.h"
-#include "include/buffer.h"
-#include <set>
-#include <map>
-#include <string>
-#include <boost/scoped_ptr.hpp>
-#include <sstream>
-#include <fstream>
-#include "kv/KeyValueDB.h"
-
-#include "include/assert.h"
-#include "common/Formatter.h"
-#include "common/Finisher.h"
-#include "common/errno.h"
-#include "common/debug.h"
-#include "common/safe_io.h"
-
-#define dout_context g_ceph_context
-
-class MonitorDBStore
-{
-  string path;
-  boost::scoped_ptr<KeyValueDB> db;
-  bool do_dump;
-  int dump_fd_binary;
-  std::ofstream dump_fd_json;
-  JSONFormatter dump_fmt;
-  
-
-  Finisher io_work;
-
-  bool is_open;
-
- public:
-
-  struct Op {
-    uint8_t type;
-    string prefix;
-    string key, endkey;
-    bufferlist bl;
-
-    Op()
-      : type(0) { }
-    Op(int t, string p, string k)
-      : type(t), prefix(p), key(k) { }
-    Op(int t, const string& p, string k, bufferlist& b)
-      : type(t), prefix(p), key(k), bl(b) { }
-    Op(int t, const string& p, string start, string end)
-      : type(t), prefix(p), key(start), endkey(end) { }
-
-    void encode(bufferlist& encode_bl) const {
-      ENCODE_START(2, 1, encode_bl);
-      ::encode(type, encode_bl);
-      ::encode(prefix, encode_bl);
-      ::encode(key, encode_bl);
-      ::encode(bl, encode_bl);
-      ::encode(endkey, encode_bl);
-      ENCODE_FINISH(encode_bl);
-    }
-
-    void decode(bufferlist::iterator& decode_bl) {
-      DECODE_START(2, decode_bl);
-      ::decode(type, decode_bl);
-      ::decode(prefix, decode_bl);
-      ::decode(key, decode_bl);
-      ::decode(bl, decode_bl);
-      if (struct_v >= 2)
-       ::decode(endkey, decode_bl);
-      DECODE_FINISH(decode_bl);
-    }
-
-    void dump(Formatter *f) const {
-      f->dump_int("type", type);
-      f->dump_string("prefix", prefix);
-      f->dump_string("key", key);
-      if (endkey.length())
-       f->dump_string("endkey", endkey);
-    }
-
-    static void generate_test_instances(list<Op*>& ls) {
-      ls.push_back(new Op);
-      // we get coverage here from the Transaction instances
-    }
-  };
-
-  struct Transaction;
-  typedef ceph::shared_ptr<Transaction> TransactionRef;
-  struct Transaction {
-    list<Op> ops;
-    uint64_t bytes, keys;
-
-    Transaction() : bytes(0), keys(0) {}
-
-    enum {
-      OP_PUT   = 1,
-      OP_ERASE = 2,
-      OP_COMPACT = 3,
-    };
-
-    void put(string prefix, string key, bufferlist& bl) {
-      ops.push_back(Op(OP_PUT, prefix, key, bl));
-      ++keys;
-      bytes += prefix.length() + key.length() + bl.length();
-    }
-
-    void put(string prefix, version_t ver, bufferlist& bl) {
-      ostringstream os;
-      os << ver;
-      put(prefix, os.str(), bl);
-    }
-
-    void put(string prefix, string key, version_t ver) {
-      bufferlist bl;
-      ::encode(ver, bl);
-      put(prefix, key, bl);
-    }
-
-    void erase(string prefix, string key) {
-      ops.push_back(Op(OP_ERASE, prefix, key));
-      ++keys;
-      bytes += prefix.length() + key.length();
-    }
-
-    void erase(string prefix, version_t ver) {
-      ostringstream os;
-      os << ver;
-      erase(prefix, os.str());
-    }
-
-    void compact_prefix(string prefix) {
-      ops.push_back(Op(OP_COMPACT, prefix, string()));
-    }
-
-    void compact_range(string prefix, string start, string end) {
-      ops.push_back(Op(OP_COMPACT, prefix, start, end));
-    }
-
-    void encode(bufferlist& bl) const {
-      ENCODE_START(2, 1, bl);
-      ::encode(ops, bl);
-      ::encode(bytes, bl);
-      ::encode(keys, bl);
-      ENCODE_FINISH(bl);
-    }
-
-    void decode(bufferlist::iterator& bl) {
-      DECODE_START(2, bl);
-      ::decode(ops, bl);
-      if (struct_v >= 2) {
-       ::decode(bytes, bl);
-       ::decode(keys, bl);
-      }
-      DECODE_FINISH(bl);
-    }
-
-    static void generate_test_instances(list<Transaction*>& ls) {
-      ls.push_back(new Transaction);
-      ls.push_back(new Transaction);
-      bufferlist bl;
-      bl.append("value");
-      ls.back()->put("prefix", "key", bl);
-      ls.back()->erase("prefix2", "key2");
-      ls.back()->compact_prefix("prefix3");
-      ls.back()->compact_range("prefix4", "from", "to");
-    }
-
-    void append(TransactionRef other) {
-      ops.splice(ops.end(), other->ops);
-      keys += other->keys;
-      bytes += other->bytes;
-    }
-
-    void append_from_encoded(bufferlist& bl) {
-      auto other(std::make_shared<Transaction>());
-      bufferlist::iterator it = bl.begin();
-      other->decode(it);
-      append(other);
-    }
-
-    bool empty() {
-      return (size() == 0);
-    }
-
-    size_t size() const {
-      return ops.size();
-    }
-    uint64_t get_keys() const {
-      return keys;
-    }
-    uint64_t get_bytes() const {
-      return bytes;
-    }
-
-    void dump(ceph::Formatter *f, bool dump_val=false) const {
-      f->open_object_section("transaction");
-      f->open_array_section("ops");
-      list<Op>::const_iterator it;
-      int op_num = 0;
-      for (it = ops.begin(); it != ops.end(); ++it) {
-       const Op& op = *it;
-       f->open_object_section("op");
-       f->dump_int("op_num", op_num++);
-       switch (op.type) {
-       case OP_PUT:
-         {
-           f->dump_string("type", "PUT");
-           f->dump_string("prefix", op.prefix);
-           f->dump_string("key", op.key);
-           f->dump_unsigned("length", op.bl.length());
-           if (dump_val) {
-             ostringstream os;
-             op.bl.hexdump(os);
-             f->dump_string("bl", os.str());
-           }
-         }
-         break;
-       case OP_ERASE:
-         {
-           f->dump_string("type", "ERASE");
-           f->dump_string("prefix", op.prefix);
-           f->dump_string("key", op.key);
-         }
-         break;
-       case OP_COMPACT:
-         {
-           f->dump_string("type", "COMPACT");
-           f->dump_string("prefix", op.prefix);
-           f->dump_string("start", op.key);
-           f->dump_string("end", op.endkey);
-         }
-         break;
-       default:
-         {
-           f->dump_string("type", "unknown");
-           f->dump_unsigned("op_code", op.type);
-           break;
-         }
-       }
-       f->close_section();
-      }
-      f->close_section();
-      f->dump_unsigned("num_keys", keys);
-      f->dump_unsigned("num_bytes", bytes);
-      f->close_section();
-    }
-  };
-
-  int apply_transaction(MonitorDBStore::TransactionRef t) {
-    KeyValueDB::Transaction dbt = db->get_transaction();
-
-    if (do_dump) {
-      if (!g_conf->mon_debug_dump_json) {
-        bufferlist bl;
-        t->encode(bl);
-        bl.write_fd(dump_fd_binary);
-      } else {
-        t->dump(&dump_fmt, true);
-        dump_fmt.flush(dump_fd_json);
-        dump_fd_json.flush();
-      }
-    }
-
-    list<pair<string, pair<string,string> > > compact;
-    for (list<Op>::const_iterator it = t->ops.begin();
-        it != t->ops.end();
-        ++it) {
-      const Op& op = *it;
-      switch (op.type) {
-      case Transaction::OP_PUT:
-       dbt->set(op.prefix, op.key, op.bl);
-       break;
-      case Transaction::OP_ERASE:
-       dbt->rmkey(op.prefix, op.key);
-       break;
-      case Transaction::OP_COMPACT:
-       compact.push_back(make_pair(op.prefix, make_pair(op.key, op.endkey)));
-       break;
-      default:
-       derr << __func__ << " unknown op type " << op.type << dendl;
-       ceph_abort();
-       break;
-      }
-    }
-    int r = db->submit_transaction_sync(dbt);
-    if (r >= 0) {
-      while (!compact.empty()) {
-       if (compact.front().second.first == string() &&
-           compact.front().second.second == string())
-         db->compact_prefix_async(compact.front().first);
-       else
-         db->compact_range_async(compact.front().first, compact.front().second.first, compact.front().second.second);
-       compact.pop_front();
-      }
-    } else {
-      assert(0 == "failed to write to db");
-    }
-    return r;
-  }
-
-  struct C_DoTransaction : public Context {
-    MonitorDBStore *store;
-    MonitorDBStore::TransactionRef t;
-    Context *oncommit;
-    C_DoTransaction(MonitorDBStore *s, MonitorDBStore::TransactionRef t,
-                   Context *f)
-      : store(s), t(t), oncommit(f)
-    {}
-    void finish(int r) override {
-      /* The store serializes writes.  Each transaction is handled
-       * sequentially by the io_work Finisher.  If a transaction takes longer
-       * to apply its state to permanent storage, then no other transaction
-       * will be handled meanwhile.
-       *
-       * We will now randomly inject random delays.  We can safely sleep prior
-       * to applying the transaction as it won't break the model.
-       */
-      double delay_prob = g_conf->mon_inject_transaction_delay_probability;
-      if (delay_prob && (rand() % 10000 < delay_prob * 10000.0)) {
-        utime_t delay;
-        double delay_max = g_conf->mon_inject_transaction_delay_max;
-        delay.set_from_double(delay_max * (double)(rand() % 10000) / 10000.0);
-        lsubdout(g_ceph_context, mon, 1)
-          << "apply_transaction will be delayed for " << delay
-          << " seconds" << dendl;
-        delay.sleep();
-      }
-      int ret = store->apply_transaction(t);
-      oncommit->complete(ret);
-    }
-  };
-
-  /**
-   * queue transaction
-   *
-   * Queue a transaction to commit asynchronously.  Trigger a context
-   * on completion (without any locks held).
-   */
-  void queue_transaction(MonitorDBStore::TransactionRef t,
-                        Context *oncommit) {
-    io_work.queue(new C_DoTransaction(this, t, oncommit));
-  }
-
-  /**
-   * block and flush all io activity
-   */
-  void flush() {
-    io_work.wait_for_empty();
-  }
-
-  class StoreIteratorImpl {
-  protected:
-    bool done;
-    pair<string,string> last_key;
-    bufferlist crc_bl;
-
-    StoreIteratorImpl() : done(false) { }
-    virtual ~StoreIteratorImpl() { }
-
-    bool add_chunk_entry(TransactionRef tx,
-                        string &prefix,
-                        string &key,
-                        bufferlist &value,
-                        uint64_t max) {
-      auto tmp(std::make_shared<Transaction>());
-      bufferlist tmp_bl;
-      tmp->put(prefix, key, value);
-      tmp->encode(tmp_bl);
-
-      bufferlist tx_bl;
-      tx->encode(tx_bl);
-
-      size_t len = tx_bl.length() + tmp_bl.length();
-
-      if (!tx->empty() && (len > max)) {
-       return false;
-      }
-
-      tx->append(tmp);
-      last_key.first = prefix;
-      last_key.second = key;
-
-      if (g_conf->mon_sync_debug) {
-       ::encode(prefix, crc_bl);
-       ::encode(key, crc_bl);
-       ::encode(value, crc_bl);
-      }
-
-      return true;
-    }
-
-    virtual bool _is_valid() = 0;
-
-  public:
-    __u32 crc() {
-      if (g_conf->mon_sync_debug)
-       return crc_bl.crc32c(0);
-      return 0;
-    }
-    pair<string,string> get_last_key() {
-      return last_key;
-    }
-    virtual bool has_next_chunk() {
-      return !done && _is_valid();
-    }
-    virtual void get_chunk_tx(TransactionRef tx, uint64_t max) = 0;
-    virtual pair<string,string> get_next_key() = 0;
-  };
-  typedef ceph::shared_ptr<StoreIteratorImpl> Synchronizer;
-
-  class WholeStoreIteratorImpl : public StoreIteratorImpl {
-    KeyValueDB::WholeSpaceIterator iter;
-    set<string> sync_prefixes;
-
-  public:
-    WholeStoreIteratorImpl(KeyValueDB::WholeSpaceIterator iter,
-                          set<string> &prefixes)
-      : StoreIteratorImpl(),
-       iter(iter),
-       sync_prefixes(prefixes)
-    { }
-
-    ~WholeStoreIteratorImpl() override { }
-
-    /**
-     * Obtain a chunk of the store
-     *
-     * @param bl           Encoded transaction that will recreate the chunk
-     * @param first_key            Pair containing the first key to obtain, and that
-     *                     will contain the first key in the chunk (that may
-     *                     differ from the one passed on to the function)
-     * @param last_key[out] Last key in the chunk
-     */
-    void get_chunk_tx(TransactionRef tx, uint64_t max) override {
-      assert(done == false);
-      assert(iter->valid() == true);
-
-      while (iter->valid()) {
-       string prefix(iter->raw_key().first);
-       string key(iter->raw_key().second);
-       if (sync_prefixes.count(prefix)) {
-         bufferlist value = iter->value();
-         if (!add_chunk_entry(tx, prefix, key, value, max))
-           return;
-       }
-       iter->next();
-      }
-      assert(iter->valid() == false);
-      done = true;
-    }
-
-    pair<string,string> get_next_key() override {
-      assert(iter->valid());
-
-      for (; iter->valid(); iter->next()) {
-        pair<string,string> r = iter->raw_key();
-        if (sync_prefixes.count(r.first) > 0) {
-          iter->next();
-          return r;
-        }
-      }
-      return pair<string,string>();
-    }
-
-    bool _is_valid() override {
-      return iter->valid();
-    }
-  };
-
-  Synchronizer get_synchronizer(pair<string,string> &key,
-                               set<string> &prefixes) {
-    KeyValueDB::WholeSpaceIterator iter;
-    iter = db->get_iterator();
-
-    if (!key.first.empty() && !key.second.empty())
-      iter->upper_bound(key.first, key.second);
-    else
-      iter->seek_to_first();
-
-    return ceph::shared_ptr<StoreIteratorImpl>(
-       new WholeStoreIteratorImpl(iter, prefixes)
-    );
-  }
-
-  KeyValueDB::Iterator get_iterator(const string &prefix) {
-    assert(!prefix.empty());
-    KeyValueDB::Iterator iter = db->get_iterator(prefix);
-    iter->seek_to_first();
-    return iter;
-  }
-
-  KeyValueDB::WholeSpaceIterator get_iterator() {
-    KeyValueDB::WholeSpaceIterator iter;
-    iter = db->get_iterator();
-    iter->seek_to_first();
-    return iter;
-  }
-
-  int get(const string& prefix, const string& key, bufferlist& bl) {
-    assert(bl.length() == 0);
-    return db->get(prefix, key, &bl);
-  }
-
-  int get(const string& prefix, const version_t ver, bufferlist& bl) {
-    ostringstream os;
-    os << ver;
-    return get(prefix, os.str(), bl);
-  }
-
-  version_t get(const string& prefix, const string& key) {
-    bufferlist bl;
-    int err = get(prefix, key, bl);
-    if (err < 0) {
-      if (err == -ENOENT) // if key doesn't exist, assume its value is 0
-        return 0;
-      // we're not expecting any other negative return value, and we can't
-      // just return a negative value if we're returning a version_t
-      generic_dout(0) << "MonitorDBStore::get() error obtaining"
-                      << " (" << prefix << ":" << key << "): "
-                      << cpp_strerror(err) << dendl;
-      assert(0 == "error obtaining key");
-    }
-
-    assert(bl.length());
-    version_t ver;
-    bufferlist::iterator p = bl.begin();
-    ::decode(ver, p);
-    return ver;
-  }
-
-  bool exists(const string& prefix, const string& key) {
-    KeyValueDB::Iterator it = db->get_iterator(prefix);
-    int err = it->lower_bound(key);
-    if (err < 0)
-      return false;
-
-    return (it->valid() && it->key() == key);
-  }
-
-  bool exists(const string& prefix, version_t ver) {
-    ostringstream os;
-    os << ver;
-    return exists(prefix, os.str());
-  }
-
-  string combine_strings(const string& prefix, const string& value) {
-    string out = prefix;
-    out.push_back('_');
-    out.append(value);
-    return out;
-  }
-
-  string combine_strings(const string& prefix, const version_t ver) {
-    ostringstream os;
-    os << ver;
-    return combine_strings(prefix, os.str());
-  }
-
-  void clear(set<string>& prefixes) {
-    set<string>::iterator iter;
-    KeyValueDB::Transaction dbt = db->get_transaction();
-
-    for (iter = prefixes.begin(); iter != prefixes.end(); ++iter) {
-      dbt->rmkeys_by_prefix((*iter));
-    }
-    int r = db->submit_transaction_sync(dbt);
-    assert(r >= 0);
-  }
-
-  void _open(string kv_type) {
-    string::const_reverse_iterator rit;
-    int pos = 0;
-    for (rit = path.rbegin(); rit != path.rend(); ++rit, ++pos) {
-      if (*rit != '/')
-       break;
-    }
-    ostringstream os;
-    os << path.substr(0, path.size() - pos) << "/store.db";
-    string full_path = os.str();
-
-    KeyValueDB *db_ptr = KeyValueDB::create(g_ceph_context,
-                                           kv_type,
-                                           full_path);
-    if (!db_ptr) {
-      derr << __func__ << " error initializing "
-          << kv_type << " db back storage in "
-          << full_path << dendl;
-      assert(0 == "MonitorDBStore: error initializing keyvaluedb back storage");
-    }
-    db.reset(db_ptr);
-
-    if (g_conf->mon_debug_dump_transactions) {
-      if (!g_conf->mon_debug_dump_json) {
-        dump_fd_binary = ::open(
-          g_conf->mon_debug_dump_location.c_str(),
-          O_CREAT|O_APPEND|O_WRONLY, 0644);
-        if (dump_fd_binary < 0) {
-          dump_fd_binary = -errno;
-          derr << "Could not open log file, got "
-               << cpp_strerror(dump_fd_binary) << dendl;
-        }
-      } else {
-        dump_fmt.reset();
-        dump_fmt.open_array_section("dump");
-        dump_fd_json.open(g_conf->mon_debug_dump_location.c_str());
-      }
-      do_dump = true;
-    }
-    if (kv_type == "rocksdb")
-      db->init(g_conf->mon_rocksdb_options);
-    else
-      db->init();
-
-
-  }
-
-  int open(ostream &out) {
-    string kv_type;
-    int r = read_meta("kv_backend", &kv_type);
-    if (r < 0 || kv_type.empty()) {
-      // assume old monitors that did not mark the type were leveldb.
-      kv_type = "leveldb";
-      r = write_meta("kv_backend", kv_type);
-      if (r < 0)
-       return r;
-    }
-    _open(kv_type);
-    r = db->open(out);
-    if (r < 0)
-      return r;
-
-    // Monitors are few in number, so the resource cost of exposing 
-    // very detailed stats is low: ramp up the priority of all the
-    // KV store's perf counters.  Do this after open, because backend may
-    // not have constructed PerfCounters earlier.
-    if (db->get_perf_counters()) {
-      db->get_perf_counters()->set_prio_adjust(
-          PerfCountersBuilder::PRIO_USEFUL - PerfCountersBuilder::PRIO_DEBUGONLY);
-    }
-
-    io_work.start();
-    is_open = true;
-    return 0;
-  }
-
-  int create_and_open(ostream &out) {
-    // record the type before open
-    string kv_type;
-    int r = read_meta("kv_backend", &kv_type);
-    if (r < 0) {
-      kv_type = g_conf->mon_keyvaluedb;
-      r = write_meta("kv_backend", kv_type);
-      if (r < 0)
-       return r;
-    }
-    _open(kv_type);
-    r = db->create_and_open(out);
-    if (r < 0)
-      return r;
-    io_work.start();
-    is_open = true;
-    return 0;
-  }
-
-  void close() {
-    // there should be no work queued!
-    io_work.stop();
-    is_open = false;
-    db.reset(NULL);
-  }
-
-  void compact() {
-    db->compact();
-  }
-
-  void compact_prefix(const string& prefix) {
-    db->compact_prefix(prefix);
-  }
-
-  uint64_t get_estimated_size(map<string, uint64_t> &extras) {
-    return db->get_estimated_size(extras);
-  }
-
-  /**
-   * write_meta - write a simple configuration key out-of-band
-   *
-   * Write a simple key/value pair for basic store configuration
-   * (e.g., a uuid or magic number) to an unopened/unmounted store.
-   * The default implementation writes this to a plaintext file in the
-   * path.
-   *
-   * A newline is appended.
-   *
-   * @param key key name (e.g., "fsid")
-   * @param value value (e.g., a uuid rendered as a string)
-   * @returns 0 for success, or an error code
-   */
-  int write_meta(const std::string& key,
-                const std::string& value) const {
-    string v = value;
-    v += "\n";
-    int r = safe_write_file(path.c_str(), key.c_str(),
-                           v.c_str(), v.length());
-    if (r < 0)
-      return r;
-    return 0;
-  }
-
-  /**
-   * read_meta - read a simple configuration key out-of-band
-   *
-   * Read a simple key value to an unopened/mounted store.
-   *
-   * Trailing whitespace is stripped off.
-   *
-   * @param key key name
-   * @param value pointer to value string
-   * @returns 0 for success, or an error code
-   */
-  int read_meta(const std::string& key,
-               std::string *value) const {
-    char buf[4096];
-    int r = safe_read_file(path.c_str(), key.c_str(),
-                          buf, sizeof(buf));
-    if (r <= 0)
-      return r;
-    // drop trailing newlines
-    while (r && isspace(buf[r-1])) {
-      --r;
-    }
-    *value = string(buf, r);
-    return 0;
-  }
-
-  explicit MonitorDBStore(const string& path)
-    : path(path),
-      db(0),
-      do_dump(false),
-      dump_fd_binary(-1),
-      dump_fmt(true),
-      io_work(g_ceph_context, "monstore", "fn_monstore"),
-      is_open(false) {
-  }
-  ~MonitorDBStore() {
-    assert(!is_open);
-    if (do_dump) {
-      if (!g_conf->mon_debug_dump_json) {
-        ::close(dump_fd_binary);
-      } else {
-        dump_fmt.close_section();
-        dump_fmt.flush(dump_fd_json);
-        dump_fd_json.flush();
-        dump_fd_json.close();
-      }
-    }
-  }
-
-};
-
-WRITE_CLASS_ENCODER(MonitorDBStore::Op)
-WRITE_CLASS_ENCODER(MonitorDBStore::Transaction)
-
-#endif /* CEPH_MONITOR_DB_STORE_H */