1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
4 * Ceph - scalable distributed file system
6 * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
7 * Copyright (C) 2013 Cloudwatt <libre.licensing@cloudwatt.com>
9 * Author: Loic Dachary <loic@dachary.org>
11 * This is free software; you can redistribute it and/or
12 * modify it under the terms of the GNU Lesser General Public
13 * License version 2.1, as published by the Free Software
14 * Foundation. See file COPYING.
19 #include "include/unordered_map.h"
20 #include "common/ceph_context.h"
22 #define dout_context cct
23 #define dout_subsys ceph_subsys_osd
25 #define dout_prefix _prefix(_dout, this)
27 static ostream& _prefix(std::ostream *_dout, const PGLog *pglog)
29 return *_dout << pglog->gen_prefix();
32 //////////////////// PGLog::IndexedLog ////////////////////
34 void PGLog::IndexedLog::split_out_child(
37 PGLog::IndexedLog *target)
40 *target = IndexedLog(pg_log_t::split_out_child(child_pgid, split_bits));
43 reset_rollback_info_trimmed_to_riter();
46 void PGLog::IndexedLog::trim(
49 set<eversion_t> *trimmed,
50 set<string>* trimmed_dups,
51 eversion_t *write_from_dups)
53 if (complete_to != log.end() &&
54 complete_to->version <= s) {
55 generic_dout(0) << " bad trim to " << s << " when complete_to is "
56 << complete_to->version
57 << " on " << *this << dendl;
60 assert(s <= can_rollback_to);
62 auto earliest_dup_version =
63 log.rbegin()->version.version < cct->_conf->osd_pg_log_dups_tracked
65 : log.rbegin()->version.version - cct->_conf->osd_pg_log_dups_tracked;
67 while (!log.empty()) {
68 const pg_log_entry_t &e = *log.begin();
71 generic_dout(20) << "trim " << e << dendl;
73 trimmed->insert(e.version);
75 unindex(e); // remove from index,
78 generic_dout(20) << "earliest_dup_version = " << earliest_dup_version << dendl;
79 if (e.version.version >= earliest_dup_version) {
80 if (write_from_dups != nullptr && *write_from_dups > e.version) {
81 generic_dout(20) << "updating write_from_dups from " << *write_from_dups << " to " << e.version << dendl;
82 *write_from_dups = e.version;
84 dups.push_back(pg_log_dup_t(e));
86 for (const auto& extra : e.extra_reqids) {
87 // note: extras have the same version as outer op
88 dups.push_back(pg_log_dup_t(e.version, extra.second,
89 extra.first, e.return_code));
94 if (rollback_info_trimmed_to_riter == log.rend() ||
95 e.version == rollback_info_trimmed_to_riter->version) {
97 rollback_info_trimmed_to_riter = log.rend();
103 while (!dups.empty()) {
104 const auto& e = *dups.begin();
105 if (e.version.version >= earliest_dup_version)
107 generic_dout(20) << "trim dup " << e << dendl;
109 trimmed_dups->insert(e.get_key_name());
110 if (indexed_data & PGLOG_INDEXED_DUPS) {
111 dup_index.erase(e.reqid);
121 ostream& PGLog::IndexedLog::print(ostream& out) const
123 out << *this << std::endl;
124 for (list<pg_log_entry_t>::const_iterator p = log.begin();
128 (logged_object(p->soid) ? "indexed" : "NOT INDEXED") <<
130 assert(!p->reqid_is_indexed() || logged_req(p->reqid));
133 for (list<pg_log_dup_t>::const_iterator p = dups.begin();
136 out << *p << std::endl;
142 //////////////////// PGLog ////////////////////
144 void PGLog::reset_backfill()
149 void PGLog::clear() {
152 log_keys_debug.clear();
156 void PGLog::clear_info_log(
158 ObjectStore::Transaction *t) {
160 t->remove(coll, pgid.make_pgmeta_oid());
168 if (trim_to > log.tail) {
169 // We shouldn't be trimming the log past last_complete
170 assert(trim_to <= info.last_complete);
172 dout(10) << "trim " << log << " to " << trim_to << dendl;
173 log.trim(cct, trim_to, &trimmed, &trimmed_dups, &write_from_dups);
174 info.log_tail = log.tail;
178 void PGLog::proc_replica_log(
180 const pg_log_t &olog,
181 pg_missing_t& omissing,
182 pg_shard_t from) const
184 dout(10) << "proc_replica_log for osd." << from << ": "
185 << oinfo << " " << olog << " " << omissing << dendl;
187 if (olog.head < log.tail) {
188 dout(10) << __func__ << ": osd." << from << " does not overlap, not looking "
189 << "for divergent objects" << dendl;
192 if (olog.head == log.head) {
193 dout(10) << __func__ << ": osd." << from << " same log head, not looking "
194 << "for divergent objects" << dendl;
197 assert(olog.head >= log.tail);
200 basically what we're doing here is rewinding the remote log,
201 dropping divergent entries, until we find something that matches
202 our master log. we then reset last_update to reflect the new
203 point up to which missing is accurate.
205 later, in activate(), missing will get wound forward again and
206 we will send the peer enough log to arrive at the same state.
209 for (map<hobject_t, pg_missing_item>::const_iterator i = omissing.get_items().begin();
210 i != omissing.get_items().end();
212 dout(20) << " before missing " << i->first << " need " << i->second.need
213 << " have " << i->second.have << dendl;
216 list<pg_log_entry_t>::const_reverse_iterator first_non_divergent =
219 if (first_non_divergent == log.log.rend())
221 if (first_non_divergent->version <= olog.head) {
222 dout(20) << "merge_log point (usually last shared) is "
223 << *first_non_divergent << dendl;
226 ++first_non_divergent;
229 /* Because olog.head >= log.tail, we know that both pgs must at least have
230 * the event represented by log.tail. Similarly, because log.head >= olog.tail,
231 * we know that the even represented by olog.tail must be common to both logs.
232 * Furthermore, the event represented by a log tail was necessarily trimmed,
233 * thus neither olog.tail nor log.tail can be divergent. It's
234 * possible that olog/log contain no actual events between olog.head and
235 * MAX(log.tail, olog.tail), however, since they might have been split out.
236 * Thus, if we cannot find an event e such that
237 * log.tail <= e.version <= log.head, the last_update must actually be
238 * MAX(log.tail, olog.tail).
240 eversion_t limit = MAX(olog.tail, log.tail);
242 (first_non_divergent == log.log.rend() ||
243 first_non_divergent->version < limit) ?
245 first_non_divergent->version;
247 IndexedLog folog(olog);
248 auto divergent = folog.rewind_from_head(lu);
249 _merge_divergent_entries(
253 olog.get_can_rollback_to(),
258 if (lu < oinfo.last_update) {
259 dout(10) << " peer osd." << from << " last_update now " << lu << dendl;
260 oinfo.last_update = lu;
263 if (omissing.have_missing()) {
264 eversion_t first_missing =
265 omissing.get_items().at(omissing.get_rmissing().begin()->second).need;
266 oinfo.last_complete = eversion_t();
267 list<pg_log_entry_t>::const_iterator i = olog.log.begin();
271 if (i->version < first_missing)
272 oinfo.last_complete = i->version;
277 oinfo.last_complete = oinfo.last_update;
279 } // proc_replica_log
282 * rewind divergent entries at the head of the log
284 * This rewinds entries off the head of our log that are divergent.
285 * This is used by replicas during activation.
287 * @param newhead new head to rewind to
289 void PGLog::rewind_divergent_log(eversion_t newhead,
290 pg_info_t &info, LogEntryHandler *rollbacker,
291 bool &dirty_info, bool &dirty_big_info)
293 dout(10) << "rewind_divergent_log truncate divergent future " <<
297 if (info.last_complete > newhead)
298 info.last_complete = newhead;
300 auto divergent = log.rewind_from_head(newhead);
301 if (!divergent.empty()) {
302 mark_dirty_from(divergent.front().version);
304 for (auto &&entry: divergent) {
305 dout(10) << "rewind_divergent_log future divergent " << entry << dendl;
307 info.last_update = newhead;
309 _merge_divergent_entries(
313 log.get_can_rollback_to(),
319 dirty_big_info = true;
322 void PGLog::merge_log(pg_info_t &oinfo, pg_log_t &olog, pg_shard_t fromosd,
323 pg_info_t &info, LogEntryHandler *rollbacker,
324 bool &dirty_info, bool &dirty_big_info)
326 dout(10) << "merge_log " << olog << " from osd." << fromosd
327 << " into " << log << dendl;
329 // Check preconditions
331 // If our log is empty, the incoming log needs to have not been trimmed.
332 assert(!log.null() || olog.tail == eversion_t());
333 // The logs must overlap.
334 assert(log.head >= olog.tail && olog.head >= log.tail);
336 for (map<hobject_t, pg_missing_item>::const_iterator i = missing.get_items().begin();
337 i != missing.get_items().end();
339 dout(20) << "pg_missing_t sobject: " << i->first << dendl;
342 bool changed = false;
345 // this is just filling in history. it does not affect our
346 // missing set, as that should already be consistent with our
348 eversion_t orig_tail = log.tail;
349 if (olog.tail < log.tail) {
350 dout(10) << "merge_log extending tail to " << olog.tail << dendl;
351 list<pg_log_entry_t>::iterator from = olog.log.begin();
352 list<pg_log_entry_t>::iterator to;
355 to != olog.log.end();
357 if (to->version > log.tail)
360 dout(15) << *to << dendl;
365 // splice into our log.
366 log.log.splice(log.log.begin(),
369 info.log_tail = log.tail = olog.tail;
373 if (oinfo.stats.reported_seq < info.stats.reported_seq || // make sure reported always increases
374 oinfo.stats.reported_epoch < info.stats.reported_epoch) {
375 oinfo.stats.reported_seq = info.stats.reported_seq;
376 oinfo.stats.reported_epoch = info.stats.reported_epoch;
378 if (info.last_backfill.is_max())
379 info.stats = oinfo.stats;
380 info.hit_set = oinfo.hit_set;
382 // do we have divergent entries to throw out?
383 if (olog.head < log.head) {
384 rewind_divergent_log(olog.head, info, rollbacker, dirty_info, dirty_big_info);
389 if (olog.head > log.head) {
390 dout(10) << "merge_log extending head to " << olog.head << dendl;
392 // find start point in olog
393 list<pg_log_entry_t>::iterator to = olog.log.end();
394 list<pg_log_entry_t>::iterator from = olog.log.end();
395 eversion_t lower_bound = MAX(olog.tail, orig_tail);
397 if (from == olog.log.begin())
400 dout(20) << " ? " << *from << dendl;
401 if (from->version <= log.head) {
402 lower_bound = MAX(lower_bound, from->version);
407 dout(20) << "merge_log cut point (usually last shared) is "
408 << lower_bound << dendl;
409 mark_dirty_from(lower_bound);
411 auto divergent = log.rewind_from_head(lower_bound);
412 // move aside divergent items
413 for (auto &&oe: divergent) {
414 dout(10) << "merge_log divergent " << oe << dendl;
416 log.roll_forward_to(log.head, rollbacker);
418 mempool::osd_pglog::list<pg_log_entry_t> new_entries;
419 new_entries.splice(new_entries.end(), olog.log, from, to);
420 append_log_entries_update_missing(
422 info.last_backfill_bitwise,
430 _merge_divergent_entries(
434 log.get_can_rollback_to(),
439 info.last_update = log.head = olog.head;
441 // We cannot rollback into the new log entries
442 log.skip_can_rollback_to_to_head();
444 info.last_user_version = oinfo.last_user_version;
445 info.purged_snaps = oinfo.purged_snaps;
451 if (merge_log_dups(olog)) {
455 dout(10) << "merge_log result " << log << " " << missing <<
456 " changed=" << changed << dendl;
460 dirty_big_info = true;
465 // returns true if any changes were made to log.dups
466 bool PGLog::merge_log_dups(const pg_log_t& olog) {
467 bool changed = false;
469 if (!olog.dups.empty()) {
470 if (log.dups.empty()) {
471 dout(10) << "merge_log copying olog dups to log " <<
472 olog.dups.front().version << " to " <<
473 olog.dups.back().version << dendl;
475 dirty_from_dups = eversion_t();
476 dirty_to_dups = eversion_t::max();
477 // since our log.dups is empty just copy them
478 for (const auto& i : olog.dups) {
479 log.dups.push_back(i);
480 log.index(log.dups.back());
483 // since our log.dups is not empty try to extend on each end
485 if (olog.dups.back().version > log.dups.back().version) {
486 // extend the dups's tail (i.e., newer dups)
487 dout(10) << "merge_log extending dups tail to " <<
488 olog.dups.back().version << dendl;
491 auto log_tail_version = log.dups.back().version;
493 auto insert_cursor = log.dups.end();
494 eversion_t last_shared = eversion_t::max();
495 for (auto i = olog.dups.crbegin(); i != olog.dups.crend(); ++i) {
496 if (i->version <= log_tail_version) break;
497 log.dups.insert(insert_cursor, *i);
498 last_shared = i->version;
500 auto prev = insert_cursor;
502 // be sure to pass reference of copy in log.dups
505 --insert_cursor; // make sure we insert in reverse order
507 mark_dirty_from_dups(last_shared);
510 if (olog.dups.front().version < log.dups.front().version) {
511 // extend the dups's head (i.e., older dups)
512 dout(10) << "merge_log extending dups head to " <<
513 olog.dups.front().version << dendl;
517 auto insert_cursor = log.dups.begin();
518 for (auto i = olog.dups.cbegin(); i != olog.dups.cend(); ++i) {
519 if (i->version >= insert_cursor->version) break;
520 log.dups.insert(insert_cursor, *i);
522 auto prev = insert_cursor;
524 // be sure to pass address of copy in log.dups
527 mark_dirty_to_dups(last);
532 // remove any dup entries that overlap with pglog
533 if (!log.dups.empty() && log.dups.back().version >= log.tail) {
534 dout(10) << "merge_log removed dups overlapping log entries [" <<
535 log.tail << "," << log.dups.back().version << "]" << dendl;
538 while (!log.dups.empty() && log.dups.back().version >= log.tail) {
539 log.unindex(log.dups.back());
540 mark_dirty_from_dups(log.dups.back().version);
548 void PGLog::check() {
551 if (log.log.size() != log_keys_debug.size()) {
552 derr << "log.log.size() != log_keys_debug.size()" << dendl;
553 derr << "actual log:" << dendl;
554 for (list<pg_log_entry_t>::iterator i = log.log.begin();
557 derr << " " << *i << dendl;
559 derr << "log_keys_debug:" << dendl;
560 for (set<string>::const_iterator i = log_keys_debug.begin();
561 i != log_keys_debug.end();
563 derr << " " << *i << dendl;
566 assert(log.log.size() == log_keys_debug.size());
567 for (list<pg_log_entry_t>::iterator i = log.log.begin();
570 assert(log_keys_debug.count(i->get_key_name()));
575 void PGLog::write_log_and_missing(
576 ObjectStore::Transaction& t,
577 map<string,bufferlist> *km,
579 const ghobject_t &log_oid,
580 bool require_rollback)
583 dout(5) << "write_log_and_missing with: "
584 << "dirty_to: " << dirty_to
585 << ", dirty_from: " << dirty_from
586 << ", writeout_from: " << writeout_from
587 << ", trimmed: " << trimmed
588 << ", trimmed_dups: " << trimmed_dups
589 << ", clear_divergent_priors: " << clear_divergent_priors
591 _write_log_and_missing(
592 t, km, log, coll, log_oid,
601 clear_divergent_priors,
605 &rebuilt_missing_with_deletes,
606 (pg_log_debug ? &log_keys_debug : nullptr));
609 dout(10) << "log is not dirty" << dendl;
614 void PGLog::write_log_and_missing_wo_missing(
615 ObjectStore::Transaction& t,
616 map<string,bufferlist> *km,
618 const coll_t& coll, const ghobject_t &log_oid,
619 map<eversion_t, hobject_t> &divergent_priors,
620 bool require_rollback
623 _write_log_and_missing_wo_missing(
624 t, km, log, coll, log_oid,
625 divergent_priors, eversion_t::max(), eversion_t(), eversion_t(),
628 true, true, require_rollback,
629 eversion_t::max(), eversion_t(), eversion_t(), nullptr);
633 void PGLog::write_log_and_missing(
634 ObjectStore::Transaction& t,
635 map<string,bufferlist> *km,
638 const ghobject_t &log_oid,
639 const pg_missing_tracker_t &missing,
640 bool require_rollback,
641 bool *rebuilt_missing_with_deletes)
643 _write_log_and_missing(
644 t, km, log, coll, log_oid,
651 true, require_rollback, false,
655 rebuilt_missing_with_deletes, nullptr);
659 void PGLog::_write_log_and_missing_wo_missing(
660 ObjectStore::Transaction& t,
661 map<string,bufferlist> *km,
663 const coll_t& coll, const ghobject_t &log_oid,
664 map<eversion_t, hobject_t> &divergent_priors,
666 eversion_t dirty_from,
667 eversion_t writeout_from,
668 const set<eversion_t> &trimmed,
669 const set<string> &trimmed_dups,
670 bool dirty_divergent_priors,
672 bool require_rollback,
673 eversion_t dirty_to_dups,
674 eversion_t dirty_from_dups,
675 eversion_t write_from_dups,
676 set<string> *log_keys_debug
679 set<string> to_remove(trimmed_dups);
680 for (set<eversion_t>::const_iterator i = trimmed.begin();
683 to_remove.insert(i->get_key_name());
684 if (log_keys_debug) {
685 assert(log_keys_debug->count(i->get_key_name()));
686 log_keys_debug->erase(i->get_key_name());
690 // dout(10) << "write_log_and_missing, clearing up to " << dirty_to << dendl;
692 t.touch(coll, log_oid);
693 if (dirty_to != eversion_t()) {
696 eversion_t().get_key_name(), dirty_to.get_key_name());
697 clear_up_to(log_keys_debug, dirty_to.get_key_name());
699 if (dirty_to != eversion_t::max() && dirty_from != eversion_t::max()) {
700 // dout(10) << "write_log_and_missing, clearing from " << dirty_from << dendl;
703 dirty_from.get_key_name(), eversion_t::max().get_key_name());
704 clear_after(log_keys_debug, dirty_from.get_key_name());
707 for (list<pg_log_entry_t>::iterator p = log.log.begin();
708 p != log.log.end() && p->version <= dirty_to;
710 bufferlist bl(sizeof(*p) * 2);
711 p->encode_with_checksum(bl);
712 (*km)[p->get_key_name()].claim(bl);
715 for (list<pg_log_entry_t>::reverse_iterator p = log.log.rbegin();
716 p != log.log.rend() &&
717 (p->version >= dirty_from || p->version >= writeout_from) &&
718 p->version >= dirty_to;
720 bufferlist bl(sizeof(*p) * 2);
721 p->encode_with_checksum(bl);
722 (*km)[p->get_key_name()].claim(bl);
725 if (log_keys_debug) {
726 for (map<string, bufferlist>::iterator i = (*km).begin();
729 if (i->first[0] == '_')
731 assert(!log_keys_debug->count(i->first));
732 log_keys_debug->insert(i->first);
736 // process dups after log_keys_debug is filled, so dups do not
737 // end up in that set
738 if (dirty_to_dups != eversion_t()) {
739 pg_log_dup_t min, dirty_to_dup;
740 dirty_to_dup.version = dirty_to_dups;
743 min.get_key_name(), dirty_to_dup.get_key_name());
745 if (dirty_to_dups != eversion_t::max() && dirty_from_dups != eversion_t::max()) {
746 pg_log_dup_t max, dirty_from_dup;
747 max.version = eversion_t::max();
748 dirty_from_dup.version = dirty_from_dups;
751 dirty_from_dup.get_key_name(), max.get_key_name());
754 for (const auto& entry : log.dups) {
755 if (entry.version > dirty_to_dups)
759 (*km)[entry.get_key_name()].claim(bl);
762 for (list<pg_log_dup_t>::reverse_iterator p = log.dups.rbegin();
763 p != log.dups.rend() &&
764 (p->version >= dirty_from_dups || p->version >= write_from_dups) &&
765 p->version >= dirty_to_dups;
769 (*km)[p->get_key_name()].claim(bl);
772 if (dirty_divergent_priors) {
773 //dout(10) << "write_log_and_missing: writing divergent_priors" << dendl;
774 ::encode(divergent_priors, (*km)["divergent_priors"]);
776 if (require_rollback) {
778 log.get_can_rollback_to(),
779 (*km)["can_rollback_to"]);
781 log.get_rollback_info_trimmed_to(),
782 (*km)["rollback_info_trimmed_to"]);
785 if (!to_remove.empty())
786 t.omap_rmkeys(coll, log_oid, to_remove);
790 void PGLog::_write_log_and_missing(
791 ObjectStore::Transaction& t,
792 map<string,bufferlist>* km,
794 const coll_t& coll, const ghobject_t &log_oid,
796 eversion_t dirty_from,
797 eversion_t writeout_from,
798 const set<eversion_t> &trimmed,
799 const set<string> &trimmed_dups,
800 const pg_missing_tracker_t &missing,
802 bool require_rollback,
803 bool clear_divergent_priors,
804 eversion_t dirty_to_dups,
805 eversion_t dirty_from_dups,
806 eversion_t write_from_dups,
807 bool *rebuilt_missing_with_deletes, // in/out param
808 set<string> *log_keys_debug
810 set<string> to_remove(trimmed_dups);
811 for (set<eversion_t>::const_iterator i = trimmed.begin();
814 to_remove.insert(i->get_key_name());
815 if (log_keys_debug) {
816 assert(log_keys_debug->count(i->get_key_name()));
817 log_keys_debug->erase(i->get_key_name());
822 t.touch(coll, log_oid);
823 if (dirty_to != eversion_t()) {
826 eversion_t().get_key_name(), dirty_to.get_key_name());
827 clear_up_to(log_keys_debug, dirty_to.get_key_name());
829 if (dirty_to != eversion_t::max() && dirty_from != eversion_t::max()) {
830 // dout(10) << "write_log_and_missing, clearing from " << dirty_from << dendl;
833 dirty_from.get_key_name(), eversion_t::max().get_key_name());
834 clear_after(log_keys_debug, dirty_from.get_key_name());
837 for (list<pg_log_entry_t>::iterator p = log.log.begin();
838 p != log.log.end() && p->version <= dirty_to;
840 bufferlist bl(sizeof(*p) * 2);
841 p->encode_with_checksum(bl);
842 (*km)[p->get_key_name()].claim(bl);
845 for (list<pg_log_entry_t>::reverse_iterator p = log.log.rbegin();
846 p != log.log.rend() &&
847 (p->version >= dirty_from || p->version >= writeout_from) &&
848 p->version >= dirty_to;
850 bufferlist bl(sizeof(*p) * 2);
851 p->encode_with_checksum(bl);
852 (*km)[p->get_key_name()].claim(bl);
855 if (log_keys_debug) {
856 for (map<string, bufferlist>::iterator i = (*km).begin();
859 if (i->first[0] == '_')
861 assert(!log_keys_debug->count(i->first));
862 log_keys_debug->insert(i->first);
866 // process dups after log_keys_debug is filled, so dups do not
867 // end up in that set
868 if (dirty_to_dups != eversion_t()) {
869 pg_log_dup_t min, dirty_to_dup;
870 dirty_to_dup.version = dirty_to_dups;
873 min.get_key_name(), dirty_to_dup.get_key_name());
875 if (dirty_to_dups != eversion_t::max() && dirty_from_dups != eversion_t::max()) {
876 pg_log_dup_t max, dirty_from_dup;
877 max.version = eversion_t::max();
878 dirty_from_dup.version = dirty_from_dups;
881 dirty_from_dup.get_key_name(), max.get_key_name());
884 for (const auto& entry : log.dups) {
885 if (entry.version > dirty_to_dups)
889 (*km)[entry.get_key_name()].claim(bl);
892 for (list<pg_log_dup_t>::reverse_iterator p = log.dups.rbegin();
893 p != log.dups.rend() &&
894 (p->version >= dirty_from_dups || p->version >= write_from_dups) &&
895 p->version >= dirty_to_dups;
899 (*km)[p->get_key_name()].claim(bl);
902 if (clear_divergent_priors) {
903 //dout(10) << "write_log_and_missing: writing divergent_priors" << dendl;
904 to_remove.insert("divergent_priors");
906 // since we encode individual missing items instead of a whole
907 // missing set, we need another key to store this bit of state
908 if (*rebuilt_missing_with_deletes) {
909 (*km)["may_include_deletes_in_missing"] = bufferlist();
910 *rebuilt_missing_with_deletes = false;
913 [&](const hobject_t &obj) {
914 string key = string("missing/") + obj.to_str();
915 pg_missing_item item;
916 if (!missing.is_missing(obj, &item)) {
917 to_remove.insert(key);
919 uint64_t features = missing.may_include_deletes ? CEPH_FEATURE_OSD_RECOVERY_DELETES : 0;
920 ::encode(make_pair(obj, item), (*km)[key], features);
923 if (require_rollback) {
925 log.get_can_rollback_to(),
926 (*km)["can_rollback_to"]);
928 log.get_rollback_info_trimmed_to(),
929 (*km)["rollback_info_trimmed_to"]);
932 if (!to_remove.empty())
933 t.omap_rmkeys(coll, log_oid, to_remove);
936 void PGLog::rebuild_missing_set_with_deletes(ObjectStore *store,
938 const pg_info_t &info)
940 // save entries not generated from the current log (e.g. added due
941 // to repair, EIO handling, or divergent_priors).
942 map<hobject_t, pg_missing_item> extra_missing;
943 for (const auto& p : missing.get_items()) {
944 if (!log.logged_object(p.first)) {
945 dout(20) << __func__ << " extra missing entry: " << p.first
946 << " " << p.second << dendl;
947 extra_missing[p.first] = p.second;
951 missing.may_include_deletes = true;
953 // go through the log and add items that are not present or older
954 // versions on disk, just as if we were reading the log + metadata
955 // off disk originally
957 for (list<pg_log_entry_t>::reverse_iterator i = log.log.rbegin();
960 if (i->version <= info.last_complete)
962 if (i->soid > info.last_backfill ||
964 did.find(i->soid) != did.end())
969 int r = store->getattr(
971 ghobject_t(i->soid, ghobject_t::NO_GEN, info.pgid.shard),
974 dout(20) << __func__ << " check for log entry: " << *i << " = " << r << dendl;
977 object_info_t oi(bv);
978 dout(20) << __func__ << " store version = " << oi.version << dendl;
979 if (oi.version < i->version) {
980 missing.add(i->soid, i->version, oi.version, i->is_delete());
983 missing.add(i->soid, i->version, eversion_t(), i->is_delete());
987 for (const auto& p : extra_missing) {
988 missing.add(p.first, p.second.need, p.second.have, p.second.is_delete());
990 rebuilt_missing_with_deletes = true;