Fix some bugs when testing opensds ansible
[stor4nfv.git] / src / ceph / src / os / kstore / KStore.cc
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
3 /*
4  * Ceph - scalable distributed file system
5  *
6  * Copyright (C) 2014 Red Hat
7  *
8  * This is free software; you can redistribute it and/or
9  * modify it under the terms of the GNU Lesser General Public
10  * License version 2.1, as published by the Free Software
11  * Foundation.  See file COPYING.
12  *
13  */
14
15 #include <unistd.h>
16 #include <stdlib.h>
17 #include <sys/types.h>
18 #include <sys/stat.h>
19 #include <fcntl.h>
20 #include <unistd.h>
21
22 #include "KStore.h"
23 #include "osd/osd_types.h"
24 #include "os/kv.h"
25 #include "include/compat.h"
26 #include "include/stringify.h"
27 #include "common/errno.h"
28 #include "common/safe_io.h"
29 #include "common/Formatter.h"
30
31
32 #define dout_context cct
33 #define dout_subsys ceph_subsys_kstore
34
35 /*
36
37   TODO:
38
39   * superblock, features
40   * refcounted extents (for efficient clone)
41
42  */
43
44 const string PREFIX_SUPER = "S"; // field -> value
45 const string PREFIX_COLL = "C"; // collection name -> (nothing)
46 const string PREFIX_OBJ = "O";  // object name -> onode
47 const string PREFIX_DATA = "D"; // nid + offset -> data
48 const string PREFIX_OMAP = "M"; // u64 + keyname -> value
49
50 /*
51  * object name key structure
52  *
53  * 2 chars: shard (-- for none, or hex digit, so that we sort properly)
54  * encoded u64: poolid + 2^63 (so that it sorts properly)
55  * encoded u32: hash (bit reversed)
56  *
57  * 1 char: '.'
58  *
59  * escaped string: namespace
60  *
61  * 1 char: '<', '=', or '>'.  if =, then object key == object name, and
62  *         we are followed just by the key.  otherwise, we are followed by
63  *         the key and then the object name.
64  * escaped string: key
65  * escaped string: object name (unless '=' above)
66  *
67  * encoded u64: snap
68  * encoded u64: generation
69  */
70
71 /*
72  * string encoding in the key
73  *
74  * The key string needs to lexicographically sort the same way that
75  * ghobject_t does.  We do this by escaping anything <= to '#' with #
76  * plus a 2 digit hex string, and anything >= '~' with ~ plus the two
77  * hex digits.
78  *
79  * We use ! as a terminator for strings; this works because it is < #
80  * and will get escaped if it is present in the string.
81  *
82  */
83
84 static void append_escaped(const string &in, string *out)
85 {
86   char hexbyte[8];
87   for (string::const_iterator i = in.begin(); i != in.end(); ++i) {
88     if (*i <= '#') {
89       snprintf(hexbyte, sizeof(hexbyte), "#%02x", (uint8_t)*i);
90       out->append(hexbyte);
91     } else if (*i >= '~') {
92       snprintf(hexbyte, sizeof(hexbyte), "~%02x", (uint8_t)*i);
93       out->append(hexbyte);
94     } else {
95       out->push_back(*i);
96     }
97   }
98   out->push_back('!');
99 }
100
101 static int decode_escaped(const char *p, string *out)
102 {
103   const char *orig_p = p;
104   while (*p && *p != '!') {
105     if (*p == '#' || *p == '~') {
106       unsigned hex;
107       int r = sscanf(++p, "%2x", &hex);
108       if (r < 1)
109         return -EINVAL;
110       out->push_back((char)hex);
111       p += 2;
112     } else {
113       out->push_back(*p++);
114     }
115   }
116   return p - orig_p;
117 }
118
119 // some things we encode in binary (as le32 or le64); print the
120 // resulting key strings nicely
121 static string pretty_binary_string(const string& in)
122 {
123   char buf[10];
124   string out;
125   out.reserve(in.length() * 3);
126   enum { NONE, HEX, STRING } mode = NONE;
127   unsigned from = 0, i;
128   for (i=0; i < in.length(); ++i) {
129     if ((in[i] < 32 || (unsigned char)in[i] > 126) ||
130         (mode == HEX && in.length() - i >= 4 &&
131          ((in[i] < 32 || (unsigned char)in[i] > 126) ||
132           (in[i+1] < 32 || (unsigned char)in[i+1] > 126) ||
133           (in[i+2] < 32 || (unsigned char)in[i+2] > 126) ||
134           (in[i+3] < 32 || (unsigned char)in[i+3] > 126)))) {
135       if (mode == STRING) {
136         out.append(in.substr(from, i - from));
137         out.push_back('\'');
138       }
139       if (mode != HEX) {
140         out.append("0x");
141         mode = HEX;
142       }
143       if (in.length() - i >= 4) {
144         // print a whole u32 at once
145         snprintf(buf, sizeof(buf), "%08x",
146                  (uint32_t)(((unsigned char)in[i] << 24) |
147                             ((unsigned char)in[i+1] << 16) |
148                             ((unsigned char)in[i+2] << 8) |
149                             ((unsigned char)in[i+3] << 0)));
150         i += 3;
151       } else {
152         snprintf(buf, sizeof(buf), "%02x", (int)(unsigned char)in[i]);
153       }
154       out.append(buf);
155     } else {
156       if (mode != STRING) {
157         out.push_back('\'');
158         mode = STRING;
159         from = i;
160       }
161     }
162   }
163   if (mode == STRING) {
164     out.append(in.substr(from, i - from));
165     out.push_back('\'');
166   }
167   return out;
168 }
169
170 static void _key_encode_shard(shard_id_t shard, string *key)
171 {
172   // make field ordering match with ghobject_t compare operations
173   if (shard == shard_id_t::NO_SHARD) {
174     // otherwise ff will sort *after* 0, not before.
175     key->append("--");
176   } else {
177     char buf[32];
178     snprintf(buf, sizeof(buf), "%02x", (int)shard);
179     key->append(buf);
180   }
181 }
182 static const char *_key_decode_shard(const char *key, shard_id_t *pshard)
183 {
184   if (key[0] == '-') {
185     *pshard = shard_id_t::NO_SHARD;
186   } else {
187     unsigned shard;
188     int r = sscanf(key, "%x", &shard);
189     if (r < 1)
190       return NULL;
191     *pshard = shard_id_t(shard);
192   }
193   return key + 2;
194 }
195
196 static void get_coll_key_range(const coll_t& cid, int bits,
197                                string *temp_start, string *temp_end,
198                                string *start, string *end)
199 {
200   temp_start->clear();
201   temp_end->clear();
202   start->clear();
203   end->clear();
204
205   spg_t pgid;
206   if (cid.is_pg(&pgid)) {
207     _key_encode_shard(pgid.shard, start);
208     *end = *start;
209     *temp_start = *start;
210     *temp_end = *start;
211
212     _key_encode_u64(pgid.pool() + 0x8000000000000000ull, start);
213     _key_encode_u64((-2ll - pgid.pool()) + 0x8000000000000000ull, temp_start);
214     _key_encode_u32(hobject_t::_reverse_bits(pgid.ps()), start);
215     _key_encode_u32(hobject_t::_reverse_bits(pgid.ps()), temp_start);
216     start->append(".");
217     temp_start->append(".");
218
219     _key_encode_u64(pgid.pool() + 0x8000000000000000ull, end);
220     _key_encode_u64((-2ll - pgid.pool()) + 0x8000000000000000ull, temp_end);
221
222     uint64_t end_hash =
223       hobject_t::_reverse_bits(pgid.ps()) + (1ull << (32-bits));
224     if (end_hash <= 0xffffffffull) {
225       _key_encode_u32(end_hash, end);
226       _key_encode_u32(end_hash, temp_end);
227       end->append(".");
228       temp_end->append(".");
229     } else {
230       _key_encode_u32(0xffffffff, end);
231       _key_encode_u32(0xffffffff, temp_end);
232       end->append(":");
233       temp_end->append(":");
234     }
235   } else {
236     _key_encode_shard(shard_id_t::NO_SHARD, start);
237     _key_encode_u64(-1ull + 0x8000000000000000ull, start);
238     *end = *start;
239     _key_encode_u32(0, start);
240     start->append(".");
241     _key_encode_u32(0xffffffff, end);
242     end->append(":");
243
244     // no separate temp section
245     *temp_start = *end;
246     *temp_end = *end;
247   }
248 }
249
250 static int get_key_object(const string& key, ghobject_t *oid);
251
252 static void get_object_key(CephContext* cct, const ghobject_t& oid,
253                            string *key)
254 {
255   key->clear();
256
257   _key_encode_shard(oid.shard_id, key);
258   _key_encode_u64(oid.hobj.pool + 0x8000000000000000ull, key);
259   _key_encode_u32(oid.hobj.get_bitwise_key_u32(), key);
260   key->append(".");
261
262   append_escaped(oid.hobj.nspace, key);
263
264   if (oid.hobj.get_key().length()) {
265     // is a key... could be < = or >.
266     // (ASCII chars < = and > sort in that order, yay)
267     if (oid.hobj.get_key() < oid.hobj.oid.name) {
268       key->append("<");
269       append_escaped(oid.hobj.get_key(), key);
270       append_escaped(oid.hobj.oid.name, key);
271     } else if (oid.hobj.get_key() > oid.hobj.oid.name) {
272       key->append(">");
273       append_escaped(oid.hobj.get_key(), key);
274       append_escaped(oid.hobj.oid.name, key);
275     } else {
276       // same as no key
277       key->append("=");
278       append_escaped(oid.hobj.oid.name, key);
279     }
280   } else {
281     // no key
282     key->append("=");
283     append_escaped(oid.hobj.oid.name, key);
284   }
285
286   _key_encode_u64(oid.hobj.snap, key);
287   _key_encode_u64(oid.generation, key);
288
289   // sanity check
290   if (true) {
291     ghobject_t t;
292     int r = get_key_object(*key, &t);
293     if (r || t != oid) {
294       derr << "  r " << r << dendl;
295       derr << "key " << pretty_binary_string(*key) << dendl;
296       derr << "oid " << oid << dendl;
297       derr << "  t " << t << dendl;
298       assert(t == oid);
299     }
300   }
301 }
302
303 static int get_key_object(const string& key, ghobject_t *oid)
304 {
305   int r;
306   const char *p = key.c_str();
307
308   p = _key_decode_shard(p, &oid->shard_id);
309
310   uint64_t pool;
311   p = _key_decode_u64(p, &pool);
312   oid->hobj.pool = pool - 0x8000000000000000ull;
313
314   unsigned hash;
315   p = _key_decode_u32(p, &hash);
316   oid->hobj.set_bitwise_key_u32(hash);
317   if (*p != '.')
318     return -5;
319   ++p;
320
321   r = decode_escaped(p, &oid->hobj.nspace);
322   if (r < 0)
323     return -6;
324   p += r + 1;
325
326   if (*p == '=') {
327     // no key
328     ++p;
329     r = decode_escaped(p, &oid->hobj.oid.name);
330     if (r < 0)
331       return -7;
332     p += r + 1;
333   } else if (*p == '<' || *p == '>') {
334     // key + name
335     ++p;
336     string okey;
337     r = decode_escaped(p, &okey);
338     if (r < 0)
339       return -8;
340     p += r + 1;
341     r = decode_escaped(p, &oid->hobj.oid.name);
342     if (r < 0)
343       return -9;
344     p += r + 1;
345     oid->hobj.set_key(okey);
346   } else {
347     // malformed
348     return -10;
349   }
350
351   p = _key_decode_u64(p, &oid->hobj.snap.val);
352   p = _key_decode_u64(p, &oid->generation);
353   if (*p) {
354     // if we get something other than a null terminator here, 
355     // something goes wrong.
356     return -12;
357   }
358
359   return 0;
360 }
361
362
363 static void get_data_key(uint64_t nid, uint64_t offset, string *out)
364 {
365   _key_encode_u64(nid, out);
366   _key_encode_u64(offset, out);
367 }
368
369 // '-' < '.' < '~'
370 static void get_omap_header(uint64_t id, string *out)
371 {
372   _key_encode_u64(id, out);
373   out->push_back('-');
374 }
375
376 // hmm, I don't think there's any need to escape the user key since we
377 // have a clean prefix.
378 static void get_omap_key(uint64_t id, const string& key, string *out)
379 {
380   _key_encode_u64(id, out);
381   out->push_back('.');
382   out->append(key);
383 }
384
385 static void rewrite_omap_key(uint64_t id, string old, string *out)
386 {
387   _key_encode_u64(id, out);
388   out->append(old.substr(out->length()));
389 }
390
391 static void decode_omap_key(const string& key, string *user_key)
392 {
393   *user_key = key.substr(sizeof(uint64_t) + 1);
394 }
395
396 static void get_omap_tail(uint64_t id, string *out)
397 {
398   _key_encode_u64(id, out);
399   out->push_back('~');
400 }
401
402
403
404 // Onode
405
406 #undef dout_prefix
407 #define dout_prefix *_dout << "kstore.onode(" << this << ") "
408
409 void KStore::Onode::flush()
410 {
411   std::unique_lock<std::mutex> l(flush_lock);
412   dout(20) << __func__ << " " << flush_txns << dendl;
413   while (!flush_txns.empty())
414     flush_cond.wait(l);
415   dout(20) << __func__ << " done" << dendl;
416 }
417
418 // OnodeHashLRU
419
420 #undef dout_prefix
421 #define dout_prefix *_dout << "kstore.lru(" << this << ") "
422
423 void KStore::OnodeHashLRU::_touch(OnodeRef o)
424 {
425   lru_list_t::iterator p = lru.iterator_to(*o);
426   lru.erase(p);
427   lru.push_front(*o);
428 }
429
430 void KStore::OnodeHashLRU::add(const ghobject_t& oid, OnodeRef o)
431 {
432   std::lock_guard<std::mutex> l(lock);
433   dout(30) << __func__ << " " << oid << " " << o << dendl;
434   assert(onode_map.count(oid) == 0);
435   onode_map[oid] = o;
436   lru.push_front(*o);
437 }
438
439 KStore::OnodeRef KStore::OnodeHashLRU::lookup(const ghobject_t& oid)
440 {
441   std::lock_guard<std::mutex> l(lock);
442   dout(30) << __func__ << dendl;
443   ceph::unordered_map<ghobject_t,OnodeRef>::iterator p = onode_map.find(oid);
444   if (p == onode_map.end()) {
445     dout(30) << __func__ << " " << oid << " miss" << dendl;
446     return OnodeRef();
447   }
448   dout(30) << __func__ << " " << oid << " hit " << p->second << dendl;
449   _touch(p->second);
450   return p->second;
451 }
452
453 void KStore::OnodeHashLRU::clear()
454 {
455   std::lock_guard<std::mutex> l(lock);
456   dout(10) << __func__ << dendl;
457   lru.clear();
458   onode_map.clear();
459 }
460
461 void KStore::OnodeHashLRU::rename(const ghobject_t& old_oid,
462                                   const ghobject_t& new_oid)
463 {
464   std::lock_guard<std::mutex> l(lock);
465   dout(30) << __func__ << " " << old_oid << " -> " << new_oid << dendl;
466   ceph::unordered_map<ghobject_t,OnodeRef>::iterator po, pn;
467   po = onode_map.find(old_oid);
468   pn = onode_map.find(new_oid);
469
470   assert(po != onode_map.end());
471   if (pn != onode_map.end()) {
472     lru_list_t::iterator p = lru.iterator_to(*pn->second);
473     lru.erase(p);
474     onode_map.erase(pn);
475   }
476   OnodeRef o = po->second;
477
478   // install a non-existent onode it its place
479   po->second.reset(new Onode(cct, old_oid, o->key));
480   lru.push_back(*po->second);
481
482   // fix oid, key
483   onode_map.insert(make_pair(new_oid, o));
484   _touch(o);
485   o->oid = new_oid;
486   get_object_key(cct, new_oid, &o->key);
487 }
488
489 bool KStore::OnodeHashLRU::get_next(
490   const ghobject_t& after,
491   pair<ghobject_t,OnodeRef> *next)
492 {
493   std::lock_guard<std::mutex> l(lock);
494   dout(20) << __func__ << " after " << after << dendl;
495
496   if (after == ghobject_t()) {
497     if (lru.empty()) {
498       return false;
499     }
500     ceph::unordered_map<ghobject_t,OnodeRef>::iterator p = onode_map.begin();
501     assert(p != onode_map.end());
502     next->first = p->first;
503     next->second = p->second;
504     return true;
505   }
506
507   ceph::unordered_map<ghobject_t,OnodeRef>::iterator p = onode_map.find(after);
508   assert(p != onode_map.end()); // for now
509   lru_list_t::iterator pi = lru.iterator_to(*p->second);
510   ++pi;
511   if (pi == lru.end()) {
512     return false;
513   }
514   next->first = pi->oid;
515   next->second = onode_map[pi->oid];
516   return true;
517 }
518
519 int KStore::OnodeHashLRU::trim(int max)
520 {
521   std::lock_guard<std::mutex> l(lock);
522   dout(20) << __func__ << " max " << max
523            << " size " << onode_map.size() << dendl;
524   int trimmed = 0;
525   int num = onode_map.size() - max;
526   if (onode_map.size() == 0 || num <= 0)
527     return 0; // don't even try
528
529   lru_list_t::iterator p = lru.end();
530   if (num)
531     --p;
532   while (num > 0) {
533     Onode *o = &*p;
534     int refs = o->nref.load();
535     if (refs > 1) {
536       dout(20) << __func__ << "  " << o->oid << " has " << refs
537                << " refs; stopping with " << num << " left to trim" << dendl;
538       break;
539     }
540     dout(30) << __func__ << "  trim " << o->oid << dendl;
541     if (p != lru.begin()) {
542       lru.erase(p--);
543     } else {
544       lru.erase(p);
545       assert(num == 1);
546     }
547     o->get();  // paranoia
548     onode_map.erase(o->oid);
549     o->put();
550     --num;
551     ++trimmed;
552   }
553   return trimmed;
554 }
555
556 // =======================================================
557
558 // Collection
559
560 #undef dout_prefix
561 #define dout_prefix *_dout << "kstore(" << store->path << ").collection(" << cid << ") "
562
563 KStore::Collection::Collection(KStore *ns, coll_t c)
564   : store(ns),
565     cid(c),
566     lock("KStore::Collection::lock", true, false),
567     onode_map(store->cct)
568 {
569 }
570
571 KStore::OnodeRef KStore::Collection::get_onode(
572   const ghobject_t& oid,
573   bool create)
574 {
575   assert(create ? lock.is_wlocked() : lock.is_locked());
576
577   spg_t pgid;
578   if (cid.is_pg(&pgid)) {
579     if (!oid.match(cnode.bits, pgid.ps())) {
580       lderr(store->cct) << __func__ << " oid " << oid << " not part of "
581                         << pgid << " bits " << cnode.bits << dendl;
582       ceph_abort();
583     }
584   }
585
586   OnodeRef o = onode_map.lookup(oid);
587   if (o)
588     return o;
589
590   string key;
591   get_object_key(store->cct, oid, &key);
592
593   ldout(store->cct, 20) << __func__ << " oid " << oid << " key "
594                         << pretty_binary_string(key) << dendl;
595
596   bufferlist v;
597   int r = store->db->get(PREFIX_OBJ, key, &v);
598   ldout(store->cct, 20) << " r " << r << " v.len " << v.length() << dendl;
599   Onode *on;
600   if (v.length() == 0) {
601     assert(r == -ENOENT);
602     if (!create)
603       return OnodeRef();
604
605     // new
606     on = new Onode(store->cct, oid, key);
607     on->dirty = true;
608   } else {
609     // loaded
610     assert(r >=0);
611     on = new Onode(store->cct, oid, key);
612     on->exists = true;
613     bufferlist::iterator p = v.begin();
614     ::decode(on->onode, p);
615   }
616   o.reset(on);
617   onode_map.add(oid, o);
618   return o;
619 }
620
621
622
623 // =======================================================
624
625 #undef dout_prefix
626 #define dout_prefix *_dout << "kstore(" << path << ") "
627
628 KStore::KStore(CephContext *cct, const string& path)
629   : ObjectStore(cct, path),
630     db(NULL),
631     path_fd(-1),
632     fsid_fd(-1),
633     mounted(false),
634     coll_lock("KStore::coll_lock"),
635     nid_last(0),
636     nid_max(0),
637     throttle_ops(cct, "kstore_max_ops", cct->_conf->kstore_max_ops),
638     throttle_bytes(cct, "kstore_max_bytes", cct->_conf->kstore_max_bytes),
639     finisher(cct),
640     kv_sync_thread(this),
641     kv_stop(false),
642     logger(nullptr)
643 {
644   _init_logger();
645 }
646
647 KStore::~KStore()
648 {
649   _shutdown_logger();
650   assert(!mounted);
651   assert(db == NULL);
652   assert(fsid_fd < 0);
653 }
654
655 void KStore::_init_logger()
656 {
657   // XXX
658   PerfCountersBuilder b(cct, "KStore",
659                         l_kstore_first, l_kstore_last);
660   b.add_time_avg(l_kstore_state_prepare_lat, "state_prepare_lat", "Average prepare state latency");
661   b.add_time_avg(l_kstore_state_kv_queued_lat, "state_kv_queued_lat", "Average kv_queued state latency");
662   b.add_time_avg(l_kstore_state_kv_done_lat, "state_kv_done_lat", "Average kv_done state latency");
663   b.add_time_avg(l_kstore_state_finishing_lat, "state_finishing_lat", "Average finishing state latency");
664   b.add_time_avg(l_kstore_state_done_lat, "state_done_lat", "Average done state latency");
665   logger = b.create_perf_counters();
666   cct->get_perfcounters_collection()->add(logger);
667 }
668
669 void KStore::_shutdown_logger()
670 {
671   // XXX
672   cct->get_perfcounters_collection()->remove(logger);
673   delete logger;
674 }
675
676 int KStore::_open_path()
677 {
678   assert(path_fd < 0);
679   path_fd = ::open(path.c_str(), O_DIRECTORY);
680   if (path_fd < 0) {
681     int r = -errno;
682     derr << __func__ << " unable to open " << path << ": " << cpp_strerror(r)
683          << dendl;
684     return r;
685   }
686   return 0;
687 }
688
689 void KStore::_close_path()
690 {
691   VOID_TEMP_FAILURE_RETRY(::close(path_fd));
692   path_fd = -1;
693 }
694
695 int KStore::_open_fsid(bool create)
696 {
697   assert(fsid_fd < 0);
698   int flags = O_RDWR;
699   if (create)
700     flags |= O_CREAT;
701   fsid_fd = ::openat(path_fd, "fsid", flags, 0644);
702   if (fsid_fd < 0) {
703     int err = -errno;
704     derr << __func__ << " " << cpp_strerror(err) << dendl;
705     return err;
706   }
707   return 0;
708 }
709
710 int KStore::_read_fsid(uuid_d *uuid)
711 {
712   char fsid_str[40];
713   memset(fsid_str, 0, sizeof(fsid_str));
714   int ret = safe_read(fsid_fd, fsid_str, sizeof(fsid_str));
715   if (ret < 0) {
716     derr << __func__ << " failed: " << cpp_strerror(ret) << dendl;
717     return ret;
718   }
719   if (ret > 36)
720     fsid_str[36] = 0;
721   else
722     fsid_str[ret] = 0;
723   if (!uuid->parse(fsid_str)) {
724     derr << __func__ << " unparsable uuid " << fsid_str << dendl;
725     return -EINVAL;
726   }
727   return 0;
728 }
729
730 int KStore::_write_fsid()
731 {
732   int r = ::ftruncate(fsid_fd, 0);
733   if (r < 0) {
734     r = -errno;
735     derr << __func__ << " fsid truncate failed: " << cpp_strerror(r) << dendl;
736     return r;
737   }
738   string str = stringify(fsid) + "\n";
739   r = safe_write(fsid_fd, str.c_str(), str.length());
740   if (r < 0) {
741     derr << __func__ << " fsid write failed: " << cpp_strerror(r) << dendl;
742     return r;
743   }
744   r = ::fsync(fsid_fd);
745   if (r < 0) {
746     r = -errno;
747     derr << __func__ << " fsid fsync failed: " << cpp_strerror(r) << dendl;
748     return r;
749   }
750   return 0;
751 }
752
753 void KStore::_close_fsid()
754 {
755   VOID_TEMP_FAILURE_RETRY(::close(fsid_fd));
756   fsid_fd = -1;
757 }
758
759 int KStore::_lock_fsid()
760 {
761   struct flock l;
762   memset(&l, 0, sizeof(l));
763   l.l_type = F_WRLCK;
764   l.l_whence = SEEK_SET;
765   l.l_start = 0;
766   l.l_len = 0;
767   int r = ::fcntl(fsid_fd, F_SETLK, &l);
768   if (r < 0) {
769     int err = errno;
770     derr << __func__ << " failed to lock " << path << "/fsid"
771          << " (is another ceph-osd still running?)"
772          << cpp_strerror(err) << dendl;
773     return -err;
774   }
775   return 0;
776 }
777
778 bool KStore::test_mount_in_use()
779 {
780   // most error conditions mean the mount is not in use (e.g., because
781   // it doesn't exist).  only if we fail to lock do we conclude it is
782   // in use.
783   bool ret = false;
784   int r = _open_path();
785   if (r < 0)
786     return false;
787   r = _open_fsid(false);
788   if (r < 0)
789     goto out_path;
790   r = _lock_fsid();
791   if (r < 0)
792     ret = true; // if we can't lock, it is in use
793   _close_fsid();
794  out_path:
795   _close_path();
796   return ret;
797 }
798
799 int KStore::_open_db(bool create)
800 {
801   int r;
802   assert(!db);
803   char fn[PATH_MAX];
804   snprintf(fn, sizeof(fn), "%s/db", path.c_str());
805
806   string kv_backend;
807   if (create) {
808     kv_backend = cct->_conf->kstore_backend;
809   } else {
810     r = read_meta("kv_backend", &kv_backend);
811     if (r < 0) {
812       derr << __func__ << " uanble to read 'kv_backend' meta" << dendl;
813       return -EIO;
814     }
815   }
816   dout(10) << __func__ << " kv_backend = " << kv_backend << dendl;
817
818   if (create) {
819     int r = ::mkdir(fn, 0755);
820     if (r < 0)
821       r = -errno;
822     if (r < 0 && r != -EEXIST) {
823       derr << __func__ << " failed to create " << fn << ": " << cpp_strerror(r)
824            << dendl;
825       return r;
826     }
827
828     // wal_dir, too!
829     char walfn[PATH_MAX];
830     snprintf(walfn, sizeof(walfn), "%s/db.wal", path.c_str());
831     r = ::mkdir(walfn, 0755);
832     if (r < 0)
833       r = -errno;
834     if (r < 0 && r != -EEXIST) {
835       derr << __func__ << " failed to create " << walfn
836            << ": " << cpp_strerror(r)
837            << dendl;
838       return r;
839     }
840   }
841
842   db = KeyValueDB::create(cct, kv_backend, fn);
843   if (!db) {
844     derr << __func__ << " error creating db" << dendl;
845     return -EIO;
846   }
847   string options;
848   if (kv_backend == "rocksdb")
849     options = cct->_conf->kstore_rocksdb_options;
850   db->init(options);
851   stringstream err;
852   if (create)
853     r = db->create_and_open(err);
854   else
855     r = db->open(err);
856   if (r) {
857     derr << __func__ << " erroring opening db: " << err.str() << dendl;
858     delete db;
859     db = NULL;
860     return -EIO;
861   }
862   dout(1) << __func__ << " opened " << kv_backend
863           << " path " << fn << " options " << options << dendl;
864   return 0;
865 }
866
867 void KStore::_close_db()
868 {
869   assert(db);
870   delete db;
871   db = NULL;
872 }
873
874 int KStore::_open_collections(int *errors)
875 {
876   assert(coll_map.empty());
877   KeyValueDB::Iterator it = db->get_iterator(PREFIX_COLL);
878   for (it->upper_bound(string());
879        it->valid();
880        it->next()) {
881     coll_t cid;
882     if (cid.parse(it->key())) {
883       CollectionRef c(new Collection(this, cid));
884       bufferlist bl = it->value();
885       bufferlist::iterator p = bl.begin();
886       try {
887         ::decode(c->cnode, p);
888       } catch (buffer::error& e) {
889         derr << __func__ << " failed to decode cnode, key:"
890              << pretty_binary_string(it->key()) << dendl;
891         return -EIO;
892       } 
893       dout(20) << __func__ << " opened " << cid << dendl;
894       coll_map[cid] = c;
895     } else {
896       derr << __func__ << " unrecognized collection " << it->key() << dendl;
897       if (errors)
898         (*errors)++;
899     }
900   }
901   return 0;
902 }
903
904 int KStore::mkfs()
905 {
906   dout(1) << __func__ << " path " << path << dendl;
907   int r;
908   uuid_d old_fsid;
909
910   r = _open_path();
911   if (r < 0)
912     return r;
913
914   r = _open_fsid(true);
915   if (r < 0)
916     goto out_path_fd;
917
918   r = _lock_fsid();
919   if (r < 0)
920     goto out_close_fsid;
921
922   r = _read_fsid(&old_fsid);
923   if (r < 0 || old_fsid.is_zero()) {
924     if (fsid.is_zero()) {
925       fsid.generate_random();
926       dout(1) << __func__ << " generated fsid " << fsid << dendl;
927     } else {
928       dout(1) << __func__ << " using provided fsid " << fsid << dendl;
929     }
930     // we'll write it last.
931   } else {
932     if (!fsid.is_zero() && fsid != old_fsid) {
933       derr << __func__ << " on-disk fsid " << old_fsid
934            << " != provided " << fsid << dendl;
935       r = -EINVAL;
936       goto out_close_fsid;
937     }
938     fsid = old_fsid;
939     dout(1) << __func__ << " already created, fsid is " << fsid << dendl;
940     goto out_close_fsid;
941   }
942
943   r = _open_db(true);
944   if (r < 0)
945     goto out_close_fsid;
946
947   r = write_meta("kv_backend", cct->_conf->kstore_backend);
948   if (r < 0)
949     goto out_close_db;
950
951   r = write_meta("type", "kstore");
952   if (r < 0)
953     goto out_close_db;
954
955   // indicate mkfs completion/success by writing the fsid file
956   r = _write_fsid();
957   if (r == 0)
958     dout(10) << __func__ << " success" << dendl;
959   else
960     derr << __func__ << " error writing fsid: " << cpp_strerror(r) << dendl;
961
962  out_close_db:
963   _close_db();
964  out_close_fsid:
965   _close_fsid();
966  out_path_fd:
967   _close_path();
968   return r;
969 }
970
971 int KStore::mount()
972 {
973   dout(1) << __func__ << " path " << path << dendl;
974
975   if (cct->_conf->kstore_fsck_on_mount) {
976     int rc = fsck(cct->_conf->kstore_fsck_on_mount_deep);
977     if (rc < 0)
978       return rc;
979   }
980
981   int r = _open_path();
982   if (r < 0)
983     return r;
984   r = _open_fsid(false);
985   if (r < 0)
986     goto out_path;
987
988   r = _read_fsid(&fsid);
989   if (r < 0)
990     goto out_fsid;
991
992   r = _lock_fsid();
993   if (r < 0)
994     goto out_fsid;
995
996   r = _open_db(false);
997   if (r < 0)
998     goto out_fsid;
999
1000   r = _open_super_meta();
1001   if (r < 0)
1002     goto out_db;
1003
1004   r = _open_collections();
1005   if (r < 0)
1006     goto out_db;
1007
1008   finisher.start();
1009   kv_sync_thread.create("kstore_kv_sync");
1010
1011   mounted = true;
1012   return 0;
1013
1014  out_db:
1015   _close_db();
1016  out_fsid:
1017   _close_fsid();
1018  out_path:
1019   _close_path();
1020   return r;
1021 }
1022
1023 int KStore::umount()
1024 {
1025   assert(mounted);
1026   dout(1) << __func__ << dendl;
1027
1028   _sync();
1029   _reap_collections();
1030   coll_map.clear();
1031
1032   dout(20) << __func__ << " stopping kv thread" << dendl;
1033   _kv_stop();
1034   dout(20) << __func__ << " draining finisher" << dendl;
1035   finisher.wait_for_empty();
1036   dout(20) << __func__ << " stopping finisher" << dendl;
1037   finisher.stop();
1038   dout(20) << __func__ << " closing" << dendl;
1039
1040   mounted = false;
1041   _close_db();
1042   _close_fsid();
1043   _close_path();
1044   return 0;
1045 }
1046
1047 int KStore::fsck(bool deep)
1048 {
1049   dout(1) << __func__ << dendl;
1050   int errors = 0;
1051   dout(1) << __func__ << " finish with " << errors << " errors" << dendl;
1052   return errors;
1053 }
1054
1055 void KStore::_sync()
1056 {
1057   dout(10) << __func__ << dendl;
1058
1059   std::unique_lock<std::mutex> l(kv_lock);
1060   while (!kv_committing.empty() ||
1061          !kv_queue.empty()) {
1062     dout(20) << " waiting for kv to commit" << dendl;
1063     kv_sync_cond.wait(l);
1064   }
1065
1066   dout(10) << __func__ << " done" << dendl;
1067 }
1068
1069 int KStore::statfs(struct store_statfs_t* buf)
1070 {
1071   return db->get_statfs(buf);
1072 }
1073
1074 // ---------------
1075 // cache
1076
1077 KStore::CollectionRef KStore::_get_collection(coll_t cid)
1078 {
1079   RWLock::RLocker l(coll_lock);
1080   ceph::unordered_map<coll_t,CollectionRef>::iterator cp = coll_map.find(cid);
1081   if (cp == coll_map.end())
1082     return CollectionRef();
1083   return cp->second;
1084 }
1085
1086 void KStore::_queue_reap_collection(CollectionRef& c)
1087 {
1088   dout(10) << __func__ << " " << c->cid << dendl;
1089   std::lock_guard<std::mutex> l(reap_lock);
1090   removed_collections.push_back(c);
1091 }
1092
1093 void KStore::_reap_collections()
1094 {
1095   list<CollectionRef> removed_colls;
1096   std::lock_guard<std::mutex> l(reap_lock);
1097   removed_colls.swap(removed_collections);
1098
1099   for (list<CollectionRef>::iterator p = removed_colls.begin();
1100        p != removed_colls.end();
1101        ++p) {
1102     CollectionRef c = *p;
1103     dout(10) << __func__ << " " << c->cid << dendl;
1104     {
1105       pair<ghobject_t,OnodeRef> next;
1106       while (c->onode_map.get_next(next.first, &next)) {
1107         assert(!next.second->exists);
1108         if (!next.second->flush_txns.empty()) {
1109           dout(10) << __func__ << " " << c->cid << " " << next.second->oid
1110                    << " flush_txns " << next.second->flush_txns << dendl;
1111           return;
1112         }
1113       }
1114     }
1115     c->onode_map.clear();
1116     dout(10) << __func__ << " " << c->cid << " done" << dendl;
1117   }
1118
1119   dout(10) << __func__ << " all reaped" << dendl;
1120 }
1121
1122 // ---------------
1123 // read operations
1124
1125 bool KStore::exists(const coll_t& cid, const ghobject_t& oid)
1126 {
1127   dout(10) << __func__ << " " << cid << " " << oid << dendl;
1128   CollectionRef c = _get_collection(cid);
1129   if (!c)
1130     return false;
1131   RWLock::RLocker l(c->lock);
1132   OnodeRef o = c->get_onode(oid, false);
1133   if (!o || !o->exists)
1134     return false;
1135   return true;
1136 }
1137
1138 int KStore::stat(
1139     const coll_t& cid,
1140     const ghobject_t& oid,
1141     struct stat *st,
1142     bool allow_eio)
1143 {
1144   dout(10) << __func__ << " " << cid << " " << oid << dendl;
1145   CollectionRef c = _get_collection(cid);
1146   if (!c)
1147     return -ENOENT;
1148   RWLock::RLocker l(c->lock);
1149   OnodeRef o = c->get_onode(oid, false);
1150   if (!o || !o->exists)
1151     return -ENOENT;
1152   st->st_size = o->onode.size;
1153   st->st_blksize = 4096;
1154   st->st_blocks = (st->st_size + st->st_blksize - 1) / st->st_blksize;
1155   st->st_nlink = 1;
1156   return 0;
1157 }
1158
1159 int KStore::set_collection_opts(
1160   const coll_t& cid,
1161   const pool_opts_t& opts)
1162 {
1163   return -EOPNOTSUPP;
1164 }
1165
1166 int KStore::read(
1167   const coll_t& cid,
1168   const ghobject_t& oid,
1169   uint64_t offset,
1170   size_t length,
1171   bufferlist& bl,
1172   uint32_t op_flags)
1173 {
1174   dout(15) << __func__ << " " << cid << " " << oid
1175            << " " << offset << "~" << length
1176            << dendl;
1177   bl.clear();
1178   CollectionRef c = _get_collection(cid);
1179   if (!c)
1180     return -ENOENT;
1181   RWLock::RLocker l(c->lock);
1182
1183   int r;
1184
1185   OnodeRef o = c->get_onode(oid, false);
1186   if (!o || !o->exists) {
1187     r = -ENOENT;
1188     goto out;
1189   }
1190
1191   if (offset == length && offset == 0)
1192     length = o->onode.size;
1193
1194   r = _do_read(o, offset, length, bl, op_flags);
1195
1196  out:
1197   dout(10) << __func__ << " " << cid << " " << oid
1198            << " " << offset << "~" << length
1199            << " = " << r << dendl;
1200   return r;
1201 }
1202
1203 int KStore::_do_read(
1204     OnodeRef o,
1205     uint64_t offset,
1206     size_t length,
1207     bufferlist& bl,
1208     uint32_t op_flags)
1209 {
1210   int r = 0;
1211   uint64_t stripe_size = o->onode.stripe_size;
1212   uint64_t stripe_off;
1213
1214   dout(20) << __func__ << " " << offset << "~" << length << " size "
1215            << o->onode.size << " nid " << o->onode.nid << dendl;
1216   bl.clear();
1217
1218   if (offset > o->onode.size) {
1219     goto out;
1220   }
1221   if (offset + length > o->onode.size) {
1222     length = o->onode.size - offset;
1223   }
1224   if (stripe_size == 0) {
1225     bl.append_zero(length);
1226     r = length;
1227     goto out;
1228   }
1229
1230   o->flush();
1231
1232   stripe_off = offset % stripe_size;
1233   while (length > 0) {
1234     bufferlist stripe;
1235     _do_read_stripe(o, offset - stripe_off, &stripe);
1236     dout(30) << __func__ << " stripe " << offset - stripe_off << " got "
1237              << stripe.length() << dendl;
1238     unsigned swant = MIN(stripe_size - stripe_off, length);
1239     if (stripe.length()) {
1240       if (swant == stripe.length()) {
1241         bl.claim_append(stripe);
1242         dout(30) << __func__ << " taking full stripe" << dendl;
1243       } else {
1244         unsigned l = 0;
1245         if (stripe_off < stripe.length()) {
1246           l = MIN(stripe.length() - stripe_off, swant);
1247           bufferlist t;
1248           t.substr_of(stripe, stripe_off, l);
1249           bl.claim_append(t);
1250           dout(30) << __func__ << " taking " << stripe_off << "~" << l << dendl;
1251         }
1252         if (l < swant) {
1253           bl.append_zero(swant - l);
1254           dout(30) << __func__ << " adding " << swant - l << " zeros" << dendl;
1255         }
1256       }
1257     } else {
1258       dout(30) << __func__ << " generating " << swant << " zeros" << dendl;
1259       bl.append_zero(swant);
1260     }
1261     offset += swant;
1262     length -= swant;
1263     stripe_off = 0;
1264   }
1265   r = bl.length();
1266   dout(30) << " result:\n";
1267   bl.hexdump(*_dout);
1268   *_dout << dendl;
1269
1270  out:
1271   return r;
1272 }
1273
1274 int KStore::fiemap(
1275   const coll_t& cid,
1276   const ghobject_t& oid,
1277   uint64_t offset,
1278   size_t len,
1279   bufferlist& bl)
1280 {
1281   map<uint64_t, uint64_t> m;
1282   int r = fiemap(cid, oid, offset, len, m);
1283   if (r >= 0) {
1284     ::encode(m, bl);
1285   }
1286
1287   return r;
1288 }
1289
1290 int KStore::fiemap(
1291   const coll_t& cid,
1292   const ghobject_t& oid,
1293   uint64_t offset,
1294   size_t len,
1295   map<uint64_t, uint64_t>& destmap)
1296 {
1297   CollectionRef c = _get_collection(cid);
1298   if (!c)
1299     return -ENOENT;
1300   RWLock::RLocker l(c->lock);
1301
1302   OnodeRef o = c->get_onode(oid, false);
1303   if (!o || !o->exists) {
1304     return -ENOENT;
1305   }
1306
1307   if (offset > o->onode.size)
1308     goto out;
1309
1310   if (offset + len > o->onode.size) {
1311     len = o->onode.size - offset;
1312   }
1313
1314   dout(20) << __func__ << " " << offset << "~" << len << " size "
1315            << o->onode.size << dendl;
1316
1317   // FIXME: do something smarter here
1318   destmap[0] = o->onode.size;
1319
1320  out:
1321   dout(20) << __func__ << " " << offset << "~" << len
1322            << " size = 0 (" << destmap << ")" << dendl;
1323   return 0;
1324 }
1325
1326 int KStore::getattr(
1327   const coll_t& cid,
1328   const ghobject_t& oid,
1329   const char *name,
1330   bufferptr& value)
1331 {
1332   dout(15) << __func__ << " " << cid << " " << oid << " " << name << dendl;
1333   CollectionRef c = _get_collection(cid);
1334   if (!c)
1335     return -ENOENT;
1336   RWLock::RLocker l(c->lock);
1337   int r;
1338   string k(name);
1339
1340   OnodeRef o = c->get_onode(oid, false);
1341   if (!o || !o->exists) {
1342     r = -ENOENT;
1343     goto out;
1344   }
1345
1346   if (!o->onode.attrs.count(k)) {
1347     r = -ENODATA;
1348     goto out;
1349   }
1350   value = o->onode.attrs[k];
1351   r = 0;
1352  out:
1353   dout(10) << __func__ << " " << cid << " " << oid << " " << name
1354            << " = " << r << dendl;
1355   return r;
1356 }
1357
1358 int KStore::getattrs(
1359   const coll_t& cid,
1360   const ghobject_t& oid,
1361   map<string,bufferptr>& aset)
1362 {
1363   dout(15) << __func__ << " " << cid << " " << oid << dendl;
1364   CollectionRef c = _get_collection(cid);
1365   if (!c)
1366     return -ENOENT;
1367   RWLock::RLocker l(c->lock);
1368   int r;
1369
1370   OnodeRef o = c->get_onode(oid, false);
1371   if (!o || !o->exists) {
1372     r = -ENOENT;
1373     goto out;
1374   }
1375   aset = o->onode.attrs;
1376   r = 0;
1377  out:
1378   dout(10) << __func__ << " " << cid << " " << oid
1379            << " = " << r << dendl;
1380   return r;
1381 }
1382
1383 int KStore::list_collections(vector<coll_t>& ls)
1384 {
1385   RWLock::RLocker l(coll_lock);
1386   for (ceph::unordered_map<coll_t, CollectionRef>::iterator p = coll_map.begin();
1387        p != coll_map.end();
1388        ++p)
1389     ls.push_back(p->first);
1390   return 0;
1391 }
1392
1393 bool KStore::collection_exists(const coll_t& c)
1394 {
1395   RWLock::RLocker l(coll_lock);
1396   return coll_map.count(c);
1397 }
1398
1399 int KStore::collection_empty(const coll_t& cid, bool *empty)
1400 {
1401   dout(15) << __func__ << " " << cid << dendl;
1402   vector<ghobject_t> ls;
1403   ghobject_t next;
1404   int r = collection_list(cid, ghobject_t(), ghobject_t::get_max(), 1,
1405                           &ls, &next);
1406   if (r < 0) {
1407     derr << __func__ << " collection_list returned: " << cpp_strerror(r)
1408          << dendl;
1409     return r;
1410   }
1411   *empty = ls.empty();
1412   dout(10) << __func__ << " " << cid << " = " << (int)(*empty) << dendl;
1413   return 0;
1414 }
1415
1416 int KStore::collection_bits(const coll_t& cid)
1417 {
1418   dout(15) << __func__ << " " << cid << dendl;
1419   CollectionHandle ch = _get_collection(cid);
1420   if (!ch)
1421     return -ENOENT;
1422   Collection *c = static_cast<Collection*>(ch.get());
1423   RWLock::RLocker l(c->lock);
1424   dout(10) << __func__ << " " << cid << " = " << c->cnode.bits << dendl;
1425   return c->cnode.bits;
1426 }
1427
1428 int KStore::collection_list(
1429   const coll_t& cid, const ghobject_t& start, const ghobject_t& end, int max,
1430   vector<ghobject_t> *ls, ghobject_t *pnext)
1431 {
1432   CollectionHandle c = _get_collection(cid);
1433   if (!c)
1434     return -ENOENT;
1435   return collection_list(c, start, end, max, ls, pnext);
1436 }
1437
1438 int KStore::collection_list(
1439   CollectionHandle &c_, const ghobject_t& start, const ghobject_t& end, int max,
1440   vector<ghobject_t> *ls, ghobject_t *pnext)
1441
1442 {
1443   Collection *c = static_cast<Collection*>(c_.get());
1444   dout(15) << __func__ << " " << c->cid
1445            << " start " << start << " end " << end << " max " << max << dendl;
1446   int r;
1447   {
1448     RWLock::RLocker l(c->lock);
1449     r = _collection_list(c, start, end, max, ls, pnext);
1450   }
1451
1452   dout(10) << __func__ << " " << c->cid
1453     << " start " << start << " end " << end << " max " << max
1454     << " = " << r << ", ls.size() = " << ls->size()
1455     << ", next = " << (pnext ? *pnext : ghobject_t())  << dendl;
1456   return r;
1457 }
1458
1459 int KStore::_collection_list(
1460   Collection* c, const ghobject_t& start, const ghobject_t& end, int max,
1461   vector<ghobject_t> *ls, ghobject_t *pnext)
1462 {
1463   int r = 0;
1464   KeyValueDB::Iterator it;
1465   string temp_start_key, temp_end_key;
1466   string start_key, end_key;
1467   bool set_next = false;
1468   string pend;
1469   bool temp;
1470
1471   ghobject_t static_next;
1472   if (!pnext)
1473     pnext = &static_next;
1474
1475   if (start == ghobject_t::get_max() ||
1476     start.hobj.is_max()) {
1477     goto out;
1478   }
1479   get_coll_key_range(c->cid, c->cnode.bits, &temp_start_key, &temp_end_key,
1480                      &start_key, &end_key);
1481   dout(20) << __func__
1482            << " range " << pretty_binary_string(temp_start_key)
1483            << " to " << pretty_binary_string(temp_end_key)
1484            << " and " << pretty_binary_string(start_key)
1485            << " to " << pretty_binary_string(end_key)
1486            << " start " << start << dendl;
1487   it = db->get_iterator(PREFIX_OBJ);
1488   if (start == ghobject_t() || start == c->cid.get_min_hobj()) {
1489     it->upper_bound(temp_start_key);
1490     temp = true;
1491   } else {
1492     string k;
1493     get_object_key(cct, start, &k);
1494     if (start.hobj.is_temp()) {
1495       temp = true;
1496       assert(k >= temp_start_key && k < temp_end_key);
1497     } else {
1498       temp = false;
1499       assert(k >= start_key && k < end_key);
1500     }
1501     dout(20) << " start from " << pretty_binary_string(k)
1502              << " temp=" << (int)temp << dendl;
1503     it->lower_bound(k);
1504   }
1505   if (end.hobj.is_max()) {
1506     pend = temp ? temp_end_key : end_key;
1507   } else {
1508     get_object_key(cct, end, &end_key);
1509     if (end.hobj.is_temp()) {
1510       if (temp)
1511         pend = end_key;
1512       else
1513         goto out;
1514     } else {
1515       pend = temp ? temp_end_key : end_key;
1516     }
1517   }
1518   dout(20) << __func__ << " pend " << pretty_binary_string(pend) << dendl;
1519   while (true) {
1520     if (!it->valid() || it->key() >= pend) {
1521       if (!it->valid())
1522         dout(20) << __func__ << " iterator not valid (end of db?)" << dendl;
1523       else
1524         dout(20) << __func__ << " key " << pretty_binary_string(it->key())
1525                  << " > " << end << dendl;
1526       if (temp) {
1527         if (end.hobj.is_temp()) {
1528           break;
1529         }
1530         dout(30) << __func__ << " switch to non-temp namespace" << dendl;
1531         temp = false;
1532         it->upper_bound(start_key);
1533         pend = end_key;
1534         dout(30) << __func__ << " pend " << pretty_binary_string(pend) << dendl;
1535         continue;
1536       }
1537       break;
1538     }
1539     dout(20) << __func__ << " key " << pretty_binary_string(it->key()) << dendl;
1540     ghobject_t oid;
1541     int r = get_key_object(it->key(), &oid);
1542     assert(r == 0);
1543     if (ls->size() >= (unsigned)max) {
1544       dout(20) << __func__ << " reached max " << max << dendl;
1545       *pnext = oid;
1546       set_next = true;
1547       break;
1548     }
1549     ls->push_back(oid);
1550     it->next();
1551   }
1552 out:
1553   if (!set_next) {
1554     *pnext = ghobject_t::get_max();
1555   }
1556   return r;
1557 }
1558
1559 // omap reads
1560
1561 KStore::OmapIteratorImpl::OmapIteratorImpl(
1562   CollectionRef c, OnodeRef o, KeyValueDB::Iterator it)
1563   : c(c), o(o), it(it)
1564 {
1565   RWLock::RLocker l(c->lock);
1566   if (o->onode.omap_head) {
1567     get_omap_key(o->onode.omap_head, string(), &head);
1568     get_omap_tail(o->onode.omap_head, &tail);
1569     it->lower_bound(head);
1570   }
1571 }
1572
1573 int KStore::OmapIteratorImpl::seek_to_first()
1574 {
1575   RWLock::RLocker l(c->lock);
1576   if (o->onode.omap_head) {
1577     it->lower_bound(head);
1578   } else {
1579     it = KeyValueDB::Iterator();
1580   }
1581   return 0;
1582 }
1583
1584 int KStore::OmapIteratorImpl::upper_bound(const string& after)
1585 {
1586   RWLock::RLocker l(c->lock);
1587   if (o->onode.omap_head) {
1588     string key;
1589     get_omap_key(o->onode.omap_head, after, &key);
1590     it->upper_bound(key);
1591   } else {
1592     it = KeyValueDB::Iterator();
1593   }
1594   return 0;
1595 }
1596
1597 int KStore::OmapIteratorImpl::lower_bound(const string& to)
1598 {
1599   RWLock::RLocker l(c->lock);
1600   if (o->onode.omap_head) {
1601     string key;
1602     get_omap_key(o->onode.omap_head, to, &key);
1603     it->lower_bound(key);
1604   } else {
1605     it = KeyValueDB::Iterator();
1606   }
1607   return 0;
1608 }
1609
1610 bool KStore::OmapIteratorImpl::valid()
1611 {
1612   RWLock::RLocker l(c->lock);
1613   if (o->onode.omap_head && it->valid() && it->raw_key().second <= tail) {
1614     return true;
1615   } else {
1616     return false;
1617   }
1618 }
1619
1620 int KStore::OmapIteratorImpl::next(bool validate)
1621 {
1622   RWLock::RLocker l(c->lock);
1623   if (o->onode.omap_head) {
1624     it->next();
1625     return 0;
1626   } else {
1627     return -1;
1628   }
1629 }
1630
1631 string KStore::OmapIteratorImpl::key()
1632 {
1633   RWLock::RLocker l(c->lock);
1634   assert(it->valid());
1635   string db_key = it->raw_key().second;
1636   string user_key;
1637   decode_omap_key(db_key, &user_key);
1638   return user_key;
1639 }
1640
1641 bufferlist KStore::OmapIteratorImpl::value()
1642 {
1643   RWLock::RLocker l(c->lock);
1644   assert(it->valid());
1645   return it->value();
1646 }
1647
1648 int KStore::omap_get(
1649   const coll_t& cid,                ///< [in] Collection containing oid
1650   const ghobject_t &oid,   ///< [in] Object containing omap
1651   bufferlist *header,      ///< [out] omap header
1652   map<string, bufferlist> *out /// < [out] Key to value map
1653   )
1654 {
1655   dout(15) << __func__ << " " << cid << " oid " << oid << dendl;
1656   CollectionRef c = _get_collection(cid);
1657   if (!c)
1658     return -ENOENT;
1659   RWLock::RLocker l(c->lock);
1660   int r = 0;
1661   OnodeRef o = c->get_onode(oid, false);
1662   if (!o || !o->exists) {
1663     r = -ENOENT;
1664     goto out;
1665   }
1666   if (!o->onode.omap_head)
1667     goto out;
1668   o->flush();
1669   {
1670     KeyValueDB::Iterator it = db->get_iterator(PREFIX_OMAP);
1671     string head, tail;
1672     get_omap_header(o->onode.omap_head, &head);
1673     get_omap_tail(o->onode.omap_head, &tail);
1674     it->lower_bound(head);
1675     while (it->valid()) {
1676       if (it->key() == head) {
1677         dout(30) << __func__ << "  got header" << dendl;
1678         *header = it->value();
1679       } else if (it->key() >= tail) {
1680         dout(30) << __func__ << "  reached tail" << dendl;
1681         break;
1682       } else {
1683         string user_key;
1684         decode_omap_key(it->key(), &user_key);
1685         dout(30) << __func__ << "  got " << pretty_binary_string(it->key())
1686                  << " -> " << user_key << dendl;
1687         assert(it->key() < tail);
1688         (*out)[user_key] = it->value();
1689       }
1690       it->next();
1691     }
1692   }
1693  out:
1694   dout(10) << __func__ << " " << cid << " oid " << oid << " = " << r << dendl;
1695   return r;
1696 }
1697
1698 int KStore::omap_get_header(
1699   const coll_t& cid,                ///< [in] Collection containing oid
1700   const ghobject_t &oid,   ///< [in] Object containing omap
1701   bufferlist *header,      ///< [out] omap header
1702   bool allow_eio ///< [in] don't assert on eio
1703   )
1704 {
1705   dout(15) << __func__ << " " << cid << " oid " << oid << dendl;
1706   CollectionRef c = _get_collection(cid);
1707   if (!c)
1708     return -ENOENT;
1709   RWLock::RLocker l(c->lock);
1710   int r = 0;
1711   OnodeRef o = c->get_onode(oid, false);
1712   if (!o || !o->exists) {
1713     r = -ENOENT;
1714     goto out;
1715   }
1716   if (!o->onode.omap_head)
1717     goto out;
1718   o->flush();
1719   {
1720     string head;
1721     get_omap_header(o->onode.omap_head, &head);
1722     if (db->get(PREFIX_OMAP, head, header) >= 0) {
1723       dout(30) << __func__ << "  got header" << dendl;
1724     } else {
1725       dout(30) << __func__ << "  no header" << dendl;
1726     }
1727   }
1728  out:
1729   dout(10) << __func__ << " " << cid << " oid " << oid << " = " << r << dendl;
1730   return r;
1731 }
1732
1733 int KStore::omap_get_keys(
1734   const coll_t& cid,              ///< [in] Collection containing oid
1735   const ghobject_t &oid, ///< [in] Object containing omap
1736   set<string> *keys      ///< [out] Keys defined on oid
1737   )
1738 {
1739   dout(15) << __func__ << " " << cid << " oid " << oid << dendl;
1740   CollectionRef c = _get_collection(cid);
1741   if (!c)
1742     return -ENOENT;
1743   RWLock::RLocker l(c->lock);
1744   int r = 0;
1745   OnodeRef o = c->get_onode(oid, false);
1746   if (!o || !o->exists) {
1747     r = -ENOENT;
1748     goto out;
1749   }
1750   if (!o->onode.omap_head)
1751     goto out;
1752   o->flush();
1753   {
1754     KeyValueDB::Iterator it = db->get_iterator(PREFIX_OMAP);
1755     string head, tail;
1756     get_omap_key(o->onode.omap_head, string(), &head);
1757     get_omap_tail(o->onode.omap_head, &tail);
1758     it->lower_bound(head);
1759     while (it->valid()) {
1760       if (it->key() >= tail) {
1761         dout(30) << __func__ << "  reached tail" << dendl;
1762         break;
1763       }
1764       string user_key;
1765       decode_omap_key(it->key(), &user_key);
1766       dout(30) << __func__ << "  got " << pretty_binary_string(it->key())
1767                << " -> " << user_key << dendl;
1768       assert(it->key() < tail);
1769       keys->insert(user_key);
1770       it->next();
1771     }
1772   }
1773  out:
1774   dout(10) << __func__ << " " << cid << " oid " << oid << " = " << r << dendl;
1775   return r;
1776 }
1777
1778 int KStore::omap_get_values(
1779   const coll_t& cid,                    ///< [in] Collection containing oid
1780   const ghobject_t &oid,       ///< [in] Object containing omap
1781   const set<string> &keys,     ///< [in] Keys to get
1782   map<string, bufferlist> *out ///< [out] Returned keys and values
1783   )
1784 {
1785   dout(15) << __func__ << " " << cid << " oid " << oid << dendl;
1786   CollectionRef c = _get_collection(cid);
1787   if (!c)
1788     return -ENOENT;
1789   RWLock::RLocker l(c->lock);
1790   int r = 0;
1791   OnodeRef o = c->get_onode(oid, false);
1792   if (!o || !o->exists) {
1793     r = -ENOENT;
1794     goto out;
1795   }
1796   if (!o->onode.omap_head)
1797     goto out;
1798   o->flush();
1799   for (set<string>::const_iterator p = keys.begin(); p != keys.end(); ++p) {
1800     string key;
1801     get_omap_key(o->onode.omap_head, *p, &key);
1802     bufferlist val;
1803     if (db->get(PREFIX_OMAP, key, &val) >= 0) {
1804       dout(30) << __func__ << "  got " << pretty_binary_string(key)
1805                << " -> " << *p << dendl;
1806       out->insert(make_pair(*p, val));
1807     }
1808   }
1809  out:
1810   dout(10) << __func__ << " " << cid << " oid " << oid << " = " << r << dendl;
1811   return r;
1812 }
1813
1814 int KStore::omap_check_keys(
1815   const coll_t& cid,                ///< [in] Collection containing oid
1816   const ghobject_t &oid,   ///< [in] Object containing omap
1817   const set<string> &keys, ///< [in] Keys to check
1818   set<string> *out         ///< [out] Subset of keys defined on oid
1819   )
1820 {
1821   dout(15) << __func__ << " " << cid << " oid " << oid << dendl;
1822   CollectionRef c = _get_collection(cid);
1823   if (!c)
1824     return -ENOENT;
1825   RWLock::RLocker l(c->lock);
1826   int r = 0;
1827   OnodeRef o = c->get_onode(oid, false);
1828   if (!o || !o->exists) {
1829     r = -ENOENT;
1830     goto out;
1831   }
1832   if (!o->onode.omap_head)
1833     goto out;
1834   o->flush();
1835   for (set<string>::const_iterator p = keys.begin(); p != keys.end(); ++p) {
1836     string key;
1837     get_omap_key(o->onode.omap_head, *p, &key);
1838     bufferlist val;
1839     if (db->get(PREFIX_OMAP, key, &val) >= 0) {
1840       dout(30) << __func__ << "  have " << pretty_binary_string(key)
1841                << " -> " << *p << dendl;
1842       out->insert(*p);
1843     } else {
1844       dout(30) << __func__ << "  miss " << pretty_binary_string(key)
1845                << " -> " << *p << dendl;
1846     }
1847   }
1848  out:
1849   dout(10) << __func__ << " " << cid << " oid " << oid << " = " << r << dendl;
1850   return r;
1851 }
1852
1853 ObjectMap::ObjectMapIterator KStore::get_omap_iterator(
1854   const coll_t& cid,              ///< [in] collection
1855   const ghobject_t &oid  ///< [in] object
1856   )
1857 {
1858
1859   dout(10) << __func__ << " " << cid << " " << oid << dendl;
1860   CollectionRef c = _get_collection(cid);
1861   if (!c) {
1862     dout(10) << __func__ << " " << cid << "doesn't exist" <<dendl;
1863     return ObjectMap::ObjectMapIterator();
1864   }
1865   RWLock::RLocker l(c->lock);
1866   OnodeRef o = c->get_onode(oid, false);
1867   if (!o || !o->exists) {
1868     dout(10) << __func__ << " " << oid << "doesn't exist" <<dendl;
1869     return ObjectMap::ObjectMapIterator();
1870   }
1871   o->flush();
1872   dout(10) << __func__ << " header = " << o->onode.omap_head <<dendl;
1873   KeyValueDB::Iterator it = db->get_iterator(PREFIX_OMAP);
1874   return ObjectMap::ObjectMapIterator(new OmapIteratorImpl(c, o, it));
1875 }
1876
1877
1878 // -----------------
1879 // write helpers
1880
1881 int KStore::_open_super_meta()
1882 {
1883   // nid
1884   {
1885     nid_max = 0;
1886     bufferlist bl;
1887     db->get(PREFIX_SUPER, "nid_max", &bl);
1888     bufferlist::iterator p = bl.begin();
1889     try {
1890       ::decode(nid_max, p);
1891     } catch (buffer::error& e) {
1892     }
1893     dout(10) << __func__ << " old nid_max " << nid_max << dendl;
1894     nid_last = nid_max;
1895   }
1896   return 0;
1897 }
1898
1899 void KStore::_assign_nid(TransContext *txc, OnodeRef o)
1900 {
1901   if (o->onode.nid)
1902     return;
1903   std::lock_guard<std::mutex> l(nid_lock);
1904   o->onode.nid = ++nid_last;
1905   dout(20) << __func__ << " " << o->oid << " nid " << o->onode.nid << dendl;
1906   if (nid_last > nid_max) {
1907     nid_max += cct->_conf->kstore_nid_prealloc;
1908     bufferlist bl;
1909     ::encode(nid_max, bl);
1910     txc->t->set(PREFIX_SUPER, "nid_max", bl);
1911     dout(10) << __func__ << " nid_max now " << nid_max << dendl;
1912   }
1913 }
1914
1915 KStore::TransContext *KStore::_txc_create(OpSequencer *osr)
1916 {
1917   TransContext *txc = new TransContext(osr);
1918   txc->t = db->get_transaction();
1919   osr->queue_new(txc);
1920   dout(20) << __func__ << " osr " << osr << " = " << txc << dendl;
1921   return txc;
1922 }
1923
1924 void KStore::_txc_state_proc(TransContext *txc)
1925 {
1926   while (true) {
1927     dout(10) << __func__ << " txc " << txc
1928              << " " << txc->get_state_name() << dendl;
1929     switch (txc->state) {
1930     case TransContext::STATE_PREPARE:
1931       txc->log_state_latency(logger, l_kstore_state_prepare_lat);
1932       txc->state = TransContext::STATE_KV_QUEUED;
1933       if (!cct->_conf->kstore_sync_transaction) {
1934         std::lock_guard<std::mutex> l(kv_lock);
1935         if (cct->_conf->kstore_sync_submit_transaction) {
1936           int r = db->submit_transaction(txc->t);
1937           assert(r == 0);
1938         }
1939         kv_queue.push_back(txc);
1940         kv_cond.notify_one();
1941         return;
1942       }
1943       {
1944         int r = db->submit_transaction_sync(txc->t);
1945         assert(r == 0);
1946       }
1947       break;
1948
1949     case TransContext::STATE_KV_QUEUED:
1950       txc->log_state_latency(logger, l_kstore_state_kv_queued_lat);
1951       txc->state = TransContext::STATE_KV_DONE;
1952       _txc_finish_kv(txc);
1953       // ** fall-thru **
1954
1955     case TransContext::STATE_KV_DONE:
1956       txc->log_state_latency(logger, l_kstore_state_kv_done_lat);
1957       txc->state = TransContext::STATE_FINISHING;
1958       // ** fall-thru **
1959
1960     case TransContext::TransContext::STATE_FINISHING:
1961       txc->log_state_latency(logger, l_kstore_state_finishing_lat);
1962       _txc_finish(txc);
1963       return;
1964
1965     default:
1966       derr << __func__ << " unexpected txc " << txc
1967            << " state " << txc->get_state_name() << dendl;
1968       assert(0 == "unexpected txc state");
1969       return;
1970     }
1971   }
1972 }
1973
1974 void KStore::_txc_finalize(OpSequencer *osr, TransContext *txc)
1975 {
1976   dout(20) << __func__ << " osr " << osr << " txc " << txc
1977            << " onodes " << txc->onodes << dendl;
1978
1979   // finalize onodes
1980   for (set<OnodeRef>::iterator p = txc->onodes.begin();
1981        p != txc->onodes.end();
1982        ++p) {
1983     bufferlist bl;
1984     ::encode((*p)->onode, bl);
1985     dout(20) << " onode size is " << bl.length() << dendl;
1986     txc->t->set(PREFIX_OBJ, (*p)->key, bl);
1987
1988     std::lock_guard<std::mutex> l((*p)->flush_lock);
1989     (*p)->flush_txns.insert(txc);
1990   }
1991 }
1992
1993 void KStore::_txc_finish_kv(TransContext *txc)
1994 {
1995   dout(20) << __func__ << " txc " << txc << dendl;
1996
1997   // warning: we're calling onreadable_sync inside the sequencer lock
1998   if (txc->onreadable_sync) {
1999     txc->onreadable_sync->complete(0);
2000     txc->onreadable_sync = NULL;
2001   }
2002   if (txc->onreadable) {
2003     finisher.queue(txc->onreadable);
2004     txc->onreadable = NULL;
2005   }
2006   if (txc->oncommit) {
2007     finisher.queue(txc->oncommit);
2008     txc->oncommit = NULL;
2009   }
2010   if (!txc->oncommits.empty()) {
2011     finisher.queue(txc->oncommits);
2012   }
2013
2014   throttle_ops.put(txc->ops);
2015   throttle_bytes.put(txc->bytes);
2016 }
2017
2018 void KStore::_txc_finish(TransContext *txc)
2019 {
2020   dout(20) << __func__ << " " << txc << " onodes " << txc->onodes << dendl;
2021   assert(txc->state == TransContext::STATE_FINISHING);
2022
2023   for (set<OnodeRef>::iterator p = txc->onodes.begin();
2024        p != txc->onodes.end();
2025        ++p) {
2026     std::lock_guard<std::mutex> l((*p)->flush_lock);
2027     dout(20) << __func__ << " onode " << *p << " had " << (*p)->flush_txns
2028              << dendl;
2029     assert((*p)->flush_txns.count(txc));
2030     (*p)->flush_txns.erase(txc);
2031     if ((*p)->flush_txns.empty()) {
2032       (*p)->flush_cond.notify_all();
2033       (*p)->clear_pending_stripes();
2034     }
2035   }
2036
2037   // clear out refs
2038   txc->onodes.clear();
2039
2040   while (!txc->removed_collections.empty()) {
2041     _queue_reap_collection(txc->removed_collections.front());
2042     txc->removed_collections.pop_front();
2043   }
2044
2045   OpSequencerRef osr = txc->osr;
2046   {
2047     std::lock_guard<std::mutex> l(osr->qlock);
2048     txc->state = TransContext::STATE_DONE;
2049   }
2050
2051   _osr_reap_done(osr.get());
2052 }
2053
2054 void KStore::_osr_reap_done(OpSequencer *osr)
2055 {
2056   std::lock_guard<std::mutex> l(osr->qlock);
2057   dout(20) << __func__ << " osr " << osr << dendl;
2058   while (!osr->q.empty()) {
2059     TransContext *txc = &osr->q.front();
2060     dout(20) << __func__ << "  txc " << txc << " " << txc->get_state_name()
2061              << dendl;
2062     if (txc->state != TransContext::STATE_DONE) {
2063       break;
2064     }
2065
2066     if (txc->first_collection) {
2067       txc->first_collection->onode_map.trim(cct->_conf->kstore_onode_map_size);
2068     }
2069
2070     osr->q.pop_front();
2071     txc->log_state_latency(logger, l_kstore_state_done_lat);
2072     delete txc;
2073     osr->qcond.notify_all();
2074     if (osr->q.empty())
2075       dout(20) << __func__ << " osr " << osr << " q now empty" << dendl;
2076   }
2077 }
2078
2079 void KStore::_kv_sync_thread()
2080 {
2081   dout(10) << __func__ << " start" << dendl;
2082   std::unique_lock<std::mutex> l(kv_lock);
2083   while (true) {
2084     assert(kv_committing.empty());
2085     if (kv_queue.empty()) {
2086       if (kv_stop)
2087         break;
2088       dout(20) << __func__ << " sleep" << dendl;
2089       kv_sync_cond.notify_all();
2090       kv_cond.wait(l);
2091       dout(20) << __func__ << " wake" << dendl;
2092     } else {
2093       dout(20) << __func__ << " committing " << kv_queue.size() << dendl;
2094       kv_committing.swap(kv_queue);
2095       utime_t start = ceph_clock_now();
2096       l.unlock();
2097
2098       dout(30) << __func__ << " committing txc " << kv_committing << dendl;
2099
2100       // one transaction to force a sync
2101       KeyValueDB::Transaction t = db->get_transaction();
2102       if (!cct->_conf->kstore_sync_submit_transaction) {
2103         for (std::deque<TransContext *>::iterator it = kv_committing.begin();
2104              it != kv_committing.end();
2105              ++it) {
2106           int r = db->submit_transaction((*it)->t);
2107           assert(r == 0);
2108         }
2109       }
2110       int r = db->submit_transaction_sync(t);
2111       assert(r == 0);
2112       utime_t finish = ceph_clock_now();
2113       utime_t dur = finish - start;
2114       dout(20) << __func__ << " committed " << kv_committing.size()
2115                << " in " << dur << dendl;
2116       while (!kv_committing.empty()) {
2117         TransContext *txc = kv_committing.front();
2118         _txc_state_proc(txc);
2119         kv_committing.pop_front();
2120       }
2121
2122       // this is as good a place as any ...
2123       _reap_collections();
2124
2125       l.lock();
2126     }
2127   }
2128   dout(10) << __func__ << " finish" << dendl;
2129 }
2130
2131
2132 // ---------------------------
2133 // transactions
2134
2135 int KStore::queue_transactions(
2136     Sequencer *posr,
2137     vector<Transaction>& tls,
2138     TrackedOpRef op,
2139     ThreadPool::TPHandle *handle)
2140 {
2141   Context *onreadable;
2142   Context *ondisk;
2143   Context *onreadable_sync;
2144   ObjectStore::Transaction::collect_contexts(
2145     tls, &onreadable, &ondisk, &onreadable_sync);
2146
2147   // set up the sequencer
2148   OpSequencer *osr;
2149   assert(posr);
2150   if (posr->p) {
2151     osr = static_cast<OpSequencer *>(posr->p.get());
2152     dout(10) << __func__ << " existing " << osr << " " << *osr << dendl;
2153   } else {
2154     osr = new OpSequencer(cct);
2155     osr->parent = posr;
2156     posr->p = osr;
2157     dout(10) << __func__ << " new " << osr << " " << *osr << dendl;
2158   }
2159
2160   // prepare
2161   TransContext *txc = _txc_create(osr);
2162   txc->onreadable = onreadable;
2163   txc->onreadable_sync = onreadable_sync;
2164   txc->oncommit = ondisk;
2165
2166   for (vector<Transaction>::iterator p = tls.begin(); p != tls.end(); ++p) {
2167     (*p).set_osr(osr);
2168     txc->ops += (*p).get_num_ops();
2169     txc->bytes += (*p).get_num_bytes();
2170     _txc_add_transaction(txc, &(*p));
2171   }
2172
2173   _txc_finalize(osr, txc);
2174
2175   throttle_ops.get(txc->ops);
2176   throttle_bytes.get(txc->bytes);
2177
2178   // execute (start)
2179   _txc_state_proc(txc);
2180   return 0;
2181 }
2182
2183 void KStore::_txc_add_transaction(TransContext *txc, Transaction *t)
2184 {
2185   Transaction::iterator i = t->begin();
2186
2187   dout(30) << __func__ << " transaction dump:\n";
2188   JSONFormatter f(true);
2189   f.open_object_section("transaction");
2190   t->dump(&f);
2191   f.close_section();
2192   f.flush(*_dout);
2193   *_dout << dendl;
2194
2195   vector<CollectionRef> cvec(i.colls.size());
2196   unsigned j = 0;
2197   for (vector<coll_t>::iterator p = i.colls.begin(); p != i.colls.end();
2198        ++p, ++j) {
2199     cvec[j] = _get_collection(*p);
2200
2201     // note first collection we reference
2202     if (!j && !txc->first_collection)
2203       txc->first_collection = cvec[j];
2204   }
2205   vector<OnodeRef> ovec(i.objects.size());
2206
2207   for (int pos = 0; i.have_op(); ++pos) {
2208     Transaction::Op *op = i.decode_op();
2209     int r = 0;
2210
2211     // no coll or obj
2212     if (op->op == Transaction::OP_NOP)
2213       continue;
2214
2215     // collection operations
2216     CollectionRef &c = cvec[op->cid];
2217     switch (op->op) {
2218     case Transaction::OP_RMCOLL:
2219       {
2220         coll_t cid = i.get_cid(op->cid);
2221         r = _remove_collection(txc, cid, &c);
2222         if (!r)
2223           continue;
2224       }
2225       break;
2226
2227     case Transaction::OP_MKCOLL:
2228       {
2229         assert(!c);
2230         coll_t cid = i.get_cid(op->cid);
2231         r = _create_collection(txc, cid, op->split_bits, &c);
2232         if (!r)
2233           continue;
2234       }
2235       break;
2236
2237     case Transaction::OP_SPLIT_COLLECTION:
2238       assert(0 == "deprecated");
2239       break;
2240
2241     case Transaction::OP_SPLIT_COLLECTION2:
2242       {
2243         uint32_t bits = op->split_bits;
2244         uint32_t rem = op->split_rem;
2245         r = _split_collection(txc, c, cvec[op->dest_cid], bits, rem);
2246         if (!r)
2247           continue;
2248       }
2249       break;
2250
2251     case Transaction::OP_COLL_HINT:
2252       {
2253         uint32_t type = op->hint_type;
2254         bufferlist hint;
2255         i.decode_bl(hint);
2256         bufferlist::iterator hiter = hint.begin();
2257         if (type == Transaction::COLL_HINT_EXPECTED_NUM_OBJECTS) {
2258           uint32_t pg_num;
2259           uint64_t num_objs;
2260           ::decode(pg_num, hiter);
2261           ::decode(num_objs, hiter);
2262           dout(10) << __func__ << " collection hint objects is a no-op, "
2263                    << " pg_num " << pg_num << " num_objects " << num_objs
2264                    << dendl;
2265         } else {
2266           // Ignore the hint
2267           dout(10) << __func__ << " unknown collection hint " << type << dendl;
2268         }
2269         continue;
2270       }
2271       break;
2272
2273     case Transaction::OP_COLL_SETATTR:
2274       r = -EOPNOTSUPP;
2275       break;
2276
2277     case Transaction::OP_COLL_RMATTR:
2278       r = -EOPNOTSUPP;
2279       break;
2280
2281     case Transaction::OP_COLL_RENAME:
2282       assert(0 == "not implemented");
2283       break;
2284     }
2285     if (r < 0) {
2286       derr << " error " << cpp_strerror(r)
2287            << " not handled on operation " << op->op
2288            << " (op " << pos << ", counting from 0)" << dendl;
2289       dout(0) << " transaction dump:\n";
2290       JSONFormatter f(true);
2291       f.open_object_section("transaction");
2292       t->dump(&f);
2293       f.close_section();
2294       f.flush(*_dout);
2295       *_dout << dendl;
2296       assert(0 == "unexpected error");
2297     }
2298
2299     // object operations
2300     RWLock::WLocker l(c->lock);
2301     OnodeRef &o = ovec[op->oid];
2302     if (!o) {
2303       // these operations implicity create the object
2304       bool create = false;
2305       if (op->op == Transaction::OP_TOUCH ||
2306           op->op == Transaction::OP_WRITE ||
2307           op->op == Transaction::OP_ZERO) {
2308         create = true;
2309       }
2310       ghobject_t oid = i.get_oid(op->oid);
2311       o = c->get_onode(oid, create);
2312       if (!create) {
2313         if (!o || !o->exists) {
2314           dout(10) << __func__ << " op " << op->op << " got ENOENT on "
2315                    << oid << dendl;
2316           r = -ENOENT;
2317           goto endop;
2318         }
2319       }
2320     }
2321
2322     switch (op->op) {
2323     case Transaction::OP_TOUCH:
2324         r = _touch(txc, c, o);
2325       break;
2326
2327     case Transaction::OP_WRITE:
2328       {
2329         uint64_t off = op->off;
2330         uint64_t len = op->len;
2331         uint32_t fadvise_flags = i.get_fadvise_flags();
2332         bufferlist bl;
2333         i.decode_bl(bl);
2334         r = _write(txc, c, o, off, len, bl, fadvise_flags);
2335       }
2336       break;
2337
2338     case Transaction::OP_ZERO:
2339       {
2340         uint64_t off = op->off;
2341         uint64_t len = op->len;
2342         r = _zero(txc, c, o, off, len);
2343       }
2344       break;
2345
2346     case Transaction::OP_TRIMCACHE:
2347       {
2348         // deprecated, no-op
2349       }
2350       break;
2351
2352     case Transaction::OP_TRUNCATE:
2353       {
2354         uint64_t off = op->off;
2355         r = _truncate(txc, c, o, off);
2356       }
2357       break;
2358
2359     case Transaction::OP_REMOVE:
2360         r = _remove(txc, c, o);
2361       break;
2362
2363     case Transaction::OP_SETATTR:
2364       {
2365         string name = i.decode_string();
2366         bufferlist bl;
2367         i.decode_bl(bl);
2368         map<string, bufferptr> to_set;
2369         to_set[name] = bufferptr(bl.c_str(), bl.length());
2370         r = _setattrs(txc, c, o, to_set);
2371       }
2372       break;
2373
2374     case Transaction::OP_SETATTRS:
2375       {
2376         map<string, bufferptr> aset;
2377         i.decode_attrset(aset);
2378         r = _setattrs(txc, c, o, aset);
2379       }
2380       break;
2381
2382     case Transaction::OP_RMATTR:
2383       {
2384         string name = i.decode_string();
2385         r = _rmattr(txc, c, o, name);
2386       }
2387       break;
2388
2389     case Transaction::OP_RMATTRS:
2390       {
2391         r = _rmattrs(txc, c, o);
2392       }
2393       break;
2394
2395     case Transaction::OP_CLONE:
2396       {
2397         const ghobject_t& noid = i.get_oid(op->dest_oid);
2398         OnodeRef no = c->get_onode(noid, true);
2399         r = _clone(txc, c, o, no);
2400       }
2401       break;
2402
2403     case Transaction::OP_CLONERANGE:
2404       assert(0 == "deprecated");
2405       break;
2406
2407     case Transaction::OP_CLONERANGE2:
2408       {
2409         const ghobject_t& noid = i.get_oid(op->dest_oid);
2410         OnodeRef no = c->get_onode(noid, true);
2411         uint64_t srcoff = op->off;
2412         uint64_t len = op->len;
2413         uint64_t dstoff = op->dest_off;
2414         r = _clone_range(txc, c, o, no, srcoff, len, dstoff);
2415       }
2416       break;
2417
2418     case Transaction::OP_COLL_ADD:
2419       assert(0 == "not implemented");
2420       break;
2421
2422     case Transaction::OP_COLL_REMOVE:
2423       assert(0 == "not implemented");
2424       break;
2425
2426     case Transaction::OP_COLL_MOVE:
2427       assert(0 == "deprecated");
2428       break;
2429
2430     case Transaction::OP_COLL_MOVE_RENAME:
2431       {
2432         assert(op->cid == op->dest_cid);
2433         const ghobject_t& noid = i.get_oid(op->dest_oid);
2434         OnodeRef no = c->get_onode(noid, true);
2435         r = _rename(txc, c, o, no, noid);
2436         o.reset();
2437       }
2438       break;
2439
2440     case Transaction::OP_TRY_RENAME:
2441       {
2442         const ghobject_t& noid = i.get_oid(op->dest_oid);
2443         OnodeRef no = c->get_onode(noid, true);
2444         r = _rename(txc, c, o, no, noid);
2445         if (r == -ENOENT)
2446           r = 0;
2447         o.reset();
2448       }
2449       break;
2450
2451     case Transaction::OP_OMAP_CLEAR:
2452       {
2453         r = _omap_clear(txc, c, o);
2454       }
2455       break;
2456     case Transaction::OP_OMAP_SETKEYS:
2457       {
2458         bufferlist aset_bl;
2459         i.decode_attrset_bl(&aset_bl);
2460         r = _omap_setkeys(txc, c, o, aset_bl);
2461       }
2462       break;
2463     case Transaction::OP_OMAP_RMKEYS:
2464       {
2465         bufferlist keys_bl;
2466         i.decode_keyset_bl(&keys_bl);
2467         r = _omap_rmkeys(txc, c, o, keys_bl);
2468       }
2469       break;
2470     case Transaction::OP_OMAP_RMKEYRANGE:
2471       {
2472         string first, last;
2473         first = i.decode_string();
2474         last = i.decode_string();
2475         r = _omap_rmkey_range(txc, c, o, first, last);
2476       }
2477       break;
2478     case Transaction::OP_OMAP_SETHEADER:
2479       {
2480         bufferlist bl;
2481         i.decode_bl(bl);
2482         r = _omap_setheader(txc, c, o, bl);
2483       }
2484       break;
2485
2486     case Transaction::OP_SETALLOCHINT:
2487       {
2488         uint64_t expected_object_size = op->expected_object_size;
2489         uint64_t expected_write_size = op->expected_write_size;
2490         uint32_t flags = op->alloc_hint_flags;
2491         r = _setallochint(txc, c, o,
2492                           expected_object_size,
2493                           expected_write_size,
2494                           flags);
2495       }
2496       break;
2497
2498     default:
2499       derr << "bad op " << op->op << dendl;
2500       ceph_abort();
2501     }
2502
2503   endop:
2504     if (r < 0) {
2505       bool ok = false;
2506
2507       if (r == -ENOENT && !(op->op == Transaction::OP_CLONERANGE ||
2508                             op->op == Transaction::OP_CLONE ||
2509                             op->op == Transaction::OP_CLONERANGE2 ||
2510                             op->op == Transaction::OP_COLL_ADD))
2511         // -ENOENT is usually okay
2512         ok = true;
2513       if (r == -ENODATA)
2514         ok = true;
2515
2516       if (!ok) {
2517         const char *msg = "unexpected error code";
2518
2519         if (r == -ENOENT && (op->op == Transaction::OP_CLONERANGE ||
2520                              op->op == Transaction::OP_CLONE ||
2521                              op->op == Transaction::OP_CLONERANGE2))
2522           msg = "ENOENT on clone suggests osd bug";
2523
2524         if (r == -ENOSPC)
2525           // For now, if we hit _any_ ENOSPC, crash, before we do any damage
2526           // by partially applying transactions.
2527           msg = "ENOSPC from key value store, misconfigured cluster";
2528
2529         if (r == -ENOTEMPTY) {
2530           msg = "ENOTEMPTY suggests garbage data in osd data dir";
2531         }
2532
2533         dout(0) << " error " << cpp_strerror(r) << " not handled on operation " << op->op
2534                 << " (op " << pos << ", counting from 0)" << dendl;
2535         dout(0) << msg << dendl;
2536         dout(0) << " transaction dump:\n";
2537         JSONFormatter f(true);
2538         f.open_object_section("transaction");
2539         t->dump(&f);
2540         f.close_section();
2541         f.flush(*_dout);
2542         *_dout << dendl;
2543         assert(0 == "unexpected error");
2544       }
2545     }
2546   }
2547 }
2548
2549
2550
2551 // -----------------
2552 // write operations
2553
2554 int KStore::_touch(TransContext *txc,
2555                    CollectionRef& c,
2556                    OnodeRef &o)
2557 {
2558   dout(15) << __func__ << " " << c->cid << " " << o->oid << dendl;
2559   int r = 0;
2560   o->exists = true;
2561   _assign_nid(txc, o);
2562   txc->write_onode(o);
2563   dout(10) << __func__ << " " << c->cid << " " << o->oid << " = " << r << dendl;
2564   return r;
2565 }
2566
2567 void KStore::_dump_onode(OnodeRef o)
2568 {
2569   dout(30) << __func__ << " " << o
2570            << " nid " << o->onode.nid
2571            << " size " << o->onode.size
2572            << " expected_object_size " << o->onode.expected_object_size
2573            << " expected_write_size " << o->onode.expected_write_size
2574            << dendl;
2575   for (map<string,bufferptr>::iterator p = o->onode.attrs.begin();
2576        p != o->onode.attrs.end();
2577        ++p) {
2578     dout(30) << __func__ << "  attr " << p->first
2579              << " len " << p->second.length() << dendl;
2580   }
2581 }
2582
2583 void KStore::_do_read_stripe(OnodeRef o, uint64_t offset, bufferlist *pbl)
2584 {
2585   map<uint64_t,bufferlist>::iterator p = o->pending_stripes.find(offset);
2586   if (p == o->pending_stripes.end()) {
2587     string key;
2588     get_data_key(o->onode.nid, offset, &key);
2589     db->get(PREFIX_DATA, key, pbl);
2590     o->pending_stripes[offset] = *pbl;
2591   } else {
2592     *pbl = p->second;
2593   }
2594 }
2595
2596 void KStore::_do_write_stripe(TransContext *txc, OnodeRef o,
2597                               uint64_t offset, bufferlist& bl)
2598 {
2599   o->pending_stripes[offset] = bl;
2600   string key;
2601   get_data_key(o->onode.nid, offset, &key);
2602   txc->t->set(PREFIX_DATA, key, bl);
2603 }
2604
2605 void KStore::_do_remove_stripe(TransContext *txc, OnodeRef o, uint64_t offset)
2606 {
2607   o->pending_stripes.erase(offset);
2608   string key;
2609   get_data_key(o->onode.nid, offset, &key);
2610   txc->t->rmkey(PREFIX_DATA, key);
2611 }
2612
2613 int KStore::_do_write(TransContext *txc,
2614                       OnodeRef o,
2615                       uint64_t offset, uint64_t length,
2616                       bufferlist& orig_bl,
2617                       uint32_t fadvise_flags)
2618 {
2619   int r = 0;
2620
2621   dout(20) << __func__
2622            << " " << o->oid << " " << offset << "~" << length
2623            << " - have " << o->onode.size
2624            << " bytes, nid " << o->onode.nid << dendl;
2625   _dump_onode(o);
2626   o->exists = true;
2627
2628   if (length == 0) {
2629     return 0;
2630   }
2631
2632   uint64_t stripe_size = o->onode.stripe_size;
2633   if (!stripe_size) {
2634     o->onode.stripe_size = cct->_conf->kstore_default_stripe_size;
2635     stripe_size = o->onode.stripe_size;
2636   }
2637
2638   unsigned bl_off = 0;
2639   while (length > 0) {
2640     uint64_t offset_rem = offset % stripe_size;
2641     uint64_t end_rem = (offset + length) % stripe_size;
2642     if (offset_rem == 0 && end_rem == 0) {
2643       bufferlist bl;
2644       bl.substr_of(orig_bl, bl_off, stripe_size);
2645       dout(30) << __func__ << " full stripe " << offset << dendl;
2646       _do_write_stripe(txc, o, offset, bl);
2647       offset += stripe_size;
2648       length -= stripe_size;
2649       bl_off += stripe_size;
2650       continue;
2651     }
2652     uint64_t stripe_off = offset - offset_rem;
2653     bufferlist prev;
2654     _do_read_stripe(o, stripe_off, &prev);
2655     dout(20) << __func__ << " read previous stripe " << stripe_off
2656              << ", got " << prev.length() << dendl;
2657     bufferlist bl;
2658     if (offset_rem) {
2659       unsigned p = MIN(prev.length(), offset_rem);
2660       if (p) {
2661         dout(20) << __func__ << " reuse leading " << p << " bytes" << dendl;
2662         bl.substr_of(prev, 0, p);
2663       }
2664       if (p < offset_rem) {
2665         dout(20) << __func__ << " add leading " << offset_rem - p << " zeros" << dendl;
2666         bl.append_zero(offset_rem - p);
2667       }
2668     }
2669     unsigned use = stripe_size - offset_rem;
2670     if (use > length)
2671       use -= stripe_size - end_rem;
2672     dout(20) << __func__ << " using " << use << " for this stripe" << dendl;
2673     bufferlist t;
2674     t.substr_of(orig_bl, bl_off, use);
2675     bl.claim_append(t);
2676     bl_off += use;
2677     if (end_rem) {
2678       if (end_rem < prev.length()) {
2679         unsigned l = prev.length() - end_rem;
2680         dout(20) << __func__ << " reuse trailing " << l << " bytes" << dendl;
2681         bufferlist t;
2682         t.substr_of(prev, end_rem, l);
2683         bl.claim_append(t);
2684       }
2685     }
2686     dout(30) << " writing:\n";
2687     bl.hexdump(*_dout);
2688     *_dout << dendl;
2689     _do_write_stripe(txc, o, stripe_off, bl);
2690     offset += use;
2691     length -= use;
2692   }
2693
2694   if (offset > o->onode.size) {
2695     dout(20) << __func__ << " extending size to " << offset + length
2696              << dendl;
2697     o->onode.size = offset;
2698   }
2699
2700   return r;
2701 }
2702
2703 int KStore::_write(TransContext *txc,
2704                    CollectionRef& c,
2705                    OnodeRef& o,
2706                    uint64_t offset, size_t length,
2707                    bufferlist& bl,
2708                    uint32_t fadvise_flags)
2709 {
2710   dout(15) << __func__ << " " << c->cid << " " << o->oid
2711            << " " << offset << "~" << length
2712            << dendl;
2713   _assign_nid(txc, o);
2714   int r = _do_write(txc, o, offset, length, bl, fadvise_flags);
2715   txc->write_onode(o);
2716
2717   dout(10) << __func__ << " " << c->cid << " " << o->oid
2718            << " " << offset << "~" << length
2719            << " = " << r << dendl;
2720   return r;
2721 }
2722
2723 int KStore::_zero(TransContext *txc,
2724                   CollectionRef& c,
2725                   OnodeRef& o,
2726                   uint64_t offset, size_t length)
2727 {
2728   dout(15) << __func__ << " " << c->cid << " " << o->oid
2729            << " " << offset << "~" << length
2730            << dendl;
2731   int r = 0;
2732   o->exists = true;
2733
2734   _dump_onode(o);
2735   _assign_nid(txc, o);
2736
2737   uint64_t stripe_size = o->onode.stripe_size;
2738   if (stripe_size) {
2739     uint64_t end = offset + length;
2740     uint64_t pos = offset;
2741     uint64_t stripe_off = pos % stripe_size;
2742     while (pos < offset + length) {
2743       if (stripe_off || end - pos < stripe_size) {
2744         bufferlist stripe;
2745         _do_read_stripe(o, pos - stripe_off, &stripe);
2746         dout(30) << __func__ << " stripe " << pos - stripe_off << " got "
2747                  << stripe.length() << dendl;
2748         bufferlist bl;
2749         bl.substr_of(stripe, 0, MIN(stripe.length(), stripe_off));
2750         if (end >= pos - stripe_off + stripe_size ||
2751             end >= o->onode.size) {
2752           dout(20) << __func__ << " truncated stripe " << pos - stripe_off
2753                    << " to " << bl.length() << dendl;
2754         } else {
2755           auto len = end - (pos - stripe_off + bl.length());
2756           bl.append_zero(len);
2757           dout(20) << __func__ << " adding " << len << " of zeros" << dendl;
2758           if (stripe.length() > bl.length()) {
2759             unsigned l = stripe.length() - bl.length();
2760             bufferlist t;
2761             t.substr_of(stripe, stripe.length() - l, l);
2762             dout(20) << __func__ << " keeping tail " << l << " of stripe" << dendl;
2763             bl.claim_append(t);
2764           }
2765         }
2766         _do_write_stripe(txc, o, pos - stripe_off, bl);
2767         pos += stripe_size - stripe_off;
2768         stripe_off = 0;
2769       } else {
2770         dout(20) << __func__ << " rm stripe " << pos << dendl;
2771         _do_remove_stripe(txc, o, pos - stripe_off);
2772         pos += stripe_size;
2773       }
2774     }
2775   }
2776   if (offset + length > o->onode.size) {
2777     o->onode.size = offset + length;
2778     dout(20) << __func__ << " extending size to " << offset + length
2779              << dendl;
2780   }
2781   txc->write_onode(o);
2782
2783   dout(10) << __func__ << " " << c->cid << " " << o->oid
2784            << " " << offset << "~" << length
2785            << " = " << r << dendl;
2786   return r;
2787 }
2788
2789 int KStore::_do_truncate(TransContext *txc, OnodeRef o, uint64_t offset)
2790 {
2791   uint64_t stripe_size = o->onode.stripe_size;
2792
2793   o->flush();
2794
2795   // trim down stripes
2796   if (stripe_size) {
2797     uint64_t pos = offset;
2798     uint64_t stripe_off = pos % stripe_size;
2799     while (pos < o->onode.size) {
2800       if (stripe_off) {
2801         bufferlist stripe;
2802         _do_read_stripe(o, pos - stripe_off, &stripe);
2803         dout(30) << __func__ << " stripe " << pos - stripe_off << " got "
2804                  << stripe.length() << dendl;
2805         bufferlist t;
2806         t.substr_of(stripe, 0, MIN(stripe_off, stripe.length()));
2807         _do_write_stripe(txc, o, pos - stripe_off, t);
2808         dout(20) << __func__ << " truncated stripe " << pos - stripe_off
2809                  << " to " << t.length() << dendl;
2810         pos += stripe_size - stripe_off;
2811         stripe_off = 0;
2812       } else {
2813         dout(20) << __func__ << " rm stripe " << pos << dendl;
2814         _do_remove_stripe(txc, o, pos - stripe_off);
2815         pos += stripe_size;
2816       }
2817     }
2818
2819     // trim down cached tail
2820     if (o->tail_bl.length()) {
2821       if (offset / stripe_size != o->onode.size / stripe_size) {
2822         dout(20) << __func__ << " clear cached tail" << dendl;
2823         o->clear_tail();
2824       }
2825     }
2826   }
2827
2828   o->onode.size = offset;
2829   dout(10) << __func__ << " truncate size to " << offset << dendl;
2830
2831   txc->write_onode(o);
2832   return 0;
2833 }
2834
2835 int KStore::_truncate(TransContext *txc,
2836                       CollectionRef& c,
2837                       OnodeRef& o,
2838                       uint64_t offset)
2839 {
2840   dout(15) << __func__ << " " << c->cid << " " << o->oid
2841            << " " << offset
2842            << dendl;
2843   int r = _do_truncate(txc, o, offset);
2844   dout(10) << __func__ << " " << c->cid << " " << o->oid
2845            << " " << offset
2846            << " = " << r << dendl;
2847   return r;
2848 }
2849
2850 int KStore::_do_remove(TransContext *txc,
2851                        OnodeRef o)
2852 {
2853   string key;
2854
2855   _do_truncate(txc, o, 0);
2856
2857   o->onode.size = 0;
2858   if (o->onode.omap_head) {
2859     _do_omap_clear(txc, o->onode.omap_head);
2860   }
2861   o->exists = false;
2862   o->onode = kstore_onode_t();
2863   txc->onodes.erase(o);
2864   get_object_key(cct, o->oid, &key);
2865   txc->t->rmkey(PREFIX_OBJ, key);
2866   return 0;
2867 }
2868
2869 int KStore::_remove(TransContext *txc,
2870                     CollectionRef& c,
2871                     OnodeRef &o)
2872 {
2873   dout(15) << __func__ << " " << c->cid << " " << o->oid << dendl;
2874   int r = _do_remove(txc, o);
2875   dout(10) << __func__ << " " << c->cid << " " << o->oid << " = " << r << dendl;
2876   return r;
2877 }
2878
2879 int KStore::_setattr(TransContext *txc,
2880                      CollectionRef& c,
2881                      OnodeRef& o,
2882                      const string& name,
2883                      bufferptr& val)
2884 {
2885   dout(15) << __func__ << " " << c->cid << " " << o->oid
2886            << " " << name << " (" << val.length() << " bytes)"
2887            << dendl;
2888   int r = 0;
2889   o->onode.attrs[name] = val;
2890   txc->write_onode(o);
2891   dout(10) << __func__ << " " << c->cid << " " << o->oid
2892            << " " << name << " (" << val.length() << " bytes)"
2893            << " = " << r << dendl;
2894   return r;
2895 }
2896
2897 int KStore::_setattrs(TransContext *txc,
2898                       CollectionRef& c,
2899                       OnodeRef& o,
2900                       const map<string,bufferptr>& aset)
2901 {
2902   dout(15) << __func__ << " " << c->cid << " " << o->oid
2903            << " " << aset.size() << " keys"
2904            << dendl;
2905   int r = 0;
2906   for (map<string,bufferptr>::const_iterator p = aset.begin();
2907        p != aset.end(); ++p) {
2908     if (p->second.is_partial())
2909       o->onode.attrs[p->first] = bufferptr(p->second.c_str(), p->second.length());
2910     else
2911       o->onode.attrs[p->first] = p->second;
2912   }
2913   txc->write_onode(o);
2914   dout(10) << __func__ << " " << c->cid << " " << o->oid
2915            << " " << aset.size() << " keys"
2916            << " = " << r << dendl;
2917   return r;
2918 }
2919
2920
2921 int KStore::_rmattr(TransContext *txc,
2922                     CollectionRef& c,
2923                     OnodeRef& o,
2924                     const string& name)
2925 {
2926   dout(15) << __func__ << " " << c->cid << " " << o->oid
2927            << " " << name << dendl;
2928   int r = 0;
2929   o->onode.attrs.erase(name);
2930   txc->write_onode(o);
2931   dout(10) << __func__ << " " << c->cid << " " << o->oid
2932            << " " << name << " = " << r << dendl;
2933   return r;
2934 }
2935
2936 int KStore::_rmattrs(TransContext *txc,
2937                      CollectionRef& c,
2938                      OnodeRef& o)
2939 {
2940   dout(15) << __func__ << " " << c->cid << " " << o->oid << dendl;
2941   int r = 0;
2942   o->onode.attrs.clear();
2943   txc->write_onode(o);
2944   dout(10) << __func__ << " " << c->cid << " " << o->oid << " = " << r << dendl;
2945   return r;
2946 }
2947
2948 void KStore::_do_omap_clear(TransContext *txc, uint64_t id)
2949 {
2950   KeyValueDB::Iterator it = db->get_iterator(PREFIX_OMAP);
2951   string prefix, tail;
2952   get_omap_header(id, &prefix);
2953   get_omap_tail(id, &tail);
2954   it->lower_bound(prefix);
2955   while (it->valid()) {
2956     if (it->key() >= tail) {
2957       dout(30) << __func__ << "  stop at " << tail << dendl;
2958       break;
2959     }
2960     txc->t->rmkey(PREFIX_OMAP, it->key());
2961     dout(30) << __func__ << "  rm " << pretty_binary_string(it->key()) << dendl;
2962     it->next();
2963   }
2964 }
2965
2966 int KStore::_omap_clear(TransContext *txc,
2967                         CollectionRef& c,
2968                         OnodeRef& o)
2969 {
2970   dout(15) << __func__ << " " << c->cid << " " << o->oid << dendl;
2971   int r = 0;
2972   if (o->onode.omap_head != 0) {
2973     _do_omap_clear(txc, o->onode.omap_head);
2974   }
2975   dout(10) << __func__ << " " << c->cid << " " << o->oid << " = " << r << dendl;
2976   return r;
2977 }
2978
2979 int KStore::_omap_setkeys(TransContext *txc,
2980                           CollectionRef& c,
2981                           OnodeRef& o,
2982                           bufferlist &bl)
2983 {
2984   dout(15) << __func__ << " " << c->cid << " " << o->oid << dendl;
2985   int r;
2986   bufferlist::iterator p = bl.begin();
2987   __u32 num;
2988   if (!o->onode.omap_head) {
2989     o->onode.omap_head = o->onode.nid;
2990     txc->write_onode(o);
2991   }
2992   ::decode(num, p);
2993   while (num--) {
2994     string key;
2995     bufferlist value;
2996     ::decode(key, p);
2997     ::decode(value, p);
2998     string final_key;
2999     get_omap_key(o->onode.omap_head, key, &final_key);
3000     dout(30) << __func__ << "  " << pretty_binary_string(final_key)
3001              << " <- " << key << dendl;
3002     txc->t->set(PREFIX_OMAP, final_key, value);
3003   }
3004   r = 0;
3005   dout(10) << __func__ << " " << c->cid << " " << o->oid << " = " << r << dendl;
3006   return r;
3007 }
3008
3009 int KStore::_omap_setheader(TransContext *txc,
3010                             CollectionRef& c,
3011                             OnodeRef &o,
3012                             bufferlist& bl)
3013 {
3014   dout(15) << __func__ << " " << c->cid << " " << o->oid << dendl;
3015   int r;
3016   string key;
3017   if (!o->onode.omap_head) {
3018     o->onode.omap_head = o->onode.nid;
3019     txc->write_onode(o);
3020   }
3021   get_omap_header(o->onode.omap_head, &key);
3022   txc->t->set(PREFIX_OMAP, key, bl);
3023   r = 0;
3024   dout(10) << __func__ << " " << c->cid << " " << o->oid << " = " << r << dendl;
3025   return r;
3026 }
3027
3028 int KStore::_omap_rmkeys(TransContext *txc,
3029                          CollectionRef& c,
3030                          OnodeRef& o,
3031                          bufferlist& bl)
3032 {
3033   dout(15) << __func__ << " " << c->cid << " " << o->oid << dendl;
3034   int r = 0;
3035   bufferlist::iterator p = bl.begin();
3036   __u32 num;
3037
3038   if (!o->onode.omap_head) {
3039     r = 0;
3040     goto out;
3041   }
3042   ::decode(num, p);
3043   while (num--) {
3044     string key;
3045     ::decode(key, p);
3046     string final_key;
3047     get_omap_key(o->onode.omap_head, key, &final_key);
3048     dout(30) << __func__ << "  rm " << pretty_binary_string(final_key)
3049              << " <- " << key << dendl;
3050     txc->t->rmkey(PREFIX_OMAP, final_key);
3051   }
3052   r = 0;
3053
3054  out:
3055   dout(10) << __func__ << " " << c->cid << " " << o->oid << " = " << r << dendl;
3056   return r;
3057 }
3058
3059 int KStore::_omap_rmkey_range(TransContext *txc,
3060                               CollectionRef& c,
3061                               OnodeRef& o,
3062                               const string& first, const string& last)
3063 {
3064   dout(15) << __func__ << " " << c->cid << " " << o->oid << dendl;
3065   KeyValueDB::Iterator it;
3066   string key_first, key_last;
3067   int r = 0;
3068
3069   if (!o->onode.omap_head) {
3070     goto out;
3071   }
3072   it = db->get_iterator(PREFIX_OMAP);
3073   get_omap_key(o->onode.omap_head, first, &key_first);
3074   get_omap_key(o->onode.omap_head, last, &key_last);
3075   it->lower_bound(key_first);
3076   while (it->valid()) {
3077     if (it->key() >= key_last) {
3078       dout(30) << __func__ << "  stop at " << pretty_binary_string(key_last)
3079                << dendl;
3080       break;
3081     }
3082     txc->t->rmkey(PREFIX_OMAP, it->key());
3083     dout(30) << __func__ << "  rm " << pretty_binary_string(it->key()) << dendl;
3084     it->next();
3085   }
3086   r = 0;
3087
3088  out:
3089   dout(10) << __func__ << " " << c->cid << " " << o->oid << " = " << r << dendl;
3090   return r;
3091 }
3092
3093 int KStore::_setallochint(TransContext *txc,
3094                           CollectionRef& c,
3095                           OnodeRef& o,
3096                           uint64_t expected_object_size,
3097                           uint64_t expected_write_size,
3098                           uint32_t flags)
3099 {
3100   dout(15) << __func__ << " " << c->cid << " " << o->oid
3101            << " object_size " << expected_object_size
3102            << " write_size " << expected_write_size
3103            << " flags " << flags
3104            << dendl;
3105   int r = 0;
3106   o->onode.expected_object_size = expected_object_size;
3107   o->onode.expected_write_size = expected_write_size;
3108   o->onode.alloc_hint_flags = flags;
3109
3110   txc->write_onode(o);
3111   dout(10) << __func__ << " " << c->cid << " " << o->oid
3112            << " object_size " << expected_object_size
3113            << " write_size " << expected_write_size
3114            << " = " << r << dendl;
3115   return r;
3116 }
3117
3118 int KStore::_clone(TransContext *txc,
3119                    CollectionRef& c,
3120                    OnodeRef& oldo,
3121                    OnodeRef& newo)
3122 {
3123   dout(15) << __func__ << " " << c->cid << " " << oldo->oid << " -> "
3124            << newo->oid << dendl;
3125   int r = 0;
3126   if (oldo->oid.hobj.get_hash() != newo->oid.hobj.get_hash()) {
3127     derr << __func__ << " mismatched hash on " << oldo->oid
3128          << " and " << newo->oid << dendl;
3129     return -EINVAL;
3130   }
3131
3132   bufferlist bl;
3133   newo->exists = true;
3134   _assign_nid(txc, newo);
3135
3136   // data
3137   oldo->flush();
3138
3139   r = _do_read(oldo, 0, oldo->onode.size, bl, 0);
3140   if (r < 0)
3141     goto out;
3142
3143   // truncate any old data
3144   r = _do_truncate(txc, newo, 0);
3145   if (r < 0)
3146     goto out;
3147
3148   r = _do_write(txc, newo, 0, oldo->onode.size, bl, 0);
3149   if (r < 0)
3150     goto out;
3151
3152   newo->onode.attrs = oldo->onode.attrs;
3153
3154   // clone omap
3155   if (newo->onode.omap_head) {
3156     dout(20) << __func__ << " clearing old omap data" << dendl;
3157     _do_omap_clear(txc, newo->onode.omap_head);
3158   }
3159   if (oldo->onode.omap_head) {
3160     dout(20) << __func__ << " copying omap data" << dendl;
3161     if (!newo->onode.omap_head) {
3162       newo->onode.omap_head = newo->onode.nid;
3163     }
3164     KeyValueDB::Iterator it = db->get_iterator(PREFIX_OMAP);
3165     string head, tail;
3166     get_omap_header(oldo->onode.omap_head, &head);
3167     get_omap_tail(oldo->onode.omap_head, &tail);
3168     it->lower_bound(head);
3169     while (it->valid()) {
3170       string key;
3171       if (it->key() >= tail) {
3172         dout(30) << __func__ << "  reached tail" << dendl;
3173         break;
3174       } else {
3175         dout(30) << __func__ << "  got header/data "
3176                  << pretty_binary_string(it->key()) << dendl;
3177         assert(it->key() < tail);
3178         rewrite_omap_key(newo->onode.omap_head, it->key(), &key);
3179         txc->t->set(PREFIX_OMAP, key, it->value());
3180       }
3181       it->next();
3182     }
3183   }
3184
3185   txc->write_onode(newo);
3186   r = 0;
3187
3188  out:
3189   dout(10) << __func__ << " " << c->cid << " " << oldo->oid << " -> "
3190            << newo->oid << " = " << r << dendl;
3191   return r;
3192 }
3193
3194 int KStore::_clone_range(TransContext *txc,
3195                          CollectionRef& c,
3196                          OnodeRef& oldo,
3197                          OnodeRef& newo,
3198                          uint64_t srcoff, uint64_t length, uint64_t dstoff)
3199 {
3200   dout(15) << __func__ << " " << c->cid << " " << oldo->oid << " -> "
3201            << newo->oid << " from " << srcoff << "~" << length
3202            << " to offset " << dstoff << dendl;
3203   int r = 0;
3204
3205   bufferlist bl;
3206   newo->exists = true;
3207   _assign_nid(txc, newo);
3208
3209   r = _do_read(oldo, srcoff, length, bl, 0);
3210   if (r < 0)
3211     goto out;
3212
3213   r = _do_write(txc, newo, dstoff, bl.length(), bl, 0);
3214   if (r < 0)
3215     goto out;
3216
3217   txc->write_onode(newo);
3218
3219   r = 0;
3220
3221  out:
3222   dout(10) << __func__ << " " << c->cid << " " << oldo->oid << " -> "
3223            << newo->oid << " from " << srcoff << "~" << length
3224            << " to offset " << dstoff
3225            << " = " << r << dendl;
3226   return r;
3227 }
3228
3229 int KStore::_rename(TransContext *txc,
3230                     CollectionRef& c,
3231                     OnodeRef& oldo,
3232                     OnodeRef& newo,
3233                     const ghobject_t& new_oid)
3234 {
3235   dout(15) << __func__ << " " << c->cid << " " << oldo->oid << " -> "
3236            << new_oid << dendl;
3237   int r;
3238   ghobject_t old_oid = oldo->oid;
3239   bufferlist bl;
3240   string old_key, new_key;
3241
3242   if (newo && newo->exists) {
3243     // destination object already exists, remove it first
3244     r = _do_remove(txc, newo);
3245     if (r < 0)
3246       goto out;
3247   }
3248
3249   txc->t->rmkey(PREFIX_OBJ, oldo->key);
3250   txc->write_onode(oldo);
3251   c->onode_map.rename(old_oid, new_oid);  // this adjusts oldo->{oid,key}
3252   r = 0;
3253
3254  out:
3255   dout(10) << __func__ << " " << c->cid << " " << old_oid << " -> "
3256            << new_oid << " = " << r << dendl;
3257   return r;
3258 }
3259
3260 // collections
3261
3262 int KStore::_create_collection(
3263   TransContext *txc,
3264   coll_t cid,
3265   unsigned bits,
3266   CollectionRef *c)
3267 {
3268   dout(15) << __func__ << " " << cid << " bits " << bits << dendl;
3269   int r;
3270   bufferlist bl;
3271
3272   {
3273     RWLock::WLocker l(coll_lock);
3274     if (*c) {
3275       r = -EEXIST;
3276       goto out;
3277     }
3278     c->reset(new Collection(this, cid));
3279     (*c)->cnode.bits = bits;
3280     coll_map[cid] = *c;
3281   }
3282   ::encode((*c)->cnode, bl);
3283   txc->t->set(PREFIX_COLL, stringify(cid), bl);
3284   r = 0;
3285
3286  out:
3287   dout(10) << __func__ << " " << cid << " bits " << bits << " = " << r << dendl;
3288   return r;
3289 }
3290
3291 int KStore::_remove_collection(TransContext *txc, coll_t cid,
3292                                  CollectionRef *c)
3293 {
3294   dout(15) << __func__ << " " << cid << dendl;
3295   int r;
3296
3297   {
3298     RWLock::WLocker l(coll_lock);
3299     if (!*c) {
3300       r = -ENOENT;
3301       goto out;
3302     }
3303     size_t nonexistent_count = 0;
3304     pair<ghobject_t,OnodeRef> next_onode;
3305     while ((*c)->onode_map.get_next(next_onode.first, &next_onode)) {
3306       if (next_onode.second->exists) {
3307         r = -ENOTEMPTY;
3308         goto out;
3309       }
3310       ++nonexistent_count;
3311     }
3312     vector<ghobject_t> ls;
3313     ghobject_t next;
3314     // Enumerate onodes in db, up to nonexistent_count + 1
3315     // then check if all of them are marked as non-existent.
3316     // Bypass the check if returned number is greater than nonexistent_count
3317     r = _collection_list(c->get(), ghobject_t(), ghobject_t::get_max(),
3318                          nonexistent_count + 1, &ls, &next);
3319     if (r >= 0) {
3320       bool exists = false; //ls.size() > nonexistent_count;
3321       for (auto it = ls.begin(); !exists && it < ls.end(); ++it) {
3322         dout(10) << __func__ << " oid " << *it << dendl;
3323         auto onode = (*c)->onode_map.lookup(*it);
3324         exists = !onode || onode->exists;
3325         if (exists) {
3326           dout(10) << __func__ << " " << *it
3327                    << " exists in db" << dendl;
3328         }
3329       }
3330       if (!exists) {
3331         coll_map.erase(cid);
3332         txc->removed_collections.push_back(*c);
3333         c->reset();
3334         txc->t->rmkey(PREFIX_COLL, stringify(cid));
3335         r = 0;
3336       } else {
3337         dout(10) << __func__ << " " << cid
3338                  << " is non-empty" << dendl;
3339         r = -ENOTEMPTY;
3340       }
3341     }
3342   }
3343
3344  out:
3345   dout(10) << __func__ << " " << cid << " = " << r << dendl;
3346   return r;
3347 }
3348
3349 int KStore::_split_collection(TransContext *txc,
3350                                 CollectionRef& c,
3351                                 CollectionRef& d,
3352                                 unsigned bits, int rem)
3353 {
3354   dout(15) << __func__ << " " << c->cid << " to " << d->cid << " "
3355            << " bits " << bits << dendl;
3356   int r;
3357   RWLock::WLocker l(c->lock);
3358   RWLock::WLocker l2(d->lock);
3359   c->onode_map.clear();
3360   d->onode_map.clear();
3361   c->cnode.bits = bits;
3362   assert(d->cnode.bits == bits);
3363   r = 0;
3364
3365   bufferlist bl;
3366   ::encode(c->cnode, bl);
3367   txc->t->set(PREFIX_COLL, stringify(c->cid), bl);
3368
3369   dout(10) << __func__ << " " << c->cid << " to " << d->cid << " "
3370            << " bits " << bits << " = " << r << dendl;
3371   return r;
3372 }
3373
3374 // ===========================================