1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
4 * Ceph - scalable distributed file system
6 * Copyright (C) 2012 Inktank, Inc.
8 * This is free software; you can redistribute it and/or
9 * modify it under the terms of the GNU Lesser General Public
10 * License version 2.1, as published by the Free Software
11 * Foundation. See file COPYING.
13 #ifndef CEPH_MONITOR_DB_STORE_H
14 #define CEPH_MONITOR_DB_STORE_H
16 #include "include/types.h"
17 #include "include/buffer.h"
21 #include <boost/scoped_ptr.hpp>
24 #include "kv/KeyValueDB.h"
26 #include "include/assert.h"
27 #include "common/Formatter.h"
28 #include "common/Finisher.h"
29 #include "common/errno.h"
30 #include "common/debug.h"
31 #include "common/safe_io.h"
33 #define dout_context g_ceph_context
38 boost::scoped_ptr<KeyValueDB> db;
41 std::ofstream dump_fd_json;
42 JSONFormatter dump_fmt;
59 Op(int t, string p, string k)
60 : type(t), prefix(p), key(k) { }
61 Op(int t, const string& p, string k, bufferlist& b)
62 : type(t), prefix(p), key(k), bl(b) { }
63 Op(int t, const string& p, string start, string end)
64 : type(t), prefix(p), key(start), endkey(end) { }
66 void encode(bufferlist& encode_bl) const {
67 ENCODE_START(2, 1, encode_bl);
68 ::encode(type, encode_bl);
69 ::encode(prefix, encode_bl);
70 ::encode(key, encode_bl);
71 ::encode(bl, encode_bl);
72 ::encode(endkey, encode_bl);
73 ENCODE_FINISH(encode_bl);
76 void decode(bufferlist::iterator& decode_bl) {
77 DECODE_START(2, decode_bl);
78 ::decode(type, decode_bl);
79 ::decode(prefix, decode_bl);
80 ::decode(key, decode_bl);
81 ::decode(bl, decode_bl);
83 ::decode(endkey, decode_bl);
84 DECODE_FINISH(decode_bl);
87 void dump(Formatter *f) const {
88 f->dump_int("type", type);
89 f->dump_string("prefix", prefix);
90 f->dump_string("key", key);
92 f->dump_string("endkey", endkey);
95 static void generate_test_instances(list<Op*>& ls) {
97 // we get coverage here from the Transaction instances
102 typedef ceph::shared_ptr<Transaction> TransactionRef;
105 uint64_t bytes, keys;
107 Transaction() : bytes(0), keys(0) {}
115 void put(string prefix, string key, bufferlist& bl) {
116 ops.push_back(Op(OP_PUT, prefix, key, bl));
118 bytes += prefix.length() + key.length() + bl.length();
121 void put(string prefix, version_t ver, bufferlist& bl) {
124 put(prefix, os.str(), bl);
127 void put(string prefix, string key, version_t ver) {
130 put(prefix, key, bl);
133 void erase(string prefix, string key) {
134 ops.push_back(Op(OP_ERASE, prefix, key));
136 bytes += prefix.length() + key.length();
139 void erase(string prefix, version_t ver) {
142 erase(prefix, os.str());
145 void compact_prefix(string prefix) {
146 ops.push_back(Op(OP_COMPACT, prefix, string()));
149 void compact_range(string prefix, string start, string end) {
150 ops.push_back(Op(OP_COMPACT, prefix, start, end));
153 void encode(bufferlist& bl) const {
154 ENCODE_START(2, 1, bl);
161 void decode(bufferlist::iterator& bl) {
171 static void generate_test_instances(list<Transaction*>& ls) {
172 ls.push_back(new Transaction);
173 ls.push_back(new Transaction);
176 ls.back()->put("prefix", "key", bl);
177 ls.back()->erase("prefix2", "key2");
178 ls.back()->compact_prefix("prefix3");
179 ls.back()->compact_range("prefix4", "from", "to");
182 void append(TransactionRef other) {
183 ops.splice(ops.end(), other->ops);
185 bytes += other->bytes;
188 void append_from_encoded(bufferlist& bl) {
189 auto other(std::make_shared<Transaction>());
190 bufferlist::iterator it = bl.begin();
196 return (size() == 0);
199 size_t size() const {
202 uint64_t get_keys() const {
205 uint64_t get_bytes() const {
209 void dump(ceph::Formatter *f, bool dump_val=false) const {
210 f->open_object_section("transaction");
211 f->open_array_section("ops");
212 list<Op>::const_iterator it;
214 for (it = ops.begin(); it != ops.end(); ++it) {
216 f->open_object_section("op");
217 f->dump_int("op_num", op_num++);
221 f->dump_string("type", "PUT");
222 f->dump_string("prefix", op.prefix);
223 f->dump_string("key", op.key);
224 f->dump_unsigned("length", op.bl.length());
228 f->dump_string("bl", os.str());
234 f->dump_string("type", "ERASE");
235 f->dump_string("prefix", op.prefix);
236 f->dump_string("key", op.key);
241 f->dump_string("type", "COMPACT");
242 f->dump_string("prefix", op.prefix);
243 f->dump_string("start", op.key);
244 f->dump_string("end", op.endkey);
249 f->dump_string("type", "unknown");
250 f->dump_unsigned("op_code", op.type);
257 f->dump_unsigned("num_keys", keys);
258 f->dump_unsigned("num_bytes", bytes);
263 int apply_transaction(MonitorDBStore::TransactionRef t) {
264 KeyValueDB::Transaction dbt = db->get_transaction();
267 if (!g_conf->mon_debug_dump_json) {
270 bl.write_fd(dump_fd_binary);
272 t->dump(&dump_fmt, true);
273 dump_fmt.flush(dump_fd_json);
274 dump_fd_json.flush();
278 list<pair<string, pair<string,string> > > compact;
279 for (list<Op>::const_iterator it = t->ops.begin();
284 case Transaction::OP_PUT:
285 dbt->set(op.prefix, op.key, op.bl);
287 case Transaction::OP_ERASE:
288 dbt->rmkey(op.prefix, op.key);
290 case Transaction::OP_COMPACT:
291 compact.push_back(make_pair(op.prefix, make_pair(op.key, op.endkey)));
294 derr << __func__ << " unknown op type " << op.type << dendl;
299 int r = db->submit_transaction_sync(dbt);
301 while (!compact.empty()) {
302 if (compact.front().second.first == string() &&
303 compact.front().second.second == string())
304 db->compact_prefix_async(compact.front().first);
306 db->compact_range_async(compact.front().first, compact.front().second.first, compact.front().second.second);
310 assert(0 == "failed to write to db");
315 struct C_DoTransaction : public Context {
316 MonitorDBStore *store;
317 MonitorDBStore::TransactionRef t;
319 C_DoTransaction(MonitorDBStore *s, MonitorDBStore::TransactionRef t,
321 : store(s), t(t), oncommit(f)
323 void finish(int r) override {
324 /* The store serializes writes. Each transaction is handled
325 * sequentially by the io_work Finisher. If a transaction takes longer
326 * to apply its state to permanent storage, then no other transaction
327 * will be handled meanwhile.
329 * We will now randomly inject random delays. We can safely sleep prior
330 * to applying the transaction as it won't break the model.
332 double delay_prob = g_conf->mon_inject_transaction_delay_probability;
333 if (delay_prob && (rand() % 10000 < delay_prob * 10000.0)) {
335 double delay_max = g_conf->mon_inject_transaction_delay_max;
336 delay.set_from_double(delay_max * (double)(rand() % 10000) / 10000.0);
337 lsubdout(g_ceph_context, mon, 1)
338 << "apply_transaction will be delayed for " << delay
339 << " seconds" << dendl;
342 int ret = store->apply_transaction(t);
343 oncommit->complete(ret);
350 * Queue a transaction to commit asynchronously. Trigger a context
351 * on completion (without any locks held).
353 void queue_transaction(MonitorDBStore::TransactionRef t,
355 io_work.queue(new C_DoTransaction(this, t, oncommit));
359 * block and flush all io activity
362 io_work.wait_for_empty();
365 class StoreIteratorImpl {
368 pair<string,string> last_key;
371 StoreIteratorImpl() : done(false) { }
372 virtual ~StoreIteratorImpl() { }
374 bool add_chunk_entry(TransactionRef tx,
379 auto tmp(std::make_shared<Transaction>());
381 tmp->put(prefix, key, value);
387 size_t len = tx_bl.length() + tmp_bl.length();
389 if (!tx->empty() && (len > max)) {
394 last_key.first = prefix;
395 last_key.second = key;
397 if (g_conf->mon_sync_debug) {
398 ::encode(prefix, crc_bl);
399 ::encode(key, crc_bl);
400 ::encode(value, crc_bl);
406 virtual bool _is_valid() = 0;
410 if (g_conf->mon_sync_debug)
411 return crc_bl.crc32c(0);
414 pair<string,string> get_last_key() {
417 virtual bool has_next_chunk() {
418 return !done && _is_valid();
420 virtual void get_chunk_tx(TransactionRef tx, uint64_t max) = 0;
421 virtual pair<string,string> get_next_key() = 0;
423 typedef ceph::shared_ptr<StoreIteratorImpl> Synchronizer;
425 class WholeStoreIteratorImpl : public StoreIteratorImpl {
426 KeyValueDB::WholeSpaceIterator iter;
427 set<string> sync_prefixes;
430 WholeStoreIteratorImpl(KeyValueDB::WholeSpaceIterator iter,
431 set<string> &prefixes)
432 : StoreIteratorImpl(),
434 sync_prefixes(prefixes)
437 ~WholeStoreIteratorImpl() override { }
440 * Obtain a chunk of the store
442 * @param bl Encoded transaction that will recreate the chunk
443 * @param first_key Pair containing the first key to obtain, and that
444 * will contain the first key in the chunk (that may
445 * differ from the one passed on to the function)
446 * @param last_key[out] Last key in the chunk
448 void get_chunk_tx(TransactionRef tx, uint64_t max) override {
449 assert(done == false);
450 assert(iter->valid() == true);
452 while (iter->valid()) {
453 string prefix(iter->raw_key().first);
454 string key(iter->raw_key().second);
455 if (sync_prefixes.count(prefix)) {
456 bufferlist value = iter->value();
457 if (!add_chunk_entry(tx, prefix, key, value, max))
462 assert(iter->valid() == false);
466 pair<string,string> get_next_key() override {
467 assert(iter->valid());
469 for (; iter->valid(); iter->next()) {
470 pair<string,string> r = iter->raw_key();
471 if (sync_prefixes.count(r.first) > 0) {
476 return pair<string,string>();
479 bool _is_valid() override {
480 return iter->valid();
484 Synchronizer get_synchronizer(pair<string,string> &key,
485 set<string> &prefixes) {
486 KeyValueDB::WholeSpaceIterator iter;
487 iter = db->get_iterator();
489 if (!key.first.empty() && !key.second.empty())
490 iter->upper_bound(key.first, key.second);
492 iter->seek_to_first();
494 return ceph::shared_ptr<StoreIteratorImpl>(
495 new WholeStoreIteratorImpl(iter, prefixes)
499 KeyValueDB::Iterator get_iterator(const string &prefix) {
500 assert(!prefix.empty());
501 KeyValueDB::Iterator iter = db->get_iterator(prefix);
502 iter->seek_to_first();
506 KeyValueDB::WholeSpaceIterator get_iterator() {
507 KeyValueDB::WholeSpaceIterator iter;
508 iter = db->get_iterator();
509 iter->seek_to_first();
513 int get(const string& prefix, const string& key, bufferlist& bl) {
514 assert(bl.length() == 0);
515 return db->get(prefix, key, &bl);
518 int get(const string& prefix, const version_t ver, bufferlist& bl) {
521 return get(prefix, os.str(), bl);
524 version_t get(const string& prefix, const string& key) {
526 int err = get(prefix, key, bl);
528 if (err == -ENOENT) // if key doesn't exist, assume its value is 0
530 // we're not expecting any other negative return value, and we can't
531 // just return a negative value if we're returning a version_t
532 generic_dout(0) << "MonitorDBStore::get() error obtaining"
533 << " (" << prefix << ":" << key << "): "
534 << cpp_strerror(err) << dendl;
535 assert(0 == "error obtaining key");
540 bufferlist::iterator p = bl.begin();
545 bool exists(const string& prefix, const string& key) {
546 KeyValueDB::Iterator it = db->get_iterator(prefix);
547 int err = it->lower_bound(key);
551 return (it->valid() && it->key() == key);
554 bool exists(const string& prefix, version_t ver) {
557 return exists(prefix, os.str());
560 string combine_strings(const string& prefix, const string& value) {
567 string combine_strings(const string& prefix, const version_t ver) {
570 return combine_strings(prefix, os.str());
573 void clear(set<string>& prefixes) {
574 set<string>::iterator iter;
575 KeyValueDB::Transaction dbt = db->get_transaction();
577 for (iter = prefixes.begin(); iter != prefixes.end(); ++iter) {
578 dbt->rmkeys_by_prefix((*iter));
580 int r = db->submit_transaction_sync(dbt);
584 void _open(string kv_type) {
585 string::const_reverse_iterator rit;
587 for (rit = path.rbegin(); rit != path.rend(); ++rit, ++pos) {
592 os << path.substr(0, path.size() - pos) << "/store.db";
593 string full_path = os.str();
595 KeyValueDB *db_ptr = KeyValueDB::create(g_ceph_context,
599 derr << __func__ << " error initializing "
600 << kv_type << " db back storage in "
601 << full_path << dendl;
602 assert(0 == "MonitorDBStore: error initializing keyvaluedb back storage");
606 if (g_conf->mon_debug_dump_transactions) {
607 if (!g_conf->mon_debug_dump_json) {
608 dump_fd_binary = ::open(
609 g_conf->mon_debug_dump_location.c_str(),
610 O_CREAT|O_APPEND|O_WRONLY, 0644);
611 if (dump_fd_binary < 0) {
612 dump_fd_binary = -errno;
613 derr << "Could not open log file, got "
614 << cpp_strerror(dump_fd_binary) << dendl;
618 dump_fmt.open_array_section("dump");
619 dump_fd_json.open(g_conf->mon_debug_dump_location.c_str());
623 if (kv_type == "rocksdb")
624 db->init(g_conf->mon_rocksdb_options);
631 int open(ostream &out) {
633 int r = read_meta("kv_backend", &kv_type);
634 if (r < 0 || kv_type.empty()) {
635 // assume old monitors that did not mark the type were leveldb.
637 r = write_meta("kv_backend", kv_type);
646 // Monitors are few in number, so the resource cost of exposing
647 // very detailed stats is low: ramp up the priority of all the
648 // KV store's perf counters. Do this after open, because backend may
649 // not have constructed PerfCounters earlier.
650 if (db->get_perf_counters()) {
651 db->get_perf_counters()->set_prio_adjust(
652 PerfCountersBuilder::PRIO_USEFUL - PerfCountersBuilder::PRIO_DEBUGONLY);
660 int create_and_open(ostream &out) {
661 // record the type before open
663 int r = read_meta("kv_backend", &kv_type);
665 kv_type = g_conf->mon_keyvaluedb;
666 r = write_meta("kv_backend", kv_type);
671 r = db->create_and_open(out);
680 // there should be no work queued!
690 void compact_prefix(const string& prefix) {
691 db->compact_prefix(prefix);
694 uint64_t get_estimated_size(map<string, uint64_t> &extras) {
695 return db->get_estimated_size(extras);
699 * write_meta - write a simple configuration key out-of-band
701 * Write a simple key/value pair for basic store configuration
702 * (e.g., a uuid or magic number) to an unopened/unmounted store.
703 * The default implementation writes this to a plaintext file in the
706 * A newline is appended.
708 * @param key key name (e.g., "fsid")
709 * @param value value (e.g., a uuid rendered as a string)
710 * @returns 0 for success, or an error code
712 int write_meta(const std::string& key,
713 const std::string& value) const {
716 int r = safe_write_file(path.c_str(), key.c_str(),
717 v.c_str(), v.length());
724 * read_meta - read a simple configuration key out-of-band
726 * Read a simple key value to an unopened/mounted store.
728 * Trailing whitespace is stripped off.
730 * @param key key name
731 * @param value pointer to value string
732 * @returns 0 for success, or an error code
734 int read_meta(const std::string& key,
735 std::string *value) const {
737 int r = safe_read_file(path.c_str(), key.c_str(),
741 // drop trailing newlines
742 while (r && isspace(buf[r-1])) {
745 *value = string(buf, r);
749 explicit MonitorDBStore(const string& path)
755 io_work(g_ceph_context, "monstore", "fn_monstore"),
761 if (!g_conf->mon_debug_dump_json) {
762 ::close(dump_fd_binary);
764 dump_fmt.close_section();
765 dump_fmt.flush(dump_fd_json);
766 dump_fd_json.flush();
767 dump_fd_json.close();
774 WRITE_CLASS_ENCODER(MonitorDBStore::Op)
775 WRITE_CLASS_ENCODER(MonitorDBStore::Transaction)
777 #endif /* CEPH_MONITOR_DB_STORE_H */