1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
3 #include "include/int_types.h"
4 #include "include/buffer.h"
10 #include "include/memory.h"
13 #include "os/ObjectMap.h"
14 #include "kv/KeyValueDB.h"
15 #include "DBObjectMap.h"
18 #include "common/debug.h"
19 #include "common/config.h"
20 #include "include/assert.h"
22 #define dout_context cct
23 #define dout_subsys ceph_subsys_filestore
25 #define dout_prefix *_dout << "filestore "
27 const string DBObjectMap::USER_PREFIX = "_USER_";
28 const string DBObjectMap::XATTR_PREFIX = "_AXATTR_";
29 const string DBObjectMap::SYS_PREFIX = "_SYS_";
30 const string DBObjectMap::COMPLETE_PREFIX = "_COMPLETE_";
31 const string DBObjectMap::HEADER_KEY = "HEADER";
32 const string DBObjectMap::USER_HEADER_KEY = "USER_HEADER";
33 const string DBObjectMap::GLOBAL_STATE_KEY = "HEADER";
34 const string DBObjectMap::HOBJECT_TO_SEQ = "_HOBJTOSEQ_";
37 const string DBObjectMap::LEAF_PREFIX = "_LEAF_";
38 const string DBObjectMap::REVERSE_LEAF_PREFIX = "_REVLEAF_";
40 static void append_escaped(const string &in, string *out)
42 for (string::const_iterator i = in.begin(); i != in.end(); ++i) {
46 } else if (*i == '.') {
49 } else if (*i == '_') {
58 int DBObjectMap::check(std::ostream &out, bool repair, bool force)
60 int errors = 0, comp_errors = 0;
61 bool repaired = false;
62 map<uint64_t, uint64_t> parent_to_num_children;
63 map<uint64_t, uint64_t> parent_to_actual_num_children;
64 KeyValueDB::Iterator iter = db->get_iterator(HOBJECT_TO_SEQ);
65 for (iter->seek_to_first(); iter->valid(); iter->next()) {
67 bufferlist bl = iter->value();
69 bufferlist::iterator bliter = bl.begin();
70 header.decode(bliter);
72 parent_to_actual_num_children[header.seq] = header.num_children;
74 if (state.v == 2 || force) {
75 // Check complete table
76 bool complete_error = false;
77 boost::optional<string> prev;
78 KeyValueDB::Iterator complete_iter = db->get_iterator(USER_PREFIX + header_key(header.seq) + COMPLETE_PREFIX);
79 for (complete_iter->seek_to_first(); complete_iter->valid();
80 complete_iter->next()) {
81 if (prev && prev >= complete_iter->key()) {
82 out << "Bad complete for " << header.oid << std::endl;
83 complete_error = true;
86 prev = string(complete_iter->value().c_str(), complete_iter->value().length() - 1);
89 out << "Complete mapping for " << header.seq << " :" << std::endl;
90 for (complete_iter->seek_to_first(); complete_iter->valid();
91 complete_iter->next()) {
92 out << complete_iter->key() << " -> " << string(complete_iter->value().c_str(), complete_iter->value().length() - 1) << std::endl;
96 KeyValueDB::Transaction t = db->get_transaction();
97 t->rmkeys_by_prefix(USER_PREFIX + header_key(header.seq) + COMPLETE_PREFIX);
98 db->submit_transaction(t);
99 out << "Cleared complete mapping to repair" << std::endl;
101 errors++; // Only count when not repaired
102 comp_errors++; // Track errors here for version update
107 if (header.parent == 0)
110 if (!parent_to_num_children.count(header.parent))
111 parent_to_num_children[header.parent] = 0;
112 parent_to_num_children[header.parent]++;
113 if (parent_to_actual_num_children.count(header.parent))
117 map<string, bufferlist> got;
118 to_get.insert(HEADER_KEY);
119 db->get(sys_parent_prefix(header), to_get, &got);
121 out << "Missing: seq " << header.parent << std::endl;
125 bl = got.begin()->second;
130 for (map<uint64_t, uint64_t>::iterator i = parent_to_num_children.begin();
131 i != parent_to_num_children.end();
132 parent_to_num_children.erase(i++)) {
133 if (!parent_to_actual_num_children.count(i->first))
135 if (parent_to_actual_num_children[i->first] != i->second) {
136 out << "Invalid: seq " << i->first << " recorded children: "
137 << parent_to_actual_num_children[i->first] << " found: "
138 << i->second << std::endl;
141 parent_to_actual_num_children.erase(i->first);
144 // Only advance the version from 2 to 3 here
145 // Mark as legacy because there are still older structures
146 // we don't update. The value of legacy is only used
147 // for internal assertions.
148 if (comp_errors == 0 && state.v == 2 && repair) {
154 if (errors == 0 && repaired)
159 string DBObjectMap::ghobject_key(const ghobject_t &oid)
162 append_escaped(oid.hobj.oid.name, &out);
164 append_escaped(oid.hobj.get_key(), &out);
166 append_escaped(oid.hobj.nspace, &out);
169 char snap_with_hash[1000];
170 char *t = snap_with_hash;
171 char *end = t + sizeof(snap_with_hash);
172 if (oid.hobj.snap == CEPH_NOSNAP)
173 t += snprintf(t, end - t, "head");
174 else if (oid.hobj.snap == CEPH_SNAPDIR)
175 t += snprintf(t, end - t, "snapdir");
177 t += snprintf(t, end - t, "%llx", (long long unsigned)oid.hobj.snap);
179 if (oid.hobj.pool == -1)
180 t += snprintf(t, end - t, ".none");
182 t += snprintf(t, end - t, ".%llx", (long long unsigned)oid.hobj.pool);
183 t += snprintf(t, end - t, ".%.*X", (int)(sizeof(uint32_t)*2), oid.hobj.get_hash());
185 if (oid.generation != ghobject_t::NO_GEN ||
186 oid.shard_id != shard_id_t::NO_SHARD) {
187 t += snprintf(t, end - t, ".%llx", (long long unsigned)oid.generation);
188 t += snprintf(t, end - t, ".%x", (int)oid.shard_id);
190 out += string(snap_with_hash);
194 // ok: pglog%u3%efs1...0.none.0017B237
195 // bad: plana8923501-10...4c.3.ffffffffffffffff.2
196 // fixed: plana8923501-10...4c.3.CB767F2D.ffffffffffffffff.2
197 // returns 0 for false, 1 for true, negative for error
198 int DBObjectMap::is_buggy_ghobject_key_v1(CephContext* cct,
201 int dots = 5; // skip 5 .'s
202 const char *s = in.c_str();
204 while (*s && *s != '.')
207 derr << "unexpected null at " << (int)(s-in.c_str()) << dendl;
211 } while (*s && --dots);
213 derr << "unexpected null at " << (int)(s-in.c_str()) << dendl;
216 // we are now either at a hash value (32 bits, 8 chars) or a generation
217 // value (64 bits) '.' and shard id. count the dots!
219 while (*s && *s != '.') {
225 derr << "hash value is not 8 chars" << dendl;
226 return -EINVAL; // the hash value is always 8 chars.
230 if (*s != '.') { // the shard follows.
231 derr << "missing final . and shard id at " << (int)(s-in.c_str()) << dendl;
238 string DBObjectMap::map_header_key(const ghobject_t &oid)
240 return ghobject_key(oid);
243 string DBObjectMap::header_key(uint64_t seq)
246 snprintf(buf, sizeof(buf), "%.*" PRId64, (int)(2*sizeof(seq)), seq);
250 string DBObjectMap::complete_prefix(Header header)
252 return USER_PREFIX + header_key(header->seq) + COMPLETE_PREFIX;
255 string DBObjectMap::user_prefix(Header header)
257 return USER_PREFIX + header_key(header->seq) + USER_PREFIX;
260 string DBObjectMap::sys_prefix(Header header)
262 return USER_PREFIX + header_key(header->seq) + SYS_PREFIX;
265 string DBObjectMap::xattr_prefix(Header header)
267 return USER_PREFIX + header_key(header->seq) + XATTR_PREFIX;
270 string DBObjectMap::sys_parent_prefix(_Header header)
272 return USER_PREFIX + header_key(header.parent) + SYS_PREFIX;
275 int DBObjectMap::DBObjectMapIteratorImpl::init()
281 assert(!parent_iter);
282 if (header->parent) {
283 Header parent = map->lookup_parent(header);
288 parent_iter = std::make_shared<DBObjectMapIteratorImpl>(map, parent);
290 key_iter = map->db->get_iterator(map->user_prefix(header));
292 complete_iter = map->db->get_iterator(map->complete_prefix(header));
293 assert(complete_iter);
300 ObjectMap::ObjectMapIterator DBObjectMap::get_iterator(
301 const ghobject_t &oid)
303 MapHeaderLock hl(this, oid);
304 Header header = lookup_map_header(hl, oid);
306 return ObjectMapIterator(new EmptyIteratorImpl());
307 DBObjectMapIterator iter = _get_iterator(header);
308 iter->hlock.swap(hl);
312 int DBObjectMap::DBObjectMapIteratorImpl::seek_to_first()
317 r = parent_iter->seek_to_first();
321 r = key_iter->seek_to_first();
327 int DBObjectMap::DBObjectMapIteratorImpl::seek_to_last()
332 r = parent_iter->seek_to_last();
335 if (parent_iter->valid())
336 r = parent_iter->next();
340 r = key_iter->seek_to_last();
343 if (key_iter->valid())
344 r = key_iter->next();
350 int DBObjectMap::DBObjectMapIteratorImpl::lower_bound(const string &to)
355 r = parent_iter->lower_bound(to);
359 r = key_iter->lower_bound(to);
365 int DBObjectMap::DBObjectMapIteratorImpl::lower_bound_parent(const string &to)
367 int r = lower_bound(to);
370 if (valid() && !on_parent())
371 return next_parent();
376 int DBObjectMap::DBObjectMapIteratorImpl::upper_bound(const string &after)
381 r = parent_iter->upper_bound(after);
385 r = key_iter->upper_bound(after);
391 bool DBObjectMap::DBObjectMapIteratorImpl::valid()
393 bool valid = !invalid && ready;
394 assert(!valid || cur_iter->valid());
398 bool DBObjectMap::DBObjectMapIteratorImpl::valid_parent()
400 if (parent_iter && parent_iter->valid() &&
401 (!key_iter->valid() || key_iter->key() > parent_iter->key()))
406 int DBObjectMap::DBObjectMapIteratorImpl::next(bool validate)
408 assert(cur_iter->valid());
414 int DBObjectMap::DBObjectMapIteratorImpl::next_parent()
419 while (parent_iter && parent_iter->valid() && !on_parent()) {
421 r = lower_bound(parent_iter->key());
426 if (!parent_iter || !parent_iter->valid()) {
432 int DBObjectMap::DBObjectMapIteratorImpl::in_complete_region(const string &to_test,
436 /* This is clumsy because one cannot call prev() on end(), nor can one
437 * test for == begin().
439 complete_iter->upper_bound(to_test);
440 if (complete_iter->valid()) {
441 complete_iter->prev();
442 if (!complete_iter->valid()) {
443 complete_iter->upper_bound(to_test);
447 complete_iter->seek_to_last();
448 if (!complete_iter->valid())
452 assert(complete_iter->key() <= to_test);
453 assert(complete_iter->value().length() >= 1);
454 string _end(complete_iter->value().c_str(),
455 complete_iter->value().length() - 1);
456 if (_end.empty() || _end > to_test) {
458 *begin = complete_iter->key();
463 complete_iter->next();
464 assert(!complete_iter->valid() || complete_iter->key() > to_test);
470 * Moves parent_iter to the next position both out of the complete_region and
471 * not equal to key_iter. Then, we set cur_iter to parent_iter if valid and
472 * less than key_iter and key_iter otherwise.
474 int DBObjectMap::DBObjectMapIteratorImpl::adjust()
477 while (parent_iter && parent_iter->valid()) {
478 if (in_complete_region(parent_iter->key(), &begin, &end)) {
479 if (end.size() == 0) {
480 parent_iter->seek_to_last();
481 if (parent_iter->valid())
484 parent_iter->lower_bound(end);
485 } else if (key_iter->valid() && key_iter->key() == parent_iter->key()) {
491 if (valid_parent()) {
492 cur_iter = parent_iter;
493 } else if (key_iter->valid()) {
498 assert(invalid || cur_iter->valid());
503 string DBObjectMap::DBObjectMapIteratorImpl::key()
505 return cur_iter->key();
508 bufferlist DBObjectMap::DBObjectMapIteratorImpl::value()
510 return cur_iter->value();
513 int DBObjectMap::DBObjectMapIteratorImpl::status()
518 int DBObjectMap::set_keys(const ghobject_t &oid,
519 const map<string, bufferlist> &set,
520 const SequencerPosition *spos)
522 KeyValueDB::Transaction t = db->get_transaction();
523 MapHeaderLock hl(this, oid);
524 Header header = lookup_create_map_header(hl, oid, t);
527 if (check_spos(oid, header, spos))
530 t->set(user_prefix(header), set);
532 return db->submit_transaction(t);
535 int DBObjectMap::set_header(const ghobject_t &oid,
536 const bufferlist &bl,
537 const SequencerPosition *spos)
539 KeyValueDB::Transaction t = db->get_transaction();
540 MapHeaderLock hl(this, oid);
541 Header header = lookup_create_map_header(hl, oid, t);
544 if (check_spos(oid, header, spos))
546 _set_header(header, bl, t);
547 return db->submit_transaction(t);
550 void DBObjectMap::_set_header(Header header, const bufferlist &bl,
551 KeyValueDB::Transaction t)
553 map<string, bufferlist> to_set;
554 to_set[USER_HEADER_KEY] = bl;
555 t->set(sys_prefix(header), to_set);
558 int DBObjectMap::get_header(const ghobject_t &oid,
561 MapHeaderLock hl(this, oid);
562 Header header = lookup_map_header(hl, oid);
566 return _get_header(header, bl);
569 int DBObjectMap::_get_header(Header header,
572 map<string, bufferlist> out;
576 to_get.insert(USER_HEADER_KEY);
577 int r = db->get(sys_prefix(header), to_get, &out);
578 if (r == 0 && !out.empty())
582 Header current(header);
583 if (!current->parent)
585 header = lookup_parent(current);
589 bl->swap(out.begin()->second);
593 int DBObjectMap::clear(const ghobject_t &oid,
594 const SequencerPosition *spos)
596 KeyValueDB::Transaction t = db->get_transaction();
597 MapHeaderLock hl(this, oid);
598 Header header = lookup_map_header(hl, oid);
601 if (check_spos(oid, header, spos))
603 remove_map_header(hl, oid, header, t);
604 assert(header->num_children > 0);
605 header->num_children--;
606 int r = _clear(header, t);
609 return db->submit_transaction(t);
612 int DBObjectMap::_clear(Header header,
613 KeyValueDB::Transaction t)
616 if (header->num_children) {
617 set_header(header, t);
620 clear_header(header, t);
623 Header parent = lookup_parent(header);
627 assert(parent->num_children > 0);
628 parent->num_children--;
634 int DBObjectMap::copy_up_header(Header header,
635 KeyValueDB::Transaction t)
638 int r = _get_header(header, &bl);
642 _set_header(header, bl, t);
646 int DBObjectMap::rm_keys(const ghobject_t &oid,
647 const set<string> &to_clear,
648 const SequencerPosition *spos)
650 MapHeaderLock hl(this, oid);
651 Header header = lookup_map_header(hl, oid);
654 KeyValueDB::Transaction t = db->get_transaction();
655 if (check_spos(oid, header, spos))
657 t->rmkeys(user_prefix(header), to_clear);
658 if (!header->parent) {
659 return db->submit_transaction(t);
662 assert(state.legacy);
665 // We only get here for legacy (v2) stores
666 // Copy up all keys from parent excluding to_clear
668 // This eliminates a v2 format use of complete for this oid only
669 map<string, bufferlist> to_write;
670 ObjectMapIterator iter = _get_iterator(header);
671 for (iter->seek_to_first() ; iter->valid() ; iter->next()) {
673 return iter->status();
674 if (!to_clear.count(iter->key()))
675 to_write[iter->key()] = iter->value();
677 t->set(user_prefix(header), to_write);
678 } // destruct iter which has parent in_use
680 copy_up_header(header, t);
681 Header parent = lookup_parent(header);
684 parent->num_children--;
687 set_map_header(hl, oid, *header, t);
688 t->rmkeys_by_prefix(complete_prefix(header));
689 return db->submit_transaction(t);
692 int DBObjectMap::clear_keys_header(const ghobject_t &oid,
693 const SequencerPosition *spos)
695 KeyValueDB::Transaction t = db->get_transaction();
696 MapHeaderLock hl(this, oid);
697 Header header = lookup_map_header(hl, oid);
700 if (check_spos(oid, header, spos))
704 KeyValueDB::Iterator iter = db->get_iterator(xattr_prefix(header));
707 map<string, bufferlist> attrs;
708 for (iter->seek_to_first(); !iter->status() && iter->valid(); iter->next())
709 attrs.insert(make_pair(iter->key(), iter->value()));
711 return iter->status();
713 // remove current header
714 remove_map_header(hl, oid, header, t);
715 assert(header->num_children > 0);
716 header->num_children--;
717 int r = _clear(header, t);
722 Header newheader = generate_new_header(oid, Header());
723 set_map_header(hl, oid, *newheader, t);
725 t->set(xattr_prefix(newheader), attrs);
726 return db->submit_transaction(t);
729 int DBObjectMap::get(const ghobject_t &oid,
731 map<string, bufferlist> *out)
733 MapHeaderLock hl(this, oid);
734 Header header = lookup_map_header(hl, oid);
737 _get_header(header, _header);
738 ObjectMapIterator iter = _get_iterator(header);
739 for (iter->seek_to_first(); iter->valid(); iter->next()) {
741 return iter->status();
742 out->insert(make_pair(iter->key(), iter->value()));
747 int DBObjectMap::get_keys(const ghobject_t &oid,
750 MapHeaderLock hl(this, oid);
751 Header header = lookup_map_header(hl, oid);
754 ObjectMapIterator iter = _get_iterator(header);
755 for (iter->seek_to_first(); iter->valid(); iter->next()) {
757 return iter->status();
758 keys->insert(iter->key());
763 int DBObjectMap::scan(Header header,
764 const set<string> &in_keys,
765 set<string> *out_keys,
766 map<string, bufferlist> *out_values)
768 ObjectMapIterator db_iter = _get_iterator(header);
769 for (set<string>::const_iterator key_iter = in_keys.begin();
770 key_iter != in_keys.end();
772 db_iter->lower_bound(*key_iter);
773 if (db_iter->status())
774 return db_iter->status();
775 if (db_iter->valid() && db_iter->key() == *key_iter) {
777 out_keys->insert(*key_iter);
779 out_values->insert(make_pair(db_iter->key(), db_iter->value()));
785 int DBObjectMap::get_values(const ghobject_t &oid,
786 const set<string> &keys,
787 map<string, bufferlist> *out)
789 MapHeaderLock hl(this, oid);
790 Header header = lookup_map_header(hl, oid);
793 return scan(header, keys, 0, out);
796 int DBObjectMap::check_keys(const ghobject_t &oid,
797 const set<string> &keys,
800 MapHeaderLock hl(this, oid);
801 Header header = lookup_map_header(hl, oid);
804 return scan(header, keys, out, 0);
807 int DBObjectMap::get_xattrs(const ghobject_t &oid,
808 const set<string> &to_get,
809 map<string, bufferlist> *out)
811 MapHeaderLock hl(this, oid);
812 Header header = lookup_map_header(hl, oid);
815 return db->get(xattr_prefix(header), to_get, out);
818 int DBObjectMap::get_all_xattrs(const ghobject_t &oid,
821 MapHeaderLock hl(this, oid);
822 Header header = lookup_map_header(hl, oid);
825 KeyValueDB::Iterator iter = db->get_iterator(xattr_prefix(header));
828 for (iter->seek_to_first(); !iter->status() && iter->valid(); iter->next())
829 out->insert(iter->key());
830 return iter->status();
833 int DBObjectMap::set_xattrs(const ghobject_t &oid,
834 const map<string, bufferlist> &to_set,
835 const SequencerPosition *spos)
837 KeyValueDB::Transaction t = db->get_transaction();
838 MapHeaderLock hl(this, oid);
839 Header header = lookup_create_map_header(hl, oid, t);
842 if (check_spos(oid, header, spos))
844 t->set(xattr_prefix(header), to_set);
845 return db->submit_transaction(t);
848 int DBObjectMap::remove_xattrs(const ghobject_t &oid,
849 const set<string> &to_remove,
850 const SequencerPosition *spos)
852 KeyValueDB::Transaction t = db->get_transaction();
853 MapHeaderLock hl(this, oid);
854 Header header = lookup_map_header(hl, oid);
857 if (check_spos(oid, header, spos))
859 t->rmkeys(xattr_prefix(header), to_remove);
860 return db->submit_transaction(t);
863 // ONLY USED FOR TESTING
864 // Set version to 2 to avoid asserts
865 int DBObjectMap::legacy_clone(const ghobject_t &oid,
866 const ghobject_t &target,
867 const SequencerPosition *spos)
874 MapHeaderLock _l1(this, std::min(oid, target));
875 MapHeaderLock _l2(this, std::max(oid, target));
876 MapHeaderLock *lsource, *ltarget;
885 KeyValueDB::Transaction t = db->get_transaction();
887 Header destination = lookup_map_header(*ltarget, target);
889 if (check_spos(target, destination, spos))
891 destination->num_children--;
892 remove_map_header(*ltarget, target, destination, t);
893 _clear(destination, t);
897 Header parent = lookup_map_header(*lsource, oid);
899 return db->submit_transaction(t);
901 Header source = generate_new_header(oid, parent);
902 Header destination = generate_new_header(target, parent);
904 destination->spos = *spos;
906 parent->num_children = 2;
907 set_header(parent, t);
908 set_map_header(*lsource, oid, *source, t);
909 set_map_header(*ltarget, target, *destination, t);
911 map<string, bufferlist> to_set;
912 KeyValueDB::Iterator xattr_iter = db->get_iterator(xattr_prefix(parent));
913 for (xattr_iter->seek_to_first();
916 to_set.insert(make_pair(xattr_iter->key(), xattr_iter->value()));
917 t->set(xattr_prefix(source), to_set);
918 t->set(xattr_prefix(destination), to_set);
919 t->rmkeys_by_prefix(xattr_prefix(parent));
920 return db->submit_transaction(t);
923 int DBObjectMap::clone(const ghobject_t &oid,
924 const ghobject_t &target,
925 const SequencerPosition *spos)
930 MapHeaderLock _l1(this, std::min(oid, target));
931 MapHeaderLock _l2(this, std::max(oid, target));
932 MapHeaderLock *lsource, *ltarget;
941 KeyValueDB::Transaction t = db->get_transaction();
943 Header destination = lookup_map_header(*ltarget, target);
945 if (check_spos(target, destination, spos))
947 destination->num_children--;
948 remove_map_header(*ltarget, target, destination, t);
949 _clear(destination, t);
953 Header source = lookup_map_header(*lsource, oid);
955 return db->submit_transaction(t);
957 Header destination = generate_new_header(target, Header());
959 destination->spos = *spos;
961 set_map_header(*ltarget, target, *destination, t);
964 int r = _get_header(source, &bl);
967 _set_header(destination, bl, t);
969 map<string, bufferlist> to_set;
970 KeyValueDB::Iterator xattr_iter = db->get_iterator(xattr_prefix(source));
971 for (xattr_iter->seek_to_first();
974 to_set.insert(make_pair(xattr_iter->key(), xattr_iter->value()));
975 t->set(xattr_prefix(destination), to_set);
977 map<string, bufferlist> to_write;
978 ObjectMapIterator iter = _get_iterator(source);
979 for (iter->seek_to_first() ; iter->valid() ; iter->next()) {
981 return iter->status();
982 to_write[iter->key()] = iter->value();
984 t->set(user_prefix(destination), to_write);
986 return db->submit_transaction(t);
989 int DBObjectMap::upgrade_to_v2()
991 dout(1) << __func__ << " start" << dendl;
992 KeyValueDB::Iterator iter = db->get_iterator(HOBJECT_TO_SEQ);
993 iter->seek_to_first();
994 while (iter->valid()) {
996 KeyValueDB::Transaction t = db->get_transaction();
998 map<string, bufferlist> add;
1000 iter->valid() && count < 300;
1002 dout(20) << __func__ << " key is " << iter->key() << dendl;
1003 int r = is_buggy_ghobject_key_v1(cct, iter->key());
1005 derr << __func__ << " bad key '" << iter->key() << "'" << dendl;
1009 dout(20) << __func__ << " " << iter->key() << " ok" << dendl;
1013 // decode header to get oid
1015 bufferlist bl = iter->value();
1016 bufferlist::iterator bliter = bl.begin();
1019 string newkey(ghobject_key(hdr.oid));
1020 dout(20) << __func__ << " " << iter->key() << " -> " << newkey << dendl;
1021 add[newkey] = iter->value();
1022 remove.insert(iter->key());
1026 if (!remove.empty()) {
1027 dout(20) << __func__ << " updating " << remove.size() << " keys" << dendl;
1028 t->rmkeys(HOBJECT_TO_SEQ, remove);
1029 t->set(HOBJECT_TO_SEQ, add);
1030 int r = db->submit_transaction(t);
1042 void DBObjectMap::set_state()
1044 Mutex::Locker l(header_lock);
1045 KeyValueDB::Transaction t = db->get_transaction();
1047 int ret = db->submit_transaction_sync(t);
1049 dout(1) << __func__ << " done" << dendl;
1053 int DBObjectMap::get_state()
1055 map<string, bufferlist> result;
1057 to_get.insert(GLOBAL_STATE_KEY);
1058 int r = db->get(SYS_PREFIX, to_get, &result);
1061 if (!result.empty()) {
1062 bufferlist::iterator bliter = result.begin()->second.begin();
1063 state.decode(bliter);
1066 state.v = State::CUR_VERSION;
1068 state.legacy = false;
1073 int DBObjectMap::init(bool do_upgrade)
1075 int ret = get_state();
1079 dout(1) << "DBObjectMap is *very* old; upgrade to an older version first"
1083 if (state.v < 2) { // Needs upgrade
1085 dout(1) << "DOBjbectMap requires an upgrade,"
1086 << " set filestore_update_to"
1090 int r = upgrade_to_v2();
1096 int errors = check(ss, true);
1098 derr << ss.str() << dendl;
1102 dout(20) << "(init)dbobjectmap: seq is " << state.seq << dendl;
1106 int DBObjectMap::sync(const ghobject_t *oid,
1107 const SequencerPosition *spos) {
1108 KeyValueDB::Transaction t = db->get_transaction();
1111 MapHeaderLock hl(this, *oid);
1112 Header header = lookup_map_header(hl, *oid);
1114 dout(10) << "oid: " << *oid << " setting spos to "
1116 header->spos = *spos;
1117 set_map_header(hl, *oid, *header, t);
1119 /* It may appear that this and the identical portion of the else
1120 * block can combined below, but in this block, the transaction
1121 * must be submitted under *both* the MapHeaderLock and the full
1124 * See 2b63dd25fc1c73fa42e52e9ea4ab5a45dd9422a0 and bug 9891.
1126 Mutex::Locker l(header_lock);
1128 return db->submit_transaction_sync(t);
1130 Mutex::Locker l(header_lock);
1132 return db->submit_transaction_sync(t);
1136 int DBObjectMap::write_state(KeyValueDB::Transaction _t) {
1137 assert(header_lock.is_locked_by_me());
1138 dout(20) << "dbobjectmap: seq is " << state.seq << dendl;
1139 KeyValueDB::Transaction t = _t ? _t : db->get_transaction();
1142 map<string, bufferlist> to_write;
1143 to_write[GLOBAL_STATE_KEY] = bl;
1144 t->set(SYS_PREFIX, to_write);
1145 return _t ? 0 : db->submit_transaction(t);
1149 DBObjectMap::Header DBObjectMap::_lookup_map_header(
1150 const MapHeaderLock &l,
1151 const ghobject_t &oid)
1153 assert(l.get_locked() == oid);
1155 _Header *header = new _Header();
1157 Mutex::Locker l(cache_lock);
1158 if (caches.lookup(oid, header)) {
1159 assert(!in_use.count(header->seq));
1160 in_use.insert(header->seq);
1161 return Header(header, RemoveOnDelete(this));
1166 int r = db->get(HOBJECT_TO_SEQ, map_header_key(oid), &out);
1167 if (r < 0 || out.length()==0) {
1172 Header ret(header, RemoveOnDelete(this));
1173 bufferlist::iterator iter = out.begin();
1177 Mutex::Locker l(cache_lock);
1178 caches.add(oid, *ret);
1181 assert(!in_use.count(header->seq));
1182 in_use.insert(header->seq);
1186 DBObjectMap::Header DBObjectMap::_generate_new_header(const ghobject_t &oid,
1189 Header header = Header(new _Header(), RemoveOnDelete(this));
1190 header->seq = state.seq++;
1192 header->parent = parent->seq;
1193 header->spos = parent->spos;
1195 header->num_children = 1;
1197 assert(!in_use.count(header->seq));
1198 in_use.insert(header->seq);
1204 DBObjectMap::Header DBObjectMap::lookup_parent(Header input)
1206 Mutex::Locker l(header_lock);
1207 while (in_use.count(input->parent))
1208 header_cond.Wait(header_lock);
1209 map<string, bufferlist> out;
1211 keys.insert(HEADER_KEY);
1213 dout(20) << "lookup_parent: parent " << input->parent
1214 << " for seq " << input->seq << dendl;
1215 int r = db->get(sys_parent_prefix(input), keys, &out);
1225 Header header = Header(new _Header(), RemoveOnDelete(this));
1226 bufferlist::iterator iter = out.begin()->second.begin();
1227 header->decode(iter);
1228 assert(header->seq == input->parent);
1229 dout(20) << "lookup_parent: parent seq is " << header->seq << " with parent "
1230 << header->parent << dendl;
1231 in_use.insert(header->seq);
1235 DBObjectMap::Header DBObjectMap::lookup_create_map_header(
1236 const MapHeaderLock &hl,
1237 const ghobject_t &oid,
1238 KeyValueDB::Transaction t)
1240 Mutex::Locker l(header_lock);
1241 Header header = _lookup_map_header(hl, oid);
1243 header = _generate_new_header(oid, Header());
1244 set_map_header(hl, oid, *header, t);
1249 void DBObjectMap::clear_header(Header header, KeyValueDB::Transaction t)
1251 dout(20) << "clear_header: clearing seq " << header->seq << dendl;
1252 t->rmkeys_by_prefix(user_prefix(header));
1253 t->rmkeys_by_prefix(sys_prefix(header));
1255 t->rmkeys_by_prefix(complete_prefix(header)); // Needed when header.parent != 0
1256 t->rmkeys_by_prefix(xattr_prefix(header));
1258 keys.insert(header_key(header->seq));
1259 t->rmkeys(USER_PREFIX, keys);
1262 void DBObjectMap::set_header(Header header, KeyValueDB::Transaction t)
1264 dout(20) << "set_header: setting seq " << header->seq << dendl;
1265 map<string, bufferlist> to_write;
1266 header->encode(to_write[HEADER_KEY]);
1267 t->set(sys_prefix(header), to_write);
1270 void DBObjectMap::remove_map_header(
1271 const MapHeaderLock &l,
1272 const ghobject_t &oid,
1274 KeyValueDB::Transaction t)
1276 assert(l.get_locked() == oid);
1277 dout(20) << "remove_map_header: removing " << header->seq
1278 << " oid " << oid << dendl;
1279 set<string> to_remove;
1280 to_remove.insert(map_header_key(oid));
1281 t->rmkeys(HOBJECT_TO_SEQ, to_remove);
1283 Mutex::Locker l(cache_lock);
1288 void DBObjectMap::set_map_header(
1289 const MapHeaderLock &l,
1290 const ghobject_t &oid, _Header header,
1291 KeyValueDB::Transaction t)
1293 assert(l.get_locked() == oid);
1294 dout(20) << "set_map_header: setting " << header.seq
1295 << " oid " << oid << " parent seq "
1296 << header.parent << dendl;
1297 map<string, bufferlist> to_set;
1298 header.encode(to_set[map_header_key(oid)]);
1299 t->set(HOBJECT_TO_SEQ, to_set);
1301 Mutex::Locker l(cache_lock);
1302 caches.add(oid, header);
1306 bool DBObjectMap::check_spos(const ghobject_t &oid,
1308 const SequencerPosition *spos)
1310 if (!spos || *spos > header->spos) {
1313 dout(10) << "oid: " << oid << " not skipping op, *spos "
1316 dout(10) << "oid: " << oid << " not skipping op, *spos "
1317 << "empty" << dendl;
1318 dout(10) << " > header.spos " << header->spos << dendl;
1321 dout(10) << "oid: " << oid << " skipping op, *spos " << *spos
1322 << " <= header.spos " << header->spos << dendl;
1327 int DBObjectMap::list_objects(vector<ghobject_t> *out)
1329 KeyValueDB::Iterator iter = db->get_iterator(HOBJECT_TO_SEQ);
1330 for (iter->seek_to_first(); iter->valid(); iter->next()) {
1331 bufferlist bl = iter->value();
1332 bufferlist::iterator bliter = bl.begin();
1334 header.decode(bliter);
1335 out->push_back(header.oid);
1340 int DBObjectMap::list_object_headers(vector<_Header> *out)
1343 KeyValueDB::Iterator iter = db->get_iterator(HOBJECT_TO_SEQ);
1344 for (iter->seek_to_first(); iter->valid(); iter->next()) {
1345 bufferlist bl = iter->value();
1346 bufferlist::iterator bliter = bl.begin();
1348 header.decode(bliter);
1349 out->push_back(header);
1350 while (header.parent) {
1352 map<string, bufferlist> got;
1353 to_get.insert(HEADER_KEY);
1354 db->get(sys_parent_prefix(header), to_get, &got);
1356 dout(0) << "Missing: seq " << header.parent << dendl;
1360 bl = got.begin()->second;
1361 bufferlist::iterator bliter = bl.begin();
1362 header.decode(bliter);
1363 out->push_back(header);
1370 ostream& operator<<(ostream& out, const DBObjectMap::_Header& h)
1372 out << "seq=" << h.seq << " parent=" << h.parent
1373 << " num_children=" << h.num_children
1374 << " ghobject=" << h.oid;
1378 int DBObjectMap::rename(const ghobject_t &from,
1379 const ghobject_t &to,
1380 const SequencerPosition *spos)
1385 MapHeaderLock _l1(this, std::min(from, to));
1386 MapHeaderLock _l2(this, std::max(from, to));
1387 MapHeaderLock *lsource, *ltarget;
1396 KeyValueDB::Transaction t = db->get_transaction();
1398 Header destination = lookup_map_header(*ltarget, to);
1400 if (check_spos(to, destination, spos))
1402 destination->num_children--;
1403 remove_map_header(*ltarget, to, destination, t);
1404 _clear(destination, t);
1408 Header hdr = lookup_map_header(*lsource, from);
1410 return db->submit_transaction(t);
1412 remove_map_header(*lsource, from, hdr, t);
1414 set_map_header(*ltarget, to, *hdr, t);
1416 return db->submit_transaction(t);