// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- // vim: ts=8 sw=2 smarttab /* * Ceph - scalable distributed file system * * Copyright (C) 2004-2006 Sage Weil * Copyright (C) 2013 Cloudwatt * * Author: Loic Dachary * * This is free software; you can redistribute it and/or * modify it under the terms of the GNU Lesser General Public * License version 2.1, as published by the Free Software * Foundation. See file COPYING. * */ #include "PGLog.h" #include "include/unordered_map.h" #include "common/ceph_context.h" #define dout_context cct #define dout_subsys ceph_subsys_osd #undef dout_prefix #define dout_prefix _prefix(_dout, this) static ostream& _prefix(std::ostream *_dout, const PGLog *pglog) { return *_dout << pglog->gen_prefix(); } //////////////////// PGLog::IndexedLog //////////////////// void PGLog::IndexedLog::split_out_child( pg_t child_pgid, unsigned split_bits, PGLog::IndexedLog *target) { unindex(); *target = IndexedLog(pg_log_t::split_out_child(child_pgid, split_bits)); index(); target->index(); reset_rollback_info_trimmed_to_riter(); } void PGLog::IndexedLog::trim( CephContext* cct, eversion_t s, set *trimmed, set* trimmed_dups, eversion_t *write_from_dups) { if (complete_to != log.end() && complete_to->version <= s) { generic_dout(0) << " bad trim to " << s << " when complete_to is " << complete_to->version << " on " << *this << dendl; } assert(s <= can_rollback_to); auto earliest_dup_version = log.rbegin()->version.version < cct->_conf->osd_pg_log_dups_tracked ? 0u : log.rbegin()->version.version - cct->_conf->osd_pg_log_dups_tracked; while (!log.empty()) { const pg_log_entry_t &e = *log.begin(); if (e.version > s) break; generic_dout(20) << "trim " << e << dendl; if (trimmed) trimmed->insert(e.version); unindex(e); // remove from index, // add to dup list generic_dout(20) << "earliest_dup_version = " << earliest_dup_version << dendl; if (e.version.version >= earliest_dup_version) { if (write_from_dups != nullptr && *write_from_dups > e.version) { generic_dout(20) << "updating write_from_dups from " << *write_from_dups << " to " << e.version << dendl; *write_from_dups = e.version; } dups.push_back(pg_log_dup_t(e)); index(dups.back()); for (const auto& extra : e.extra_reqids) { // note: extras have the same version as outer op dups.push_back(pg_log_dup_t(e.version, extra.second, extra.first, e.return_code)); index(dups.back()); } } if (rollback_info_trimmed_to_riter == log.rend() || e.version == rollback_info_trimmed_to_riter->version) { log.pop_front(); rollback_info_trimmed_to_riter = log.rend(); } else { log.pop_front(); } } while (!dups.empty()) { const auto& e = *dups.begin(); if (e.version.version >= earliest_dup_version) break; generic_dout(20) << "trim dup " << e << dendl; if (trimmed_dups) trimmed_dups->insert(e.get_key_name()); if (indexed_data & PGLOG_INDEXED_DUPS) { dup_index.erase(e.reqid); } dups.pop_front(); } // raise tail? if (tail < s) tail = s; } ostream& PGLog::IndexedLog::print(ostream& out) const { out << *this << std::endl; for (list::const_iterator p = log.begin(); p != log.end(); ++p) { out << *p << " " << (logged_object(p->soid) ? "indexed" : "NOT INDEXED") << std::endl; assert(!p->reqid_is_indexed() || logged_req(p->reqid)); } for (list::const_iterator p = dups.begin(); p != dups.end(); ++p) { out << *p << std::endl; } return out; } //////////////////// PGLog //////////////////// void PGLog::reset_backfill() { missing.clear(); } void PGLog::clear() { missing.clear(); log.clear(); log_keys_debug.clear(); undirty(); } void PGLog::clear_info_log( spg_t pgid, ObjectStore::Transaction *t) { coll_t coll(pgid); t->remove(coll, pgid.make_pgmeta_oid()); } void PGLog::trim( eversion_t trim_to, pg_info_t &info) { // trim? if (trim_to > log.tail) { // We shouldn't be trimming the log past last_complete assert(trim_to <= info.last_complete); dout(10) << "trim " << log << " to " << trim_to << dendl; log.trim(cct, trim_to, &trimmed, &trimmed_dups, &write_from_dups); info.log_tail = log.tail; } } void PGLog::proc_replica_log( pg_info_t &oinfo, const pg_log_t &olog, pg_missing_t& omissing, pg_shard_t from) const { dout(10) << "proc_replica_log for osd." << from << ": " << oinfo << " " << olog << " " << omissing << dendl; if (olog.head < log.tail) { dout(10) << __func__ << ": osd." << from << " does not overlap, not looking " << "for divergent objects" << dendl; return; } if (olog.head == log.head) { dout(10) << __func__ << ": osd." << from << " same log head, not looking " << "for divergent objects" << dendl; return; } assert(olog.head >= log.tail); /* basically what we're doing here is rewinding the remote log, dropping divergent entries, until we find something that matches our master log. we then reset last_update to reflect the new point up to which missing is accurate. later, in activate(), missing will get wound forward again and we will send the peer enough log to arrive at the same state. */ for (map::const_iterator i = omissing.get_items().begin(); i != omissing.get_items().end(); ++i) { dout(20) << " before missing " << i->first << " need " << i->second.need << " have " << i->second.have << dendl; } list::const_reverse_iterator first_non_divergent = log.log.rbegin(); while (1) { if (first_non_divergent == log.log.rend()) break; if (first_non_divergent->version <= olog.head) { dout(20) << "merge_log point (usually last shared) is " << *first_non_divergent << dendl; break; } ++first_non_divergent; } /* Because olog.head >= log.tail, we know that both pgs must at least have * the event represented by log.tail. Similarly, because log.head >= olog.tail, * we know that the even represented by olog.tail must be common to both logs. * Furthermore, the event represented by a log tail was necessarily trimmed, * thus neither olog.tail nor log.tail can be divergent. It's * possible that olog/log contain no actual events between olog.head and * MAX(log.tail, olog.tail), however, since they might have been split out. * Thus, if we cannot find an event e such that * log.tail <= e.version <= log.head, the last_update must actually be * MAX(log.tail, olog.tail). */ eversion_t limit = MAX(olog.tail, log.tail); eversion_t lu = (first_non_divergent == log.log.rend() || first_non_divergent->version < limit) ? limit : first_non_divergent->version; IndexedLog folog(olog); auto divergent = folog.rewind_from_head(lu); _merge_divergent_entries( folog, divergent, oinfo, olog.get_can_rollback_to(), omissing, 0, this); if (lu < oinfo.last_update) { dout(10) << " peer osd." << from << " last_update now " << lu << dendl; oinfo.last_update = lu; } if (omissing.have_missing()) { eversion_t first_missing = omissing.get_items().at(omissing.get_rmissing().begin()->second).need; oinfo.last_complete = eversion_t(); list::const_iterator i = olog.log.begin(); for (; i != olog.log.end(); ++i) { if (i->version < first_missing) oinfo.last_complete = i->version; else break; } } else { oinfo.last_complete = oinfo.last_update; } } // proc_replica_log /** * rewind divergent entries at the head of the log * * This rewinds entries off the head of our log that are divergent. * This is used by replicas during activation. * * @param newhead new head to rewind to */ void PGLog::rewind_divergent_log(eversion_t newhead, pg_info_t &info, LogEntryHandler *rollbacker, bool &dirty_info, bool &dirty_big_info) { dout(10) << "rewind_divergent_log truncate divergent future " << newhead << dendl; if (info.last_complete > newhead) info.last_complete = newhead; auto divergent = log.rewind_from_head(newhead); if (!divergent.empty()) { mark_dirty_from(divergent.front().version); } for (auto &&entry: divergent) { dout(10) << "rewind_divergent_log future divergent " << entry << dendl; } info.last_update = newhead; _merge_divergent_entries( log, divergent, info, log.get_can_rollback_to(), missing, rollbacker, this); dirty_info = true; dirty_big_info = true; } void PGLog::merge_log(pg_info_t &oinfo, pg_log_t &olog, pg_shard_t fromosd, pg_info_t &info, LogEntryHandler *rollbacker, bool &dirty_info, bool &dirty_big_info) { dout(10) << "merge_log " << olog << " from osd." << fromosd << " into " << log << dendl; // Check preconditions // If our log is empty, the incoming log needs to have not been trimmed. assert(!log.null() || olog.tail == eversion_t()); // The logs must overlap. assert(log.head >= olog.tail && olog.head >= log.tail); for (map::const_iterator i = missing.get_items().begin(); i != missing.get_items().end(); ++i) { dout(20) << "pg_missing_t sobject: " << i->first << dendl; } bool changed = false; // extend on tail? // this is just filling in history. it does not affect our // missing set, as that should already be consistent with our // current log. eversion_t orig_tail = log.tail; if (olog.tail < log.tail) { dout(10) << "merge_log extending tail to " << olog.tail << dendl; list::iterator from = olog.log.begin(); list::iterator to; eversion_t last; for (to = from; to != olog.log.end(); ++to) { if (to->version > log.tail) break; log.index(*to); dout(15) << *to << dendl; last = to->version; } mark_dirty_to(last); // splice into our log. log.log.splice(log.log.begin(), olog.log, from, to); info.log_tail = log.tail = olog.tail; changed = true; } if (oinfo.stats.reported_seq < info.stats.reported_seq || // make sure reported always increases oinfo.stats.reported_epoch < info.stats.reported_epoch) { oinfo.stats.reported_seq = info.stats.reported_seq; oinfo.stats.reported_epoch = info.stats.reported_epoch; } if (info.last_backfill.is_max()) info.stats = oinfo.stats; info.hit_set = oinfo.hit_set; // do we have divergent entries to throw out? if (olog.head < log.head) { rewind_divergent_log(olog.head, info, rollbacker, dirty_info, dirty_big_info); changed = true; } // extend on head? if (olog.head > log.head) { dout(10) << "merge_log extending head to " << olog.head << dendl; // find start point in olog list::iterator to = olog.log.end(); list::iterator from = olog.log.end(); eversion_t lower_bound = MAX(olog.tail, orig_tail); while (1) { if (from == olog.log.begin()) break; --from; dout(20) << " ? " << *from << dendl; if (from->version <= log.head) { lower_bound = MAX(lower_bound, from->version); ++from; break; } } dout(20) << "merge_log cut point (usually last shared) is " << lower_bound << dendl; mark_dirty_from(lower_bound); auto divergent = log.rewind_from_head(lower_bound); // move aside divergent items for (auto &&oe: divergent) { dout(10) << "merge_log divergent " << oe << dendl; } log.roll_forward_to(log.head, rollbacker); mempool::osd_pglog::list new_entries; new_entries.splice(new_entries.end(), olog.log, from, to); append_log_entries_update_missing( info.last_backfill, info.last_backfill_bitwise, new_entries, false, &log, missing, rollbacker, this); _merge_divergent_entries( log, divergent, info, log.get_can_rollback_to(), missing, rollbacker, this); info.last_update = log.head = olog.head; // We cannot rollback into the new log entries log.skip_can_rollback_to_to_head(); info.last_user_version = oinfo.last_user_version; info.purged_snaps = oinfo.purged_snaps; changed = true; } // now handle dups if (merge_log_dups(olog)) { changed = true; } dout(10) << "merge_log result " << log << " " << missing << " changed=" << changed << dendl; if (changed) { dirty_info = true; dirty_big_info = true; } } // returns true if any changes were made to log.dups bool PGLog::merge_log_dups(const pg_log_t& olog) { bool changed = false; if (!olog.dups.empty()) { if (log.dups.empty()) { dout(10) << "merge_log copying olog dups to log " << olog.dups.front().version << " to " << olog.dups.back().version << dendl; changed = true; dirty_from_dups = eversion_t(); dirty_to_dups = eversion_t::max(); // since our log.dups is empty just copy them for (const auto& i : olog.dups) { log.dups.push_back(i); log.index(log.dups.back()); } } else { // since our log.dups is not empty try to extend on each end if (olog.dups.back().version > log.dups.back().version) { // extend the dups's tail (i.e., newer dups) dout(10) << "merge_log extending dups tail to " << olog.dups.back().version << dendl; changed = true; auto log_tail_version = log.dups.back().version; auto insert_cursor = log.dups.end(); eversion_t last_shared = eversion_t::max(); for (auto i = olog.dups.crbegin(); i != olog.dups.crend(); ++i) { if (i->version <= log_tail_version) break; log.dups.insert(insert_cursor, *i); last_shared = i->version; auto prev = insert_cursor; --prev; // be sure to pass reference of copy in log.dups log.index(*prev); --insert_cursor; // make sure we insert in reverse order } mark_dirty_from_dups(last_shared); } if (olog.dups.front().version < log.dups.front().version) { // extend the dups's head (i.e., older dups) dout(10) << "merge_log extending dups head to " << olog.dups.front().version << dendl; changed = true; eversion_t last; auto insert_cursor = log.dups.begin(); for (auto i = olog.dups.cbegin(); i != olog.dups.cend(); ++i) { if (i->version >= insert_cursor->version) break; log.dups.insert(insert_cursor, *i); last = i->version; auto prev = insert_cursor; --prev; // be sure to pass address of copy in log.dups log.index(*prev); } mark_dirty_to_dups(last); } } } // remove any dup entries that overlap with pglog if (!log.dups.empty() && log.dups.back().version >= log.tail) { dout(10) << "merge_log removed dups overlapping log entries [" << log.tail << "," << log.dups.back().version << "]" << dendl; changed = true; while (!log.dups.empty() && log.dups.back().version >= log.tail) { log.unindex(log.dups.back()); mark_dirty_from_dups(log.dups.back().version); log.dups.pop_back(); } } return changed; } void PGLog::check() { if (!pg_log_debug) return; if (log.log.size() != log_keys_debug.size()) { derr << "log.log.size() != log_keys_debug.size()" << dendl; derr << "actual log:" << dendl; for (list::iterator i = log.log.begin(); i != log.log.end(); ++i) { derr << " " << *i << dendl; } derr << "log_keys_debug:" << dendl; for (set::const_iterator i = log_keys_debug.begin(); i != log_keys_debug.end(); ++i) { derr << " " << *i << dendl; } } assert(log.log.size() == log_keys_debug.size()); for (list::iterator i = log.log.begin(); i != log.log.end(); ++i) { assert(log_keys_debug.count(i->get_key_name())); } } // non-static void PGLog::write_log_and_missing( ObjectStore::Transaction& t, map *km, const coll_t& coll, const ghobject_t &log_oid, bool require_rollback) { if (is_dirty()) { dout(5) << "write_log_and_missing with: " << "dirty_to: " << dirty_to << ", dirty_from: " << dirty_from << ", writeout_from: " << writeout_from << ", trimmed: " << trimmed << ", trimmed_dups: " << trimmed_dups << ", clear_divergent_priors: " << clear_divergent_priors << dendl; _write_log_and_missing( t, km, log, coll, log_oid, dirty_to, dirty_from, writeout_from, trimmed, trimmed_dups, missing, !touched_log, require_rollback, clear_divergent_priors, dirty_to_dups, dirty_from_dups, write_from_dups, &rebuilt_missing_with_deletes, (pg_log_debug ? &log_keys_debug : nullptr)); undirty(); } else { dout(10) << "log is not dirty" << dendl; } } // static void PGLog::write_log_and_missing_wo_missing( ObjectStore::Transaction& t, map *km, pg_log_t &log, const coll_t& coll, const ghobject_t &log_oid, map &divergent_priors, bool require_rollback ) { _write_log_and_missing_wo_missing( t, km, log, coll, log_oid, divergent_priors, eversion_t::max(), eversion_t(), eversion_t(), set(), set(), true, true, require_rollback, eversion_t::max(), eversion_t(), eversion_t(), nullptr); } // static void PGLog::write_log_and_missing( ObjectStore::Transaction& t, map *km, pg_log_t &log, const coll_t& coll, const ghobject_t &log_oid, const pg_missing_tracker_t &missing, bool require_rollback, bool *rebuilt_missing_with_deletes) { _write_log_and_missing( t, km, log, coll, log_oid, eversion_t::max(), eversion_t(), eversion_t(), set(), set(), missing, true, require_rollback, false, eversion_t::max(), eversion_t(), eversion_t(), rebuilt_missing_with_deletes, nullptr); } // static void PGLog::_write_log_and_missing_wo_missing( ObjectStore::Transaction& t, map *km, pg_log_t &log, const coll_t& coll, const ghobject_t &log_oid, map &divergent_priors, eversion_t dirty_to, eversion_t dirty_from, eversion_t writeout_from, const set &trimmed, const set &trimmed_dups, bool dirty_divergent_priors, bool touch_log, bool require_rollback, eversion_t dirty_to_dups, eversion_t dirty_from_dups, eversion_t write_from_dups, set *log_keys_debug ) { set to_remove(trimmed_dups); for (set::const_iterator i = trimmed.begin(); i != trimmed.end(); ++i) { to_remove.insert(i->get_key_name()); if (log_keys_debug) { assert(log_keys_debug->count(i->get_key_name())); log_keys_debug->erase(i->get_key_name()); } } // dout(10) << "write_log_and_missing, clearing up to " << dirty_to << dendl; if (touch_log) t.touch(coll, log_oid); if (dirty_to != eversion_t()) { t.omap_rmkeyrange( coll, log_oid, eversion_t().get_key_name(), dirty_to.get_key_name()); clear_up_to(log_keys_debug, dirty_to.get_key_name()); } if (dirty_to != eversion_t::max() && dirty_from != eversion_t::max()) { // dout(10) << "write_log_and_missing, clearing from " << dirty_from << dendl; t.omap_rmkeyrange( coll, log_oid, dirty_from.get_key_name(), eversion_t::max().get_key_name()); clear_after(log_keys_debug, dirty_from.get_key_name()); } for (list::iterator p = log.log.begin(); p != log.log.end() && p->version <= dirty_to; ++p) { bufferlist bl(sizeof(*p) * 2); p->encode_with_checksum(bl); (*km)[p->get_key_name()].claim(bl); } for (list::reverse_iterator p = log.log.rbegin(); p != log.log.rend() && (p->version >= dirty_from || p->version >= writeout_from) && p->version >= dirty_to; ++p) { bufferlist bl(sizeof(*p) * 2); p->encode_with_checksum(bl); (*km)[p->get_key_name()].claim(bl); } if (log_keys_debug) { for (map::iterator i = (*km).begin(); i != (*km).end(); ++i) { if (i->first[0] == '_') continue; assert(!log_keys_debug->count(i->first)); log_keys_debug->insert(i->first); } } // process dups after log_keys_debug is filled, so dups do not // end up in that set if (dirty_to_dups != eversion_t()) { pg_log_dup_t min, dirty_to_dup; dirty_to_dup.version = dirty_to_dups; t.omap_rmkeyrange( coll, log_oid, min.get_key_name(), dirty_to_dup.get_key_name()); } if (dirty_to_dups != eversion_t::max() && dirty_from_dups != eversion_t::max()) { pg_log_dup_t max, dirty_from_dup; max.version = eversion_t::max(); dirty_from_dup.version = dirty_from_dups; t.omap_rmkeyrange( coll, log_oid, dirty_from_dup.get_key_name(), max.get_key_name()); } for (const auto& entry : log.dups) { if (entry.version > dirty_to_dups) break; bufferlist bl; ::encode(entry, bl); (*km)[entry.get_key_name()].claim(bl); } for (list::reverse_iterator p = log.dups.rbegin(); p != log.dups.rend() && (p->version >= dirty_from_dups || p->version >= write_from_dups) && p->version >= dirty_to_dups; ++p) { bufferlist bl; ::encode(*p, bl); (*km)[p->get_key_name()].claim(bl); } if (dirty_divergent_priors) { //dout(10) << "write_log_and_missing: writing divergent_priors" << dendl; ::encode(divergent_priors, (*km)["divergent_priors"]); } if (require_rollback) { ::encode( log.get_can_rollback_to(), (*km)["can_rollback_to"]); ::encode( log.get_rollback_info_trimmed_to(), (*km)["rollback_info_trimmed_to"]); } if (!to_remove.empty()) t.omap_rmkeys(coll, log_oid, to_remove); } // static void PGLog::_write_log_and_missing( ObjectStore::Transaction& t, map* km, pg_log_t &log, const coll_t& coll, const ghobject_t &log_oid, eversion_t dirty_to, eversion_t dirty_from, eversion_t writeout_from, const set &trimmed, const set &trimmed_dups, const pg_missing_tracker_t &missing, bool touch_log, bool require_rollback, bool clear_divergent_priors, eversion_t dirty_to_dups, eversion_t dirty_from_dups, eversion_t write_from_dups, bool *rebuilt_missing_with_deletes, // in/out param set *log_keys_debug ) { set to_remove(trimmed_dups); for (set::const_iterator i = trimmed.begin(); i != trimmed.end(); ++i) { to_remove.insert(i->get_key_name()); if (log_keys_debug) { assert(log_keys_debug->count(i->get_key_name())); log_keys_debug->erase(i->get_key_name()); } } if (touch_log) t.touch(coll, log_oid); if (dirty_to != eversion_t()) { t.omap_rmkeyrange( coll, log_oid, eversion_t().get_key_name(), dirty_to.get_key_name()); clear_up_to(log_keys_debug, dirty_to.get_key_name()); } if (dirty_to != eversion_t::max() && dirty_from != eversion_t::max()) { // dout(10) << "write_log_and_missing, clearing from " << dirty_from << dendl; t.omap_rmkeyrange( coll, log_oid, dirty_from.get_key_name(), eversion_t::max().get_key_name()); clear_after(log_keys_debug, dirty_from.get_key_name()); } for (list::iterator p = log.log.begin(); p != log.log.end() && p->version <= dirty_to; ++p) { bufferlist bl(sizeof(*p) * 2); p->encode_with_checksum(bl); (*km)[p->get_key_name()].claim(bl); } for (list::reverse_iterator p = log.log.rbegin(); p != log.log.rend() && (p->version >= dirty_from || p->version >= writeout_from) && p->version >= dirty_to; ++p) { bufferlist bl(sizeof(*p) * 2); p->encode_with_checksum(bl); (*km)[p->get_key_name()].claim(bl); } if (log_keys_debug) { for (map::iterator i = (*km).begin(); i != (*km).end(); ++i) { if (i->first[0] == '_') continue; assert(!log_keys_debug->count(i->first)); log_keys_debug->insert(i->first); } } // process dups after log_keys_debug is filled, so dups do not // end up in that set if (dirty_to_dups != eversion_t()) { pg_log_dup_t min, dirty_to_dup; dirty_to_dup.version = dirty_to_dups; t.omap_rmkeyrange( coll, log_oid, min.get_key_name(), dirty_to_dup.get_key_name()); } if (dirty_to_dups != eversion_t::max() && dirty_from_dups != eversion_t::max()) { pg_log_dup_t max, dirty_from_dup; max.version = eversion_t::max(); dirty_from_dup.version = dirty_from_dups; t.omap_rmkeyrange( coll, log_oid, dirty_from_dup.get_key_name(), max.get_key_name()); } for (const auto& entry : log.dups) { if (entry.version > dirty_to_dups) break; bufferlist bl; ::encode(entry, bl); (*km)[entry.get_key_name()].claim(bl); } for (list::reverse_iterator p = log.dups.rbegin(); p != log.dups.rend() && (p->version >= dirty_from_dups || p->version >= write_from_dups) && p->version >= dirty_to_dups; ++p) { bufferlist bl; ::encode(*p, bl); (*km)[p->get_key_name()].claim(bl); } if (clear_divergent_priors) { //dout(10) << "write_log_and_missing: writing divergent_priors" << dendl; to_remove.insert("divergent_priors"); } // since we encode individual missing items instead of a whole // missing set, we need another key to store this bit of state if (*rebuilt_missing_with_deletes) { (*km)["may_include_deletes_in_missing"] = bufferlist(); *rebuilt_missing_with_deletes = false; } missing.get_changed( [&](const hobject_t &obj) { string key = string("missing/") + obj.to_str(); pg_missing_item item; if (!missing.is_missing(obj, &item)) { to_remove.insert(key); } else { uint64_t features = missing.may_include_deletes ? CEPH_FEATURE_OSD_RECOVERY_DELETES : 0; ::encode(make_pair(obj, item), (*km)[key], features); } }); if (require_rollback) { ::encode( log.get_can_rollback_to(), (*km)["can_rollback_to"]); ::encode( log.get_rollback_info_trimmed_to(), (*km)["rollback_info_trimmed_to"]); } if (!to_remove.empty()) t.omap_rmkeys(coll, log_oid, to_remove); } void PGLog::rebuild_missing_set_with_deletes(ObjectStore *store, coll_t pg_coll, const pg_info_t &info) { // save entries not generated from the current log (e.g. added due // to repair, EIO handling, or divergent_priors). map extra_missing; for (const auto& p : missing.get_items()) { if (!log.logged_object(p.first)) { dout(20) << __func__ << " extra missing entry: " << p.first << " " << p.second << dendl; extra_missing[p.first] = p.second; } } missing.clear(); missing.may_include_deletes = true; // go through the log and add items that are not present or older // versions on disk, just as if we were reading the log + metadata // off disk originally set did; for (list::reverse_iterator i = log.log.rbegin(); i != log.log.rend(); ++i) { if (i->version <= info.last_complete) break; if (i->soid > info.last_backfill || i->is_error() || did.find(i->soid) != did.end()) continue; did.insert(i->soid); bufferlist bv; int r = store->getattr( pg_coll, ghobject_t(i->soid, ghobject_t::NO_GEN, info.pgid.shard), OI_ATTR, bv); dout(20) << __func__ << " check for log entry: " << *i << " = " << r << dendl; if (r >= 0) { object_info_t oi(bv); dout(20) << __func__ << " store version = " << oi.version << dendl; if (oi.version < i->version) { missing.add(i->soid, i->version, oi.version, i->is_delete()); } } else { missing.add(i->soid, i->version, eversion_t(), i->is_delete()); } } for (const auto& p : extra_missing) { missing.add(p.first, p.second.need, p.second.have, p.second.is_delete()); } rebuilt_missing_with_deletes = true; }