Fix some bugs when testing opensds ansible
[stor4nfv.git] / src / ceph / src / os / filestore / DBObjectMap.cc
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2
3 #include "include/int_types.h"
4 #include "include/buffer.h"
5
6 #include <iostream>
7 #include <set>
8 #include <map>
9 #include <string>
10 #include "include/memory.h"
11 #include <vector>
12
13 #include "os/ObjectMap.h"
14 #include "kv/KeyValueDB.h"
15 #include "DBObjectMap.h"
16 #include <errno.h>
17
18 #include "common/debug.h"
19 #include "common/config.h"
20 #include "include/assert.h"
21
22 #define dout_context cct
23 #define dout_subsys ceph_subsys_filestore
24 #undef dout_prefix
25 #define dout_prefix *_dout << "filestore "
26
27 const string DBObjectMap::USER_PREFIX = "_USER_";
28 const string DBObjectMap::XATTR_PREFIX = "_AXATTR_";
29 const string DBObjectMap::SYS_PREFIX = "_SYS_";
30 const string DBObjectMap::COMPLETE_PREFIX = "_COMPLETE_";
31 const string DBObjectMap::HEADER_KEY = "HEADER";
32 const string DBObjectMap::USER_HEADER_KEY = "USER_HEADER";
33 const string DBObjectMap::GLOBAL_STATE_KEY = "HEADER";
34 const string DBObjectMap::HOBJECT_TO_SEQ = "_HOBJTOSEQ_";
35
36 // Legacy
37 const string DBObjectMap::LEAF_PREFIX = "_LEAF_";
38 const string DBObjectMap::REVERSE_LEAF_PREFIX = "_REVLEAF_";
39
40 static void append_escaped(const string &in, string *out)
41 {
42   for (string::const_iterator i = in.begin(); i != in.end(); ++i) {
43     if (*i == '%') {
44       out->push_back('%');
45       out->push_back('p');
46     } else if (*i == '.') {
47       out->push_back('%');
48       out->push_back('e');
49     } else if (*i == '_') {
50       out->push_back('%');
51       out->push_back('u');
52     } else {
53       out->push_back(*i);
54     }
55   }
56 }
57
58 int DBObjectMap::check(std::ostream &out, bool repair, bool force)
59 {
60   int errors = 0, comp_errors = 0;
61   bool repaired = false;
62   map<uint64_t, uint64_t> parent_to_num_children;
63   map<uint64_t, uint64_t> parent_to_actual_num_children;
64   KeyValueDB::Iterator iter = db->get_iterator(HOBJECT_TO_SEQ);
65   for (iter->seek_to_first(); iter->valid(); iter->next()) {
66     _Header header;
67     bufferlist bl = iter->value();
68     while (true) {
69       bufferlist::iterator bliter = bl.begin();
70       header.decode(bliter);
71       if (header.seq != 0)
72         parent_to_actual_num_children[header.seq] = header.num_children;
73
74       if (state.v == 2 || force) {
75         // Check complete table
76         bool complete_error = false;
77         boost::optional<string> prev;
78         KeyValueDB::Iterator complete_iter = db->get_iterator(USER_PREFIX + header_key(header.seq) + COMPLETE_PREFIX);
79         for (complete_iter->seek_to_first(); complete_iter->valid();
80              complete_iter->next()) {
81           if (prev && prev >= complete_iter->key()) {
82              out << "Bad complete for " << header.oid << std::endl;
83              complete_error = true;
84              break;
85           }
86           prev = string(complete_iter->value().c_str(), complete_iter->value().length() - 1);
87         }
88         if (complete_error) {
89           out << "Complete mapping for " << header.seq << " :" << std::endl;
90           for (complete_iter->seek_to_first(); complete_iter->valid();
91                complete_iter->next()) {
92             out << complete_iter->key() << " -> " << string(complete_iter->value().c_str(), complete_iter->value().length() - 1) << std::endl;
93           }
94           if (repair) {
95             repaired = true;
96             KeyValueDB::Transaction t = db->get_transaction();
97             t->rmkeys_by_prefix(USER_PREFIX + header_key(header.seq) + COMPLETE_PREFIX);
98             db->submit_transaction(t);
99             out << "Cleared complete mapping to repair" << std::endl;
100           } else {
101             errors++;  // Only count when not repaired
102             comp_errors++;  // Track errors here for version update
103           }
104         }
105       }
106
107       if (header.parent == 0)
108         break;
109
110       if (!parent_to_num_children.count(header.parent))
111         parent_to_num_children[header.parent] = 0;
112       parent_to_num_children[header.parent]++;
113       if (parent_to_actual_num_children.count(header.parent))
114         break;
115
116       set<string> to_get;
117       map<string, bufferlist> got;
118       to_get.insert(HEADER_KEY);
119       db->get(sys_parent_prefix(header), to_get, &got);
120       if (got.empty()) {
121         out << "Missing: seq " << header.parent << std::endl;
122         errors++;
123         break;
124       } else {
125         bl = got.begin()->second;
126       }
127     }
128   }
129
130   for (map<uint64_t, uint64_t>::iterator i = parent_to_num_children.begin();
131        i != parent_to_num_children.end();
132        parent_to_num_children.erase(i++)) {
133     if (!parent_to_actual_num_children.count(i->first))
134       continue;
135     if (parent_to_actual_num_children[i->first] != i->second) {
136       out << "Invalid: seq " << i->first << " recorded children: "
137           << parent_to_actual_num_children[i->first] << " found: "
138           << i->second << std::endl;
139       errors++;
140     }
141     parent_to_actual_num_children.erase(i->first);
142   }
143
144   // Only advance the version from 2 to 3 here
145   // Mark as legacy because there are still older structures
146   // we don't update.  The value of legacy is only used
147   // for internal assertions.
148   if (comp_errors == 0 && state.v == 2 && repair) {
149     state.v = 3;
150     state.legacy = true;
151     set_state();
152   }
153
154   if (errors == 0 && repaired)
155     return -1;
156   return errors;
157 }
158
159 string DBObjectMap::ghobject_key(const ghobject_t &oid)
160 {
161   string out;
162   append_escaped(oid.hobj.oid.name, &out);
163   out.push_back('.');
164   append_escaped(oid.hobj.get_key(), &out);
165   out.push_back('.');
166   append_escaped(oid.hobj.nspace, &out);
167   out.push_back('.');
168
169   char snap_with_hash[1000];
170   char *t = snap_with_hash;
171   char *end = t + sizeof(snap_with_hash);
172   if (oid.hobj.snap == CEPH_NOSNAP)
173     t += snprintf(t, end - t, "head");
174   else if (oid.hobj.snap == CEPH_SNAPDIR)
175     t += snprintf(t, end - t, "snapdir");
176   else
177     t += snprintf(t, end - t, "%llx", (long long unsigned)oid.hobj.snap);
178
179   if (oid.hobj.pool == -1)
180     t += snprintf(t, end - t, ".none");
181   else
182     t += snprintf(t, end - t, ".%llx", (long long unsigned)oid.hobj.pool);
183   t += snprintf(t, end - t, ".%.*X", (int)(sizeof(uint32_t)*2), oid.hobj.get_hash());
184
185   if (oid.generation != ghobject_t::NO_GEN ||
186       oid.shard_id != shard_id_t::NO_SHARD) {
187     t += snprintf(t, end - t, ".%llx", (long long unsigned)oid.generation);
188     t += snprintf(t, end - t, ".%x", (int)oid.shard_id);
189   }
190   out += string(snap_with_hash);
191   return out;
192 }
193
194 //    ok: pglog%u3%efs1...0.none.0017B237
195 //   bad: plana8923501-10...4c.3.ffffffffffffffff.2
196 // fixed: plana8923501-10...4c.3.CB767F2D.ffffffffffffffff.2
197 // returns 0 for false, 1 for true, negative for error
198 int DBObjectMap::is_buggy_ghobject_key_v1(CephContext* cct,
199                                           const string &in)
200 {
201   int dots = 5;  // skip 5 .'s
202   const char *s = in.c_str();
203   do {
204     while (*s && *s != '.')
205       ++s;
206     if (!*s) {
207       derr << "unexpected null at " << (int)(s-in.c_str()) << dendl;
208       return -EINVAL;
209     }
210     ++s;
211   } while (*s && --dots);
212   if (!*s) {
213     derr << "unexpected null at " << (int)(s-in.c_str()) << dendl;
214     return -EINVAL;
215   }
216   // we are now either at a hash value (32 bits, 8 chars) or a generation
217   // value (64 bits) '.' and shard id.  count the dots!
218   int len = 0;
219   while (*s && *s != '.') {
220     ++s;
221     ++len;
222   }
223   if (*s == '\0') {
224     if (len != 8) {
225       derr << "hash value is not 8 chars" << dendl;
226       return -EINVAL;  // the hash value is always 8 chars.
227     }
228     return 0;
229   }
230   if (*s != '.') { // the shard follows.
231     derr << "missing final . and shard id at " << (int)(s-in.c_str()) << dendl;
232     return -EINVAL;
233   }
234   return 1;
235 }
236
237
238 string DBObjectMap::map_header_key(const ghobject_t &oid)
239 {
240   return ghobject_key(oid);
241 }
242
243 string DBObjectMap::header_key(uint64_t seq)
244 {
245   char buf[100];
246   snprintf(buf, sizeof(buf), "%.*" PRId64, (int)(2*sizeof(seq)), seq);
247   return string(buf);
248 }
249
250 string DBObjectMap::complete_prefix(Header header)
251 {
252   return USER_PREFIX + header_key(header->seq) + COMPLETE_PREFIX;
253 }
254
255 string DBObjectMap::user_prefix(Header header)
256 {
257   return USER_PREFIX + header_key(header->seq) + USER_PREFIX;
258 }
259
260 string DBObjectMap::sys_prefix(Header header)
261 {
262   return USER_PREFIX + header_key(header->seq) + SYS_PREFIX;
263 }
264
265 string DBObjectMap::xattr_prefix(Header header)
266 {
267   return USER_PREFIX + header_key(header->seq) + XATTR_PREFIX;
268 }
269
270 string DBObjectMap::sys_parent_prefix(_Header header)
271 {
272   return USER_PREFIX + header_key(header.parent) + SYS_PREFIX;
273 }
274
275 int DBObjectMap::DBObjectMapIteratorImpl::init()
276 {
277   invalid = false;
278   if (ready) {
279     return 0;
280   }
281   assert(!parent_iter);
282   if (header->parent) {
283     Header parent = map->lookup_parent(header);
284     if (!parent) {
285       ceph_abort();
286       return -EINVAL;
287     }
288     parent_iter = std::make_shared<DBObjectMapIteratorImpl>(map, parent);
289   }
290   key_iter = map->db->get_iterator(map->user_prefix(header));
291   assert(key_iter);
292   complete_iter = map->db->get_iterator(map->complete_prefix(header));
293   assert(complete_iter);
294   cur_iter = key_iter;
295   assert(cur_iter);
296   ready = true;
297   return 0;
298 }
299
300 ObjectMap::ObjectMapIterator DBObjectMap::get_iterator(
301   const ghobject_t &oid)
302 {
303   MapHeaderLock hl(this, oid);
304   Header header = lookup_map_header(hl, oid);
305   if (!header)
306     return ObjectMapIterator(new EmptyIteratorImpl());
307   DBObjectMapIterator iter = _get_iterator(header);
308   iter->hlock.swap(hl);
309   return iter;
310 }
311
312 int DBObjectMap::DBObjectMapIteratorImpl::seek_to_first()
313 {
314   init();
315   r = 0;
316   if (parent_iter) {
317     r = parent_iter->seek_to_first();
318     if (r < 0)
319       return r;
320   }
321   r = key_iter->seek_to_first();
322   if (r < 0)
323     return r;
324   return adjust();
325 }
326
327 int DBObjectMap::DBObjectMapIteratorImpl::seek_to_last()
328 {
329   init();
330   r = 0;
331   if (parent_iter) {
332     r = parent_iter->seek_to_last();
333     if (r < 0)
334       return r;
335     if (parent_iter->valid())
336       r = parent_iter->next();
337     if (r < 0)
338       return r;
339   }
340   r = key_iter->seek_to_last();
341   if (r < 0)
342     return r;
343   if (key_iter->valid())
344     r = key_iter->next();
345   if (r < 0)
346     return r;
347   return adjust();
348 }
349
350 int DBObjectMap::DBObjectMapIteratorImpl::lower_bound(const string &to)
351 {
352   init();
353   r = 0;
354   if (parent_iter) {
355     r = parent_iter->lower_bound(to);
356     if (r < 0)
357       return r;
358   }
359   r = key_iter->lower_bound(to);
360   if (r < 0)
361     return r;
362   return adjust();
363 }
364
365 int DBObjectMap::DBObjectMapIteratorImpl::lower_bound_parent(const string &to)
366 {
367   int r = lower_bound(to);
368   if (r < 0)
369     return r;
370   if (valid() && !on_parent())
371     return next_parent();
372   else
373     return r;
374 }
375
376 int DBObjectMap::DBObjectMapIteratorImpl::upper_bound(const string &after)
377 {
378   init();
379   r = 0;
380   if (parent_iter) {
381     r = parent_iter->upper_bound(after);
382     if (r < 0)
383       return r;
384   }
385   r = key_iter->upper_bound(after);
386   if (r < 0)
387     return r;
388   return adjust();
389 }
390
391 bool DBObjectMap::DBObjectMapIteratorImpl::valid()
392 {
393   bool valid = !invalid && ready;
394   assert(!valid || cur_iter->valid());
395   return valid;
396 }
397
398 bool DBObjectMap::DBObjectMapIteratorImpl::valid_parent()
399 {
400   if (parent_iter && parent_iter->valid() &&
401       (!key_iter->valid() || key_iter->key() > parent_iter->key()))
402     return true;
403   return false;
404 }
405
406 int DBObjectMap::DBObjectMapIteratorImpl::next(bool validate)
407 {
408   assert(cur_iter->valid());
409   assert(valid());
410   cur_iter->next();
411   return adjust();
412 }
413
414 int DBObjectMap::DBObjectMapIteratorImpl::next_parent()
415 {
416   r = next();
417   if (r < 0)
418     return r;
419   while (parent_iter && parent_iter->valid() && !on_parent()) {
420     assert(valid());
421     r = lower_bound(parent_iter->key());
422     if (r < 0)
423       return r;
424   }
425
426   if (!parent_iter || !parent_iter->valid()) {
427     invalid = true;
428   }
429   return 0;
430 }
431
432 int DBObjectMap::DBObjectMapIteratorImpl::in_complete_region(const string &to_test,
433                                                              string *begin,
434                                                              string *end)
435 {
436   /* This is clumsy because one cannot call prev() on end(), nor can one
437    * test for == begin().
438    */
439   complete_iter->upper_bound(to_test);
440   if (complete_iter->valid()) {
441     complete_iter->prev();
442     if (!complete_iter->valid()) {
443       complete_iter->upper_bound(to_test);
444       return false;
445     }
446   } else {
447     complete_iter->seek_to_last();
448     if (!complete_iter->valid())
449       return false;
450   }
451
452   assert(complete_iter->key() <= to_test);
453   assert(complete_iter->value().length() >= 1);
454   string _end(complete_iter->value().c_str(),
455               complete_iter->value().length() - 1);
456   if (_end.empty() || _end > to_test) {
457     if (begin)
458       *begin = complete_iter->key();
459     if (end)
460       *end = _end;
461     return true;
462   } else {
463     complete_iter->next();
464     assert(!complete_iter->valid() || complete_iter->key() > to_test);
465     return false;
466   }
467 }
468
469 /**
470  * Moves parent_iter to the next position both out of the complete_region and
471  * not equal to key_iter.  Then, we set cur_iter to parent_iter if valid and
472  * less than key_iter and key_iter otherwise.
473  */
474 int DBObjectMap::DBObjectMapIteratorImpl::adjust()
475 {
476   string begin, end;
477   while (parent_iter && parent_iter->valid()) {
478     if (in_complete_region(parent_iter->key(), &begin, &end)) {
479       if (end.size() == 0) {
480         parent_iter->seek_to_last();
481         if (parent_iter->valid())
482           parent_iter->next();
483       } else
484         parent_iter->lower_bound(end);
485     } else if (key_iter->valid() && key_iter->key() == parent_iter->key()) {
486       parent_iter->next();
487     } else {
488       break;
489     }
490   }
491   if (valid_parent()) {
492     cur_iter = parent_iter;
493   } else if (key_iter->valid()) {
494     cur_iter = key_iter;
495   } else {
496     invalid = true;
497   }
498   assert(invalid || cur_iter->valid());
499   return 0;
500 }
501
502
503 string DBObjectMap::DBObjectMapIteratorImpl::key()
504 {
505   return cur_iter->key();
506 }
507
508 bufferlist DBObjectMap::DBObjectMapIteratorImpl::value()
509 {
510   return cur_iter->value();
511 }
512
513 int DBObjectMap::DBObjectMapIteratorImpl::status()
514 {
515   return r;
516 }
517
518 int DBObjectMap::set_keys(const ghobject_t &oid,
519                           const map<string, bufferlist> &set,
520                           const SequencerPosition *spos)
521 {
522   KeyValueDB::Transaction t = db->get_transaction();
523   MapHeaderLock hl(this, oid);
524   Header header = lookup_create_map_header(hl, oid, t);
525   if (!header)
526     return -EINVAL;
527   if (check_spos(oid, header, spos))
528     return 0;
529
530   t->set(user_prefix(header), set);
531
532   return db->submit_transaction(t);
533 }
534
535 int DBObjectMap::set_header(const ghobject_t &oid,
536                             const bufferlist &bl,
537                             const SequencerPosition *spos)
538 {
539   KeyValueDB::Transaction t = db->get_transaction();
540   MapHeaderLock hl(this, oid);
541   Header header = lookup_create_map_header(hl, oid, t);
542   if (!header)
543     return -EINVAL;
544   if (check_spos(oid, header, spos))
545     return 0;
546   _set_header(header, bl, t);
547   return db->submit_transaction(t);
548 }
549
550 void DBObjectMap::_set_header(Header header, const bufferlist &bl,
551                               KeyValueDB::Transaction t)
552 {
553   map<string, bufferlist> to_set;
554   to_set[USER_HEADER_KEY] = bl;
555   t->set(sys_prefix(header), to_set);
556 }
557
558 int DBObjectMap::get_header(const ghobject_t &oid,
559                             bufferlist *bl)
560 {
561   MapHeaderLock hl(this, oid);
562   Header header = lookup_map_header(hl, oid);
563   if (!header) {
564     return 0;
565   }
566   return _get_header(header, bl);
567 }
568
569 int DBObjectMap::_get_header(Header header,
570                              bufferlist *bl)
571 {
572   map<string, bufferlist> out;
573   while (true) {
574     out.clear();
575     set<string> to_get;
576     to_get.insert(USER_HEADER_KEY);
577     int r = db->get(sys_prefix(header), to_get, &out);
578     if (r == 0 && !out.empty())
579       break;
580     if (r < 0)
581       return r;
582     Header current(header);
583     if (!current->parent)
584       break;
585     header = lookup_parent(current);
586   }
587
588   if (!out.empty())
589     bl->swap(out.begin()->second);
590   return 0;
591 }
592
593 int DBObjectMap::clear(const ghobject_t &oid,
594                        const SequencerPosition *spos)
595 {
596   KeyValueDB::Transaction t = db->get_transaction();
597   MapHeaderLock hl(this, oid);
598   Header header = lookup_map_header(hl, oid);
599   if (!header)
600     return -ENOENT;
601   if (check_spos(oid, header, spos))
602     return 0;
603   remove_map_header(hl, oid, header, t);
604   assert(header->num_children > 0);
605   header->num_children--;
606   int r = _clear(header, t);
607   if (r < 0)
608     return r;
609   return db->submit_transaction(t);
610 }
611
612 int DBObjectMap::_clear(Header header,
613                         KeyValueDB::Transaction t)
614 {
615   while (1) {
616     if (header->num_children) {
617       set_header(header, t);
618       break;
619     }
620     clear_header(header, t);
621     if (!header->parent)
622       break;
623     Header parent = lookup_parent(header);
624     if (!parent) {
625       return -EINVAL;
626     }
627     assert(parent->num_children > 0);
628     parent->num_children--;
629     header.swap(parent);
630   }
631   return 0;
632 }
633
634 int DBObjectMap::copy_up_header(Header header,
635                                 KeyValueDB::Transaction t)
636 {
637   bufferlist bl;
638   int r = _get_header(header, &bl);
639   if (r < 0)
640     return r;
641
642   _set_header(header, bl, t);
643   return 0;
644 }
645
646 int DBObjectMap::rm_keys(const ghobject_t &oid,
647                          const set<string> &to_clear,
648                          const SequencerPosition *spos)
649 {
650   MapHeaderLock hl(this, oid);
651   Header header = lookup_map_header(hl, oid);
652   if (!header)
653     return -ENOENT;
654   KeyValueDB::Transaction t = db->get_transaction();
655   if (check_spos(oid, header, spos))
656     return 0;
657   t->rmkeys(user_prefix(header), to_clear);
658   if (!header->parent) {
659     return db->submit_transaction(t);
660   }
661
662   assert(state.legacy);
663
664   {
665     // We only get here for legacy (v2) stores
666     // Copy up all keys from parent excluding to_clear
667     // and remove parent
668     // This eliminates a v2 format use of complete for this oid only
669     map<string, bufferlist> to_write;
670     ObjectMapIterator iter = _get_iterator(header);
671     for (iter->seek_to_first() ; iter->valid() ; iter->next()) {
672       if (iter->status())
673         return iter->status();
674       if (!to_clear.count(iter->key()))
675         to_write[iter->key()] = iter->value();
676     }
677     t->set(user_prefix(header), to_write);
678   } // destruct iter which has parent in_use
679
680   copy_up_header(header, t);
681   Header parent = lookup_parent(header);
682   if (!parent)
683     return -EINVAL;
684   parent->num_children--;
685   _clear(parent, t);
686   header->parent = 0;
687   set_map_header(hl, oid, *header, t);
688   t->rmkeys_by_prefix(complete_prefix(header));
689   return db->submit_transaction(t);
690 }
691
692 int DBObjectMap::clear_keys_header(const ghobject_t &oid,
693                                    const SequencerPosition *spos)
694 {
695   KeyValueDB::Transaction t = db->get_transaction();
696   MapHeaderLock hl(this, oid);
697   Header header = lookup_map_header(hl, oid);
698   if (!header)
699     return -ENOENT;
700   if (check_spos(oid, header, spos))
701     return 0;
702
703   // save old attrs
704   KeyValueDB::Iterator iter = db->get_iterator(xattr_prefix(header));
705   if (!iter)
706     return -EINVAL;
707   map<string, bufferlist> attrs;
708   for (iter->seek_to_first(); !iter->status() && iter->valid(); iter->next())
709     attrs.insert(make_pair(iter->key(), iter->value()));
710   if (iter->status())
711     return iter->status();
712
713   // remove current header
714   remove_map_header(hl, oid, header, t);
715   assert(header->num_children > 0);
716   header->num_children--;
717   int r = _clear(header, t);
718   if (r < 0)
719     return r;
720
721   // create new header
722   Header newheader = generate_new_header(oid, Header());
723   set_map_header(hl, oid, *newheader, t);
724   if (!attrs.empty())
725     t->set(xattr_prefix(newheader), attrs);
726   return db->submit_transaction(t);
727 }
728
729 int DBObjectMap::get(const ghobject_t &oid,
730                      bufferlist *_header,
731                      map<string, bufferlist> *out)
732 {
733   MapHeaderLock hl(this, oid);
734   Header header = lookup_map_header(hl, oid);
735   if (!header)
736     return -ENOENT;
737   _get_header(header, _header);
738   ObjectMapIterator iter = _get_iterator(header);
739   for (iter->seek_to_first(); iter->valid(); iter->next()) {
740     if (iter->status())
741       return iter->status();
742     out->insert(make_pair(iter->key(), iter->value()));
743   }
744   return 0;
745 }
746
747 int DBObjectMap::get_keys(const ghobject_t &oid,
748                           set<string> *keys)
749 {
750   MapHeaderLock hl(this, oid);
751   Header header = lookup_map_header(hl, oid);
752   if (!header)
753     return -ENOENT;
754   ObjectMapIterator iter = _get_iterator(header);
755   for (iter->seek_to_first(); iter->valid(); iter->next()) {
756     if (iter->status())
757       return iter->status();
758     keys->insert(iter->key());
759   }
760   return 0;
761 }
762
763 int DBObjectMap::scan(Header header,
764                       const set<string> &in_keys,
765                       set<string> *out_keys,
766                       map<string, bufferlist> *out_values)
767 {
768   ObjectMapIterator db_iter = _get_iterator(header);
769   for (set<string>::const_iterator key_iter = in_keys.begin();
770        key_iter != in_keys.end();
771        ++key_iter) {
772     db_iter->lower_bound(*key_iter);
773     if (db_iter->status())
774       return db_iter->status();
775     if (db_iter->valid() && db_iter->key() == *key_iter) {
776       if (out_keys)
777         out_keys->insert(*key_iter);
778       if (out_values)
779         out_values->insert(make_pair(db_iter->key(), db_iter->value()));
780     }
781   }
782   return 0;
783 }
784
785 int DBObjectMap::get_values(const ghobject_t &oid,
786                             const set<string> &keys,
787                             map<string, bufferlist> *out)
788 {
789   MapHeaderLock hl(this, oid);
790   Header header = lookup_map_header(hl, oid);
791   if (!header)
792     return -ENOENT;
793   return scan(header, keys, 0, out);
794 }
795
796 int DBObjectMap::check_keys(const ghobject_t &oid,
797                             const set<string> &keys,
798                             set<string> *out)
799 {
800   MapHeaderLock hl(this, oid);
801   Header header = lookup_map_header(hl, oid);
802   if (!header)
803     return -ENOENT;
804   return scan(header, keys, out, 0);
805 }
806
807 int DBObjectMap::get_xattrs(const ghobject_t &oid,
808                             const set<string> &to_get,
809                             map<string, bufferlist> *out)
810 {
811   MapHeaderLock hl(this, oid);
812   Header header = lookup_map_header(hl, oid);
813   if (!header)
814     return -ENOENT;
815   return db->get(xattr_prefix(header), to_get, out);
816 }
817
818 int DBObjectMap::get_all_xattrs(const ghobject_t &oid,
819                                 set<string> *out)
820 {
821   MapHeaderLock hl(this, oid);
822   Header header = lookup_map_header(hl, oid);
823   if (!header)
824     return -ENOENT;
825   KeyValueDB::Iterator iter = db->get_iterator(xattr_prefix(header));
826   if (!iter)
827     return -EINVAL;
828   for (iter->seek_to_first(); !iter->status() && iter->valid(); iter->next())
829     out->insert(iter->key());
830   return iter->status();
831 }
832
833 int DBObjectMap::set_xattrs(const ghobject_t &oid,
834                             const map<string, bufferlist> &to_set,
835                             const SequencerPosition *spos)
836 {
837   KeyValueDB::Transaction t = db->get_transaction();
838   MapHeaderLock hl(this, oid);
839   Header header = lookup_create_map_header(hl, oid, t);
840   if (!header)
841     return -EINVAL;
842   if (check_spos(oid, header, spos))
843     return 0;
844   t->set(xattr_prefix(header), to_set);
845   return db->submit_transaction(t);
846 }
847
848 int DBObjectMap::remove_xattrs(const ghobject_t &oid,
849                                const set<string> &to_remove,
850                                const SequencerPosition *spos)
851 {
852   KeyValueDB::Transaction t = db->get_transaction();
853   MapHeaderLock hl(this, oid);
854   Header header = lookup_map_header(hl, oid);
855   if (!header)
856     return -ENOENT;
857   if (check_spos(oid, header, spos))
858     return 0;
859   t->rmkeys(xattr_prefix(header), to_remove);
860   return db->submit_transaction(t);
861 }
862
863 // ONLY USED FOR TESTING
864 // Set version to 2 to avoid asserts
865 int DBObjectMap::legacy_clone(const ghobject_t &oid,
866                        const ghobject_t &target,
867                        const SequencerPosition *spos)
868 {
869   state.legacy = true;
870
871   if (oid == target)
872     return 0;
873
874   MapHeaderLock _l1(this, std::min(oid, target));
875   MapHeaderLock _l2(this, std::max(oid, target));
876   MapHeaderLock *lsource, *ltarget;
877   if (oid > target) {
878     lsource = &_l2;
879     ltarget= &_l1;
880   } else {
881     lsource = &_l1;
882     ltarget= &_l2;
883   }
884
885   KeyValueDB::Transaction t = db->get_transaction();
886   {
887     Header destination = lookup_map_header(*ltarget, target);
888     if (destination) {
889       if (check_spos(target, destination, spos))
890         return 0;
891       destination->num_children--;
892       remove_map_header(*ltarget, target, destination, t);
893       _clear(destination, t);
894     }
895   }
896
897   Header parent = lookup_map_header(*lsource, oid);
898   if (!parent)
899     return db->submit_transaction(t);
900
901   Header source = generate_new_header(oid, parent);
902   Header destination = generate_new_header(target, parent);
903   if (spos)
904     destination->spos = *spos;
905
906   parent->num_children = 2;
907   set_header(parent, t);
908   set_map_header(*lsource, oid, *source, t);
909   set_map_header(*ltarget, target, *destination, t);
910
911   map<string, bufferlist> to_set;
912   KeyValueDB::Iterator xattr_iter = db->get_iterator(xattr_prefix(parent));
913   for (xattr_iter->seek_to_first();
914        xattr_iter->valid();
915        xattr_iter->next())
916     to_set.insert(make_pair(xattr_iter->key(), xattr_iter->value()));
917   t->set(xattr_prefix(source), to_set);
918   t->set(xattr_prefix(destination), to_set);
919   t->rmkeys_by_prefix(xattr_prefix(parent));
920   return db->submit_transaction(t);
921 }
922
923 int DBObjectMap::clone(const ghobject_t &oid,
924                        const ghobject_t &target,
925                        const SequencerPosition *spos)
926 {
927   if (oid == target)
928     return 0;
929
930   MapHeaderLock _l1(this, std::min(oid, target));
931   MapHeaderLock _l2(this, std::max(oid, target));
932   MapHeaderLock *lsource, *ltarget;
933   if (oid > target) {
934     lsource = &_l2;
935     ltarget= &_l1;
936   } else {
937     lsource = &_l1;
938     ltarget= &_l2;
939   }
940
941   KeyValueDB::Transaction t = db->get_transaction();
942   {
943     Header destination = lookup_map_header(*ltarget, target);
944     if (destination) {
945       if (check_spos(target, destination, spos))
946         return 0;
947       destination->num_children--;
948       remove_map_header(*ltarget, target, destination, t);
949       _clear(destination, t);
950     }
951   }
952
953   Header source = lookup_map_header(*lsource, oid);
954   if (!source)
955     return db->submit_transaction(t);
956
957   Header destination = generate_new_header(target, Header());
958   if (spos)
959     destination->spos = *spos;
960
961   set_map_header(*ltarget, target, *destination, t);
962
963   bufferlist bl;
964   int r = _get_header(source, &bl);
965   if (r < 0)
966     return r;
967   _set_header(destination, bl, t);
968
969   map<string, bufferlist> to_set;
970   KeyValueDB::Iterator xattr_iter = db->get_iterator(xattr_prefix(source));
971   for (xattr_iter->seek_to_first();
972        xattr_iter->valid();
973        xattr_iter->next())
974     to_set.insert(make_pair(xattr_iter->key(), xattr_iter->value()));
975   t->set(xattr_prefix(destination), to_set);
976
977   map<string, bufferlist> to_write;
978   ObjectMapIterator iter = _get_iterator(source);
979   for (iter->seek_to_first() ; iter->valid() ; iter->next()) {
980     if (iter->status())
981       return iter->status();
982     to_write[iter->key()] = iter->value();
983   }
984   t->set(user_prefix(destination), to_write);
985
986   return db->submit_transaction(t);
987 }
988
989 int DBObjectMap::upgrade_to_v2()
990 {
991   dout(1) << __func__ << " start" << dendl;
992   KeyValueDB::Iterator iter = db->get_iterator(HOBJECT_TO_SEQ);
993   iter->seek_to_first();
994   while (iter->valid()) {
995     unsigned count = 0;
996     KeyValueDB::Transaction t = db->get_transaction();
997     set<string> remove;
998     map<string, bufferlist> add;
999     for (;
1000         iter->valid() && count < 300;
1001         iter->next()) {
1002       dout(20) << __func__ << " key is " << iter->key() << dendl;
1003       int r = is_buggy_ghobject_key_v1(cct, iter->key());
1004       if (r < 0) {
1005         derr << __func__ << " bad key '" << iter->key() << "'" << dendl;
1006         return r;
1007       }
1008       if (!r) {
1009         dout(20) << __func__ << " " << iter->key() << " ok" << dendl;
1010         continue;
1011       }
1012
1013       // decode header to get oid
1014       _Header hdr;
1015       bufferlist bl = iter->value();
1016       bufferlist::iterator bliter = bl.begin();
1017       hdr.decode(bliter);
1018
1019       string newkey(ghobject_key(hdr.oid));
1020       dout(20) << __func__ << " " << iter->key() << " -> " << newkey << dendl;
1021       add[newkey] = iter->value();
1022       remove.insert(iter->key());
1023       ++count;
1024     }
1025
1026     if (!remove.empty()) {
1027       dout(20) << __func__ << " updating " << remove.size() << " keys" << dendl;
1028       t->rmkeys(HOBJECT_TO_SEQ, remove);
1029       t->set(HOBJECT_TO_SEQ, add);
1030       int r = db->submit_transaction(t);
1031       if (r < 0)
1032         return r;
1033     }
1034   }
1035
1036   state.v = 2;
1037
1038   set_state();
1039   return 0;
1040 }
1041
1042 void DBObjectMap::set_state()
1043 {
1044   Mutex::Locker l(header_lock);
1045   KeyValueDB::Transaction t = db->get_transaction();
1046   write_state(t);
1047   int ret = db->submit_transaction_sync(t);
1048   assert(ret == 0);
1049   dout(1) << __func__ << " done" << dendl;
1050   return;
1051 }
1052
1053 int DBObjectMap::get_state()
1054 {
1055   map<string, bufferlist> result;
1056   set<string> to_get;
1057   to_get.insert(GLOBAL_STATE_KEY);
1058   int r = db->get(SYS_PREFIX, to_get, &result);
1059   if (r < 0)
1060     return r;
1061   if (!result.empty()) {
1062     bufferlist::iterator bliter = result.begin()->second.begin();
1063     state.decode(bliter);
1064   } else {
1065     // New store
1066     state.v = State::CUR_VERSION;
1067     state.seq = 1;
1068     state.legacy = false;
1069   }
1070   return 0;
1071 }
1072
1073 int DBObjectMap::init(bool do_upgrade)
1074 {
1075   int ret = get_state();
1076   if (ret < 0)
1077     return ret;
1078   if (state.v < 1) {
1079     dout(1) << "DBObjectMap is *very* old; upgrade to an older version first"
1080             << dendl;
1081     return -ENOTSUP;
1082   }
1083   if (state.v < 2) { // Needs upgrade
1084     if (!do_upgrade) {
1085       dout(1) << "DOBjbectMap requires an upgrade,"
1086               << " set filestore_update_to"
1087               << dendl;
1088       return -ENOTSUP;
1089     } else {
1090       int r = upgrade_to_v2();
1091       if (r < 0)
1092         return r;
1093     }
1094   }
1095   ostringstream ss;
1096   int errors = check(ss, true);
1097   if (errors) {
1098     derr << ss.str() << dendl;
1099     if (errors > 0)
1100       return -EINVAL;
1101   }
1102   dout(20) << "(init)dbobjectmap: seq is " << state.seq << dendl;
1103   return 0;
1104 }
1105
1106 int DBObjectMap::sync(const ghobject_t *oid,
1107                       const SequencerPosition *spos) {
1108   KeyValueDB::Transaction t = db->get_transaction();
1109   if (oid) {
1110     assert(spos);
1111     MapHeaderLock hl(this, *oid);
1112     Header header = lookup_map_header(hl, *oid);
1113     if (header) {
1114       dout(10) << "oid: " << *oid << " setting spos to "
1115                << *spos << dendl;
1116       header->spos = *spos;
1117       set_map_header(hl, *oid, *header, t);
1118     }
1119     /* It may appear that this and the identical portion of the else
1120      * block can combined below, but in this block, the transaction
1121      * must be submitted under *both* the MapHeaderLock and the full
1122      * header_lock.
1123      *
1124      * See 2b63dd25fc1c73fa42e52e9ea4ab5a45dd9422a0 and bug 9891.
1125      */
1126     Mutex::Locker l(header_lock);
1127     write_state(t);
1128     return db->submit_transaction_sync(t);
1129   } else {
1130     Mutex::Locker l(header_lock);
1131     write_state(t);
1132     return db->submit_transaction_sync(t);
1133   }
1134 }
1135
1136 int DBObjectMap::write_state(KeyValueDB::Transaction _t) {
1137   assert(header_lock.is_locked_by_me());
1138   dout(20) << "dbobjectmap: seq is " << state.seq << dendl;
1139   KeyValueDB::Transaction t = _t ? _t : db->get_transaction();
1140   bufferlist bl;
1141   state.encode(bl);
1142   map<string, bufferlist> to_write;
1143   to_write[GLOBAL_STATE_KEY] = bl;
1144   t->set(SYS_PREFIX, to_write);
1145   return _t ? 0 : db->submit_transaction(t);
1146 }
1147
1148
1149 DBObjectMap::Header DBObjectMap::_lookup_map_header(
1150   const MapHeaderLock &l,
1151   const ghobject_t &oid)
1152 {
1153   assert(l.get_locked() == oid);
1154
1155   _Header *header = new _Header();
1156   {
1157     Mutex::Locker l(cache_lock);
1158     if (caches.lookup(oid, header)) {
1159       assert(!in_use.count(header->seq));
1160       in_use.insert(header->seq);
1161       return Header(header, RemoveOnDelete(this));
1162     }
1163   }
1164
1165   bufferlist out;
1166   int r = db->get(HOBJECT_TO_SEQ, map_header_key(oid), &out);
1167   if (r < 0 || out.length()==0) {
1168     delete header;
1169     return Header();
1170   }
1171
1172   Header ret(header, RemoveOnDelete(this));
1173   bufferlist::iterator iter = out.begin();
1174
1175   ret->decode(iter);
1176   {
1177     Mutex::Locker l(cache_lock);
1178     caches.add(oid, *ret);
1179   }
1180
1181   assert(!in_use.count(header->seq));
1182   in_use.insert(header->seq);
1183   return ret;
1184 }
1185
1186 DBObjectMap::Header DBObjectMap::_generate_new_header(const ghobject_t &oid,
1187                                                       Header parent)
1188 {
1189   Header header = Header(new _Header(), RemoveOnDelete(this));
1190   header->seq = state.seq++;
1191   if (parent) {
1192     header->parent = parent->seq;
1193     header->spos = parent->spos;
1194   }
1195   header->num_children = 1;
1196   header->oid = oid;
1197   assert(!in_use.count(header->seq));
1198   in_use.insert(header->seq);
1199
1200   write_state();
1201   return header;
1202 }
1203
1204 DBObjectMap::Header DBObjectMap::lookup_parent(Header input)
1205 {
1206   Mutex::Locker l(header_lock);
1207   while (in_use.count(input->parent))
1208     header_cond.Wait(header_lock);
1209   map<string, bufferlist> out;
1210   set<string> keys;
1211   keys.insert(HEADER_KEY);
1212
1213   dout(20) << "lookup_parent: parent " << input->parent
1214        << " for seq " << input->seq << dendl;
1215   int r = db->get(sys_parent_prefix(input), keys, &out);
1216   if (r < 0) {
1217     ceph_abort();
1218     return Header();
1219   }
1220   if (out.empty()) {
1221     ceph_abort();
1222     return Header();
1223   }
1224
1225   Header header = Header(new _Header(), RemoveOnDelete(this));
1226   bufferlist::iterator iter = out.begin()->second.begin();
1227   header->decode(iter);
1228   assert(header->seq == input->parent);
1229   dout(20) << "lookup_parent: parent seq is " << header->seq << " with parent "
1230        << header->parent << dendl;
1231   in_use.insert(header->seq);
1232   return header;
1233 }
1234
1235 DBObjectMap::Header DBObjectMap::lookup_create_map_header(
1236   const MapHeaderLock &hl,
1237   const ghobject_t &oid,
1238   KeyValueDB::Transaction t)
1239 {
1240   Mutex::Locker l(header_lock);
1241   Header header = _lookup_map_header(hl, oid);
1242   if (!header) {
1243     header = _generate_new_header(oid, Header());
1244     set_map_header(hl, oid, *header, t);
1245   }
1246   return header;
1247 }
1248
1249 void DBObjectMap::clear_header(Header header, KeyValueDB::Transaction t)
1250 {
1251   dout(20) << "clear_header: clearing seq " << header->seq << dendl;
1252   t->rmkeys_by_prefix(user_prefix(header));
1253   t->rmkeys_by_prefix(sys_prefix(header));
1254   if (state.legacy)
1255     t->rmkeys_by_prefix(complete_prefix(header)); // Needed when header.parent != 0
1256   t->rmkeys_by_prefix(xattr_prefix(header));
1257   set<string> keys;
1258   keys.insert(header_key(header->seq));
1259   t->rmkeys(USER_PREFIX, keys);
1260 }
1261
1262 void DBObjectMap::set_header(Header header, KeyValueDB::Transaction t)
1263 {
1264   dout(20) << "set_header: setting seq " << header->seq << dendl;
1265   map<string, bufferlist> to_write;
1266   header->encode(to_write[HEADER_KEY]);
1267   t->set(sys_prefix(header), to_write);
1268 }
1269
1270 void DBObjectMap::remove_map_header(
1271   const MapHeaderLock &l,
1272   const ghobject_t &oid,
1273   Header header,
1274   KeyValueDB::Transaction t)
1275 {
1276   assert(l.get_locked() == oid);
1277   dout(20) << "remove_map_header: removing " << header->seq
1278            << " oid " << oid << dendl;
1279   set<string> to_remove;
1280   to_remove.insert(map_header_key(oid));
1281   t->rmkeys(HOBJECT_TO_SEQ, to_remove);
1282   {
1283     Mutex::Locker l(cache_lock);
1284     caches.clear(oid);
1285   }
1286 }
1287
1288 void DBObjectMap::set_map_header(
1289   const MapHeaderLock &l,
1290   const ghobject_t &oid, _Header header,
1291   KeyValueDB::Transaction t)
1292 {
1293   assert(l.get_locked() == oid);
1294   dout(20) << "set_map_header: setting " << header.seq
1295            << " oid " << oid << " parent seq "
1296            << header.parent << dendl;
1297   map<string, bufferlist> to_set;
1298   header.encode(to_set[map_header_key(oid)]);
1299   t->set(HOBJECT_TO_SEQ, to_set);
1300   {
1301     Mutex::Locker l(cache_lock);
1302     caches.add(oid, header);
1303   }
1304 }
1305
1306 bool DBObjectMap::check_spos(const ghobject_t &oid,
1307                              Header header,
1308                              const SequencerPosition *spos)
1309 {
1310   if (!spos || *spos > header->spos) {
1311     stringstream out;
1312     if (spos)
1313       dout(10) << "oid: " << oid << " not skipping op, *spos "
1314                << *spos << dendl;
1315     else
1316       dout(10) << "oid: " << oid << " not skipping op, *spos "
1317                << "empty" << dendl;
1318     dout(10) << " > header.spos " << header->spos << dendl;
1319     return false;
1320   } else {
1321     dout(10) << "oid: " << oid << " skipping op, *spos " << *spos
1322              << " <= header.spos " << header->spos << dendl;
1323     return true;
1324   }
1325 }
1326
1327 int DBObjectMap::list_objects(vector<ghobject_t> *out)
1328 {
1329   KeyValueDB::Iterator iter = db->get_iterator(HOBJECT_TO_SEQ);
1330   for (iter->seek_to_first(); iter->valid(); iter->next()) {
1331     bufferlist bl = iter->value();
1332     bufferlist::iterator bliter = bl.begin();
1333     _Header header;
1334     header.decode(bliter);
1335     out->push_back(header.oid);
1336   }
1337   return 0;
1338 }
1339
1340 int DBObjectMap::list_object_headers(vector<_Header> *out)
1341 {
1342   int error = 0;
1343   KeyValueDB::Iterator iter = db->get_iterator(HOBJECT_TO_SEQ);
1344   for (iter->seek_to_first(); iter->valid(); iter->next()) {
1345     bufferlist bl = iter->value();
1346     bufferlist::iterator bliter = bl.begin();
1347     _Header header;
1348     header.decode(bliter);
1349     out->push_back(header);
1350     while (header.parent) {
1351       set<string> to_get;
1352       map<string, bufferlist> got;
1353       to_get.insert(HEADER_KEY);
1354       db->get(sys_parent_prefix(header), to_get, &got);
1355       if (got.empty()) {
1356         dout(0) << "Missing: seq " << header.parent << dendl;
1357         error = -ENOENT;
1358         break;
1359       } else {
1360         bl = got.begin()->second;
1361         bufferlist::iterator bliter = bl.begin();
1362         header.decode(bliter);
1363         out->push_back(header);
1364       }
1365     }
1366   }
1367   return error;
1368 }
1369
1370 ostream& operator<<(ostream& out, const DBObjectMap::_Header& h)
1371 {
1372   out << "seq=" << h.seq << " parent=" << h.parent 
1373       << " num_children=" << h.num_children
1374       << " ghobject=" << h.oid;
1375   return out;
1376 }
1377
1378 int DBObjectMap::rename(const ghobject_t &from,
1379                        const ghobject_t &to,
1380                        const SequencerPosition *spos)
1381 {
1382   if (from == to)
1383     return 0;
1384
1385   MapHeaderLock _l1(this, std::min(from, to));
1386   MapHeaderLock _l2(this, std::max(from, to));
1387   MapHeaderLock *lsource, *ltarget;
1388   if (from > to) {
1389     lsource = &_l2;
1390     ltarget= &_l1;
1391   } else {
1392     lsource = &_l1;
1393     ltarget= &_l2;
1394   }
1395
1396   KeyValueDB::Transaction t = db->get_transaction();
1397   {
1398     Header destination = lookup_map_header(*ltarget, to);
1399     if (destination) {
1400       if (check_spos(to, destination, spos))
1401         return 0;
1402       destination->num_children--;
1403       remove_map_header(*ltarget, to, destination, t);
1404       _clear(destination, t);
1405     }
1406   }
1407
1408   Header hdr = lookup_map_header(*lsource, from);
1409   if (!hdr)
1410     return db->submit_transaction(t);
1411
1412   remove_map_header(*lsource, from, hdr, t);
1413   hdr->oid = to;
1414   set_map_header(*ltarget, to, *hdr, t);
1415
1416   return db->submit_transaction(t);
1417 }