1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
10 #include <sys/types.h>
13 #include "rocksdb/db.h"
14 #include "rocksdb/table.h"
15 #include "rocksdb/env.h"
16 #include "rocksdb/slice.h"
17 #include "rocksdb/cache.h"
18 #include "rocksdb/filter_policy.h"
19 #include "rocksdb/utilities/convenience.h"
20 #include "rocksdb/merge_operator.h"
22 #include "common/perf_counters.h"
23 #include "common/debug.h"
24 #include "include/str_list.h"
25 #include "include/stringify.h"
26 #include "include/str_map.h"
27 #include "KeyValueDB.h"
28 #include "RocksDBStore.h"
30 #include "common/debug.h"
32 #define dout_context cct
33 #define dout_subsys ceph_subsys_rocksdb
35 #define dout_prefix *_dout << "rocksdb: "
37 static rocksdb::SliceParts prepare_sliceparts(const bufferlist &bl,
38 vector<rocksdb::Slice> *slices)
41 for (auto& buf : bl.buffers()) {
42 (*slices)[n].data_ = buf.c_str();
43 (*slices)[n].size_ = buf.length();
46 return rocksdb::SliceParts(slices->data(), slices->size());
50 // One of these per rocksdb instance, implements the merge operator prefix stuff
52 class RocksDBStore::MergeOperatorRouter : public rocksdb::AssociativeMergeOperator {
55 const char *Name() const override {
56 // Construct a name that rocksDB will validate against. We want to
57 // do this in a way that doesn't constrain the ordering of calls
58 // to set_merge_operator, so sort the merge operators and then
59 // construct a name from all of those parts.
60 store.assoc_name.clear();
61 map<std::string,std::string> names;
62 for (auto& p : store.merge_ops) names[p.first] = p.second->name();
63 for (auto& p : names) {
64 store.assoc_name += '.';
65 store.assoc_name += p.first;
66 store.assoc_name += ':';
67 store.assoc_name += p.second;
69 return store.assoc_name.c_str();
72 MergeOperatorRouter(RocksDBStore &_store) : store(_store) {}
74 bool Merge(const rocksdb::Slice& key,
75 const rocksdb::Slice* existing_value,
76 const rocksdb::Slice& value,
77 std::string* new_value,
78 rocksdb::Logger* logger) const override {
80 for (auto& p : store.merge_ops) {
81 if (p.first.compare(0, p.first.length(),
82 key.data(), p.first.length()) == 0 &&
83 key.data()[p.first.length()] == 0) {
85 p.second->merge(existing_value->data(), existing_value->size(),
86 value.data(), value.size(),
89 p.second->merge_nonexistent(value.data(), value.size(), new_value);
99 int RocksDBStore::set_merge_operator(
100 const string& prefix,
101 std::shared_ptr<KeyValueDB::MergeOperator> mop)
103 // If you fail here, it's because you can't do this on an open database
104 assert(db == nullptr);
105 merge_ops.push_back(std::make_pair(prefix,mop));
109 class CephRocksdbLogger : public rocksdb::Logger {
112 explicit CephRocksdbLogger(CephContext *c) : cct(c) {
115 ~CephRocksdbLogger() override {
119 // Write an entry to the log file with the specified format.
120 void Logv(const char* format, va_list ap) override {
121 Logv(rocksdb::INFO_LEVEL, format, ap);
124 // Write an entry to the log file with the specified log level
125 // and format. Any log with level under the internal log level
126 // of *this (see @SetInfoLogLevel and @GetInfoLogLevel) will not be
128 void Logv(const rocksdb::InfoLogLevel log_level, const char* format,
129 va_list ap) override {
130 int v = rocksdb::NUM_INFO_LOG_LEVELS - log_level - 1;
133 vsnprintf(buf, sizeof(buf), format, ap);
134 *_dout << buf << dendl;
138 rocksdb::Logger *create_rocksdb_ceph_logger()
140 return new CephRocksdbLogger(g_ceph_context);
143 static int string2bool(const string &val, bool &b_val)
145 if (strcasecmp(val.c_str(), "false") == 0) {
148 } else if (strcasecmp(val.c_str(), "true") == 0) {
153 int b = strict_strtol(val.c_str(), 10, &err);
161 int RocksDBStore::tryInterpret(const string &key, const string &val, rocksdb::Options &opt)
163 if (key == "compaction_threads") {
165 int f = strict_sistrtoll(val.c_str(), &err);
168 //Low priority threadpool is used for compaction
169 opt.env->SetBackgroundThreads(f, rocksdb::Env::Priority::LOW);
170 } else if (key == "flusher_threads") {
172 int f = strict_sistrtoll(val.c_str(), &err);
175 //High priority threadpool is used for flusher
176 opt.env->SetBackgroundThreads(f, rocksdb::Env::Priority::HIGH);
177 } else if (key == "compact_on_mount") {
178 int ret = string2bool(val, compact_on_mount);
181 } else if (key == "disableWAL") {
182 int ret = string2bool(val, disableWAL);
186 //unrecognize config options.
192 int RocksDBStore::ParseOptionsFromString(const string &opt_str, rocksdb::Options &opt)
194 map<string, string> str_map;
195 int r = get_str_map(opt_str, &str_map, ",\n;");
198 map<string, string>::iterator it;
199 for(it = str_map.begin(); it != str_map.end(); ++it) {
200 string this_opt = it->first + "=" + it->second;
201 rocksdb::Status status = rocksdb::GetOptionsFromString(opt, this_opt , &opt);
203 //unrecognized by rocksdb, try to interpret by ourselves.
204 r = tryInterpret(it->first, it->second, opt);
206 derr << status.ToString() << dendl;
210 lgeneric_dout(cct, 0) << " set rocksdb option " << it->first
211 << " = " << it->second << dendl;
216 int RocksDBStore::init(string _options_str)
218 options_str = _options_str;
219 rocksdb::Options opt;
221 if (options_str.length()) {
222 int r = ParseOptionsFromString(options_str, opt);
230 int RocksDBStore::create_and_open(ostream &out)
233 unique_ptr<rocksdb::Directory> dir;
234 env->NewDirectory(path, &dir);
236 int r = ::mkdir(path.c_str(), 0755);
239 if (r < 0 && r != -EEXIST) {
240 derr << __func__ << " failed to create " << path << ": " << cpp_strerror(r)
245 return do_open(out, true);
248 int RocksDBStore::do_open(ostream &out, bool create_if_missing)
250 rocksdb::Options opt;
251 rocksdb::Status status;
253 if (options_str.length()) {
254 int r = ParseOptionsFromString(options_str, opt);
260 if (g_conf->rocksdb_perf) {
261 dbstats = rocksdb::CreateDBStatistics();
262 opt.statistics = dbstats;
265 opt.create_if_missing = create_if_missing;
266 if (g_conf->rocksdb_separate_wal_dir) {
267 opt.wal_dir = path + ".wal";
269 if (g_conf->get_val<std::string>("rocksdb_db_paths").length()) {
271 get_str_list(g_conf->get_val<std::string>("rocksdb_db_paths"), "; \t", paths);
272 for (auto& p : paths) {
273 size_t pos = p.find(',');
274 if (pos == std::string::npos) {
275 derr << __func__ << " invalid db path item " << p << " in "
276 << g_conf->get_val<std::string>("rocksdb_db_paths") << dendl;
279 string path = p.substr(0, pos);
280 string size_str = p.substr(pos + 1);
281 uint64_t size = atoll(size_str.c_str());
283 derr << __func__ << " invalid db path item " << p << " in "
284 << g_conf->get_val<std::string>("rocksdb_db_paths") << dendl;
287 opt.db_paths.push_back(rocksdb::DbPath(path, size));
288 dout(10) << __func__ << " db_path " << path << " size " << size << dendl;
292 if (g_conf->rocksdb_log_to_ceph_log) {
293 opt.info_log.reset(new CephRocksdbLogger(g_ceph_context));
297 dout(10) << __func__ << " using custom Env " << priv << dendl;
298 opt.env = static_cast<rocksdb::Env*>(priv);
302 if (!set_cache_flag) {
303 cache_size = g_conf->rocksdb_cache_size;
305 uint64_t row_cache_size = cache_size * g_conf->rocksdb_cache_row_ratio;
306 uint64_t block_cache_size = cache_size - row_cache_size;
308 if (block_cache_size == 0) {
309 // disable block cache
310 dout(10) << __func__ << " block_cache_size " << block_cache_size
311 << ", setting no_block_cache " << dendl;
312 bbt_opts.no_block_cache = true;
314 if (g_conf->rocksdb_cache_type == "lru") {
315 bbt_opts.block_cache = rocksdb::NewLRUCache(
317 g_conf->rocksdb_cache_shard_bits);
318 } else if (g_conf->rocksdb_cache_type == "clock") {
319 bbt_opts.block_cache = rocksdb::NewClockCache(
321 g_conf->rocksdb_cache_shard_bits);
323 derr << "unrecognized rocksdb_cache_type '" << g_conf->rocksdb_cache_type
328 bbt_opts.block_size = g_conf->rocksdb_block_size;
330 if (row_cache_size > 0)
331 opt.row_cache = rocksdb::NewLRUCache(row_cache_size,
332 g_conf->rocksdb_cache_shard_bits);
333 uint64_t bloom_bits = g_conf->get_val<uint64_t>("rocksdb_bloom_bits_per_key");
334 if (bloom_bits > 0) {
335 dout(10) << __func__ << " set bloom filter bits per key to "
336 << bloom_bits << dendl;
337 bbt_opts.filter_policy.reset(rocksdb::NewBloomFilterPolicy(bloom_bits));
339 if (g_conf->get_val<std::string>("rocksdb_index_type") == "binary_search")
340 bbt_opts.index_type = rocksdb::BlockBasedTableOptions::IndexType::kBinarySearch;
341 if (g_conf->get_val<std::string>("rocksdb_index_type") == "hash_search")
342 bbt_opts.index_type = rocksdb::BlockBasedTableOptions::IndexType::kHashSearch;
343 if (g_conf->get_val<std::string>("rocksdb_index_type") == "two_level")
344 bbt_opts.index_type = rocksdb::BlockBasedTableOptions::IndexType::kTwoLevelIndexSearch;
345 bbt_opts.cache_index_and_filter_blocks =
346 g_conf->get_val<bool>("rocksdb_cache_index_and_filter_blocks");
347 bbt_opts.cache_index_and_filter_blocks_with_high_priority =
348 g_conf->get_val<bool>("rocksdb_cache_index_and_filter_blocks_with_high_priority");
349 bbt_opts.partition_filters = g_conf->get_val<bool>("rocksdb_partition_filters");
350 if (g_conf->get_val<uint64_t>("rocksdb_metadata_block_size") > 0)
351 bbt_opts.metadata_block_size = g_conf->get_val<uint64_t>("rocksdb_metadata_block_size");
352 bbt_opts.pin_l0_filter_and_index_blocks_in_cache =
353 g_conf->get_val<bool>("rocksdb_pin_l0_filter_and_index_blocks_in_cache");
355 opt.table_factory.reset(rocksdb::NewBlockBasedTableFactory(bbt_opts));
356 dout(10) << __func__ << " block size " << g_conf->rocksdb_block_size
357 << ", block_cache size " << prettybyte_t(block_cache_size)
358 << ", row_cache size " << prettybyte_t(row_cache_size)
360 << (1 << g_conf->rocksdb_cache_shard_bits)
361 << ", type " << g_conf->rocksdb_cache_type
364 opt.merge_operator.reset(new MergeOperatorRouter(*this));
365 status = rocksdb::DB::Open(opt, path, &db);
367 derr << status.ToString() << dendl;
371 PerfCountersBuilder plb(g_ceph_context, "rocksdb", l_rocksdb_first, l_rocksdb_last);
372 plb.add_u64_counter(l_rocksdb_gets, "get", "Gets");
373 plb.add_u64_counter(l_rocksdb_txns, "submit_transaction", "Submit transactions");
374 plb.add_u64_counter(l_rocksdb_txns_sync, "submit_transaction_sync", "Submit transactions sync");
375 plb.add_time_avg(l_rocksdb_get_latency, "get_latency", "Get latency");
376 plb.add_time_avg(l_rocksdb_submit_latency, "submit_latency", "Submit Latency");
377 plb.add_time_avg(l_rocksdb_submit_sync_latency, "submit_sync_latency", "Submit Sync Latency");
378 plb.add_u64_counter(l_rocksdb_compact, "compact", "Compactions");
379 plb.add_u64_counter(l_rocksdb_compact_range, "compact_range", "Compactions by range");
380 plb.add_u64_counter(l_rocksdb_compact_queue_merge, "compact_queue_merge", "Mergings of ranges in compaction queue");
381 plb.add_u64(l_rocksdb_compact_queue_len, "compact_queue_len", "Length of compaction queue");
382 plb.add_time_avg(l_rocksdb_write_wal_time, "rocksdb_write_wal_time", "Rocksdb write wal time");
383 plb.add_time_avg(l_rocksdb_write_memtable_time, "rocksdb_write_memtable_time", "Rocksdb write memtable time");
384 plb.add_time_avg(l_rocksdb_write_delay_time, "rocksdb_write_delay_time", "Rocksdb write delay time");
385 plb.add_time_avg(l_rocksdb_write_pre_and_post_process_time,
386 "rocksdb_write_pre_and_post_time", "total time spent on writing a record, excluding write process");
387 logger = plb.create_perf_counters();
388 cct->get_perfcounters_collection()->add(logger);
390 if (compact_on_mount) {
391 derr << "Compacting rocksdb store..." << dendl;
393 derr << "Finished compacting rocksdb store" << dendl;
398 int RocksDBStore::_test_init(const string& dir)
400 rocksdb::Options options;
401 options.create_if_missing = true;
403 rocksdb::Status status = rocksdb::DB::Open(options, dir, &db);
406 return status.ok() ? 0 : -EIO;
409 RocksDBStore::~RocksDBStore()
414 // Ensure db is destroyed before dependent db_cache and filterpolicy
419 delete static_cast<rocksdb::Env*>(priv);
423 void RocksDBStore::close()
425 // stop compaction thread
426 compact_queue_lock.Lock();
427 if (compact_thread.is_started()) {
428 compact_queue_stop = true;
429 compact_queue_cond.Signal();
430 compact_queue_lock.Unlock();
431 compact_thread.join();
433 compact_queue_lock.Unlock();
437 cct->get_perfcounters_collection()->remove(logger);
440 void RocksDBStore::split_stats(const std::string &s, char delim, std::vector<std::string> &elems) {
441 std::stringstream ss;
444 while (std::getline(ss, item, delim)) {
445 elems.push_back(item);
449 void RocksDBStore::get_statistics(Formatter *f)
451 if (!g_conf->rocksdb_perf) {
452 dout(20) << __func__ << "RocksDB perf is disabled, can't probe for stats"
457 if (g_conf->rocksdb_collect_compaction_stats) {
458 std::string stat_str;
459 bool status = db->GetProperty("rocksdb.stats", &stat_str);
461 f->open_object_section("rocksdb_statistics");
462 f->dump_string("rocksdb_compaction_statistics", "");
463 vector<string> stats;
464 split_stats(stat_str, '\n', stats);
465 for (auto st :stats) {
466 f->dump_string("", st);
471 if (g_conf->rocksdb_collect_extended_stats) {
473 f->open_object_section("rocksdb_extended_statistics");
474 string stat_str = dbstats->ToString();
475 vector<string> stats;
476 split_stats(stat_str, '\n', stats);
477 f->dump_string("rocksdb_extended_statistics", "");
478 for (auto st :stats) {
479 f->dump_string(".", st);
483 f->open_object_section("rocksdbstore_perf_counters");
484 logger->dump_formatted(f,0);
487 if (g_conf->rocksdb_collect_memory_stats) {
488 f->open_object_section("rocksdb_memtable_statistics");
489 std::string str(stringify(bbt_opts.block_cache->GetUsage()));
490 f->dump_string("block_cache_usage", str.data());
492 str.append(stringify(bbt_opts.block_cache->GetPinnedUsage()));
493 f->dump_string("block_cache_pinned_blocks_usage", str);
495 db->GetProperty("rocksdb.cur-size-all-mem-tables", &str);
496 f->dump_string("rocksdb_memtable_usage", str);
501 int RocksDBStore::submit_transaction(KeyValueDB::Transaction t)
503 utime_t start = ceph_clock_now();
504 // enable rocksdb breakdown
505 // considering performance overhead, default is disabled
506 if (g_conf->rocksdb_perf) {
507 rocksdb::SetPerfLevel(rocksdb::PerfLevel::kEnableTimeExceptForMutex);
508 rocksdb::perf_context.Reset();
511 RocksDBTransactionImpl * _t =
512 static_cast<RocksDBTransactionImpl *>(t.get());
513 rocksdb::WriteOptions woptions;
514 woptions.disableWAL = disableWAL;
515 lgeneric_subdout(cct, rocksdb, 30) << __func__;
516 RocksWBHandler bat_txc;
517 _t->bat.Iterate(&bat_txc);
518 *_dout << " Rocksdb transaction: " << bat_txc.seen << dendl;
520 rocksdb::Status s = db->Write(woptions, &_t->bat);
522 RocksWBHandler rocks_txc;
523 _t->bat.Iterate(&rocks_txc);
524 derr << __func__ << " error: " << s.ToString() << " code = " << s.code()
525 << " Rocksdb transaction: " << rocks_txc.seen << dendl;
527 utime_t lat = ceph_clock_now() - start;
529 if (g_conf->rocksdb_perf) {
530 utime_t write_memtable_time;
531 utime_t write_delay_time;
532 utime_t write_wal_time;
533 utime_t write_pre_and_post_process_time;
534 write_wal_time.set_from_double(
535 static_cast<double>(rocksdb::perf_context.write_wal_time)/1000000000);
536 write_memtable_time.set_from_double(
537 static_cast<double>(rocksdb::perf_context.write_memtable_time)/1000000000);
538 write_delay_time.set_from_double(
539 static_cast<double>(rocksdb::perf_context.write_delay_time)/1000000000);
540 write_pre_and_post_process_time.set_from_double(
541 static_cast<double>(rocksdb::perf_context.write_pre_and_post_process_time)/1000000000);
542 logger->tinc(l_rocksdb_write_memtable_time, write_memtable_time);
543 logger->tinc(l_rocksdb_write_delay_time, write_delay_time);
544 logger->tinc(l_rocksdb_write_wal_time, write_wal_time);
545 logger->tinc(l_rocksdb_write_pre_and_post_process_time, write_pre_and_post_process_time);
548 logger->inc(l_rocksdb_txns);
549 logger->tinc(l_rocksdb_submit_latency, lat);
551 return s.ok() ? 0 : -1;
554 int RocksDBStore::submit_transaction_sync(KeyValueDB::Transaction t)
556 utime_t start = ceph_clock_now();
557 // enable rocksdb breakdown
558 // considering performance overhead, default is disabled
559 if (g_conf->rocksdb_perf) {
560 rocksdb::SetPerfLevel(rocksdb::PerfLevel::kEnableTimeExceptForMutex);
561 rocksdb::perf_context.Reset();
564 RocksDBTransactionImpl * _t =
565 static_cast<RocksDBTransactionImpl *>(t.get());
566 rocksdb::WriteOptions woptions;
567 woptions.sync = true;
568 woptions.disableWAL = disableWAL;
569 lgeneric_subdout(cct, rocksdb, 30) << __func__;
570 RocksWBHandler bat_txc;
571 _t->bat.Iterate(&bat_txc);
572 *_dout << " Rocksdb transaction: " << bat_txc.seen << dendl;
574 rocksdb::Status s = db->Write(woptions, &_t->bat);
576 RocksWBHandler rocks_txc;
577 _t->bat.Iterate(&rocks_txc);
578 derr << __func__ << " error: " << s.ToString() << " code = " << s.code()
579 << " Rocksdb transaction: " << rocks_txc.seen << dendl;
581 utime_t lat = ceph_clock_now() - start;
583 if (g_conf->rocksdb_perf) {
584 utime_t write_memtable_time;
585 utime_t write_delay_time;
586 utime_t write_wal_time;
587 utime_t write_pre_and_post_process_time;
588 write_wal_time.set_from_double(
589 static_cast<double>(rocksdb::perf_context.write_wal_time)/1000000000);
590 write_memtable_time.set_from_double(
591 static_cast<double>(rocksdb::perf_context.write_memtable_time)/1000000000);
592 write_delay_time.set_from_double(
593 static_cast<double>(rocksdb::perf_context.write_delay_time)/1000000000);
594 write_pre_and_post_process_time.set_from_double(
595 static_cast<double>(rocksdb::perf_context.write_pre_and_post_process_time)/1000000000);
596 logger->tinc(l_rocksdb_write_memtable_time, write_memtable_time);
597 logger->tinc(l_rocksdb_write_delay_time, write_delay_time);
598 logger->tinc(l_rocksdb_write_wal_time, write_wal_time);
599 logger->tinc(l_rocksdb_write_pre_and_post_process_time, write_pre_and_post_process_time);
602 logger->inc(l_rocksdb_txns_sync);
603 logger->tinc(l_rocksdb_submit_sync_latency, lat);
605 return s.ok() ? 0 : -1;
608 RocksDBStore::RocksDBTransactionImpl::RocksDBTransactionImpl(RocksDBStore *_db)
613 void RocksDBStore::RocksDBTransactionImpl::set(
614 const string &prefix,
616 const bufferlist &to_set_bl)
618 string key = combine_strings(prefix, k);
620 // bufferlist::c_str() is non-constant, so we can't call c_str()
621 if (to_set_bl.is_contiguous() && to_set_bl.length() > 0) {
622 bat.Put(rocksdb::Slice(key),
623 rocksdb::Slice(to_set_bl.buffers().front().c_str(),
624 to_set_bl.length()));
626 rocksdb::Slice key_slice(key);
627 vector<rocksdb::Slice> value_slices(to_set_bl.buffers().size());
628 bat.Put(nullptr, rocksdb::SliceParts(&key_slice, 1),
629 prepare_sliceparts(to_set_bl, &value_slices));
633 void RocksDBStore::RocksDBTransactionImpl::set(
634 const string &prefix,
635 const char *k, size_t keylen,
636 const bufferlist &to_set_bl)
639 combine_strings(prefix, k, keylen, &key);
641 // bufferlist::c_str() is non-constant, so we can't call c_str()
642 if (to_set_bl.is_contiguous() && to_set_bl.length() > 0) {
643 bat.Put(rocksdb::Slice(key),
644 rocksdb::Slice(to_set_bl.buffers().front().c_str(),
645 to_set_bl.length()));
647 rocksdb::Slice key_slice(key);
648 vector<rocksdb::Slice> value_slices(to_set_bl.buffers().size());
649 bat.Put(nullptr, rocksdb::SliceParts(&key_slice, 1),
650 prepare_sliceparts(to_set_bl, &value_slices));
654 void RocksDBStore::RocksDBTransactionImpl::rmkey(const string &prefix,
657 bat.Delete(combine_strings(prefix, k));
660 void RocksDBStore::RocksDBTransactionImpl::rmkey(const string &prefix,
665 combine_strings(prefix, k, keylen, &key);
669 void RocksDBStore::RocksDBTransactionImpl::rm_single_key(const string &prefix,
672 bat.SingleDelete(combine_strings(prefix, k));
675 void RocksDBStore::RocksDBTransactionImpl::rmkeys_by_prefix(const string &prefix)
677 if (db->enable_rmrange) {
678 string endprefix = prefix;
679 endprefix.push_back('\x01');
680 bat.DeleteRange(combine_strings(prefix, string()),
681 combine_strings(endprefix, string()));
683 KeyValueDB::Iterator it = db->get_iterator(prefix);
684 for (it->seek_to_first();
687 bat.Delete(combine_strings(prefix, it->key()));
692 void RocksDBStore::RocksDBTransactionImpl::rm_range_keys(const string &prefix,
696 if (db->enable_rmrange) {
697 bat.DeleteRange(combine_strings(prefix, start), combine_strings(prefix, end));
699 auto it = db->get_iterator(prefix);
700 it->lower_bound(start);
701 while (it->valid()) {
702 if (it->key() >= end) {
705 bat.Delete(combine_strings(prefix, it->key()));
711 void RocksDBStore::RocksDBTransactionImpl::merge(
712 const string &prefix,
714 const bufferlist &to_set_bl)
716 string key = combine_strings(prefix, k);
718 // bufferlist::c_str() is non-constant, so we can't call c_str()
719 if (to_set_bl.is_contiguous() && to_set_bl.length() > 0) {
720 bat.Merge(rocksdb::Slice(key),
721 rocksdb::Slice(to_set_bl.buffers().front().c_str(),
722 to_set_bl.length()));
725 rocksdb::Slice key_slice(key);
726 vector<rocksdb::Slice> value_slices(to_set_bl.buffers().size());
727 bat.Merge(nullptr, rocksdb::SliceParts(&key_slice, 1),
728 prepare_sliceparts(to_set_bl, &value_slices));
732 //gets will bypass RocksDB row cache, since it uses iterator
733 int RocksDBStore::get(
734 const string &prefix,
735 const std::set<string> &keys,
736 std::map<string, bufferlist> *out)
738 utime_t start = ceph_clock_now();
739 for (std::set<string>::const_iterator i = keys.begin();
740 i != keys.end(); ++i) {
742 std::string bound = combine_strings(prefix, *i);
743 auto status = db->Get(rocksdb::ReadOptions(), rocksdb::Slice(bound), &value);
745 (*out)[*i].append(value);
746 } else if (status.IsIOError()) {
747 ceph_abort_msg(cct, status.ToString());
751 utime_t lat = ceph_clock_now() - start;
752 logger->inc(l_rocksdb_gets);
753 logger->tinc(l_rocksdb_get_latency, lat);
757 int RocksDBStore::get(
758 const string &prefix,
762 assert(out && (out->length() == 0));
763 utime_t start = ceph_clock_now();
767 k = combine_strings(prefix, key);
768 s = db->Get(rocksdb::ReadOptions(), rocksdb::Slice(k), &value);
771 } else if (s.IsNotFound()) {
774 ceph_abort_msg(cct, s.ToString());
776 utime_t lat = ceph_clock_now() - start;
777 logger->inc(l_rocksdb_gets);
778 logger->tinc(l_rocksdb_get_latency, lat);
782 int RocksDBStore::get(
783 const string& prefix,
788 assert(out && (out->length() == 0));
789 utime_t start = ceph_clock_now();
792 combine_strings(prefix, key, keylen, &k);
794 s = db->Get(rocksdb::ReadOptions(), rocksdb::Slice(k), &value);
797 } else if (s.IsNotFound()) {
800 ceph_abort_msg(cct, s.ToString());
802 utime_t lat = ceph_clock_now() - start;
803 logger->inc(l_rocksdb_gets);
804 logger->tinc(l_rocksdb_get_latency, lat);
808 int RocksDBStore::split_key(rocksdb::Slice in, string *prefix, string *key)
810 size_t prefix_len = 0;
812 // Find separator inside Slice
813 char* separator = (char*) memchr(in.data(), 0, in.size());
814 if (separator == NULL)
816 prefix_len = size_t(separator - in.data());
817 if (prefix_len >= in.size())
820 // Fetch prefix and/or key directly from Slice
822 *prefix = string(in.data(), prefix_len);
824 *key = string(separator+1, in.size()-prefix_len-1);
828 void RocksDBStore::compact()
830 logger->inc(l_rocksdb_compact);
831 rocksdb::CompactRangeOptions options;
832 db->CompactRange(options, nullptr, nullptr);
836 void RocksDBStore::compact_thread_entry()
838 compact_queue_lock.Lock();
839 while (!compact_queue_stop) {
840 while (!compact_queue.empty()) {
841 pair<string,string> range = compact_queue.front();
842 compact_queue.pop_front();
843 logger->set(l_rocksdb_compact_queue_len, compact_queue.size());
844 compact_queue_lock.Unlock();
845 logger->inc(l_rocksdb_compact_range);
846 compact_range(range.first, range.second);
847 compact_queue_lock.Lock();
850 compact_queue_cond.Wait(compact_queue_lock);
852 compact_queue_lock.Unlock();
855 void RocksDBStore::compact_range_async(const string& start, const string& end)
857 Mutex::Locker l(compact_queue_lock);
859 // try to merge adjacent ranges. this is O(n), but the queue should
860 // be short. note that we do not cover all overlap cases and merge
861 // opportunities here, but we capture the ones we currently need.
862 list< pair<string,string> >::iterator p = compact_queue.begin();
863 while (p != compact_queue.end()) {
864 if (p->first == start && p->second == end) {
868 if (p->first <= end && p->first > start) {
869 // merge with existing range to the right
870 compact_queue.push_back(make_pair(start, p->second));
871 compact_queue.erase(p);
872 logger->inc(l_rocksdb_compact_queue_merge);
875 if (p->second >= start && p->second < end) {
876 // merge with existing range to the left
877 compact_queue.push_back(make_pair(p->first, end));
878 compact_queue.erase(p);
879 logger->inc(l_rocksdb_compact_queue_merge);
884 if (p == compact_queue.end()) {
885 // no merge, new entry.
886 compact_queue.push_back(make_pair(start, end));
887 logger->set(l_rocksdb_compact_queue_len, compact_queue.size());
889 compact_queue_cond.Signal();
890 if (!compact_thread.is_started()) {
891 compact_thread.create("rstore_compact");
894 bool RocksDBStore::check_omap_dir(string &omap_dir)
896 rocksdb::Options options;
897 options.create_if_missing = true;
899 rocksdb::Status status = rocksdb::DB::Open(options, omap_dir, &db);
904 void RocksDBStore::compact_range(const string& start, const string& end)
906 rocksdb::CompactRangeOptions options;
907 rocksdb::Slice cstart(start);
908 rocksdb::Slice cend(end);
909 db->CompactRange(options, &cstart, &cend);
911 RocksDBStore::RocksDBWholeSpaceIteratorImpl::~RocksDBWholeSpaceIteratorImpl()
915 int RocksDBStore::RocksDBWholeSpaceIteratorImpl::seek_to_first()
917 dbiter->SeekToFirst();
918 assert(!dbiter->status().IsIOError());
919 return dbiter->status().ok() ? 0 : -1;
921 int RocksDBStore::RocksDBWholeSpaceIteratorImpl::seek_to_first(const string &prefix)
923 rocksdb::Slice slice_prefix(prefix);
924 dbiter->Seek(slice_prefix);
925 assert(!dbiter->status().IsIOError());
926 return dbiter->status().ok() ? 0 : -1;
928 int RocksDBStore::RocksDBWholeSpaceIteratorImpl::seek_to_last()
930 dbiter->SeekToLast();
931 assert(!dbiter->status().IsIOError());
932 return dbiter->status().ok() ? 0 : -1;
934 int RocksDBStore::RocksDBWholeSpaceIteratorImpl::seek_to_last(const string &prefix)
936 string limit = past_prefix(prefix);
937 rocksdb::Slice slice_limit(limit);
938 dbiter->Seek(slice_limit);
940 if (!dbiter->Valid()) {
941 dbiter->SeekToLast();
945 return dbiter->status().ok() ? 0 : -1;
947 int RocksDBStore::RocksDBWholeSpaceIteratorImpl::upper_bound(const string &prefix, const string &after)
949 lower_bound(prefix, after);
951 pair<string,string> key = raw_key();
952 if (key.first == prefix && key.second == after)
955 return dbiter->status().ok() ? 0 : -1;
957 int RocksDBStore::RocksDBWholeSpaceIteratorImpl::lower_bound(const string &prefix, const string &to)
959 string bound = combine_strings(prefix, to);
960 rocksdb::Slice slice_bound(bound);
961 dbiter->Seek(slice_bound);
962 return dbiter->status().ok() ? 0 : -1;
964 bool RocksDBStore::RocksDBWholeSpaceIteratorImpl::valid()
966 return dbiter->Valid();
968 int RocksDBStore::RocksDBWholeSpaceIteratorImpl::next()
973 assert(!dbiter->status().IsIOError());
974 return dbiter->status().ok() ? 0 : -1;
976 int RocksDBStore::RocksDBWholeSpaceIteratorImpl::prev()
981 assert(!dbiter->status().IsIOError());
982 return dbiter->status().ok() ? 0 : -1;
984 string RocksDBStore::RocksDBWholeSpaceIteratorImpl::key()
987 split_key(dbiter->key(), 0, &out_key);
990 pair<string,string> RocksDBStore::RocksDBWholeSpaceIteratorImpl::raw_key()
993 split_key(dbiter->key(), &prefix, &key);
994 return make_pair(prefix, key);
997 bool RocksDBStore::RocksDBWholeSpaceIteratorImpl::raw_key_is_prefixed(const string &prefix) {
998 // Look for "prefix\0" right in rocksb::Slice
999 rocksdb::Slice key = dbiter->key();
1000 if ((key.size() > prefix.length()) && (key[prefix.length()] == '\0')) {
1001 return memcmp(key.data(), prefix.c_str(), prefix.length()) == 0;
1007 bufferlist RocksDBStore::RocksDBWholeSpaceIteratorImpl::value()
1009 return to_bufferlist(dbiter->value());
1012 size_t RocksDBStore::RocksDBWholeSpaceIteratorImpl::key_size()
1014 return dbiter->key().size();
1017 size_t RocksDBStore::RocksDBWholeSpaceIteratorImpl::value_size()
1019 return dbiter->value().size();
1022 bufferptr RocksDBStore::RocksDBWholeSpaceIteratorImpl::value_as_ptr()
1024 rocksdb::Slice val = dbiter->value();
1025 return bufferptr(val.data(), val.size());
1028 int RocksDBStore::RocksDBWholeSpaceIteratorImpl::status()
1030 return dbiter->status().ok() ? 0 : -1;
1033 string RocksDBStore::past_prefix(const string &prefix)
1035 string limit = prefix;
1040 RocksDBStore::WholeSpaceIterator RocksDBStore::_get_iterator()
1042 return std::make_shared<RocksDBWholeSpaceIteratorImpl>(
1043 db->NewIterator(rocksdb::ReadOptions()));