Fix some bugs when testing opensds ansible
[stor4nfv.git] / src / ceph / src / kv / LevelDBStore.h
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
3 #ifndef LEVEL_DB_STORE_H
4 #define LEVEL_DB_STORE_H
5
6 #include "include/types.h"
7 #include "include/buffer_fwd.h"
8 #include "KeyValueDB.h"
9 #include <set>
10 #include <map>
11 #include <string>
12 #include "include/memory.h"
13 #include <boost/scoped_ptr.hpp>
14 #include "leveldb/db.h"
15 #include "leveldb/env.h"
16 #include "leveldb/write_batch.h"
17 #include "leveldb/slice.h"
18 #include "leveldb/cache.h"
19 #ifdef HAVE_LEVELDB_FILTER_POLICY
20 #include "leveldb/filter_policy.h"
21 #endif
22
23 #include <errno.h>
24 #include "common/errno.h"
25 #include "common/dout.h"
26 #include "include/assert.h"
27 #include "common/Formatter.h"
28 #include "common/Cond.h"
29
30 #include "common/ceph_context.h"
31
32 // reinclude our assert to clobber the system one
33 # include "include/assert.h"
34
35 class PerfCounters;
36
37 enum {
38   l_leveldb_first = 34300,
39   l_leveldb_gets,
40   l_leveldb_txns,
41   l_leveldb_get_latency,
42   l_leveldb_submit_latency,
43   l_leveldb_submit_sync_latency,
44   l_leveldb_compact,
45   l_leveldb_compact_range,
46   l_leveldb_compact_queue_merge,
47   l_leveldb_compact_queue_len,
48   l_leveldb_last,
49 };
50
51 extern leveldb::Logger *create_leveldb_ceph_logger();
52
53 class CephLevelDBLogger;
54
55 /**
56  * Uses LevelDB to implement the KeyValueDB interface
57  */
58 class LevelDBStore : public KeyValueDB {
59   CephContext *cct;
60   PerfCounters *logger;
61   CephLevelDBLogger *ceph_logger;
62   string path;
63   boost::scoped_ptr<leveldb::Cache> db_cache;
64 #ifdef HAVE_LEVELDB_FILTER_POLICY
65   boost::scoped_ptr<const leveldb::FilterPolicy> filterpolicy;
66 #endif
67   boost::scoped_ptr<leveldb::DB> db;
68
69   int do_open(ostream &out, bool create_if_missing);
70
71   // manage async compactions
72   Mutex compact_queue_lock;
73   Cond compact_queue_cond;
74   list< pair<string,string> > compact_queue;
75   bool compact_queue_stop;
76   class CompactThread : public Thread {
77     LevelDBStore *db;
78   public:
79     explicit CompactThread(LevelDBStore *d) : db(d) {}
80     void *entry() override {
81       db->compact_thread_entry();
82       return NULL;
83     }
84     friend class LevelDBStore;
85   } compact_thread;
86
87   void compact_thread_entry();
88
89   void compact_range(const string& start, const string& end) {
90     leveldb::Slice cstart(start);
91     leveldb::Slice cend(end);
92     db->CompactRange(&cstart, &cend);
93   }
94   void compact_range_async(const string& start, const string& end);
95
96 public:
97   /// compact the underlying leveldb store
98   void compact() override;
99
100   /// compact db for all keys with a given prefix
101   void compact_prefix(const string& prefix) override {
102     compact_range(prefix, past_prefix(prefix));
103   }
104   void compact_prefix_async(const string& prefix) override {
105     compact_range_async(prefix, past_prefix(prefix));
106   }
107   void compact_range(const string& prefix,
108                      const string& start, const string& end) override {
109     compact_range(combine_strings(prefix, start), combine_strings(prefix, end));
110   }
111   void compact_range_async(const string& prefix,
112                            const string& start, const string& end) override {
113     compact_range_async(combine_strings(prefix, start),
114                         combine_strings(prefix, end));
115   }
116
117
118   /**
119    * options_t: Holds options which are minimally interpreted
120    * on initialization and then passed through to LevelDB.
121    * We transform a couple of these into actual LevelDB
122    * structures, but the rest are simply passed through unchanged. See
123    * leveldb/options.h for more precise details on each.
124    *
125    * Set them after constructing the LevelDBStore, but before calling
126    * open() or create_and_open().
127    */
128   struct options_t {
129     uint64_t write_buffer_size; /// in-memory write buffer size
130     int max_open_files; /// maximum number of files LevelDB can open at once
131     uint64_t cache_size; /// size of extra decompressed cache to use
132     uint64_t block_size; /// user data per block
133     int bloom_size; /// number of bits per entry to put in a bloom filter
134     bool compression_enabled; /// whether to use libsnappy compression or not
135
136     // don't change these ones. No, seriously
137     int block_restart_interval;
138     bool error_if_exists;
139     bool paranoid_checks;
140
141     string log_file;
142
143     options_t() :
144       write_buffer_size(0), //< 0 means default
145       max_open_files(0), //< 0 means default
146       cache_size(0), //< 0 means no cache (default)
147       block_size(0), //< 0 means default
148       bloom_size(0), //< 0 means no bloom filter (default)
149       compression_enabled(true), //< set to false for no compression
150       block_restart_interval(0), //< 0 means default
151       error_if_exists(false), //< set to true if you want to check nonexistence
152       paranoid_checks(false) //< set to true if you want paranoid checks
153     {}
154   } options;
155
156   LevelDBStore(CephContext *c, const string &path) :
157     cct(c),
158     logger(NULL),
159     ceph_logger(NULL),
160     path(path),
161     db_cache(NULL),
162 #ifdef HAVE_LEVELDB_FILTER_POLICY
163     filterpolicy(NULL),
164 #endif
165     compact_queue_lock("LevelDBStore::compact_thread_lock"),
166     compact_queue_stop(false),
167     compact_thread(this),
168     options()
169   {}
170
171   ~LevelDBStore() override;
172
173   static int _test_init(const string& dir);
174   int init(string option_str="") override;
175
176   /// Opens underlying db
177   int open(ostream &out) override {
178     return do_open(out, false);
179   }
180   /// Creates underlying db if missing and opens it
181   int create_and_open(ostream &out) override {
182     return do_open(out, true);
183   }
184
185   void close() override;
186
187   PerfCounters *get_perf_counters() override
188   {
189     return logger;
190   }
191
192   class LevelDBTransactionImpl : public KeyValueDB::TransactionImpl {
193   public:
194     leveldb::WriteBatch bat;
195     LevelDBStore *db;
196     explicit LevelDBTransactionImpl(LevelDBStore *db) : db(db) {}
197     void set(
198       const string &prefix,
199       const string &k,
200       const bufferlist &bl) override;
201     using KeyValueDB::TransactionImpl::set;
202     void rmkey(
203       const string &prefix,
204       const string &k) override;
205     void rmkeys_by_prefix(
206       const string &prefix
207       ) override;
208     virtual void rm_range_keys(
209         const string &prefix,
210         const string &start,
211         const string &end) override;
212
213     using KeyValueDB::TransactionImpl::rmkey;
214   };
215
216   KeyValueDB::Transaction get_transaction() override {
217     return std::make_shared<LevelDBTransactionImpl>(this);
218   }
219
220   int submit_transaction(KeyValueDB::Transaction t) override;
221   int submit_transaction_sync(KeyValueDB::Transaction t) override;
222   int get(
223     const string &prefix,
224     const std::set<string> &key,
225     std::map<string, bufferlist> *out
226     ) override;
227
228   int get(const string &prefix, 
229     const string &key,   
230     bufferlist *value) override;
231
232   using KeyValueDB::get;
233
234   class LevelDBWholeSpaceIteratorImpl :
235     public KeyValueDB::WholeSpaceIteratorImpl {
236   protected:
237     boost::scoped_ptr<leveldb::Iterator> dbiter;
238   public:
239     explicit LevelDBWholeSpaceIteratorImpl(leveldb::Iterator *iter) :
240       dbiter(iter) { }
241     ~LevelDBWholeSpaceIteratorImpl() override { }
242
243     int seek_to_first() override {
244       dbiter->SeekToFirst();
245       return dbiter->status().ok() ? 0 : -1;
246     }
247     int seek_to_first(const string &prefix) override {
248       leveldb::Slice slice_prefix(prefix);
249       dbiter->Seek(slice_prefix);
250       return dbiter->status().ok() ? 0 : -1;
251     }
252     int seek_to_last() override {
253       dbiter->SeekToLast();
254       return dbiter->status().ok() ? 0 : -1;
255     }
256     int seek_to_last(const string &prefix) override {
257       string limit = past_prefix(prefix);
258       leveldb::Slice slice_limit(limit);
259       dbiter->Seek(slice_limit);
260
261       if (!dbiter->Valid()) {
262         dbiter->SeekToLast();
263       } else {
264         dbiter->Prev();
265       }
266       return dbiter->status().ok() ? 0 : -1;
267     }
268     int upper_bound(const string &prefix, const string &after) override {
269       lower_bound(prefix, after);
270       if (valid()) {
271         pair<string,string> key = raw_key();
272         if (key.first == prefix && key.second == after)
273           next();
274       }
275       return dbiter->status().ok() ? 0 : -1;
276     }
277     int lower_bound(const string &prefix, const string &to) override {
278       string bound = combine_strings(prefix, to);
279       leveldb::Slice slice_bound(bound);
280       dbiter->Seek(slice_bound);
281       return dbiter->status().ok() ? 0 : -1;
282     }
283     bool valid() override {
284       return dbiter->Valid();
285     }
286     int next() override {
287       if (valid())
288         dbiter->Next();
289       return dbiter->status().ok() ? 0 : -1;
290     }
291     int prev() override {
292       if (valid())
293         dbiter->Prev();
294       return dbiter->status().ok() ? 0 : -1;
295     }
296     string key() override {
297       string out_key;
298       split_key(dbiter->key(), 0, &out_key);
299       return out_key;
300     }
301     pair<string,string> raw_key() override {
302       string prefix, key;
303       split_key(dbiter->key(), &prefix, &key);
304       return make_pair(prefix, key);
305     }
306     bool raw_key_is_prefixed(const string &prefix) override {
307       leveldb::Slice key = dbiter->key();
308       if ((key.size() > prefix.length()) && (key[prefix.length()] == '\0')) {
309         return memcmp(key.data(), prefix.c_str(), prefix.length()) == 0;
310       } else {
311         return false;
312       }
313     }
314     bufferlist value() override {
315       return to_bufferlist(dbiter->value());
316     }
317
318     bufferptr value_as_ptr() override {
319       leveldb::Slice data = dbiter->value();
320       return bufferptr(data.data(), data.size());
321     }
322
323     int status() override {
324       return dbiter->status().ok() ? 0 : -1;
325     }
326   };
327
328   /// Utility
329   static string combine_strings(const string &prefix, const string &value);
330   static int split_key(leveldb::Slice in, string *prefix, string *key);
331   static bufferlist to_bufferlist(leveldb::Slice in);
332   static string past_prefix(const string &prefix) {
333     string limit = prefix;
334     limit.push_back(1);
335     return limit;
336   }
337
338   uint64_t get_estimated_size(map<string,uint64_t> &extra) override {
339     DIR *store_dir = opendir(path.c_str());
340     if (!store_dir) {
341       lderr(cct) << __func__ << " something happened opening the store: "
342                  << cpp_strerror(errno) << dendl;
343       return 0;
344     }
345
346     uint64_t total_size = 0;
347     uint64_t sst_size = 0;
348     uint64_t log_size = 0;
349     uint64_t misc_size = 0;
350
351     struct dirent *entry = NULL;
352     while ((entry = readdir(store_dir)) != NULL) {
353       string n(entry->d_name);
354
355       if (n == "." || n == "..")
356         continue;
357
358       string fpath = path + '/' + n;
359       struct stat s;
360       int err = stat(fpath.c_str(), &s);
361       if (err < 0)
362         err = -errno;
363       // we may race against leveldb while reading files; this should only
364       // happen when those files are being updated, data is being shuffled
365       // and files get removed, in which case there's not much of a problem
366       // as we'll get to them next time around.
367       if (err == -ENOENT) {
368         continue;
369       }
370       if (err < 0) {
371         lderr(cct) << __func__ << " error obtaining stats for " << fpath
372                    << ": " << cpp_strerror(err) << dendl;
373         goto err;
374       }
375
376       size_t pos = n.find_last_of('.');
377       if (pos == string::npos) {
378         misc_size += s.st_size;
379         continue;
380       }
381
382       string ext = n.substr(pos+1);
383       if (ext == "sst") {
384         sst_size += s.st_size;
385       } else if (ext == "log") {
386         log_size += s.st_size;
387       } else {
388         misc_size += s.st_size;
389       }
390     }
391
392     total_size = sst_size + log_size + misc_size;
393
394     extra["sst"] = sst_size;
395     extra["log"] = log_size;
396     extra["misc"] = misc_size;
397     extra["total"] = total_size;
398
399 err:
400     closedir(store_dir);
401     return total_size;
402   }
403
404
405 protected:
406   WholeSpaceIterator _get_iterator() override {
407     return std::make_shared<LevelDBWholeSpaceIteratorImpl>(
408         db->NewIterator(leveldb::ReadOptions()));
409   }
410
411 };
412
413 #endif