// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- #include "FileStoreTracker.h" #include #include #include #include "include/Context.h" #include "common/Mutex.h" class OnApplied : public Context { FileStoreTracker *tracker; list, uint64_t> > in_flight; public: OnApplied(FileStoreTracker *tracker, list, uint64_t> > in_flight) : tracker(tracker), in_flight(in_flight) {} void finish(int r) override { for (list, uint64_t> >::iterator i = in_flight.begin(); i != in_flight.end(); ++i) { tracker->applied(i->first, i->second); } } }; class OnCommitted : public Context { FileStoreTracker *tracker; list, uint64_t> > in_flight; public: OnCommitted(FileStoreTracker *tracker, list, uint64_t> > in_flight) : tracker(tracker), in_flight(in_flight) {} void finish(int r) override { for (list, uint64_t> >::iterator i = in_flight.begin(); i != in_flight.end(); ++i) { tracker->committed(i->first, i->second); } } }; int FileStoreTracker::init() { set to_get; to_get.insert("STATUS"); map got; db->get("STATUS", to_get, &got); restart_seq = 0; if (!got.empty()) { bufferlist::iterator bp = got.begin()->second.begin(); ::decode(restart_seq, bp); } ++restart_seq; KeyValueDB::Transaction t = db->get_transaction(); got.clear(); ::encode(restart_seq, got["STATUS"]); t->set("STATUS", got); db->submit_transaction(t); return 0; } void FileStoreTracker::submit_transaction(Transaction &t) { list, uint64_t> > in_flight; OutTransaction out; out.t = new ObjectStore::Transaction; out.in_flight = &in_flight; for (list::iterator i = t.ops.begin(); i != t.ops.end(); ++i) { (**i)(this, &out); } store->queue_transaction( 0, std::move(*out.t), new OnApplied(this, in_flight), new OnCommitted(this, in_flight)); delete out.t; } void FileStoreTracker::write(const pair &obj, OutTransaction *out) { Mutex::Locker l(lock); std::cerr << "Writing " << obj << std::endl; ObjectContents contents = get_current_content(obj); uint64_t offset = rand() % (SIZE/2); uint64_t len = rand() % (SIZE/2); if (!len) len = 10; contents.write(rand(), offset, len); bufferlist to_write; ObjectContents::Iterator iter = contents.get_iterator(); iter.seek_to(offset); for (uint64_t i = offset; i < offset + len; ++i, ++iter) { assert(iter.valid()); to_write.append(*iter); } out->t->write(coll_t(obj.first), ghobject_t(hobject_t(sobject_t(obj.second, CEPH_NOSNAP))), offset, len, to_write); out->in_flight->push_back(make_pair(obj, set_content(obj, contents))); } void FileStoreTracker::remove(const pair &obj, OutTransaction *out) { std::cerr << "Deleting " << obj << std::endl; Mutex::Locker l(lock); ObjectContents old_contents = get_current_content(obj); if (!old_contents.exists()) return; out->t->remove(coll_t(obj.first), ghobject_t(hobject_t(sobject_t(obj.second, CEPH_NOSNAP)))); ObjectContents contents; out->in_flight->push_back(make_pair(obj, set_content(obj, contents))); } void FileStoreTracker::clone_range(const pair &from, const pair &to, OutTransaction *out) { Mutex::Locker l(lock); std::cerr << "CloningRange " << from << " to " << to << std::endl; assert(from.first == to.first); ObjectContents from_contents = get_current_content(from); ObjectContents to_contents = get_current_content(to); if (!from_contents.exists()) { return; } if (from.second == to.second) { return; } uint64_t new_size = from_contents.size(); interval_set interval_to_clone; uint64_t offset = rand() % (new_size/2); uint64_t len = rand() % (new_size/2); if (!len) len = 10; interval_to_clone.insert(offset, len); to_contents.clone_range(from_contents, interval_to_clone); out->t->clone_range(coll_t(from.first), ghobject_t(hobject_t(sobject_t(from.second, CEPH_NOSNAP))), ghobject_t(hobject_t(sobject_t(to.second, CEPH_NOSNAP))), offset, len, offset); out->in_flight->push_back(make_pair(to, set_content(to, to_contents))); } void FileStoreTracker::clone(const pair &from, const pair &to, OutTransaction *out) { Mutex::Locker l(lock); std::cerr << "Cloning " << from << " to " << to << std::endl; assert(from.first == to.first); if (from.second == to.second) { return; } ObjectContents from_contents = get_current_content(from); ObjectContents to_contents = get_current_content(to); if (!from_contents.exists()) { return; } if (to_contents.exists()) out->t->remove(coll_t(to.first), ghobject_t(hobject_t(sobject_t(to.second, CEPH_NOSNAP)))); out->t->clone(coll_t(from.first), ghobject_t(hobject_t(sobject_t(from.second, CEPH_NOSNAP))), ghobject_t(hobject_t(sobject_t(to.second, CEPH_NOSNAP)))); out->in_flight->push_back(make_pair(to, set_content(to, from_contents))); } string obj_to_prefix(const pair &obj) { string sep; sep.push_back('^'); return obj.first.to_str() + sep + obj.second + "_CONTENTS_"; } string obj_to_meta_prefix(const pair &obj) { string sep; sep.push_back('^'); return obj.first.to_str() + sep + obj.second; } string seq_to_key(uint64_t seq) { char buf[50]; snprintf(buf, sizeof(buf), "%*llu", 20, (unsigned long long int)seq); return string(buf); } struct ObjStatus { uint64_t last_applied; uint64_t last_committed; uint64_t restart_seq; ObjStatus() : last_applied(0), last_committed(0), restart_seq(0) {} uint64_t get_last_applied(uint64_t seq) const { if (seq > restart_seq) return last_committed; else return last_applied; } void set_last_applied(uint64_t _last_applied, uint64_t seq) { last_applied = _last_applied; restart_seq = seq; } uint64_t trim_to() const { return last_applied < last_committed ? last_applied : last_committed; } }; void encode(const ObjStatus &obj, bufferlist &bl) { ::encode(obj.last_applied, bl); ::encode(obj.last_committed, bl); ::encode(obj.restart_seq, bl); } void decode(ObjStatus &obj, bufferlist::iterator &bl) { ::decode(obj.last_applied, bl); ::decode(obj.last_committed, bl); ::decode(obj.restart_seq, bl); } ObjStatus get_obj_status(const pair &obj, KeyValueDB *db) { set to_get; to_get.insert("META"); map got; db->get(obj_to_meta_prefix(obj), to_get, &got); ObjStatus retval; if (!got.empty()) { bufferlist::iterator bp = got.begin()->second.begin(); ::decode(retval, bp); } return retval; } void set_obj_status(const pair &obj, const ObjStatus &status, KeyValueDB::Transaction t) { map to_set; ::encode(status, to_set["META"]); t->set(obj_to_meta_prefix(obj), to_set); } void _clean_forward(const pair &obj, uint64_t last_valid, KeyValueDB *db) { KeyValueDB::Transaction t = db->get_transaction(); KeyValueDB::Iterator i = db->get_iterator(obj_to_prefix(obj)); set to_remove; i->upper_bound(seq_to_key(last_valid)); for (; i->valid(); i->next()) { to_remove.insert(i->key()); } t->rmkeys(obj_to_prefix(obj), to_remove); db->submit_transaction(t); } void FileStoreTracker::verify(const coll_t &coll, const string &obj, bool on_start) { Mutex::Locker l(lock); std::cerr << "Verifying " << make_pair(coll, obj) << std::endl; pair valid_reads = get_valid_reads(make_pair(coll, obj)); std::cerr << "valid_reads is " << valid_reads << std::endl; bufferlist contents; int r = store->read(coll_t(coll), ghobject_t(hobject_t(sobject_t(obj, CEPH_NOSNAP))), 0, 2*SIZE, contents); std::cerr << "exists: " << r << std::endl; for (uint64_t i = valid_reads.first; i < valid_reads.second; ++i) { ObjectContents old_contents = get_content(make_pair(coll, obj), i); std::cerr << "old_contents exists " << old_contents.exists() << std::endl; if (!old_contents.exists() && (r == -ENOENT)) return; if (old_contents.exists() && (r == -ENOENT)) continue; if (!old_contents.exists() && (r != -ENOENT)) continue; if (contents.length() != old_contents.size()) { std::cerr << "old_contents.size() is " << old_contents.size() << std::endl; continue; } bufferlist::iterator bp = contents.begin(); ObjectContents::Iterator iter = old_contents.get_iterator(); iter.seek_to_first(); bool matches = true; uint64_t pos = 0; for (; !bp.end() && iter.valid(); ++iter, ++bp, ++pos) { if (*iter != *bp) { std::cerr << "does not match at pos " << pos << std::endl; matches = false; break; } } if (matches) { if (on_start) _clean_forward(make_pair(coll, obj), i, db); return; } } std::cerr << "Verifying " << make_pair(coll, obj) << " failed " << std::endl; ceph_abort(); } ObjectContents FileStoreTracker::get_current_content( const pair &obj) { KeyValueDB::Iterator iter = db->get_iterator( obj_to_prefix(obj)); iter->seek_to_last(); if (iter->valid()) { bufferlist bl = iter->value(); bufferlist::iterator bp = bl.begin(); pair val; ::decode(val, bp); assert(seq_to_key(val.first) == iter->key()); bp = val.second.begin(); return ObjectContents(bp); } return ObjectContents(); } ObjectContents FileStoreTracker::get_content( const pair &obj, uint64_t version) { set to_get; map got; to_get.insert(seq_to_key(version)); db->get(obj_to_prefix(obj), to_get, &got); if (got.empty()) return ObjectContents(); pair val; bufferlist::iterator bp = got.begin()->second.begin(); ::decode(val, bp); bp = val.second.begin(); assert(val.first == version); return ObjectContents(bp); } pair FileStoreTracker::get_valid_reads( const pair &obj) { pair bounds = make_pair(0,1); KeyValueDB::Iterator iter = db->get_iterator( obj_to_prefix(obj)); iter->seek_to_last(); if (iter->valid()) { pair val; bufferlist bl = iter->value(); bufferlist::iterator bp = bl.begin(); ::decode(val, bp); bounds.second = val.first + 1; } ObjStatus obj_status = get_obj_status(obj, db); bounds.first = obj_status.get_last_applied(restart_seq); return bounds; } void clear_obsolete(const pair &obj, const ObjStatus &status, KeyValueDB *db, KeyValueDB::Transaction t) { KeyValueDB::Iterator iter = db->get_iterator(obj_to_prefix(obj)); set to_remove; iter->seek_to_first(); for (; iter->valid() && iter->key() < seq_to_key(status.trim_to()); iter->next()) to_remove.insert(iter->key()); t->rmkeys(obj_to_prefix(obj), to_remove); } void FileStoreTracker::committed(const pair &obj, uint64_t seq) { Mutex::Locker l(lock); ObjStatus status = get_obj_status(obj, db); assert(status.last_committed < seq); status.last_committed = seq; KeyValueDB::Transaction t = db->get_transaction(); clear_obsolete(obj, status, db, t); set_obj_status(obj, status, t); db->submit_transaction(t); } void FileStoreTracker::applied(const pair &obj, uint64_t seq) { Mutex::Locker l(lock); std::cerr << "Applied " << obj << " version " << seq << std::endl; ObjStatus status = get_obj_status(obj, db); assert(status.last_applied < seq); status.set_last_applied(seq, restart_seq); KeyValueDB::Transaction t = db->get_transaction(); clear_obsolete(obj, status, db, t); set_obj_status(obj, status, t); db->submit_transaction(t); } uint64_t FileStoreTracker::set_content(const pair &obj, ObjectContents &content) { KeyValueDB::Transaction t = db->get_transaction(); KeyValueDB::Iterator iter = db->get_iterator( obj_to_prefix(obj)); iter->seek_to_last(); uint64_t most_recent = 0; if (iter->valid()) { pair val; bufferlist bl = iter->value(); bufferlist::iterator bp = bl.begin(); ::decode(val, bp); most_recent = val.first; } bufferlist buf_content; content.encode(buf_content); map to_set; ::encode(make_pair(most_recent + 1, buf_content), to_set[seq_to_key(most_recent + 1)]); t->set(obj_to_prefix(obj), to_set); db->submit_transaction(t); return most_recent + 1; }