1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
3 #ifndef ROCKS_DB_STORE_H
4 #define ROCKS_DB_STORE_H
6 #include "include/types.h"
7 #include "include/buffer_fwd.h"
8 #include "KeyValueDB.h"
13 #include <boost/scoped_ptr.hpp>
14 #include "rocksdb/write_batch.h"
15 #include "rocksdb/perf_context.h"
16 #include "rocksdb/iostats_context.h"
17 #include "rocksdb/statistics.h"
18 #include "rocksdb/table.h"
20 #include "common/errno.h"
21 #include "common/dout.h"
22 #include "include/assert.h"
23 #include "common/Formatter.h"
24 #include "common/Cond.h"
26 #include "common/ceph_context.h"
30 l_rocksdb_first = 34300,
34 l_rocksdb_get_latency,
35 l_rocksdb_submit_latency,
36 l_rocksdb_submit_sync_latency,
38 l_rocksdb_compact_range,
39 l_rocksdb_compact_queue_merge,
40 l_rocksdb_compact_queue_len,
41 l_rocksdb_write_wal_time,
42 l_rocksdb_write_memtable_time,
43 l_rocksdb_write_delay_time,
44 l_rocksdb_write_pre_and_post_process_time,
59 struct BlockBasedTableOptions;
62 extern rocksdb::Logger *create_rocksdb_ceph_logger();
65 * Uses RocksDB to implement the KeyValueDB interface
67 class RocksDBStore : public KeyValueDB {
74 std::shared_ptr<rocksdb::Statistics> dbstats;
75 rocksdb::BlockBasedTableOptions bbt_opts;
78 uint64_t cache_size = 0;
79 bool set_cache_flag = false;
81 int do_open(ostream &out, bool create_if_missing);
83 // manage async compactions
84 Mutex compact_queue_lock;
85 Cond compact_queue_cond;
86 list< pair<string,string> > compact_queue;
87 bool compact_queue_stop;
88 class CompactThread : public Thread {
91 explicit CompactThread(RocksDBStore *d) : db(d) {}
92 void *entry() override {
93 db->compact_thread_entry();
96 friend class RocksDBStore;
99 void compact_thread_entry();
101 void compact_range(const string& start, const string& end);
102 void compact_range_async(const string& start, const string& end);
105 /// compact the underlying rocksdb store
106 bool compact_on_mount;
109 void compact() override;
111 int tryInterpret(const string& key, const string& val, rocksdb::Options &opt);
112 int ParseOptionsFromString(const string& opt_str, rocksdb::Options &opt);
113 static int _test_init(const string& dir);
114 int init(string options_str) override;
115 /// compact rocksdb for all keys with a given prefix
116 void compact_prefix(const string& prefix) override {
117 compact_range(prefix, past_prefix(prefix));
119 void compact_prefix_async(const string& prefix) override {
120 compact_range_async(prefix, past_prefix(prefix));
123 void compact_range(const string& prefix, const string& start, const string& end) override {
124 compact_range(combine_strings(prefix, start), combine_strings(prefix, end));
126 void compact_range_async(const string& prefix, const string& start, const string& end) override {
127 compact_range_async(combine_strings(prefix, start), combine_strings(prefix, end));
130 RocksDBStore(CephContext *c, const string &path, void *p) :
136 env(static_cast<rocksdb::Env*>(p)),
138 compact_queue_lock("RocksDBStore::compact_thread_lock"),
139 compact_queue_stop(false),
140 compact_thread(this),
141 compact_on_mount(false),
143 enable_rmrange(cct->_conf->rocksdb_enable_rmrange)
146 ~RocksDBStore() override;
148 static bool check_omap_dir(string &omap_dir);
149 /// Opens underlying db
150 int open(ostream &out) override {
151 return do_open(out, false);
153 /// Creates underlying db if missing and opens it
154 int create_and_open(ostream &out) override;
156 void close() override;
158 void split_stats(const std::string &s, char delim, std::vector<std::string> &elems);
159 void get_statistics(Formatter *f) override;
161 PerfCounters *get_perf_counters() override
166 struct RocksWBHandler: public rocksdb::WriteBatch::Handler {
169 static string pretty_binary_string(const string& in) {
172 out.reserve(in.length() * 3);
173 enum { NONE, HEX, STRING } mode = NONE;
174 unsigned from = 0, i;
175 for (i=0; i < in.length(); ++i) {
176 if ((in[i] < 32 || (unsigned char)in[i] > 126) ||
177 (mode == HEX && in.length() - i >= 4 &&
178 ((in[i] < 32 || (unsigned char)in[i] > 126) ||
179 (in[i+1] < 32 || (unsigned char)in[i+1] > 126) ||
180 (in[i+2] < 32 || (unsigned char)in[i+2] > 126) ||
181 (in[i+3] < 32 || (unsigned char)in[i+3] > 126)))) {
183 if (mode == STRING) {
184 out.append(in.substr(from, i - from));
191 if (in.length() - i >= 4) {
192 // print a whole u32 at once
193 snprintf(buf, sizeof(buf), "%08x",
194 (uint32_t)(((unsigned char)in[i] << 24) |
195 ((unsigned char)in[i+1] << 16) |
196 ((unsigned char)in[i+2] << 8) |
197 ((unsigned char)in[i+3] << 0)));
200 snprintf(buf, sizeof(buf), "%02x", (int)(unsigned char)in[i]);
204 if (mode != STRING) {
211 if (mode == STRING) {
212 out.append(in.substr(from, i - from));
217 void Put(const rocksdb::Slice& key,
218 const rocksdb::Slice& value) override {
219 string prefix ((key.ToString()).substr(0,1));
220 string key_to_decode ((key.ToString()).substr(2,string::npos));
221 uint64_t size = (value.ToString()).size();
222 seen += "\nPut( Prefix = " + prefix + " key = "
223 + pretty_binary_string(key_to_decode)
224 + " Value size = " + std::to_string(size) + ")";
227 void SingleDelete(const rocksdb::Slice& key) override {
228 string prefix ((key.ToString()).substr(0,1));
229 string key_to_decode ((key.ToString()).substr(2,string::npos));
230 seen += "\nSingleDelete(Prefix = "+ prefix + " Key = "
231 + pretty_binary_string(key_to_decode) + ")";
234 void Delete(const rocksdb::Slice& key) override {
235 string prefix ((key.ToString()).substr(0,1));
236 string key_to_decode ((key.ToString()).substr(2,string::npos));
237 seen += "\nDelete( Prefix = " + prefix + " key = "
238 + pretty_binary_string(key_to_decode) + ")";
242 void Merge(const rocksdb::Slice& key,
243 const rocksdb::Slice& value) override {
244 string prefix ((key.ToString()).substr(0,1));
245 string key_to_decode ((key.ToString()).substr(2,string::npos));
246 uint64_t size = (value.ToString()).size();
247 seen += "\nMerge( Prefix = " + prefix + " key = "
248 + pretty_binary_string(key_to_decode) + " Value size = "
249 + std::to_string(size) + ")";
253 bool Continue() override { return num_seen < 50; }
258 class RocksDBTransactionImpl : public KeyValueDB::TransactionImpl {
260 rocksdb::WriteBatch bat;
263 explicit RocksDBTransactionImpl(RocksDBStore *_db);
265 const string &prefix,
267 const bufferlist &bl) override;
269 const string &prefix,
272 const bufferlist &bl) override;
274 const string &prefix,
275 const string &k) override;
277 const string &prefix,
279 size_t keylen) override;
281 const string &prefix,
282 const string &k) override;
283 void rmkeys_by_prefix(
287 const string &prefix,
289 const string &end) override;
291 const string& prefix,
293 const bufferlist &bl) override;
296 KeyValueDB::Transaction get_transaction() override {
297 return std::make_shared<RocksDBTransactionImpl>(this);
300 int submit_transaction(KeyValueDB::Transaction t) override;
301 int submit_transaction_sync(KeyValueDB::Transaction t) override;
303 const string &prefix,
304 const std::set<string> &key,
305 std::map<string, bufferlist> *out
308 const string &prefix,
313 const string &prefix,
316 bufferlist *out) override;
319 class RocksDBWholeSpaceIteratorImpl :
320 public KeyValueDB::WholeSpaceIteratorImpl {
322 rocksdb::Iterator *dbiter;
324 explicit RocksDBWholeSpaceIteratorImpl(rocksdb::Iterator *iter) :
326 //virtual ~RocksDBWholeSpaceIteratorImpl() { }
327 ~RocksDBWholeSpaceIteratorImpl() override;
329 int seek_to_first() override;
330 int seek_to_first(const string &prefix) override;
331 int seek_to_last() override;
332 int seek_to_last(const string &prefix) override;
333 int upper_bound(const string &prefix, const string &after) override;
334 int lower_bound(const string &prefix, const string &to) override;
335 bool valid() override;
338 string key() override;
339 pair<string,string> raw_key() override;
340 bool raw_key_is_prefixed(const string &prefix) override;
341 bufferlist value() override;
342 bufferptr value_as_ptr() override;
343 int status() override;
344 size_t key_size() override;
345 size_t value_size() override;
349 static string combine_strings(const string &prefix, const string &value) {
355 static void combine_strings(const string &prefix,
356 const char *key, size_t keylen,
358 out->reserve(prefix.size() + 1 + keylen);
361 out->append(key, keylen);
364 static int split_key(rocksdb::Slice in, string *prefix, string *key);
366 static bufferlist to_bufferlist(rocksdb::Slice in) {
368 bl.append(bufferptr(in.data(), in.size()));
372 static string past_prefix(const string &prefix);
374 class MergeOperatorRouter;
375 friend class MergeOperatorRouter;
376 int set_merge_operator(const std::string& prefix,
377 std::shared_ptr<KeyValueDB::MergeOperator> mop) override;
378 string assoc_name; ///< Name of associative operator
380 uint64_t get_estimated_size(map<string,uint64_t> &extra) override {
381 DIR *store_dir = opendir(path.c_str());
383 lderr(cct) << __func__ << " something happened opening the store: "
384 << cpp_strerror(errno) << dendl;
388 uint64_t total_size = 0;
389 uint64_t sst_size = 0;
390 uint64_t log_size = 0;
391 uint64_t misc_size = 0;
393 struct dirent *entry = NULL;
394 while ((entry = readdir(store_dir)) != NULL) {
395 string n(entry->d_name);
397 if (n == "." || n == "..")
400 string fpath = path + '/' + n;
402 int err = stat(fpath.c_str(), &s);
405 // we may race against rocksdb while reading files; this should only
406 // happen when those files are being updated, data is being shuffled
407 // and files get removed, in which case there's not much of a problem
408 // as we'll get to them next time around.
409 if (err == -ENOENT) {
413 lderr(cct) << __func__ << " error obtaining stats for " << fpath
414 << ": " << cpp_strerror(err) << dendl;
418 size_t pos = n.find_last_of('.');
419 if (pos == string::npos) {
420 misc_size += s.st_size;
424 string ext = n.substr(pos+1);
426 sst_size += s.st_size;
427 } else if (ext == "log") {
428 log_size += s.st_size;
430 misc_size += s.st_size;
434 total_size = sst_size + log_size + misc_size;
436 extra["sst"] = sst_size;
437 extra["log"] = log_size;
438 extra["misc"] = misc_size;
439 extra["total"] = total_size;
446 int set_cache_size(uint64_t s) override {
448 set_cache_flag = true;
453 WholeSpaceIterator _get_iterator() override;