Fix some bugs when testing opensds ansible
[stor4nfv.git] / src / ceph / src / kv / RocksDBStore.cc
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
3
4 #include <set>
5 #include <map>
6 #include <string>
7 #include <memory>
8 #include <errno.h>
9 #include <unistd.h>
10 #include <sys/types.h>
11 #include <sys/stat.h>
12
13 #include "rocksdb/db.h"
14 #include "rocksdb/table.h"
15 #include "rocksdb/env.h"
16 #include "rocksdb/slice.h"
17 #include "rocksdb/cache.h"
18 #include "rocksdb/filter_policy.h"
19 #include "rocksdb/utilities/convenience.h"
20 #include "rocksdb/merge_operator.h"
21 using std::string;
22 #include "common/perf_counters.h"
23 #include "common/debug.h"
24 #include "include/str_list.h"
25 #include "include/stringify.h"
26 #include "include/str_map.h"
27 #include "KeyValueDB.h"
28 #include "RocksDBStore.h"
29
30 #include "common/debug.h"
31
32 #define dout_context cct
33 #define dout_subsys ceph_subsys_rocksdb
34 #undef dout_prefix
35 #define dout_prefix *_dout << "rocksdb: "
36
37 static rocksdb::SliceParts prepare_sliceparts(const bufferlist &bl,
38                                               vector<rocksdb::Slice> *slices)
39 {
40   unsigned n = 0;
41   for (auto& buf : bl.buffers()) {
42     (*slices)[n].data_ = buf.c_str();
43     (*slices)[n].size_ = buf.length();
44     n++;
45   }
46   return rocksdb::SliceParts(slices->data(), slices->size());
47 }
48
49 //
50 // One of these per rocksdb instance, implements the merge operator prefix stuff
51 //
52 class RocksDBStore::MergeOperatorRouter : public rocksdb::AssociativeMergeOperator {
53   RocksDBStore& store;
54   public:
55   const char *Name() const override {
56     // Construct a name that rocksDB will validate against. We want to
57     // do this in a way that doesn't constrain the ordering of calls
58     // to set_merge_operator, so sort the merge operators and then
59     // construct a name from all of those parts.
60     store.assoc_name.clear();
61     map<std::string,std::string> names;
62     for (auto& p : store.merge_ops) names[p.first] = p.second->name();
63     for (auto& p : names) {
64       store.assoc_name += '.';
65       store.assoc_name += p.first;
66       store.assoc_name += ':';
67       store.assoc_name += p.second;
68     }
69     return store.assoc_name.c_str();
70   }
71
72   MergeOperatorRouter(RocksDBStore &_store) : store(_store) {}
73
74   bool Merge(const rocksdb::Slice& key,
75                      const rocksdb::Slice* existing_value,
76                      const rocksdb::Slice& value,
77                      std::string* new_value,
78                      rocksdb::Logger* logger) const override {
79     // Check each prefix
80     for (auto& p : store.merge_ops) {
81       if (p.first.compare(0, p.first.length(),
82                           key.data(), p.first.length()) == 0 &&
83           key.data()[p.first.length()] == 0) {
84         if (existing_value) {
85           p.second->merge(existing_value->data(), existing_value->size(),
86                           value.data(), value.size(),
87                           new_value);
88         } else {
89           p.second->merge_nonexistent(value.data(), value.size(), new_value);
90         }
91         break;
92       }
93     }
94     return true; // OK :)
95   }
96
97 };
98
99 int RocksDBStore::set_merge_operator(
100   const string& prefix,
101   std::shared_ptr<KeyValueDB::MergeOperator> mop)
102 {
103   // If you fail here, it's because you can't do this on an open database
104   assert(db == nullptr);
105   merge_ops.push_back(std::make_pair(prefix,mop));
106   return 0;
107 }
108
109 class CephRocksdbLogger : public rocksdb::Logger {
110   CephContext *cct;
111 public:
112   explicit CephRocksdbLogger(CephContext *c) : cct(c) {
113     cct->get();
114   }
115   ~CephRocksdbLogger() override {
116     cct->put();
117   }
118
119   // Write an entry to the log file with the specified format.
120   void Logv(const char* format, va_list ap) override {
121     Logv(rocksdb::INFO_LEVEL, format, ap);
122   }
123
124   // Write an entry to the log file with the specified log level
125   // and format.  Any log with level under the internal log level
126   // of *this (see @SetInfoLogLevel and @GetInfoLogLevel) will not be
127   // printed.
128   void Logv(const rocksdb::InfoLogLevel log_level, const char* format,
129             va_list ap) override {
130     int v = rocksdb::NUM_INFO_LOG_LEVELS - log_level - 1;
131     dout(v);
132     char buf[65536];
133     vsnprintf(buf, sizeof(buf), format, ap);
134     *_dout << buf << dendl;
135   }
136 };
137
138 rocksdb::Logger *create_rocksdb_ceph_logger()
139 {
140   return new CephRocksdbLogger(g_ceph_context);
141 }
142
143 static int string2bool(const string &val, bool &b_val)
144 {
145   if (strcasecmp(val.c_str(), "false") == 0) {
146     b_val = false;
147     return 0;
148   } else if (strcasecmp(val.c_str(), "true") == 0) {
149     b_val = true;
150     return 0;
151   } else {
152     std::string err;
153     int b = strict_strtol(val.c_str(), 10, &err);
154     if (!err.empty())
155       return -EINVAL;
156     b_val = !!b;
157     return 0;
158   }
159 }
160   
161 int RocksDBStore::tryInterpret(const string &key, const string &val, rocksdb::Options &opt)
162 {
163   if (key == "compaction_threads") {
164     std::string err;
165     int f = strict_sistrtoll(val.c_str(), &err);
166     if (!err.empty())
167       return -EINVAL;
168     //Low priority threadpool is used for compaction
169     opt.env->SetBackgroundThreads(f, rocksdb::Env::Priority::LOW);
170   } else if (key == "flusher_threads") {
171     std::string err;
172     int f = strict_sistrtoll(val.c_str(), &err);
173     if (!err.empty())
174       return -EINVAL;
175     //High priority threadpool is used for flusher
176     opt.env->SetBackgroundThreads(f, rocksdb::Env::Priority::HIGH);
177   } else if (key == "compact_on_mount") {
178     int ret = string2bool(val, compact_on_mount);
179     if (ret != 0)
180       return ret;
181   } else if (key == "disableWAL") {
182     int ret = string2bool(val, disableWAL);
183     if (ret != 0)
184       return ret;
185   } else {
186     //unrecognize config options.
187     return -EINVAL;
188   }
189   return 0;
190 }
191
192 int RocksDBStore::ParseOptionsFromString(const string &opt_str, rocksdb::Options &opt)
193 {
194   map<string, string> str_map;
195   int r = get_str_map(opt_str, &str_map, ",\n;");
196   if (r < 0)
197     return r;
198   map<string, string>::iterator it;
199   for(it = str_map.begin(); it != str_map.end(); ++it) {
200     string this_opt = it->first + "=" + it->second;
201     rocksdb::Status status = rocksdb::GetOptionsFromString(opt, this_opt , &opt); 
202     if (!status.ok()) {
203       //unrecognized by rocksdb, try to interpret by ourselves.
204       r = tryInterpret(it->first, it->second, opt);
205       if (r < 0) {
206         derr << status.ToString() << dendl;
207         return -EINVAL;
208       }
209     }
210     lgeneric_dout(cct, 0) << " set rocksdb option " << it->first
211                           << " = " << it->second << dendl;
212   }
213   return 0;
214 }
215
216 int RocksDBStore::init(string _options_str)
217 {
218   options_str = _options_str;
219   rocksdb::Options opt;
220   //try parse options
221   if (options_str.length()) {
222     int r = ParseOptionsFromString(options_str, opt);
223     if (r != 0) {
224       return -EINVAL;
225     }
226   }
227   return 0;
228 }
229
230 int RocksDBStore::create_and_open(ostream &out)
231 {
232   if (env) {
233     unique_ptr<rocksdb::Directory> dir;
234     env->NewDirectory(path, &dir);
235   } else {
236     int r = ::mkdir(path.c_str(), 0755);
237     if (r < 0)
238       r = -errno;
239     if (r < 0 && r != -EEXIST) {
240       derr << __func__ << " failed to create " << path << ": " << cpp_strerror(r)
241            << dendl;
242       return r;
243     }
244   }
245   return do_open(out, true);
246 }
247
248 int RocksDBStore::do_open(ostream &out, bool create_if_missing)
249 {
250   rocksdb::Options opt;
251   rocksdb::Status status;
252
253   if (options_str.length()) {
254     int r = ParseOptionsFromString(options_str, opt);
255     if (r != 0) {
256       return -EINVAL;
257     }
258   }
259
260   if (g_conf->rocksdb_perf)  {
261     dbstats = rocksdb::CreateDBStatistics();
262     opt.statistics = dbstats;
263   }
264
265   opt.create_if_missing = create_if_missing;
266   if (g_conf->rocksdb_separate_wal_dir) {
267     opt.wal_dir = path + ".wal";
268   }
269   if (g_conf->get_val<std::string>("rocksdb_db_paths").length()) {
270     list<string> paths;
271     get_str_list(g_conf->get_val<std::string>("rocksdb_db_paths"), "; \t", paths);
272     for (auto& p : paths) {
273       size_t pos = p.find(',');
274       if (pos == std::string::npos) {
275         derr << __func__ << " invalid db path item " << p << " in "
276              << g_conf->get_val<std::string>("rocksdb_db_paths") << dendl;
277         return -EINVAL;
278       }
279       string path = p.substr(0, pos);
280       string size_str = p.substr(pos + 1);
281       uint64_t size = atoll(size_str.c_str());
282       if (!size) {
283         derr << __func__ << " invalid db path item " << p << " in "
284              << g_conf->get_val<std::string>("rocksdb_db_paths") << dendl;
285         return -EINVAL;
286       }
287       opt.db_paths.push_back(rocksdb::DbPath(path, size));
288       dout(10) << __func__ << " db_path " << path << " size " << size << dendl;
289     }
290   }
291
292   if (g_conf->rocksdb_log_to_ceph_log) {
293     opt.info_log.reset(new CephRocksdbLogger(g_ceph_context));
294   }
295
296   if (priv) {
297     dout(10) << __func__ << " using custom Env " << priv << dendl;
298     opt.env = static_cast<rocksdb::Env*>(priv);
299   }
300
301   // caches
302   if (!set_cache_flag) {
303     cache_size = g_conf->rocksdb_cache_size;
304   }
305   uint64_t row_cache_size = cache_size * g_conf->rocksdb_cache_row_ratio;
306   uint64_t block_cache_size = cache_size - row_cache_size;
307
308   if (block_cache_size == 0) {
309     // disable block cache
310     dout(10) << __func__ << " block_cache_size " << block_cache_size
311              << ", setting no_block_cache " << dendl;
312     bbt_opts.no_block_cache = true;
313   } else {
314     if (g_conf->rocksdb_cache_type == "lru") {
315       bbt_opts.block_cache = rocksdb::NewLRUCache(
316         block_cache_size,
317         g_conf->rocksdb_cache_shard_bits);
318     } else if (g_conf->rocksdb_cache_type == "clock") {
319       bbt_opts.block_cache = rocksdb::NewClockCache(
320         block_cache_size,
321         g_conf->rocksdb_cache_shard_bits);
322     } else {
323       derr << "unrecognized rocksdb_cache_type '" << g_conf->rocksdb_cache_type
324         << "'" << dendl;
325       return -EINVAL;
326     }
327   }
328   bbt_opts.block_size = g_conf->rocksdb_block_size;
329
330   if (row_cache_size > 0)
331     opt.row_cache = rocksdb::NewLRUCache(row_cache_size,
332                                      g_conf->rocksdb_cache_shard_bits);
333   uint64_t bloom_bits = g_conf->get_val<uint64_t>("rocksdb_bloom_bits_per_key");
334   if (bloom_bits > 0) {
335     dout(10) << __func__ << " set bloom filter bits per key to "
336              << bloom_bits << dendl;
337     bbt_opts.filter_policy.reset(rocksdb::NewBloomFilterPolicy(bloom_bits));
338   }
339   if (g_conf->get_val<std::string>("rocksdb_index_type") == "binary_search")
340     bbt_opts.index_type = rocksdb::BlockBasedTableOptions::IndexType::kBinarySearch;
341   if (g_conf->get_val<std::string>("rocksdb_index_type") == "hash_search")
342     bbt_opts.index_type = rocksdb::BlockBasedTableOptions::IndexType::kHashSearch;
343   if (g_conf->get_val<std::string>("rocksdb_index_type") == "two_level")
344     bbt_opts.index_type = rocksdb::BlockBasedTableOptions::IndexType::kTwoLevelIndexSearch;
345   bbt_opts.cache_index_and_filter_blocks = 
346       g_conf->get_val<bool>("rocksdb_cache_index_and_filter_blocks");
347   bbt_opts.cache_index_and_filter_blocks_with_high_priority = 
348       g_conf->get_val<bool>("rocksdb_cache_index_and_filter_blocks_with_high_priority");
349   bbt_opts.partition_filters = g_conf->get_val<bool>("rocksdb_partition_filters");
350   if (g_conf->get_val<uint64_t>("rocksdb_metadata_block_size") > 0)
351     bbt_opts.metadata_block_size = g_conf->get_val<uint64_t>("rocksdb_metadata_block_size");
352   bbt_opts.pin_l0_filter_and_index_blocks_in_cache = 
353       g_conf->get_val<bool>("rocksdb_pin_l0_filter_and_index_blocks_in_cache");
354
355   opt.table_factory.reset(rocksdb::NewBlockBasedTableFactory(bbt_opts));
356   dout(10) << __func__ << " block size " << g_conf->rocksdb_block_size
357            << ", block_cache size " << prettybyte_t(block_cache_size)
358            << ", row_cache size " << prettybyte_t(row_cache_size)
359            << "; shards "
360            << (1 << g_conf->rocksdb_cache_shard_bits)
361            << ", type " << g_conf->rocksdb_cache_type
362            << dendl;
363
364   opt.merge_operator.reset(new MergeOperatorRouter(*this));
365   status = rocksdb::DB::Open(opt, path, &db);
366   if (!status.ok()) {
367     derr << status.ToString() << dendl;
368     return -EINVAL;
369   }
370   
371   PerfCountersBuilder plb(g_ceph_context, "rocksdb", l_rocksdb_first, l_rocksdb_last);
372   plb.add_u64_counter(l_rocksdb_gets, "get", "Gets");
373   plb.add_u64_counter(l_rocksdb_txns, "submit_transaction", "Submit transactions");
374   plb.add_u64_counter(l_rocksdb_txns_sync, "submit_transaction_sync", "Submit transactions sync");
375   plb.add_time_avg(l_rocksdb_get_latency, "get_latency", "Get latency");
376   plb.add_time_avg(l_rocksdb_submit_latency, "submit_latency", "Submit Latency");
377   plb.add_time_avg(l_rocksdb_submit_sync_latency, "submit_sync_latency", "Submit Sync Latency");
378   plb.add_u64_counter(l_rocksdb_compact, "compact", "Compactions");
379   plb.add_u64_counter(l_rocksdb_compact_range, "compact_range", "Compactions by range");
380   plb.add_u64_counter(l_rocksdb_compact_queue_merge, "compact_queue_merge", "Mergings of ranges in compaction queue");
381   plb.add_u64(l_rocksdb_compact_queue_len, "compact_queue_len", "Length of compaction queue");
382   plb.add_time_avg(l_rocksdb_write_wal_time, "rocksdb_write_wal_time", "Rocksdb write wal time");
383   plb.add_time_avg(l_rocksdb_write_memtable_time, "rocksdb_write_memtable_time", "Rocksdb write memtable time");
384   plb.add_time_avg(l_rocksdb_write_delay_time, "rocksdb_write_delay_time", "Rocksdb write delay time");
385   plb.add_time_avg(l_rocksdb_write_pre_and_post_process_time, 
386       "rocksdb_write_pre_and_post_time", "total time spent on writing a record, excluding write process");
387   logger = plb.create_perf_counters();
388   cct->get_perfcounters_collection()->add(logger);
389
390   if (compact_on_mount) {
391     derr << "Compacting rocksdb store..." << dendl;
392     compact();
393     derr << "Finished compacting rocksdb store" << dendl;
394   }
395   return 0;
396 }
397
398 int RocksDBStore::_test_init(const string& dir)
399 {
400   rocksdb::Options options;
401   options.create_if_missing = true;
402   rocksdb::DB *db;
403   rocksdb::Status status = rocksdb::DB::Open(options, dir, &db);
404   delete db;
405   db = nullptr;
406   return status.ok() ? 0 : -EIO;
407 }
408
409 RocksDBStore::~RocksDBStore()
410 {
411   close();
412   delete logger;
413
414   // Ensure db is destroyed before dependent db_cache and filterpolicy
415   delete db;
416   db = nullptr;
417
418   if (priv) {
419     delete static_cast<rocksdb::Env*>(priv);
420   }
421 }
422
423 void RocksDBStore::close()
424 {
425   // stop compaction thread
426   compact_queue_lock.Lock();
427   if (compact_thread.is_started()) {
428     compact_queue_stop = true;
429     compact_queue_cond.Signal();
430     compact_queue_lock.Unlock();
431     compact_thread.join();
432   } else {
433     compact_queue_lock.Unlock();
434   }
435
436   if (logger)
437     cct->get_perfcounters_collection()->remove(logger);
438 }
439
440 void RocksDBStore::split_stats(const std::string &s, char delim, std::vector<std::string> &elems) {
441     std::stringstream ss;
442     ss.str(s);
443     std::string item;
444     while (std::getline(ss, item, delim)) {
445         elems.push_back(item);
446     }
447 }
448
449 void RocksDBStore::get_statistics(Formatter *f)
450 {
451   if (!g_conf->rocksdb_perf)  {
452     dout(20) << __func__ << "RocksDB perf is disabled, can't probe for stats"
453              << dendl;
454     return;
455   }
456
457   if (g_conf->rocksdb_collect_compaction_stats) {
458     std::string stat_str;
459     bool status = db->GetProperty("rocksdb.stats", &stat_str);
460     if (status) {
461       f->open_object_section("rocksdb_statistics");
462       f->dump_string("rocksdb_compaction_statistics", "");
463       vector<string> stats;
464       split_stats(stat_str, '\n', stats);
465       for (auto st :stats) {
466         f->dump_string("", st);
467       }
468       f->close_section();
469     }
470   }
471   if (g_conf->rocksdb_collect_extended_stats) {
472     if (dbstats) {
473       f->open_object_section("rocksdb_extended_statistics");
474       string stat_str = dbstats->ToString();
475       vector<string> stats;
476       split_stats(stat_str, '\n', stats);
477       f->dump_string("rocksdb_extended_statistics", "");
478       for (auto st :stats) {
479         f->dump_string(".", st);
480       }
481       f->close_section();
482     }
483     f->open_object_section("rocksdbstore_perf_counters");
484     logger->dump_formatted(f,0);
485     f->close_section();
486   }
487   if (g_conf->rocksdb_collect_memory_stats) {
488     f->open_object_section("rocksdb_memtable_statistics");
489     std::string str(stringify(bbt_opts.block_cache->GetUsage()));
490     f->dump_string("block_cache_usage", str.data());
491     str.clear();
492     str.append(stringify(bbt_opts.block_cache->GetPinnedUsage()));
493     f->dump_string("block_cache_pinned_blocks_usage", str);
494     str.clear();
495     db->GetProperty("rocksdb.cur-size-all-mem-tables", &str);
496     f->dump_string("rocksdb_memtable_usage", str);
497     f->close_section();
498   }
499 }
500
501 int RocksDBStore::submit_transaction(KeyValueDB::Transaction t)
502 {
503   utime_t start = ceph_clock_now();
504   // enable rocksdb breakdown
505   // considering performance overhead, default is disabled
506   if (g_conf->rocksdb_perf) {
507     rocksdb::SetPerfLevel(rocksdb::PerfLevel::kEnableTimeExceptForMutex);
508     rocksdb::perf_context.Reset();
509   }
510
511   RocksDBTransactionImpl * _t =
512     static_cast<RocksDBTransactionImpl *>(t.get());
513   rocksdb::WriteOptions woptions;
514   woptions.disableWAL = disableWAL;
515   lgeneric_subdout(cct, rocksdb, 30) << __func__;
516   RocksWBHandler bat_txc;
517   _t->bat.Iterate(&bat_txc);
518   *_dout << " Rocksdb transaction: " << bat_txc.seen << dendl;
519   
520   rocksdb::Status s = db->Write(woptions, &_t->bat);
521   if (!s.ok()) {
522     RocksWBHandler rocks_txc;
523     _t->bat.Iterate(&rocks_txc);
524     derr << __func__ << " error: " << s.ToString() << " code = " << s.code()
525          << " Rocksdb transaction: " << rocks_txc.seen << dendl;
526   }
527   utime_t lat = ceph_clock_now() - start;
528
529   if (g_conf->rocksdb_perf) {
530     utime_t write_memtable_time;
531     utime_t write_delay_time;
532     utime_t write_wal_time;
533     utime_t write_pre_and_post_process_time;
534     write_wal_time.set_from_double(
535         static_cast<double>(rocksdb::perf_context.write_wal_time)/1000000000);
536     write_memtable_time.set_from_double(
537         static_cast<double>(rocksdb::perf_context.write_memtable_time)/1000000000);
538     write_delay_time.set_from_double(
539         static_cast<double>(rocksdb::perf_context.write_delay_time)/1000000000);
540     write_pre_and_post_process_time.set_from_double(
541         static_cast<double>(rocksdb::perf_context.write_pre_and_post_process_time)/1000000000);
542     logger->tinc(l_rocksdb_write_memtable_time, write_memtable_time);
543     logger->tinc(l_rocksdb_write_delay_time, write_delay_time);
544     logger->tinc(l_rocksdb_write_wal_time, write_wal_time);
545     logger->tinc(l_rocksdb_write_pre_and_post_process_time, write_pre_and_post_process_time);
546   }
547
548   logger->inc(l_rocksdb_txns);
549   logger->tinc(l_rocksdb_submit_latency, lat);
550
551   return s.ok() ? 0 : -1;
552 }
553
554 int RocksDBStore::submit_transaction_sync(KeyValueDB::Transaction t)
555 {
556   utime_t start = ceph_clock_now();
557   // enable rocksdb breakdown
558   // considering performance overhead, default is disabled
559   if (g_conf->rocksdb_perf) {
560     rocksdb::SetPerfLevel(rocksdb::PerfLevel::kEnableTimeExceptForMutex);
561     rocksdb::perf_context.Reset();
562   }
563
564   RocksDBTransactionImpl * _t =
565     static_cast<RocksDBTransactionImpl *>(t.get());
566   rocksdb::WriteOptions woptions;
567   woptions.sync = true;
568   woptions.disableWAL = disableWAL;
569   lgeneric_subdout(cct, rocksdb, 30) << __func__;
570   RocksWBHandler bat_txc;
571   _t->bat.Iterate(&bat_txc);
572   *_dout << " Rocksdb transaction: " << bat_txc.seen << dendl;
573
574   rocksdb::Status s = db->Write(woptions, &_t->bat);
575   if (!s.ok()) {
576     RocksWBHandler rocks_txc;
577     _t->bat.Iterate(&rocks_txc);
578     derr << __func__ << " error: " << s.ToString() << " code = " << s.code()
579          << " Rocksdb transaction: " << rocks_txc.seen << dendl;
580   }
581   utime_t lat = ceph_clock_now() - start;
582
583   if (g_conf->rocksdb_perf) {
584     utime_t write_memtable_time;
585     utime_t write_delay_time;
586     utime_t write_wal_time;
587     utime_t write_pre_and_post_process_time;
588     write_wal_time.set_from_double(
589         static_cast<double>(rocksdb::perf_context.write_wal_time)/1000000000);
590     write_memtable_time.set_from_double(
591         static_cast<double>(rocksdb::perf_context.write_memtable_time)/1000000000);
592     write_delay_time.set_from_double(
593         static_cast<double>(rocksdb::perf_context.write_delay_time)/1000000000);
594     write_pre_and_post_process_time.set_from_double(
595         static_cast<double>(rocksdb::perf_context.write_pre_and_post_process_time)/1000000000);
596     logger->tinc(l_rocksdb_write_memtable_time, write_memtable_time);
597     logger->tinc(l_rocksdb_write_delay_time, write_delay_time);
598     logger->tinc(l_rocksdb_write_wal_time, write_wal_time);
599     logger->tinc(l_rocksdb_write_pre_and_post_process_time, write_pre_and_post_process_time);
600   }
601
602   logger->inc(l_rocksdb_txns_sync);
603   logger->tinc(l_rocksdb_submit_sync_latency, lat);
604
605   return s.ok() ? 0 : -1;
606 }
607
608 RocksDBStore::RocksDBTransactionImpl::RocksDBTransactionImpl(RocksDBStore *_db)
609 {
610   db = _db;
611 }
612
613 void RocksDBStore::RocksDBTransactionImpl::set(
614   const string &prefix,
615   const string &k,
616   const bufferlist &to_set_bl)
617 {
618   string key = combine_strings(prefix, k);
619
620   // bufferlist::c_str() is non-constant, so we can't call c_str()
621   if (to_set_bl.is_contiguous() && to_set_bl.length() > 0) {
622     bat.Put(rocksdb::Slice(key),
623              rocksdb::Slice(to_set_bl.buffers().front().c_str(),
624                             to_set_bl.length()));
625   } else {
626     rocksdb::Slice key_slice(key);
627     vector<rocksdb::Slice> value_slices(to_set_bl.buffers().size());
628     bat.Put(nullptr, rocksdb::SliceParts(&key_slice, 1),
629             prepare_sliceparts(to_set_bl, &value_slices));
630   }
631 }
632
633 void RocksDBStore::RocksDBTransactionImpl::set(
634   const string &prefix,
635   const char *k, size_t keylen,
636   const bufferlist &to_set_bl)
637 {
638   string key;
639   combine_strings(prefix, k, keylen, &key);
640
641   // bufferlist::c_str() is non-constant, so we can't call c_str()
642   if (to_set_bl.is_contiguous() && to_set_bl.length() > 0) {
643     bat.Put(rocksdb::Slice(key),
644              rocksdb::Slice(to_set_bl.buffers().front().c_str(),
645                             to_set_bl.length()));
646   } else {
647     rocksdb::Slice key_slice(key);
648     vector<rocksdb::Slice> value_slices(to_set_bl.buffers().size());
649     bat.Put(nullptr, rocksdb::SliceParts(&key_slice, 1),
650             prepare_sliceparts(to_set_bl, &value_slices));
651   }
652 }
653
654 void RocksDBStore::RocksDBTransactionImpl::rmkey(const string &prefix,
655                                                  const string &k)
656 {
657   bat.Delete(combine_strings(prefix, k));
658 }
659
660 void RocksDBStore::RocksDBTransactionImpl::rmkey(const string &prefix,
661                                                  const char *k,
662                                                  size_t keylen)
663 {
664   string key;
665   combine_strings(prefix, k, keylen, &key);
666   bat.Delete(key);
667 }
668
669 void RocksDBStore::RocksDBTransactionImpl::rm_single_key(const string &prefix,
670                                                          const string &k)
671 {
672   bat.SingleDelete(combine_strings(prefix, k));
673 }
674
675 void RocksDBStore::RocksDBTransactionImpl::rmkeys_by_prefix(const string &prefix)
676 {
677   if (db->enable_rmrange) {
678     string endprefix = prefix;
679     endprefix.push_back('\x01');
680     bat.DeleteRange(combine_strings(prefix, string()),
681                     combine_strings(endprefix, string()));
682   } else {
683     KeyValueDB::Iterator it = db->get_iterator(prefix);
684     for (it->seek_to_first();
685          it->valid();
686          it->next()) {
687       bat.Delete(combine_strings(prefix, it->key()));
688     }
689   }
690 }
691
692 void RocksDBStore::RocksDBTransactionImpl::rm_range_keys(const string &prefix,
693                                                          const string &start,
694                                                          const string &end)
695 {
696   if (db->enable_rmrange) {
697     bat.DeleteRange(combine_strings(prefix, start), combine_strings(prefix, end));
698   } else {
699     auto it = db->get_iterator(prefix);
700     it->lower_bound(start);
701     while (it->valid()) {
702       if (it->key() >= end) {
703         break;
704       }
705       bat.Delete(combine_strings(prefix, it->key()));
706       it->next();
707     }
708   }
709 }
710
711 void RocksDBStore::RocksDBTransactionImpl::merge(
712   const string &prefix,
713   const string &k,
714   const bufferlist &to_set_bl)
715 {
716   string key = combine_strings(prefix, k);
717
718   // bufferlist::c_str() is non-constant, so we can't call c_str()
719   if (to_set_bl.is_contiguous() && to_set_bl.length() > 0) {
720     bat.Merge(rocksdb::Slice(key),
721                rocksdb::Slice(to_set_bl.buffers().front().c_str(),
722                             to_set_bl.length()));
723   } else {
724     // make a copy
725     rocksdb::Slice key_slice(key);
726     vector<rocksdb::Slice> value_slices(to_set_bl.buffers().size());
727     bat.Merge(nullptr, rocksdb::SliceParts(&key_slice, 1),
728               prepare_sliceparts(to_set_bl, &value_slices));
729   }
730 }
731
732 //gets will bypass RocksDB row cache, since it uses iterator
733 int RocksDBStore::get(
734     const string &prefix,
735     const std::set<string> &keys,
736     std::map<string, bufferlist> *out)
737 {
738   utime_t start = ceph_clock_now();
739   for (std::set<string>::const_iterator i = keys.begin();
740        i != keys.end(); ++i) {
741     std::string value;
742     std::string bound = combine_strings(prefix, *i);
743     auto status = db->Get(rocksdb::ReadOptions(), rocksdb::Slice(bound), &value);
744     if (status.ok()) {
745       (*out)[*i].append(value);
746     } else if (status.IsIOError()) {
747       ceph_abort_msg(cct, status.ToString());
748     }
749
750   }
751   utime_t lat = ceph_clock_now() - start;
752   logger->inc(l_rocksdb_gets);
753   logger->tinc(l_rocksdb_get_latency, lat);
754   return 0;
755 }
756
757 int RocksDBStore::get(
758     const string &prefix,
759     const string &key,
760     bufferlist *out)
761 {
762   assert(out && (out->length() == 0));
763   utime_t start = ceph_clock_now();
764   int r = 0;
765   string value, k;
766   rocksdb::Status s;
767   k = combine_strings(prefix, key);
768   s = db->Get(rocksdb::ReadOptions(), rocksdb::Slice(k), &value);
769   if (s.ok()) {
770     out->append(value);
771   } else if (s.IsNotFound()) {
772     r = -ENOENT;
773   } else {
774     ceph_abort_msg(cct, s.ToString());
775   }
776   utime_t lat = ceph_clock_now() - start;
777   logger->inc(l_rocksdb_gets);
778   logger->tinc(l_rocksdb_get_latency, lat);
779   return r;
780 }
781
782 int RocksDBStore::get(
783   const string& prefix,
784   const char *key,
785   size_t keylen,
786   bufferlist *out)
787 {
788   assert(out && (out->length() == 0));
789   utime_t start = ceph_clock_now();
790   int r = 0;
791   string value, k;
792   combine_strings(prefix, key, keylen, &k);
793   rocksdb::Status s;
794   s = db->Get(rocksdb::ReadOptions(), rocksdb::Slice(k), &value);
795   if (s.ok()) {
796     out->append(value);
797   } else if (s.IsNotFound()) {
798     r = -ENOENT;
799   } else {
800     ceph_abort_msg(cct, s.ToString());
801   }
802   utime_t lat = ceph_clock_now() - start;
803   logger->inc(l_rocksdb_gets);
804   logger->tinc(l_rocksdb_get_latency, lat);
805   return r;
806 }
807
808 int RocksDBStore::split_key(rocksdb::Slice in, string *prefix, string *key)
809 {
810   size_t prefix_len = 0;
811   
812   // Find separator inside Slice
813   char* separator = (char*) memchr(in.data(), 0, in.size());
814   if (separator == NULL)
815      return -EINVAL;
816   prefix_len = size_t(separator - in.data());
817   if (prefix_len >= in.size())
818     return -EINVAL;
819
820   // Fetch prefix and/or key directly from Slice
821   if (prefix)
822     *prefix = string(in.data(), prefix_len);
823   if (key)
824     *key = string(separator+1, in.size()-prefix_len-1);
825   return 0;
826 }
827
828 void RocksDBStore::compact()
829 {
830   logger->inc(l_rocksdb_compact);
831   rocksdb::CompactRangeOptions options;
832   db->CompactRange(options, nullptr, nullptr);
833 }
834
835
836 void RocksDBStore::compact_thread_entry()
837 {
838   compact_queue_lock.Lock();
839   while (!compact_queue_stop) {
840     while (!compact_queue.empty()) {
841       pair<string,string> range = compact_queue.front();
842       compact_queue.pop_front();
843       logger->set(l_rocksdb_compact_queue_len, compact_queue.size());
844       compact_queue_lock.Unlock();
845       logger->inc(l_rocksdb_compact_range);
846       compact_range(range.first, range.second);
847       compact_queue_lock.Lock();
848       continue;
849     }
850     compact_queue_cond.Wait(compact_queue_lock);
851   }
852   compact_queue_lock.Unlock();
853 }
854
855 void RocksDBStore::compact_range_async(const string& start, const string& end)
856 {
857   Mutex::Locker l(compact_queue_lock);
858
859   // try to merge adjacent ranges.  this is O(n), but the queue should
860   // be short.  note that we do not cover all overlap cases and merge
861   // opportunities here, but we capture the ones we currently need.
862   list< pair<string,string> >::iterator p = compact_queue.begin();
863   while (p != compact_queue.end()) {
864     if (p->first == start && p->second == end) {
865       // dup; no-op
866       return;
867     }
868     if (p->first <= end && p->first > start) {
869       // merge with existing range to the right
870       compact_queue.push_back(make_pair(start, p->second));
871       compact_queue.erase(p);
872       logger->inc(l_rocksdb_compact_queue_merge);
873       break;
874     }
875     if (p->second >= start && p->second < end) {
876       // merge with existing range to the left
877       compact_queue.push_back(make_pair(p->first, end));
878       compact_queue.erase(p);
879       logger->inc(l_rocksdb_compact_queue_merge);
880       break;
881     }
882     ++p;
883   }
884   if (p == compact_queue.end()) {
885     // no merge, new entry.
886     compact_queue.push_back(make_pair(start, end));
887     logger->set(l_rocksdb_compact_queue_len, compact_queue.size());
888   }
889   compact_queue_cond.Signal();
890   if (!compact_thread.is_started()) {
891     compact_thread.create("rstore_compact");
892   }
893 }
894 bool RocksDBStore::check_omap_dir(string &omap_dir)
895 {
896   rocksdb::Options options;
897   options.create_if_missing = true;
898   rocksdb::DB *db;
899   rocksdb::Status status = rocksdb::DB::Open(options, omap_dir, &db);
900   delete db;
901   db = nullptr;
902   return status.ok();
903 }
904 void RocksDBStore::compact_range(const string& start, const string& end)
905 {
906   rocksdb::CompactRangeOptions options;
907   rocksdb::Slice cstart(start);
908   rocksdb::Slice cend(end);
909   db->CompactRange(options, &cstart, &cend);
910 }
911 RocksDBStore::RocksDBWholeSpaceIteratorImpl::~RocksDBWholeSpaceIteratorImpl()
912 {
913   delete dbiter;
914 }
915 int RocksDBStore::RocksDBWholeSpaceIteratorImpl::seek_to_first()
916 {
917   dbiter->SeekToFirst();
918   assert(!dbiter->status().IsIOError());
919   return dbiter->status().ok() ? 0 : -1;
920 }
921 int RocksDBStore::RocksDBWholeSpaceIteratorImpl::seek_to_first(const string &prefix)
922 {
923   rocksdb::Slice slice_prefix(prefix);
924   dbiter->Seek(slice_prefix);
925   assert(!dbiter->status().IsIOError());
926   return dbiter->status().ok() ? 0 : -1;
927 }
928 int RocksDBStore::RocksDBWholeSpaceIteratorImpl::seek_to_last()
929 {
930   dbiter->SeekToLast();
931   assert(!dbiter->status().IsIOError());
932   return dbiter->status().ok() ? 0 : -1;
933 }
934 int RocksDBStore::RocksDBWholeSpaceIteratorImpl::seek_to_last(const string &prefix)
935 {
936   string limit = past_prefix(prefix);
937   rocksdb::Slice slice_limit(limit);
938   dbiter->Seek(slice_limit);
939
940   if (!dbiter->Valid()) {
941     dbiter->SeekToLast();
942   } else {
943     dbiter->Prev();
944   }
945   return dbiter->status().ok() ? 0 : -1;
946 }
947 int RocksDBStore::RocksDBWholeSpaceIteratorImpl::upper_bound(const string &prefix, const string &after)
948 {
949   lower_bound(prefix, after);
950   if (valid()) {
951   pair<string,string> key = raw_key();
952     if (key.first == prefix && key.second == after)
953       next();
954   }
955   return dbiter->status().ok() ? 0 : -1;
956 }
957 int RocksDBStore::RocksDBWholeSpaceIteratorImpl::lower_bound(const string &prefix, const string &to)
958 {
959   string bound = combine_strings(prefix, to);
960   rocksdb::Slice slice_bound(bound);
961   dbiter->Seek(slice_bound);
962   return dbiter->status().ok() ? 0 : -1;
963 }
964 bool RocksDBStore::RocksDBWholeSpaceIteratorImpl::valid()
965 {
966   return dbiter->Valid();
967 }
968 int RocksDBStore::RocksDBWholeSpaceIteratorImpl::next()
969 {
970   if (valid()) {
971     dbiter->Next();
972   }
973   assert(!dbiter->status().IsIOError());
974   return dbiter->status().ok() ? 0 : -1;
975 }
976 int RocksDBStore::RocksDBWholeSpaceIteratorImpl::prev()
977 {
978   if (valid()) {
979     dbiter->Prev();
980   }
981   assert(!dbiter->status().IsIOError());
982   return dbiter->status().ok() ? 0 : -1;
983 }
984 string RocksDBStore::RocksDBWholeSpaceIteratorImpl::key()
985 {
986   string out_key;
987   split_key(dbiter->key(), 0, &out_key);
988   return out_key;
989 }
990 pair<string,string> RocksDBStore::RocksDBWholeSpaceIteratorImpl::raw_key()
991 {
992   string prefix, key;
993   split_key(dbiter->key(), &prefix, &key);
994   return make_pair(prefix, key);
995 }
996
997 bool RocksDBStore::RocksDBWholeSpaceIteratorImpl::raw_key_is_prefixed(const string &prefix) {
998   // Look for "prefix\0" right in rocksb::Slice
999   rocksdb::Slice key = dbiter->key();
1000   if ((key.size() > prefix.length()) && (key[prefix.length()] == '\0')) {
1001     return memcmp(key.data(), prefix.c_str(), prefix.length()) == 0;
1002   } else {
1003     return false;
1004   }
1005 }
1006
1007 bufferlist RocksDBStore::RocksDBWholeSpaceIteratorImpl::value()
1008 {
1009   return to_bufferlist(dbiter->value());
1010 }
1011
1012 size_t RocksDBStore::RocksDBWholeSpaceIteratorImpl::key_size()
1013 {
1014   return dbiter->key().size();
1015 }
1016
1017 size_t RocksDBStore::RocksDBWholeSpaceIteratorImpl::value_size()
1018 {
1019   return dbiter->value().size();
1020 }
1021
1022 bufferptr RocksDBStore::RocksDBWholeSpaceIteratorImpl::value_as_ptr()
1023 {
1024   rocksdb::Slice val = dbiter->value();
1025   return bufferptr(val.data(), val.size());
1026 }
1027
1028 int RocksDBStore::RocksDBWholeSpaceIteratorImpl::status()
1029 {
1030   return dbiter->status().ok() ? 0 : -1;
1031 }
1032
1033 string RocksDBStore::past_prefix(const string &prefix)
1034 {
1035   string limit = prefix;
1036   limit.push_back(1);
1037   return limit;
1038 }
1039
1040 RocksDBStore::WholeSpaceIterator RocksDBStore::_get_iterator()
1041 {
1042   return std::make_shared<RocksDBWholeSpaceIteratorImpl>(
1043         db->NewIterator(rocksdb::ReadOptions()));
1044 }
1045