Fix some bugs when testing opensds ansible
[stor4nfv.git] / src / ceph / src / kv / RocksDBStore.h
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
3 #ifndef ROCKS_DB_STORE_H
4 #define ROCKS_DB_STORE_H
5
6 #include "include/types.h"
7 #include "include/buffer_fwd.h"
8 #include "KeyValueDB.h"
9 #include <set>
10 #include <map>
11 #include <string>
12 #include <memory>
13 #include <boost/scoped_ptr.hpp>
14 #include "rocksdb/write_batch.h"
15 #include "rocksdb/perf_context.h"
16 #include "rocksdb/iostats_context.h"
17 #include "rocksdb/statistics.h"
18 #include "rocksdb/table.h"
19 #include <errno.h>
20 #include "common/errno.h"
21 #include "common/dout.h"
22 #include "include/assert.h"
23 #include "common/Formatter.h"
24 #include "common/Cond.h"
25
26 #include "common/ceph_context.h"
27 class PerfCounters;
28
29 enum {
30   l_rocksdb_first = 34300,
31   l_rocksdb_gets,
32   l_rocksdb_txns,
33   l_rocksdb_txns_sync,
34   l_rocksdb_get_latency,
35   l_rocksdb_submit_latency,
36   l_rocksdb_submit_sync_latency,
37   l_rocksdb_compact,
38   l_rocksdb_compact_range,
39   l_rocksdb_compact_queue_merge,
40   l_rocksdb_compact_queue_len,
41   l_rocksdb_write_wal_time,
42   l_rocksdb_write_memtable_time,
43   l_rocksdb_write_delay_time,
44   l_rocksdb_write_pre_and_post_process_time,
45   l_rocksdb_last,
46 };
47
48 namespace rocksdb{
49   class DB;
50   class Env;
51   class Cache;
52   class FilterPolicy;
53   class Snapshot;
54   class Slice;
55   class WriteBatch;
56   class Iterator;
57   class Logger;
58   struct Options;
59   struct BlockBasedTableOptions;
60 }
61
62 extern rocksdb::Logger *create_rocksdb_ceph_logger();
63
64 /**
65  * Uses RocksDB to implement the KeyValueDB interface
66  */
67 class RocksDBStore : public KeyValueDB {
68   CephContext *cct;
69   PerfCounters *logger;
70   string path;
71   void *priv;
72   rocksdb::DB *db;
73   rocksdb::Env *env;
74   std::shared_ptr<rocksdb::Statistics> dbstats;
75   rocksdb::BlockBasedTableOptions bbt_opts;
76   string options_str;
77
78   uint64_t cache_size = 0;
79   bool set_cache_flag = false;
80
81   int do_open(ostream &out, bool create_if_missing);
82
83   // manage async compactions
84   Mutex compact_queue_lock;
85   Cond compact_queue_cond;
86   list< pair<string,string> > compact_queue;
87   bool compact_queue_stop;
88   class CompactThread : public Thread {
89     RocksDBStore *db;
90   public:
91     explicit CompactThread(RocksDBStore *d) : db(d) {}
92     void *entry() override {
93       db->compact_thread_entry();
94       return NULL;
95     }
96     friend class RocksDBStore;
97   } compact_thread;
98
99   void compact_thread_entry();
100
101   void compact_range(const string& start, const string& end);
102   void compact_range_async(const string& start, const string& end);
103
104 public:
105   /// compact the underlying rocksdb store
106   bool compact_on_mount;
107   bool disableWAL;
108   bool enable_rmrange;
109   void compact() override;
110
111   int tryInterpret(const string& key, const string& val, rocksdb::Options &opt);
112   int ParseOptionsFromString(const string& opt_str, rocksdb::Options &opt);
113   static int _test_init(const string& dir);
114   int init(string options_str) override;
115   /// compact rocksdb for all keys with a given prefix
116   void compact_prefix(const string& prefix) override {
117     compact_range(prefix, past_prefix(prefix));
118   }
119   void compact_prefix_async(const string& prefix) override {
120     compact_range_async(prefix, past_prefix(prefix));
121   }
122
123   void compact_range(const string& prefix, const string& start, const string& end) override {
124     compact_range(combine_strings(prefix, start), combine_strings(prefix, end));
125   }
126   void compact_range_async(const string& prefix, const string& start, const string& end) override {
127     compact_range_async(combine_strings(prefix, start), combine_strings(prefix, end));
128   }
129
130   RocksDBStore(CephContext *c, const string &path, void *p) :
131     cct(c),
132     logger(NULL),
133     path(path),
134     priv(p),
135     db(NULL),
136     env(static_cast<rocksdb::Env*>(p)),
137     dbstats(NULL),
138     compact_queue_lock("RocksDBStore::compact_thread_lock"),
139     compact_queue_stop(false),
140     compact_thread(this),
141     compact_on_mount(false),
142     disableWAL(false),
143     enable_rmrange(cct->_conf->rocksdb_enable_rmrange)
144   {}
145
146   ~RocksDBStore() override;
147
148   static bool check_omap_dir(string &omap_dir);
149   /// Opens underlying db
150   int open(ostream &out) override {
151     return do_open(out, false);
152   }
153   /// Creates underlying db if missing and opens it
154   int create_and_open(ostream &out) override;
155
156   void close() override;
157
158   void split_stats(const std::string &s, char delim, std::vector<std::string> &elems);
159   void get_statistics(Formatter *f) override;
160
161   PerfCounters *get_perf_counters() override
162   {
163     return logger;
164   }
165
166   struct  RocksWBHandler: public rocksdb::WriteBatch::Handler {
167     std::string seen ;
168     int num_seen = 0;
169     static string pretty_binary_string(const string& in) {
170       char buf[10];
171       string out;
172       out.reserve(in.length() * 3);
173       enum { NONE, HEX, STRING } mode = NONE;
174       unsigned from = 0, i;
175       for (i=0; i < in.length(); ++i) {
176         if ((in[i] < 32 || (unsigned char)in[i] > 126) ||
177           (mode == HEX && in.length() - i >= 4 &&
178           ((in[i] < 32 || (unsigned char)in[i] > 126) ||
179           (in[i+1] < 32 || (unsigned char)in[i+1] > 126) ||
180           (in[i+2] < 32 || (unsigned char)in[i+2] > 126) ||
181           (in[i+3] < 32 || (unsigned char)in[i+3] > 126)))) {
182
183           if (mode == STRING) {
184             out.append(in.substr(from, i - from));
185             out.push_back('\'');
186           }
187           if (mode != HEX) {
188             out.append("0x");
189             mode = HEX;
190           }
191           if (in.length() - i >= 4) {
192             // print a whole u32 at once
193             snprintf(buf, sizeof(buf), "%08x",
194                   (uint32_t)(((unsigned char)in[i] << 24) |
195                             ((unsigned char)in[i+1] << 16) |
196                             ((unsigned char)in[i+2] << 8) |
197                             ((unsigned char)in[i+3] << 0)));
198             i += 3;
199           } else {
200             snprintf(buf, sizeof(buf), "%02x", (int)(unsigned char)in[i]);
201           }
202           out.append(buf);
203         } else {
204           if (mode != STRING) {
205             out.push_back('\'');
206             mode = STRING;
207             from = i;
208           }
209         }
210       }
211       if (mode == STRING) {
212         out.append(in.substr(from, i - from));
213         out.push_back('\'');
214       }
215       return out;
216     }
217     void Put(const rocksdb::Slice& key,
218                     const rocksdb::Slice& value) override {
219       string prefix ((key.ToString()).substr(0,1));
220       string key_to_decode ((key.ToString()).substr(2,string::npos));
221       uint64_t size = (value.ToString()).size();
222       seen += "\nPut( Prefix = " + prefix + " key = " 
223             + pretty_binary_string(key_to_decode) 
224             + " Value size = " + std::to_string(size) + ")";
225       num_seen++;
226     }
227     void SingleDelete(const rocksdb::Slice& key) override {
228       string prefix ((key.ToString()).substr(0,1));
229       string key_to_decode ((key.ToString()).substr(2,string::npos));
230       seen += "\nSingleDelete(Prefix = "+ prefix + " Key = " 
231             + pretty_binary_string(key_to_decode) + ")";
232       num_seen++;
233     }
234     void Delete(const rocksdb::Slice& key) override {
235       string prefix ((key.ToString()).substr(0,1));
236       string key_to_decode ((key.ToString()).substr(2,string::npos));
237       seen += "\nDelete( Prefix = " + prefix + " key = " 
238             + pretty_binary_string(key_to_decode) + ")";
239
240       num_seen++;
241     }
242     void Merge(const rocksdb::Slice& key,
243                       const rocksdb::Slice& value) override {
244       string prefix ((key.ToString()).substr(0,1));
245       string key_to_decode ((key.ToString()).substr(2,string::npos));
246       uint64_t size = (value.ToString()).size();
247       seen += "\nMerge( Prefix = " + prefix + " key = " 
248             + pretty_binary_string(key_to_decode) + " Value size = " 
249             + std::to_string(size) + ")";
250
251       num_seen++;
252     }
253     bool Continue() override { return num_seen < 50; }
254
255   };
256
257
258   class RocksDBTransactionImpl : public KeyValueDB::TransactionImpl {
259   public:
260     rocksdb::WriteBatch bat;
261     RocksDBStore *db;
262
263     explicit RocksDBTransactionImpl(RocksDBStore *_db);
264     void set(
265       const string &prefix,
266       const string &k,
267       const bufferlist &bl) override;
268     void set(
269       const string &prefix,
270       const char *k,
271       size_t keylen,
272       const bufferlist &bl) override;
273     void rmkey(
274       const string &prefix,
275       const string &k) override;
276     void rmkey(
277       const string &prefix,
278       const char *k,
279       size_t keylen) override;
280     void rm_single_key(
281       const string &prefix,
282       const string &k) override;
283     void rmkeys_by_prefix(
284       const string &prefix
285       ) override;
286     void rm_range_keys(
287       const string &prefix,
288       const string &start,
289       const string &end) override;
290     void merge(
291       const string& prefix,
292       const string& k,
293       const bufferlist &bl) override;
294   };
295
296   KeyValueDB::Transaction get_transaction() override {
297     return std::make_shared<RocksDBTransactionImpl>(this);
298   }
299
300   int submit_transaction(KeyValueDB::Transaction t) override;
301   int submit_transaction_sync(KeyValueDB::Transaction t) override;
302   int get(
303     const string &prefix,
304     const std::set<string> &key,
305     std::map<string, bufferlist> *out
306     ) override;
307   int get(
308     const string &prefix,
309     const string &key,
310     bufferlist *out
311     ) override;
312   int get(
313     const string &prefix,
314     const char *key,
315     size_t keylen,
316     bufferlist *out) override;
317
318
319   class RocksDBWholeSpaceIteratorImpl :
320     public KeyValueDB::WholeSpaceIteratorImpl {
321   protected:
322     rocksdb::Iterator *dbiter;
323   public:
324     explicit RocksDBWholeSpaceIteratorImpl(rocksdb::Iterator *iter) :
325       dbiter(iter) { }
326     //virtual ~RocksDBWholeSpaceIteratorImpl() { }
327     ~RocksDBWholeSpaceIteratorImpl() override;
328
329     int seek_to_first() override;
330     int seek_to_first(const string &prefix) override;
331     int seek_to_last() override;
332     int seek_to_last(const string &prefix) override;
333     int upper_bound(const string &prefix, const string &after) override;
334     int lower_bound(const string &prefix, const string &to) override;
335     bool valid() override;
336     int next() override;
337     int prev() override;
338     string key() override;
339     pair<string,string> raw_key() override;
340     bool raw_key_is_prefixed(const string &prefix) override;
341     bufferlist value() override;
342     bufferptr value_as_ptr() override;
343     int status() override;
344     size_t key_size() override;
345     size_t value_size() override;
346   };
347
348   /// Utility
349   static string combine_strings(const string &prefix, const string &value) {
350     string out = prefix;
351     out.push_back(0);
352     out.append(value);
353     return out;
354   }
355   static void combine_strings(const string &prefix,
356                               const char *key, size_t keylen,
357                               string *out) {
358     out->reserve(prefix.size() + 1 + keylen);
359     *out = prefix;
360     out->push_back(0);
361     out->append(key, keylen);
362   }
363
364   static int split_key(rocksdb::Slice in, string *prefix, string *key);
365
366   static bufferlist to_bufferlist(rocksdb::Slice in) {
367     bufferlist bl;
368     bl.append(bufferptr(in.data(), in.size()));
369     return bl;
370   }
371
372   static string past_prefix(const string &prefix);
373
374   class MergeOperatorRouter;
375   friend class MergeOperatorRouter;
376   int set_merge_operator(const std::string& prefix,
377                                  std::shared_ptr<KeyValueDB::MergeOperator> mop) override;
378   string assoc_name; ///< Name of associative operator
379
380   uint64_t get_estimated_size(map<string,uint64_t> &extra) override {
381     DIR *store_dir = opendir(path.c_str());
382     if (!store_dir) {
383       lderr(cct) << __func__ << " something happened opening the store: "
384                  << cpp_strerror(errno) << dendl;
385       return 0;
386     }
387
388     uint64_t total_size = 0;
389     uint64_t sst_size = 0;
390     uint64_t log_size = 0;
391     uint64_t misc_size = 0;
392
393     struct dirent *entry = NULL;
394     while ((entry = readdir(store_dir)) != NULL) {
395       string n(entry->d_name);
396
397       if (n == "." || n == "..")
398         continue;
399
400       string fpath = path + '/' + n;
401       struct stat s;
402       int err = stat(fpath.c_str(), &s);
403       if (err < 0)
404         err = -errno;
405       // we may race against rocksdb while reading files; this should only
406       // happen when those files are being updated, data is being shuffled
407       // and files get removed, in which case there's not much of a problem
408       // as we'll get to them next time around.
409       if (err == -ENOENT) {
410         continue;
411       }
412       if (err < 0) {
413         lderr(cct) << __func__ << " error obtaining stats for " << fpath
414                    << ": " << cpp_strerror(err) << dendl;
415         goto err;
416       }
417
418       size_t pos = n.find_last_of('.');
419       if (pos == string::npos) {
420         misc_size += s.st_size;
421         continue;
422       }
423
424       string ext = n.substr(pos+1);
425       if (ext == "sst") {
426         sst_size += s.st_size;
427       } else if (ext == "log") {
428         log_size += s.st_size;
429       } else {
430         misc_size += s.st_size;
431       }
432     }
433
434     total_size = sst_size + log_size + misc_size;
435
436     extra["sst"] = sst_size;
437     extra["log"] = log_size;
438     extra["misc"] = misc_size;
439     extra["total"] = total_size;
440
441 err:
442     closedir(store_dir);
443     return total_size;
444   }
445
446   int set_cache_size(uint64_t s) override {
447     cache_size = s;
448     set_cache_flag = true;
449     return 0;
450   }
451
452 protected:
453   WholeSpaceIterator _get_iterator() override;
454 };
455
456
457
458 #endif