Fix some bugs when testing opensds ansible
[stor4nfv.git] / src / ceph / src / mon / MonitorDBStore.h
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
3 /*
4 * Ceph - scalable distributed file system
5 *
6 * Copyright (C) 2012 Inktank, Inc.
7 *
8 * This is free software; you can redistribute it and/or
9 * modify it under the terms of the GNU Lesser General Public
10 * License version 2.1, as published by the Free Software
11 * Foundation. See file COPYING.
12 */
13 #ifndef CEPH_MONITOR_DB_STORE_H
14 #define CEPH_MONITOR_DB_STORE_H
15
16 #include "include/types.h"
17 #include "include/buffer.h"
18 #include <set>
19 #include <map>
20 #include <string>
21 #include <boost/scoped_ptr.hpp>
22 #include <sstream>
23 #include <fstream>
24 #include "kv/KeyValueDB.h"
25
26 #include "include/assert.h"
27 #include "common/Formatter.h"
28 #include "common/Finisher.h"
29 #include "common/errno.h"
30 #include "common/debug.h"
31 #include "common/safe_io.h"
32
33 #define dout_context g_ceph_context
34
35 class MonitorDBStore
36 {
37   string path;
38   boost::scoped_ptr<KeyValueDB> db;
39   bool do_dump;
40   int dump_fd_binary;
41   std::ofstream dump_fd_json;
42   JSONFormatter dump_fmt;
43   
44
45   Finisher io_work;
46
47   bool is_open;
48
49  public:
50
51   struct Op {
52     uint8_t type;
53     string prefix;
54     string key, endkey;
55     bufferlist bl;
56
57     Op()
58       : type(0) { }
59     Op(int t, string p, string k)
60       : type(t), prefix(p), key(k) { }
61     Op(int t, const string& p, string k, bufferlist& b)
62       : type(t), prefix(p), key(k), bl(b) { }
63     Op(int t, const string& p, string start, string end)
64       : type(t), prefix(p), key(start), endkey(end) { }
65
66     void encode(bufferlist& encode_bl) const {
67       ENCODE_START(2, 1, encode_bl);
68       ::encode(type, encode_bl);
69       ::encode(prefix, encode_bl);
70       ::encode(key, encode_bl);
71       ::encode(bl, encode_bl);
72       ::encode(endkey, encode_bl);
73       ENCODE_FINISH(encode_bl);
74     }
75
76     void decode(bufferlist::iterator& decode_bl) {
77       DECODE_START(2, decode_bl);
78       ::decode(type, decode_bl);
79       ::decode(prefix, decode_bl);
80       ::decode(key, decode_bl);
81       ::decode(bl, decode_bl);
82       if (struct_v >= 2)
83         ::decode(endkey, decode_bl);
84       DECODE_FINISH(decode_bl);
85     }
86
87     void dump(Formatter *f) const {
88       f->dump_int("type", type);
89       f->dump_string("prefix", prefix);
90       f->dump_string("key", key);
91       if (endkey.length())
92         f->dump_string("endkey", endkey);
93     }
94
95     static void generate_test_instances(list<Op*>& ls) {
96       ls.push_back(new Op);
97       // we get coverage here from the Transaction instances
98     }
99   };
100
101   struct Transaction;
102   typedef ceph::shared_ptr<Transaction> TransactionRef;
103   struct Transaction {
104     list<Op> ops;
105     uint64_t bytes, keys;
106
107     Transaction() : bytes(0), keys(0) {}
108
109     enum {
110       OP_PUT    = 1,
111       OP_ERASE  = 2,
112       OP_COMPACT = 3,
113     };
114
115     void put(string prefix, string key, bufferlist& bl) {
116       ops.push_back(Op(OP_PUT, prefix, key, bl));
117       ++keys;
118       bytes += prefix.length() + key.length() + bl.length();
119     }
120
121     void put(string prefix, version_t ver, bufferlist& bl) {
122       ostringstream os;
123       os << ver;
124       put(prefix, os.str(), bl);
125     }
126
127     void put(string prefix, string key, version_t ver) {
128       bufferlist bl;
129       ::encode(ver, bl);
130       put(prefix, key, bl);
131     }
132
133     void erase(string prefix, string key) {
134       ops.push_back(Op(OP_ERASE, prefix, key));
135       ++keys;
136       bytes += prefix.length() + key.length();
137     }
138
139     void erase(string prefix, version_t ver) {
140       ostringstream os;
141       os << ver;
142       erase(prefix, os.str());
143     }
144
145     void compact_prefix(string prefix) {
146       ops.push_back(Op(OP_COMPACT, prefix, string()));
147     }
148
149     void compact_range(string prefix, string start, string end) {
150       ops.push_back(Op(OP_COMPACT, prefix, start, end));
151     }
152
153     void encode(bufferlist& bl) const {
154       ENCODE_START(2, 1, bl);
155       ::encode(ops, bl);
156       ::encode(bytes, bl);
157       ::encode(keys, bl);
158       ENCODE_FINISH(bl);
159     }
160
161     void decode(bufferlist::iterator& bl) {
162       DECODE_START(2, bl);
163       ::decode(ops, bl);
164       if (struct_v >= 2) {
165         ::decode(bytes, bl);
166         ::decode(keys, bl);
167       }
168       DECODE_FINISH(bl);
169     }
170
171     static void generate_test_instances(list<Transaction*>& ls) {
172       ls.push_back(new Transaction);
173       ls.push_back(new Transaction);
174       bufferlist bl;
175       bl.append("value");
176       ls.back()->put("prefix", "key", bl);
177       ls.back()->erase("prefix2", "key2");
178       ls.back()->compact_prefix("prefix3");
179       ls.back()->compact_range("prefix4", "from", "to");
180     }
181
182     void append(TransactionRef other) {
183       ops.splice(ops.end(), other->ops);
184       keys += other->keys;
185       bytes += other->bytes;
186     }
187
188     void append_from_encoded(bufferlist& bl) {
189       auto other(std::make_shared<Transaction>());
190       bufferlist::iterator it = bl.begin();
191       other->decode(it);
192       append(other);
193     }
194
195     bool empty() {
196       return (size() == 0);
197     }
198
199     size_t size() const {
200       return ops.size();
201     }
202     uint64_t get_keys() const {
203       return keys;
204     }
205     uint64_t get_bytes() const {
206       return bytes;
207     }
208
209     void dump(ceph::Formatter *f, bool dump_val=false) const {
210       f->open_object_section("transaction");
211       f->open_array_section("ops");
212       list<Op>::const_iterator it;
213       int op_num = 0;
214       for (it = ops.begin(); it != ops.end(); ++it) {
215         const Op& op = *it;
216         f->open_object_section("op");
217         f->dump_int("op_num", op_num++);
218         switch (op.type) {
219         case OP_PUT:
220           {
221             f->dump_string("type", "PUT");
222             f->dump_string("prefix", op.prefix);
223             f->dump_string("key", op.key);
224             f->dump_unsigned("length", op.bl.length());
225             if (dump_val) {
226               ostringstream os;
227               op.bl.hexdump(os);
228               f->dump_string("bl", os.str());
229             }
230           }
231           break;
232         case OP_ERASE:
233           {
234             f->dump_string("type", "ERASE");
235             f->dump_string("prefix", op.prefix);
236             f->dump_string("key", op.key);
237           }
238           break;
239         case OP_COMPACT:
240           {
241             f->dump_string("type", "COMPACT");
242             f->dump_string("prefix", op.prefix);
243             f->dump_string("start", op.key);
244             f->dump_string("end", op.endkey);
245           }
246           break;
247         default:
248           {
249             f->dump_string("type", "unknown");
250             f->dump_unsigned("op_code", op.type);
251             break;
252           }
253         }
254         f->close_section();
255       }
256       f->close_section();
257       f->dump_unsigned("num_keys", keys);
258       f->dump_unsigned("num_bytes", bytes);
259       f->close_section();
260     }
261   };
262
263   int apply_transaction(MonitorDBStore::TransactionRef t) {
264     KeyValueDB::Transaction dbt = db->get_transaction();
265
266     if (do_dump) {
267       if (!g_conf->mon_debug_dump_json) {
268         bufferlist bl;
269         t->encode(bl);
270         bl.write_fd(dump_fd_binary);
271       } else {
272         t->dump(&dump_fmt, true);
273         dump_fmt.flush(dump_fd_json);
274         dump_fd_json.flush();
275       }
276     }
277
278     list<pair<string, pair<string,string> > > compact;
279     for (list<Op>::const_iterator it = t->ops.begin();
280          it != t->ops.end();
281          ++it) {
282       const Op& op = *it;
283       switch (op.type) {
284       case Transaction::OP_PUT:
285         dbt->set(op.prefix, op.key, op.bl);
286         break;
287       case Transaction::OP_ERASE:
288         dbt->rmkey(op.prefix, op.key);
289         break;
290       case Transaction::OP_COMPACT:
291         compact.push_back(make_pair(op.prefix, make_pair(op.key, op.endkey)));
292         break;
293       default:
294         derr << __func__ << " unknown op type " << op.type << dendl;
295         ceph_abort();
296         break;
297       }
298     }
299     int r = db->submit_transaction_sync(dbt);
300     if (r >= 0) {
301       while (!compact.empty()) {
302         if (compact.front().second.first == string() &&
303             compact.front().second.second == string())
304           db->compact_prefix_async(compact.front().first);
305         else
306           db->compact_range_async(compact.front().first, compact.front().second.first, compact.front().second.second);
307         compact.pop_front();
308       }
309     } else {
310       assert(0 == "failed to write to db");
311     }
312     return r;
313   }
314
315   struct C_DoTransaction : public Context {
316     MonitorDBStore *store;
317     MonitorDBStore::TransactionRef t;
318     Context *oncommit;
319     C_DoTransaction(MonitorDBStore *s, MonitorDBStore::TransactionRef t,
320                     Context *f)
321       : store(s), t(t), oncommit(f)
322     {}
323     void finish(int r) override {
324       /* The store serializes writes.  Each transaction is handled
325        * sequentially by the io_work Finisher.  If a transaction takes longer
326        * to apply its state to permanent storage, then no other transaction
327        * will be handled meanwhile.
328        *
329        * We will now randomly inject random delays.  We can safely sleep prior
330        * to applying the transaction as it won't break the model.
331        */
332       double delay_prob = g_conf->mon_inject_transaction_delay_probability;
333       if (delay_prob && (rand() % 10000 < delay_prob * 10000.0)) {
334         utime_t delay;
335         double delay_max = g_conf->mon_inject_transaction_delay_max;
336         delay.set_from_double(delay_max * (double)(rand() % 10000) / 10000.0);
337         lsubdout(g_ceph_context, mon, 1)
338           << "apply_transaction will be delayed for " << delay
339           << " seconds" << dendl;
340         delay.sleep();
341       }
342       int ret = store->apply_transaction(t);
343       oncommit->complete(ret);
344     }
345   };
346
347   /**
348    * queue transaction
349    *
350    * Queue a transaction to commit asynchronously.  Trigger a context
351    * on completion (without any locks held).
352    */
353   void queue_transaction(MonitorDBStore::TransactionRef t,
354                          Context *oncommit) {
355     io_work.queue(new C_DoTransaction(this, t, oncommit));
356   }
357
358   /**
359    * block and flush all io activity
360    */
361   void flush() {
362     io_work.wait_for_empty();
363   }
364
365   class StoreIteratorImpl {
366   protected:
367     bool done;
368     pair<string,string> last_key;
369     bufferlist crc_bl;
370
371     StoreIteratorImpl() : done(false) { }
372     virtual ~StoreIteratorImpl() { }
373
374     bool add_chunk_entry(TransactionRef tx,
375                          string &prefix,
376                          string &key,
377                          bufferlist &value,
378                          uint64_t max) {
379       auto tmp(std::make_shared<Transaction>());
380       bufferlist tmp_bl;
381       tmp->put(prefix, key, value);
382       tmp->encode(tmp_bl);
383
384       bufferlist tx_bl;
385       tx->encode(tx_bl);
386
387       size_t len = tx_bl.length() + tmp_bl.length();
388
389       if (!tx->empty() && (len > max)) {
390         return false;
391       }
392
393       tx->append(tmp);
394       last_key.first = prefix;
395       last_key.second = key;
396
397       if (g_conf->mon_sync_debug) {
398         ::encode(prefix, crc_bl);
399         ::encode(key, crc_bl);
400         ::encode(value, crc_bl);
401       }
402
403       return true;
404     }
405
406     virtual bool _is_valid() = 0;
407
408   public:
409     __u32 crc() {
410       if (g_conf->mon_sync_debug)
411         return crc_bl.crc32c(0);
412       return 0;
413     }
414     pair<string,string> get_last_key() {
415       return last_key;
416     }
417     virtual bool has_next_chunk() {
418       return !done && _is_valid();
419     }
420     virtual void get_chunk_tx(TransactionRef tx, uint64_t max) = 0;
421     virtual pair<string,string> get_next_key() = 0;
422   };
423   typedef ceph::shared_ptr<StoreIteratorImpl> Synchronizer;
424
425   class WholeStoreIteratorImpl : public StoreIteratorImpl {
426     KeyValueDB::WholeSpaceIterator iter;
427     set<string> sync_prefixes;
428
429   public:
430     WholeStoreIteratorImpl(KeyValueDB::WholeSpaceIterator iter,
431                            set<string> &prefixes)
432       : StoreIteratorImpl(),
433         iter(iter),
434         sync_prefixes(prefixes)
435     { }
436
437     ~WholeStoreIteratorImpl() override { }
438
439     /**
440      * Obtain a chunk of the store
441      *
442      * @param bl            Encoded transaction that will recreate the chunk
443      * @param first_key     Pair containing the first key to obtain, and that
444      *                      will contain the first key in the chunk (that may
445      *                      differ from the one passed on to the function)
446      * @param last_key[out] Last key in the chunk
447      */
448     void get_chunk_tx(TransactionRef tx, uint64_t max) override {
449       assert(done == false);
450       assert(iter->valid() == true);
451
452       while (iter->valid()) {
453         string prefix(iter->raw_key().first);
454         string key(iter->raw_key().second);
455         if (sync_prefixes.count(prefix)) {
456           bufferlist value = iter->value();
457           if (!add_chunk_entry(tx, prefix, key, value, max))
458             return;
459         }
460         iter->next();
461       }
462       assert(iter->valid() == false);
463       done = true;
464     }
465
466     pair<string,string> get_next_key() override {
467       assert(iter->valid());
468
469       for (; iter->valid(); iter->next()) {
470         pair<string,string> r = iter->raw_key();
471         if (sync_prefixes.count(r.first) > 0) {
472           iter->next();
473           return r;
474         }
475       }
476       return pair<string,string>();
477     }
478
479     bool _is_valid() override {
480       return iter->valid();
481     }
482   };
483
484   Synchronizer get_synchronizer(pair<string,string> &key,
485                                 set<string> &prefixes) {
486     KeyValueDB::WholeSpaceIterator iter;
487     iter = db->get_iterator();
488
489     if (!key.first.empty() && !key.second.empty())
490       iter->upper_bound(key.first, key.second);
491     else
492       iter->seek_to_first();
493
494     return ceph::shared_ptr<StoreIteratorImpl>(
495         new WholeStoreIteratorImpl(iter, prefixes)
496     );
497   }
498
499   KeyValueDB::Iterator get_iterator(const string &prefix) {
500     assert(!prefix.empty());
501     KeyValueDB::Iterator iter = db->get_iterator(prefix);
502     iter->seek_to_first();
503     return iter;
504   }
505
506   KeyValueDB::WholeSpaceIterator get_iterator() {
507     KeyValueDB::WholeSpaceIterator iter;
508     iter = db->get_iterator();
509     iter->seek_to_first();
510     return iter;
511   }
512
513   int get(const string& prefix, const string& key, bufferlist& bl) {
514     assert(bl.length() == 0);
515     return db->get(prefix, key, &bl);
516   }
517
518   int get(const string& prefix, const version_t ver, bufferlist& bl) {
519     ostringstream os;
520     os << ver;
521     return get(prefix, os.str(), bl);
522   }
523
524   version_t get(const string& prefix, const string& key) {
525     bufferlist bl;
526     int err = get(prefix, key, bl);
527     if (err < 0) {
528       if (err == -ENOENT) // if key doesn't exist, assume its value is 0
529         return 0;
530       // we're not expecting any other negative return value, and we can't
531       // just return a negative value if we're returning a version_t
532       generic_dout(0) << "MonitorDBStore::get() error obtaining"
533                       << " (" << prefix << ":" << key << "): "
534                       << cpp_strerror(err) << dendl;
535       assert(0 == "error obtaining key");
536     }
537
538     assert(bl.length());
539     version_t ver;
540     bufferlist::iterator p = bl.begin();
541     ::decode(ver, p);
542     return ver;
543   }
544
545   bool exists(const string& prefix, const string& key) {
546     KeyValueDB::Iterator it = db->get_iterator(prefix);
547     int err = it->lower_bound(key);
548     if (err < 0)
549       return false;
550
551     return (it->valid() && it->key() == key);
552   }
553
554   bool exists(const string& prefix, version_t ver) {
555     ostringstream os;
556     os << ver;
557     return exists(prefix, os.str());
558   }
559
560   string combine_strings(const string& prefix, const string& value) {
561     string out = prefix;
562     out.push_back('_');
563     out.append(value);
564     return out;
565   }
566
567   string combine_strings(const string& prefix, const version_t ver) {
568     ostringstream os;
569     os << ver;
570     return combine_strings(prefix, os.str());
571   }
572
573   void clear(set<string>& prefixes) {
574     set<string>::iterator iter;
575     KeyValueDB::Transaction dbt = db->get_transaction();
576
577     for (iter = prefixes.begin(); iter != prefixes.end(); ++iter) {
578       dbt->rmkeys_by_prefix((*iter));
579     }
580     int r = db->submit_transaction_sync(dbt);
581     assert(r >= 0);
582   }
583
584   void _open(string kv_type) {
585     string::const_reverse_iterator rit;
586     int pos = 0;
587     for (rit = path.rbegin(); rit != path.rend(); ++rit, ++pos) {
588       if (*rit != '/')
589         break;
590     }
591     ostringstream os;
592     os << path.substr(0, path.size() - pos) << "/store.db";
593     string full_path = os.str();
594
595     KeyValueDB *db_ptr = KeyValueDB::create(g_ceph_context,
596                                             kv_type,
597                                             full_path);
598     if (!db_ptr) {
599       derr << __func__ << " error initializing "
600            << kv_type << " db back storage in "
601            << full_path << dendl;
602       assert(0 == "MonitorDBStore: error initializing keyvaluedb back storage");
603     }
604     db.reset(db_ptr);
605
606     if (g_conf->mon_debug_dump_transactions) {
607       if (!g_conf->mon_debug_dump_json) {
608         dump_fd_binary = ::open(
609           g_conf->mon_debug_dump_location.c_str(),
610           O_CREAT|O_APPEND|O_WRONLY, 0644);
611         if (dump_fd_binary < 0) {
612           dump_fd_binary = -errno;
613           derr << "Could not open log file, got "
614                << cpp_strerror(dump_fd_binary) << dendl;
615         }
616       } else {
617         dump_fmt.reset();
618         dump_fmt.open_array_section("dump");
619         dump_fd_json.open(g_conf->mon_debug_dump_location.c_str());
620       }
621       do_dump = true;
622     }
623     if (kv_type == "rocksdb")
624       db->init(g_conf->mon_rocksdb_options);
625     else
626       db->init();
627
628
629   }
630
631   int open(ostream &out) {
632     string kv_type;
633     int r = read_meta("kv_backend", &kv_type);
634     if (r < 0 || kv_type.empty()) {
635       // assume old monitors that did not mark the type were leveldb.
636       kv_type = "leveldb";
637       r = write_meta("kv_backend", kv_type);
638       if (r < 0)
639         return r;
640     }
641     _open(kv_type);
642     r = db->open(out);
643     if (r < 0)
644       return r;
645
646     // Monitors are few in number, so the resource cost of exposing 
647     // very detailed stats is low: ramp up the priority of all the
648     // KV store's perf counters.  Do this after open, because backend may
649     // not have constructed PerfCounters earlier.
650     if (db->get_perf_counters()) {
651       db->get_perf_counters()->set_prio_adjust(
652           PerfCountersBuilder::PRIO_USEFUL - PerfCountersBuilder::PRIO_DEBUGONLY);
653     }
654
655     io_work.start();
656     is_open = true;
657     return 0;
658   }
659
660   int create_and_open(ostream &out) {
661     // record the type before open
662     string kv_type;
663     int r = read_meta("kv_backend", &kv_type);
664     if (r < 0) {
665       kv_type = g_conf->mon_keyvaluedb;
666       r = write_meta("kv_backend", kv_type);
667       if (r < 0)
668         return r;
669     }
670     _open(kv_type);
671     r = db->create_and_open(out);
672     if (r < 0)
673       return r;
674     io_work.start();
675     is_open = true;
676     return 0;
677   }
678
679   void close() {
680     // there should be no work queued!
681     io_work.stop();
682     is_open = false;
683     db.reset(NULL);
684   }
685
686   void compact() {
687     db->compact();
688   }
689
690   void compact_prefix(const string& prefix) {
691     db->compact_prefix(prefix);
692   }
693
694   uint64_t get_estimated_size(map<string, uint64_t> &extras) {
695     return db->get_estimated_size(extras);
696   }
697
698   /**
699    * write_meta - write a simple configuration key out-of-band
700    *
701    * Write a simple key/value pair for basic store configuration
702    * (e.g., a uuid or magic number) to an unopened/unmounted store.
703    * The default implementation writes this to a plaintext file in the
704    * path.
705    *
706    * A newline is appended.
707    *
708    * @param key key name (e.g., "fsid")
709    * @param value value (e.g., a uuid rendered as a string)
710    * @returns 0 for success, or an error code
711    */
712   int write_meta(const std::string& key,
713                  const std::string& value) const {
714     string v = value;
715     v += "\n";
716     int r = safe_write_file(path.c_str(), key.c_str(),
717                             v.c_str(), v.length());
718     if (r < 0)
719       return r;
720     return 0;
721   }
722
723   /**
724    * read_meta - read a simple configuration key out-of-band
725    *
726    * Read a simple key value to an unopened/mounted store.
727    *
728    * Trailing whitespace is stripped off.
729    *
730    * @param key key name
731    * @param value pointer to value string
732    * @returns 0 for success, or an error code
733    */
734   int read_meta(const std::string& key,
735                 std::string *value) const {
736     char buf[4096];
737     int r = safe_read_file(path.c_str(), key.c_str(),
738                            buf, sizeof(buf));
739     if (r <= 0)
740       return r;
741     // drop trailing newlines
742     while (r && isspace(buf[r-1])) {
743       --r;
744     }
745     *value = string(buf, r);
746     return 0;
747   }
748
749   explicit MonitorDBStore(const string& path)
750     : path(path),
751       db(0),
752       do_dump(false),
753       dump_fd_binary(-1),
754       dump_fmt(true),
755       io_work(g_ceph_context, "monstore", "fn_monstore"),
756       is_open(false) {
757   }
758   ~MonitorDBStore() {
759     assert(!is_open);
760     if (do_dump) {
761       if (!g_conf->mon_debug_dump_json) {
762         ::close(dump_fd_binary);
763       } else {
764         dump_fmt.close_section();
765         dump_fmt.flush(dump_fd_json);
766         dump_fd_json.flush();
767         dump_fd_json.close();
768       }
769     }
770   }
771
772 };
773
774 WRITE_CLASS_ENCODER(MonitorDBStore::Op)
775 WRITE_CLASS_ENCODER(MonitorDBStore::Transaction)
776
777 #endif /* CEPH_MONITOR_DB_STORE_H */