Fix some bugs when testing opensds ansible
[stor4nfv.git] / src / ceph / src / kv / LevelDBStore.cc
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
3 #include "LevelDBStore.h"
4
5 #include <set>
6 #include <map>
7 #include <string>
8 #include <cerrno>
9
10 using std::string;
11
12 #include "include/memory.h"
13
14 #include "common/debug.h"
15 #include "common/perf_counters.h"
16
17 // re-include our assert to clobber the system one; fix dout:
18 #include "include/assert.h"
19
20 #define dout_context cct
21 #define dout_subsys ceph_subsys_leveldb
22 #undef dout_prefix
23 #define dout_prefix *_dout << "leveldb: "
24
25 class CephLevelDBLogger : public leveldb::Logger {
26   CephContext *cct;
27 public:
28   explicit CephLevelDBLogger(CephContext *c) : cct(c) {
29     cct->get();
30   }
31   ~CephLevelDBLogger() override {
32     cct->put();
33   }
34
35   // Write an entry to the log file with the specified format.
36   void Logv(const char* format, va_list ap) override {
37     dout(1);
38     char buf[65536];
39     vsnprintf(buf, sizeof(buf), format, ap);
40     *_dout << buf << dendl;
41   }
42 };
43
44 leveldb::Logger *create_leveldb_ceph_logger()
45 {
46   return new CephLevelDBLogger(g_ceph_context);
47 }
48
49 int LevelDBStore::init(string option_str)
50 {
51   // init defaults.  caller can override these if they want
52   // prior to calling open.
53   options.write_buffer_size = g_conf->leveldb_write_buffer_size;
54   options.cache_size = g_conf->leveldb_cache_size;
55   options.block_size = g_conf->leveldb_block_size;
56   options.bloom_size = g_conf->leveldb_bloom_size;
57   options.compression_enabled = g_conf->leveldb_compression;
58   options.paranoid_checks = g_conf->leveldb_paranoid;
59   options.max_open_files = g_conf->leveldb_max_open_files;
60   options.log_file = g_conf->leveldb_log;
61   return 0;
62 }
63
64 int LevelDBStore::do_open(ostream &out, bool create_if_missing)
65 {
66   leveldb::Options ldoptions;
67
68   if (options.write_buffer_size)
69     ldoptions.write_buffer_size = options.write_buffer_size;
70   if (options.max_open_files)
71     ldoptions.max_open_files = options.max_open_files;
72   if (options.cache_size) {
73     leveldb::Cache *_db_cache = leveldb::NewLRUCache(options.cache_size);
74     db_cache.reset(_db_cache);
75     ldoptions.block_cache = db_cache.get();
76   }
77   if (options.block_size)
78     ldoptions.block_size = options.block_size;
79   if (options.bloom_size) {
80 #ifdef HAVE_LEVELDB_FILTER_POLICY
81     const leveldb::FilterPolicy *_filterpolicy =
82         leveldb::NewBloomFilterPolicy(options.bloom_size);
83     filterpolicy.reset(_filterpolicy);
84     ldoptions.filter_policy = filterpolicy.get();
85 #else
86     assert(0 == "bloom size set but installed leveldb doesn't support bloom filters");
87 #endif
88   }
89   if (options.compression_enabled)
90     ldoptions.compression = leveldb::kSnappyCompression;
91   else
92     ldoptions.compression = leveldb::kNoCompression;
93   if (options.block_restart_interval)
94     ldoptions.block_restart_interval = options.block_restart_interval;
95
96   ldoptions.error_if_exists = options.error_if_exists;
97   ldoptions.paranoid_checks = options.paranoid_checks;
98   ldoptions.create_if_missing = create_if_missing;
99
100   if (g_conf->leveldb_log_to_ceph_log) {
101     ceph_logger = new CephLevelDBLogger(g_ceph_context);
102     ldoptions.info_log = ceph_logger;
103   }
104   
105   if (options.log_file.length()) {
106     leveldb::Env *env = leveldb::Env::Default();
107     env->NewLogger(options.log_file, &ldoptions.info_log);
108   }
109
110   leveldb::DB *_db;
111   leveldb::Status status = leveldb::DB::Open(ldoptions, path, &_db);
112   db.reset(_db);
113   if (!status.ok()) {
114     out << status.ToString() << std::endl;
115     return -EINVAL;
116   }
117
118   PerfCountersBuilder plb(g_ceph_context, "leveldb", l_leveldb_first, l_leveldb_last);
119   plb.add_u64_counter(l_leveldb_gets, "leveldb_get", "Gets");
120   plb.add_u64_counter(l_leveldb_txns, "leveldb_transaction", "Transactions");
121   plb.add_time_avg(l_leveldb_get_latency, "leveldb_get_latency", "Get Latency");
122   plb.add_time_avg(l_leveldb_submit_latency, "leveldb_submit_latency", "Submit Latency");
123   plb.add_time_avg(l_leveldb_submit_sync_latency, "leveldb_submit_sync_latency", "Submit Sync Latency");
124   plb.add_u64_counter(l_leveldb_compact, "leveldb_compact", "Compactions");
125   plb.add_u64_counter(l_leveldb_compact_range, "leveldb_compact_range", "Compactions by range");
126   plb.add_u64_counter(l_leveldb_compact_queue_merge, "leveldb_compact_queue_merge", "Mergings of ranges in compaction queue");
127   plb.add_u64(l_leveldb_compact_queue_len, "leveldb_compact_queue_len", "Length of compaction queue");
128   logger = plb.create_perf_counters();
129   cct->get_perfcounters_collection()->add(logger);
130
131   if (g_conf->leveldb_compact_on_mount) {
132     derr << "Compacting leveldb store..." << dendl;
133     compact();
134     derr << "Finished compacting leveldb store" << dendl;
135   }
136   return 0;
137 }
138
139 int LevelDBStore::_test_init(const string& dir)
140 {
141   leveldb::Options options;
142   options.create_if_missing = true;
143   leveldb::DB *db;
144   leveldb::Status status = leveldb::DB::Open(options, dir, &db);
145   delete db;
146   return status.ok() ? 0 : -EIO;
147 }
148
149 LevelDBStore::~LevelDBStore()
150 {
151   close();
152   delete logger;
153
154   // Ensure db is destroyed before dependent db_cache and filterpolicy
155   db.reset();
156   delete ceph_logger;
157 }
158
159 void LevelDBStore::close()
160 {
161   // stop compaction thread
162   compact_queue_lock.Lock();
163   if (compact_thread.is_started()) {
164     compact_queue_stop = true;
165     compact_queue_cond.Signal();
166     compact_queue_lock.Unlock();
167     compact_thread.join();
168   } else {
169     compact_queue_lock.Unlock();
170   }
171
172   if (logger)
173     cct->get_perfcounters_collection()->remove(logger);
174 }
175
176 int LevelDBStore::submit_transaction(KeyValueDB::Transaction t)
177 {
178   utime_t start = ceph_clock_now();
179   LevelDBTransactionImpl * _t =
180     static_cast<LevelDBTransactionImpl *>(t.get());
181   leveldb::Status s = db->Write(leveldb::WriteOptions(), &(_t->bat));
182   utime_t lat = ceph_clock_now() - start;
183   logger->inc(l_leveldb_txns);
184   logger->tinc(l_leveldb_submit_latency, lat);
185   return s.ok() ? 0 : -1;
186 }
187
188 int LevelDBStore::submit_transaction_sync(KeyValueDB::Transaction t)
189 {
190   utime_t start = ceph_clock_now();
191   LevelDBTransactionImpl * _t =
192     static_cast<LevelDBTransactionImpl *>(t.get());
193   leveldb::WriteOptions options;
194   options.sync = true;
195   leveldb::Status s = db->Write(options, &(_t->bat));
196   utime_t lat = ceph_clock_now() - start;
197   logger->inc(l_leveldb_txns);
198   logger->tinc(l_leveldb_submit_sync_latency, lat);
199   return s.ok() ? 0 : -1;
200 }
201
202 void LevelDBStore::LevelDBTransactionImpl::set(
203   const string &prefix,
204   const string &k,
205   const bufferlist &to_set_bl)
206 {
207   string key = combine_strings(prefix, k);
208   size_t bllen = to_set_bl.length();
209   // bufferlist::c_str() is non-constant, so we can't call c_str()
210   if (to_set_bl.is_contiguous() && bllen > 0) {
211     // bufferlist contains just one ptr or they're contiguous
212     bat.Put(leveldb::Slice(key), leveldb::Slice(to_set_bl.buffers().front().c_str(), bllen));
213   } else if ((bllen <= 32 * 1024) && (bllen > 0)) {
214     // 2+ bufferptrs that are not contiguopus
215     // allocate buffer on stack and copy bl contents to that buffer
216     // make sure the buffer isn't too large or we might crash here...    
217     char* slicebuf = (char*) alloca(bllen);
218     leveldb::Slice newslice(slicebuf, bllen);
219     std::list<buffer::ptr>::const_iterator pb;
220     for (pb = to_set_bl.buffers().begin(); pb != to_set_bl.buffers().end(); ++pb) {
221       size_t ptrlen = (*pb).length();
222       memcpy((void*)slicebuf, (*pb).c_str(), ptrlen);
223       slicebuf += ptrlen;
224     } 
225     bat.Put(leveldb::Slice(key), newslice);
226   } else {
227     // 2+ bufferptrs that are not contiguous, and enormous in size
228     bufferlist val = to_set_bl;
229     bat.Put(leveldb::Slice(key), leveldb::Slice(val.c_str(), val.length()));
230   }
231 }
232
233 void LevelDBStore::LevelDBTransactionImpl::rmkey(const string &prefix,
234                                                  const string &k)
235 {
236   string key = combine_strings(prefix, k);
237   bat.Delete(leveldb::Slice(key));
238 }
239
240 void LevelDBStore::LevelDBTransactionImpl::rmkeys_by_prefix(const string &prefix)
241 {
242   KeyValueDB::Iterator it = db->get_iterator(prefix);
243   for (it->seek_to_first();
244        it->valid();
245        it->next()) {
246     bat.Delete(leveldb::Slice(combine_strings(prefix, it->key())));
247   }
248 }
249
250 void LevelDBStore::LevelDBTransactionImpl::rm_range_keys(const string &prefix, const string &start, const string &end)
251 {
252   KeyValueDB::Iterator it = db->get_iterator(prefix);
253   it->lower_bound(start);
254   while (it->valid()) {
255     if (it->key() >= end) {
256       break;
257     }
258     bat.Delete(combine_strings(prefix, it->key()));
259     it->next();
260   }
261 }
262
263 int LevelDBStore::get(
264     const string &prefix,
265     const std::set<string> &keys,
266     std::map<string, bufferlist> *out)
267 {
268   utime_t start = ceph_clock_now();
269   for (std::set<string>::const_iterator i = keys.begin();
270        i != keys.end(); ++i) {
271     std::string value;
272     std::string bound = combine_strings(prefix, *i);
273     auto status = db->Get(leveldb::ReadOptions(), leveldb::Slice(bound), &value);
274     if (status.ok())
275       (*out)[*i].append(value);
276   }
277   utime_t lat = ceph_clock_now() - start;
278   logger->inc(l_leveldb_gets);
279   logger->tinc(l_leveldb_get_latency, lat);
280   return 0;
281 }
282
283 int LevelDBStore::get(const string &prefix, 
284       const string &key,
285       bufferlist *out)
286 {
287   assert(out && (out->length() == 0));
288   utime_t start = ceph_clock_now();
289   int r = 0;
290   string value, k;
291   leveldb::Status s;
292   k = combine_strings(prefix, key);
293   s = db->Get(leveldb::ReadOptions(), leveldb::Slice(k), &value);
294   if (s.ok()) {
295     out->append(value);
296   } else {
297     r = -ENOENT;
298   }
299   utime_t lat = ceph_clock_now() - start;
300   logger->inc(l_leveldb_gets);
301   logger->tinc(l_leveldb_get_latency, lat);
302   return r;
303 }
304
305 string LevelDBStore::combine_strings(const string &prefix, const string &value)
306 {
307   string out = prefix;
308   out.push_back(0);
309   out.append(value);
310   return out;
311 }
312
313 bufferlist LevelDBStore::to_bufferlist(leveldb::Slice in)
314 {
315   bufferlist bl;
316   bl.append(bufferptr(in.data(), in.size()));
317   return bl;
318 }
319
320 int LevelDBStore::split_key(leveldb::Slice in, string *prefix, string *key)
321 {
322   size_t prefix_len = 0;
323   
324   // Find separator inside Slice
325   char* separator = (char*) memchr(in.data(), 0, in.size());
326   if (separator == NULL)
327      return -EINVAL;
328   prefix_len = size_t(separator - in.data());
329   if (prefix_len >= in.size())
330     return -EINVAL;
331
332   if (prefix)
333     *prefix = string(in.data(), prefix_len);
334   if (key)
335     *key = string(separator+1, in.size() - prefix_len - 1);
336    return 0;
337 }
338
339 void LevelDBStore::compact()
340 {
341   logger->inc(l_leveldb_compact);
342   db->CompactRange(NULL, NULL);
343 }
344
345
346 void LevelDBStore::compact_thread_entry()
347 {
348   compact_queue_lock.Lock();
349   while (!compact_queue_stop) {
350     while (!compact_queue.empty()) {
351       pair<string,string> range = compact_queue.front();
352       compact_queue.pop_front();
353       logger->set(l_leveldb_compact_queue_len, compact_queue.size());
354       compact_queue_lock.Unlock();
355       logger->inc(l_leveldb_compact_range);
356       compact_range(range.first, range.second);
357       compact_queue_lock.Lock();
358       continue;
359     }
360     compact_queue_cond.Wait(compact_queue_lock);
361   }
362   compact_queue_lock.Unlock();
363 }
364
365 void LevelDBStore::compact_range_async(const string& start, const string& end)
366 {
367   Mutex::Locker l(compact_queue_lock);
368
369   // try to merge adjacent ranges.  this is O(n), but the queue should
370   // be short.  note that we do not cover all overlap cases and merge
371   // opportunities here, but we capture the ones we currently need.
372   list< pair<string,string> >::iterator p = compact_queue.begin();
373   while (p != compact_queue.end()) {
374     if (p->first == start && p->second == end) {
375       // dup; no-op
376       return;
377     }
378     if (p->first <= end && p->first > start) {
379       // merge with existing range to the right
380       compact_queue.push_back(make_pair(start, p->second));
381       compact_queue.erase(p);
382       logger->inc(l_leveldb_compact_queue_merge);
383       break;
384     }
385     if (p->second >= start && p->second < end) {
386       // merge with existing range to the left
387       compact_queue.push_back(make_pair(p->first, end));
388       compact_queue.erase(p);
389       logger->inc(l_leveldb_compact_queue_merge);
390       break;
391     }
392     ++p;
393   }
394   if (p == compact_queue.end()) {
395     // no merge, new entry.
396     compact_queue.push_back(make_pair(start, end));
397     logger->set(l_leveldb_compact_queue_len, compact_queue.size());
398   }
399   compact_queue_cond.Signal();
400   if (!compact_thread.is_started()) {
401     compact_thread.create("levdbst_compact");
402   }
403 }