X-Git-Url: https://gerrit.opnfv.org/gerrit/gitweb?a=blobdiff_plain;f=src%2Fceph%2Fsrc%2Fosd%2FECBackend.cc;fp=src%2Fceph%2Fsrc%2Fosd%2FECBackend.cc;h=0000000000000000000000000000000000000000;hb=7da45d65be36d36b880cc55c5036e96c24b53f00;hp=40be6a31d4d929f62a30dc5f652a5cfecb33115e;hpb=691462d09d0987b47e112d6ee8740375df3c51b2;p=stor4nfv.git diff --git a/src/ceph/src/osd/ECBackend.cc b/src/ceph/src/osd/ECBackend.cc deleted file mode 100644 index 40be6a3..0000000 --- a/src/ceph/src/osd/ECBackend.cc +++ /dev/null @@ -1,2468 +0,0 @@ -// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- -// vim: ts=8 sw=2 smarttab -/* - * Ceph - scalable distributed file system - * - * Copyright (C) 2013 Inktank Storage, Inc. - * - * This is free software; you can redistribute it and/or - * modify it under the terms of the GNU Lesser General Public - * License version 2.1, as published by the Free Software - * Foundation. See file COPYING. - * - */ - -#include -#include - -#include "ECBackend.h" -#include "messages/MOSDPGPush.h" -#include "messages/MOSDPGPushReply.h" -#include "messages/MOSDECSubOpWrite.h" -#include "messages/MOSDECSubOpWriteReply.h" -#include "messages/MOSDECSubOpRead.h" -#include "messages/MOSDECSubOpReadReply.h" -#include "ECMsgTypes.h" - -#include "PrimaryLogPG.h" - -#define dout_context cct -#define dout_subsys ceph_subsys_osd -#define DOUT_PREFIX_ARGS this -#undef dout_prefix -#define dout_prefix _prefix(_dout, this) -static ostream& _prefix(std::ostream *_dout, ECBackend *pgb) { - return *_dout << pgb->get_parent()->gen_dbg_prefix(); -} - -struct ECRecoveryHandle : public PGBackend::RecoveryHandle { - list ops; -}; - -ostream &operator<<(ostream &lhs, const ECBackend::pipeline_state_t &rhs) { - switch (rhs.pipeline_state) { - case ECBackend::pipeline_state_t::CACHE_VALID: - return lhs << "CACHE_VALID"; - case ECBackend::pipeline_state_t::CACHE_INVALID: - return lhs << "CACHE_INVALID"; - default: - assert(0 == "invalid pipeline state"); - } - return lhs; // unreachable -} - -static ostream &operator<<(ostream &lhs, const map &rhs) -{ - lhs << "["; - for (map::const_iterator i = rhs.begin(); - i != rhs.end(); - ++i) { - if (i != rhs.begin()) - lhs << ", "; - lhs << make_pair(i->first, i->second.length()); - } - return lhs << "]"; -} - -static ostream &operator<<(ostream &lhs, const map &rhs) -{ - lhs << "["; - for (map::const_iterator i = rhs.begin(); - i != rhs.end(); - ++i) { - if (i != rhs.begin()) - lhs << ", "; - lhs << make_pair(i->first, i->second.length()); - } - return lhs << "]"; -} - -static ostream &operator<<( - ostream &lhs, - const boost::tuple > &rhs) -{ - return lhs << "(" << rhs.get<0>() << ", " - << rhs.get<1>() << ", " << rhs.get<2>() << ")"; -} - -ostream &operator<<(ostream &lhs, const ECBackend::read_request_t &rhs) -{ - return lhs << "read_request_t(to_read=[" << rhs.to_read << "]" - << ", need=" << rhs.need - << ", want_attrs=" << rhs.want_attrs - << ")"; -} - -ostream &operator<<(ostream &lhs, const ECBackend::read_result_t &rhs) -{ - lhs << "read_result_t(r=" << rhs.r - << ", errors=" << rhs.errors; - if (rhs.attrs) { - lhs << ", attrs=" << rhs.attrs.get(); - } else { - lhs << ", noattrs"; - } - return lhs << ", returned=" << rhs.returned << ")"; -} - -ostream &operator<<(ostream &lhs, const ECBackend::ReadOp &rhs) -{ - lhs << "ReadOp(tid=" << rhs.tid; - if (rhs.op && rhs.op->get_req()) { - lhs << ", op="; - rhs.op->get_req()->print(lhs); - } - return lhs << ", to_read=" << rhs.to_read - << ", complete=" << rhs.complete - << ", priority=" << rhs.priority - << ", obj_to_source=" << rhs.obj_to_source - << ", source_to_obj=" << rhs.source_to_obj - << ", in_progress=" << rhs.in_progress << ")"; -} - -void ECBackend::ReadOp::dump(Formatter *f) const -{ - f->dump_unsigned("tid", tid); - if (op && op->get_req()) { - f->dump_stream("op") << *(op->get_req()); - } - f->dump_stream("to_read") << to_read; - f->dump_stream("complete") << complete; - f->dump_int("priority", priority); - f->dump_stream("obj_to_source") << obj_to_source; - f->dump_stream("source_to_obj") << source_to_obj; - f->dump_stream("in_progress") << in_progress; -} - -ostream &operator<<(ostream &lhs, const ECBackend::Op &rhs) -{ - lhs << "Op(" << rhs.hoid - << " v=" << rhs.version - << " tt=" << rhs.trim_to - << " tid=" << rhs.tid - << " reqid=" << rhs.reqid; - if (rhs.client_op && rhs.client_op->get_req()) { - lhs << " client_op="; - rhs.client_op->get_req()->print(lhs); - } - lhs << " roll_forward_to=" << rhs.roll_forward_to - << " temp_added=" << rhs.temp_added - << " temp_cleared=" << rhs.temp_cleared - << " pending_read=" << rhs.pending_read - << " remote_read=" << rhs.remote_read - << " remote_read_result=" << rhs.remote_read_result - << " pending_apply=" << rhs.pending_apply - << " pending_commit=" << rhs.pending_commit - << " plan.to_read=" << rhs.plan.to_read - << " plan.will_write=" << rhs.plan.will_write - << ")"; - return lhs; -} - -ostream &operator<<(ostream &lhs, const ECBackend::RecoveryOp &rhs) -{ - return lhs << "RecoveryOp(" - << "hoid=" << rhs.hoid - << " v=" << rhs.v - << " missing_on=" << rhs.missing_on - << " missing_on_shards=" << rhs.missing_on_shards - << " recovery_info=" << rhs.recovery_info - << " recovery_progress=" << rhs.recovery_progress - << " obc refcount=" << rhs.obc.use_count() - << " state=" << ECBackend::RecoveryOp::tostr(rhs.state) - << " waiting_on_pushes=" << rhs.waiting_on_pushes - << " extent_requested=" << rhs.extent_requested - << ")"; -} - -void ECBackend::RecoveryOp::dump(Formatter *f) const -{ - f->dump_stream("hoid") << hoid; - f->dump_stream("v") << v; - f->dump_stream("missing_on") << missing_on; - f->dump_stream("missing_on_shards") << missing_on_shards; - f->dump_stream("recovery_info") << recovery_info; - f->dump_stream("recovery_progress") << recovery_progress; - f->dump_stream("state") << tostr(state); - f->dump_stream("waiting_on_pushes") << waiting_on_pushes; - f->dump_stream("extent_requested") << extent_requested; -} - -ECBackend::ECBackend( - PGBackend::Listener *pg, - coll_t coll, - ObjectStore::CollectionHandle &ch, - ObjectStore *store, - CephContext *cct, - ErasureCodeInterfaceRef ec_impl, - uint64_t stripe_width) - : PGBackend(cct, pg, store, coll, ch), - ec_impl(ec_impl), - sinfo(ec_impl->get_data_chunk_count(), stripe_width) { - assert((ec_impl->get_data_chunk_count() * - ec_impl->get_chunk_size(stripe_width)) == stripe_width); -} - -PGBackend::RecoveryHandle *ECBackend::open_recovery_op() -{ - return new ECRecoveryHandle; -} - -void ECBackend::_failed_push(const hobject_t &hoid, - pair &in) -{ - ECBackend::read_result_t &res = in.second; - dout(10) << __func__ << ": Read error " << hoid << " r=" - << res.r << " errors=" << res.errors << dendl; - dout(10) << __func__ << ": canceling recovery op for obj " << hoid - << dendl; - assert(recovery_ops.count(hoid)); - recovery_ops.erase(hoid); - - list fl; - for (auto&& i : res.errors) { - fl.push_back(i.first); - } - get_parent()->failed_push(fl, hoid); -} - -struct OnRecoveryReadComplete : - public GenContext &> { - ECBackend *pg; - hobject_t hoid; - set want; - OnRecoveryReadComplete(ECBackend *pg, const hobject_t &hoid) - : pg(pg), hoid(hoid) {} - void finish(pair &in) override { - ECBackend::read_result_t &res = in.second; - if (!(res.r == 0 && res.errors.empty())) { - pg->_failed_push(hoid, in); - return; - } - assert(res.returned.size() == 1); - pg->handle_recovery_read_complete( - hoid, - res.returned.back(), - res.attrs, - in.first); - } -}; - -struct RecoveryMessages { - map reads; - void read( - ECBackend *ec, - const hobject_t &hoid, uint64_t off, uint64_t len, - const set &need, - bool attrs) { - list > to_read; - to_read.push_back(boost::make_tuple(off, len, 0)); - assert(!reads.count(hoid)); - reads.insert( - make_pair( - hoid, - ECBackend::read_request_t( - to_read, - need, - attrs, - new OnRecoveryReadComplete( - ec, - hoid)))); - } - - map > pushes; - map > push_replies; - ObjectStore::Transaction t; - RecoveryMessages() {} - ~RecoveryMessages(){} -}; - -void ECBackend::handle_recovery_push( - const PushOp &op, - RecoveryMessages *m) -{ - ostringstream ss; - if (get_parent()->check_failsafe_full(ss)) { - dout(10) << __func__ << " Out of space (failsafe) processing push request: " << ss.str() << dendl; - ceph_abort(); - } - - bool oneshot = op.before_progress.first && op.after_progress.data_complete; - ghobject_t tobj; - if (oneshot) { - tobj = ghobject_t(op.soid, ghobject_t::NO_GEN, - get_parent()->whoami_shard().shard); - } else { - tobj = ghobject_t(get_parent()->get_temp_recovery_object(op.soid, - op.version), - ghobject_t::NO_GEN, - get_parent()->whoami_shard().shard); - if (op.before_progress.first) { - dout(10) << __func__ << ": Adding oid " - << tobj.hobj << " in the temp collection" << dendl; - add_temp_obj(tobj.hobj); - } - } - - if (op.before_progress.first) { - m->t.remove(coll, tobj); - m->t.touch(coll, tobj); - } - - if (!op.data_included.empty()) { - uint64_t start = op.data_included.range_start(); - uint64_t end = op.data_included.range_end(); - assert(op.data.length() == (end - start)); - - m->t.write( - coll, - tobj, - start, - op.data.length(), - op.data); - } else { - assert(op.data.length() == 0); - } - - if (op.before_progress.first) { - assert(op.attrset.count(string("_"))); - m->t.setattrs( - coll, - tobj, - op.attrset); - } - - if (op.after_progress.data_complete && !oneshot) { - dout(10) << __func__ << ": Removing oid " - << tobj.hobj << " from the temp collection" << dendl; - clear_temp_obj(tobj.hobj); - m->t.remove(coll, ghobject_t( - op.soid, ghobject_t::NO_GEN, get_parent()->whoami_shard().shard)); - m->t.collection_move_rename( - coll, tobj, - coll, ghobject_t( - op.soid, ghobject_t::NO_GEN, get_parent()->whoami_shard().shard)); - } - if (op.after_progress.data_complete) { - if ((get_parent()->pgb_is_primary())) { - assert(recovery_ops.count(op.soid)); - assert(recovery_ops[op.soid].obc); - get_parent()->on_local_recover( - op.soid, - op.recovery_info, - recovery_ops[op.soid].obc, - false, - &m->t); - } else { - get_parent()->on_local_recover( - op.soid, - op.recovery_info, - ObjectContextRef(), - false, - &m->t); - } - } - m->push_replies[get_parent()->primary_shard()].push_back(PushReplyOp()); - m->push_replies[get_parent()->primary_shard()].back().soid = op.soid; -} - -void ECBackend::handle_recovery_push_reply( - const PushReplyOp &op, - pg_shard_t from, - RecoveryMessages *m) -{ - if (!recovery_ops.count(op.soid)) - return; - RecoveryOp &rop = recovery_ops[op.soid]; - assert(rop.waiting_on_pushes.count(from)); - rop.waiting_on_pushes.erase(from); - continue_recovery_op(rop, m); -} - -void ECBackend::handle_recovery_read_complete( - const hobject_t &hoid, - boost::tuple > &to_read, - boost::optional > attrs, - RecoveryMessages *m) -{ - dout(10) << __func__ << ": returned " << hoid << " " - << "(" << to_read.get<0>() - << ", " << to_read.get<1>() - << ", " << to_read.get<2>() - << ")" - << dendl; - assert(recovery_ops.count(hoid)); - RecoveryOp &op = recovery_ops[hoid]; - assert(op.returned_data.empty()); - map target; - for (set::iterator i = op.missing_on_shards.begin(); - i != op.missing_on_shards.end(); - ++i) { - target[*i] = &(op.returned_data[*i]); - } - map from; - for(map::iterator i = to_read.get<2>().begin(); - i != to_read.get<2>().end(); - ++i) { - from[i->first.shard].claim(i->second); - } - dout(10) << __func__ << ": " << from << dendl; - int r = ECUtil::decode(sinfo, ec_impl, from, target); - assert(r == 0); - if (attrs) { - op.xattrs.swap(*attrs); - - if (!op.obc) { - // attrs only reference the origin bufferlist (decode from - // ECSubReadReply message) whose size is much greater than attrs - // in recovery. If obc cache it (get_obc maybe cache the attr), - // this causes the whole origin bufferlist would not be free - // until obc is evicted from obc cache. So rebuild the - // bufferlist before cache it. - for (map::iterator it = op.xattrs.begin(); - it != op.xattrs.end(); - ++it) { - it->second.rebuild(); - } - // Need to remove ECUtil::get_hinfo_key() since it should not leak out - // of the backend (see bug #12983) - map sanitized_attrs(op.xattrs); - sanitized_attrs.erase(ECUtil::get_hinfo_key()); - op.obc = get_parent()->get_obc(hoid, sanitized_attrs); - assert(op.obc); - op.recovery_info.size = op.obc->obs.oi.size; - op.recovery_info.oi = op.obc->obs.oi; - } - - ECUtil::HashInfo hinfo(ec_impl->get_chunk_count()); - if (op.obc->obs.oi.size > 0) { - assert(op.xattrs.count(ECUtil::get_hinfo_key())); - bufferlist::iterator bp = op.xattrs[ECUtil::get_hinfo_key()].begin(); - ::decode(hinfo, bp); - } - op.hinfo = unstable_hashinfo_registry.lookup_or_create(hoid, hinfo); - } - assert(op.xattrs.size()); - assert(op.obc); - continue_recovery_op(op, m); -} - -struct SendPushReplies : public Context { - PGBackend::Listener *l; - epoch_t epoch; - map replies; - SendPushReplies( - PGBackend::Listener *l, - epoch_t epoch, - map &in) : l(l), epoch(epoch) { - replies.swap(in); - } - void finish(int) override { - for (map::iterator i = replies.begin(); - i != replies.end(); - ++i) { - l->send_message_osd_cluster(i->first, i->second, epoch); - } - replies.clear(); - } - ~SendPushReplies() override { - for (map::iterator i = replies.begin(); - i != replies.end(); - ++i) { - i->second->put(); - } - replies.clear(); - } -}; - -void ECBackend::dispatch_recovery_messages(RecoveryMessages &m, int priority) -{ - for (map >::iterator i = m.pushes.begin(); - i != m.pushes.end(); - m.pushes.erase(i++)) { - MOSDPGPush *msg = new MOSDPGPush(); - msg->set_priority(priority); - msg->map_epoch = get_parent()->get_epoch(); - msg->min_epoch = get_parent()->get_last_peering_reset_epoch(); - msg->from = get_parent()->whoami_shard(); - msg->pgid = spg_t(get_parent()->get_info().pgid.pgid, i->first.shard); - msg->pushes.swap(i->second); - msg->compute_cost(cct); - get_parent()->send_message( - i->first.osd, - msg); - } - map replies; - for (map >::iterator i = - m.push_replies.begin(); - i != m.push_replies.end(); - m.push_replies.erase(i++)) { - MOSDPGPushReply *msg = new MOSDPGPushReply(); - msg->set_priority(priority); - msg->map_epoch = get_parent()->get_epoch(); - msg->min_epoch = get_parent()->get_last_peering_reset_epoch(); - msg->from = get_parent()->whoami_shard(); - msg->pgid = spg_t(get_parent()->get_info().pgid.pgid, i->first.shard); - msg->replies.swap(i->second); - msg->compute_cost(cct); - replies.insert(make_pair(i->first.osd, msg)); - } - - if (!replies.empty()) { - (m.t).register_on_complete( - get_parent()->bless_context( - new SendPushReplies( - get_parent(), - get_parent()->get_epoch(), - replies))); - get_parent()->queue_transaction(std::move(m.t)); - } - - if (m.reads.empty()) - return; - start_read_op( - priority, - m.reads, - OpRequestRef(), - false, true); -} - -void ECBackend::continue_recovery_op( - RecoveryOp &op, - RecoveryMessages *m) -{ - dout(10) << __func__ << ": continuing " << op << dendl; - while (1) { - switch (op.state) { - case RecoveryOp::IDLE: { - // start read - op.state = RecoveryOp::READING; - assert(!op.recovery_progress.data_complete); - set want(op.missing_on_shards.begin(), op.missing_on_shards.end()); - uint64_t from = op.recovery_progress.data_recovered_to; - uint64_t amount = get_recovery_chunk_size(); - - if (op.recovery_progress.first && op.obc) { - /* We've got the attrs and the hinfo, might as well use them */ - op.hinfo = get_hash_info(op.hoid); - assert(op.hinfo); - op.xattrs = op.obc->attr_cache; - ::encode(*(op.hinfo), op.xattrs[ECUtil::get_hinfo_key()]); - } - - set to_read; - int r = get_min_avail_to_read_shards( - op.hoid, want, true, false, &to_read); - if (r != 0) { - // we must have lost a recovery source - assert(!op.recovery_progress.first); - dout(10) << __func__ << ": canceling recovery op for obj " << op.hoid - << dendl; - get_parent()->cancel_pull(op.hoid); - recovery_ops.erase(op.hoid); - return; - } - m->read( - this, - op.hoid, - op.recovery_progress.data_recovered_to, - amount, - to_read, - op.recovery_progress.first && !op.obc); - op.extent_requested = make_pair( - from, - amount); - dout(10) << __func__ << ": IDLE return " << op << dendl; - return; - } - case RecoveryOp::READING: { - // read completed, start write - assert(op.xattrs.size()); - assert(op.returned_data.size()); - op.state = RecoveryOp::WRITING; - ObjectRecoveryProgress after_progress = op.recovery_progress; - after_progress.data_recovered_to += op.extent_requested.second; - after_progress.first = false; - if (after_progress.data_recovered_to >= op.obc->obs.oi.size) { - after_progress.data_recovered_to = - sinfo.logical_to_next_stripe_offset( - op.obc->obs.oi.size); - after_progress.data_complete = true; - } - for (set::iterator mi = op.missing_on.begin(); - mi != op.missing_on.end(); - ++mi) { - assert(op.returned_data.count(mi->shard)); - m->pushes[*mi].push_back(PushOp()); - PushOp &pop = m->pushes[*mi].back(); - pop.soid = op.hoid; - pop.version = op.v; - pop.data = op.returned_data[mi->shard]; - dout(10) << __func__ << ": before_progress=" << op.recovery_progress - << ", after_progress=" << after_progress - << ", pop.data.length()=" << pop.data.length() - << ", size=" << op.obc->obs.oi.size << dendl; - assert( - pop.data.length() == - sinfo.aligned_logical_offset_to_chunk_offset( - after_progress.data_recovered_to - - op.recovery_progress.data_recovered_to) - ); - if (pop.data.length()) - pop.data_included.insert( - sinfo.aligned_logical_offset_to_chunk_offset( - op.recovery_progress.data_recovered_to), - pop.data.length() - ); - if (op.recovery_progress.first) { - pop.attrset = op.xattrs; - } - pop.recovery_info = op.recovery_info; - pop.before_progress = op.recovery_progress; - pop.after_progress = after_progress; - if (*mi != get_parent()->primary_shard()) - get_parent()->begin_peer_recover( - *mi, - op.hoid); - } - op.returned_data.clear(); - op.waiting_on_pushes = op.missing_on; - op.recovery_progress = after_progress; - dout(10) << __func__ << ": READING return " << op << dendl; - return; - } - case RecoveryOp::WRITING: { - if (op.waiting_on_pushes.empty()) { - if (op.recovery_progress.data_complete) { - op.state = RecoveryOp::COMPLETE; - for (set::iterator i = op.missing_on.begin(); - i != op.missing_on.end(); - ++i) { - if (*i != get_parent()->primary_shard()) { - dout(10) << __func__ << ": on_peer_recover on " << *i - << ", obj " << op.hoid << dendl; - get_parent()->on_peer_recover( - *i, - op.hoid, - op.recovery_info); - } - } - object_stat_sum_t stat; - stat.num_bytes_recovered = op.recovery_info.size; - stat.num_keys_recovered = 0; // ??? op ... omap_entries.size(); ? - stat.num_objects_recovered = 1; - get_parent()->on_global_recover(op.hoid, stat, false); - dout(10) << __func__ << ": WRITING return " << op << dendl; - recovery_ops.erase(op.hoid); - return; - } else { - op.state = RecoveryOp::IDLE; - dout(10) << __func__ << ": WRITING continue " << op << dendl; - continue; - } - } - return; - } - // should never be called once complete - case RecoveryOp::COMPLETE: - default: { - ceph_abort(); - }; - } - } -} - -void ECBackend::run_recovery_op( - RecoveryHandle *_h, - int priority) -{ - ECRecoveryHandle *h = static_cast(_h); - RecoveryMessages m; - for (list::iterator i = h->ops.begin(); - i != h->ops.end(); - ++i) { - dout(10) << __func__ << ": starting " << *i << dendl; - assert(!recovery_ops.count(i->hoid)); - RecoveryOp &op = recovery_ops.insert(make_pair(i->hoid, *i)).first->second; - continue_recovery_op(op, &m); - } - - dispatch_recovery_messages(m, priority); - send_recovery_deletes(priority, h->deletes); - delete _h; -} - -int ECBackend::recover_object( - const hobject_t &hoid, - eversion_t v, - ObjectContextRef head, - ObjectContextRef obc, - RecoveryHandle *_h) -{ - ECRecoveryHandle *h = static_cast(_h); - h->ops.push_back(RecoveryOp()); - h->ops.back().v = v; - h->ops.back().hoid = hoid; - h->ops.back().obc = obc; - h->ops.back().recovery_info.soid = hoid; - h->ops.back().recovery_info.version = v; - if (obc) { - h->ops.back().recovery_info.size = obc->obs.oi.size; - h->ops.back().recovery_info.oi = obc->obs.oi; - } - if (hoid.is_snap()) { - if (obc) { - assert(obc->ssc); - h->ops.back().recovery_info.ss = obc->ssc->snapset; - } else if (head) { - assert(head->ssc); - h->ops.back().recovery_info.ss = head->ssc->snapset; - } else { - assert(0 == "neither obc nor head set for a snap object"); - } - } - h->ops.back().recovery_progress.omap_complete = true; - for (set::const_iterator i = - get_parent()->get_actingbackfill_shards().begin(); - i != get_parent()->get_actingbackfill_shards().end(); - ++i) { - dout(10) << "checking " << *i << dendl; - if (get_parent()->get_shard_missing(*i).is_missing(hoid)) { - h->ops.back().missing_on.insert(*i); - h->ops.back().missing_on_shards.insert(i->shard); - } - } - dout(10) << __func__ << ": built op " << h->ops.back() << dendl; - return 0; -} - -bool ECBackend::can_handle_while_inactive( - OpRequestRef _op) -{ - return false; -} - -bool ECBackend::_handle_message( - OpRequestRef _op) -{ - dout(10) << __func__ << ": " << *_op->get_req() << dendl; - int priority = _op->get_req()->get_priority(); - switch (_op->get_req()->get_type()) { - case MSG_OSD_EC_WRITE: { - // NOTE: this is non-const because handle_sub_write modifies the embedded - // ObjectStore::Transaction in place (and then std::move's it). It does - // not conflict with ECSubWrite's operator<<. - MOSDECSubOpWrite *op = static_cast( - _op->get_nonconst_req()); - handle_sub_write(op->op.from, _op, op->op, _op->pg_trace); - return true; - } - case MSG_OSD_EC_WRITE_REPLY: { - const MOSDECSubOpWriteReply *op = static_cast( - _op->get_req()); - handle_sub_write_reply(op->op.from, op->op, _op->pg_trace); - return true; - } - case MSG_OSD_EC_READ: { - const MOSDECSubOpRead *op = static_cast(_op->get_req()); - MOSDECSubOpReadReply *reply = new MOSDECSubOpReadReply; - reply->pgid = get_parent()->primary_spg_t(); - reply->map_epoch = get_parent()->get_epoch(); - reply->min_epoch = get_parent()->get_interval_start_epoch(); - handle_sub_read(op->op.from, op->op, &(reply->op), _op->pg_trace); - reply->trace = _op->pg_trace; - get_parent()->send_message_osd_cluster( - op->op.from.osd, reply, get_parent()->get_epoch()); - return true; - } - case MSG_OSD_EC_READ_REPLY: { - // NOTE: this is non-const because handle_sub_read_reply steals resulting - // buffers. It does not conflict with ECSubReadReply operator<<. - MOSDECSubOpReadReply *op = static_cast( - _op->get_nonconst_req()); - RecoveryMessages rm; - handle_sub_read_reply(op->op.from, op->op, &rm, _op->pg_trace); - dispatch_recovery_messages(rm, priority); - return true; - } - case MSG_OSD_PG_PUSH: { - const MOSDPGPush *op = static_cast(_op->get_req()); - RecoveryMessages rm; - for (vector::const_iterator i = op->pushes.begin(); - i != op->pushes.end(); - ++i) { - handle_recovery_push(*i, &rm); - } - dispatch_recovery_messages(rm, priority); - return true; - } - case MSG_OSD_PG_PUSH_REPLY: { - const MOSDPGPushReply *op = static_cast( - _op->get_req()); - RecoveryMessages rm; - for (vector::const_iterator i = op->replies.begin(); - i != op->replies.end(); - ++i) { - handle_recovery_push_reply(*i, op->from, &rm); - } - dispatch_recovery_messages(rm, priority); - return true; - } - default: - return false; - } - return false; -} - -struct SubWriteCommitted : public Context { - ECBackend *pg; - OpRequestRef msg; - ceph_tid_t tid; - eversion_t version; - eversion_t last_complete; - const ZTracer::Trace trace; - SubWriteCommitted( - ECBackend *pg, - OpRequestRef msg, - ceph_tid_t tid, - eversion_t version, - eversion_t last_complete, - const ZTracer::Trace &trace) - : pg(pg), msg(msg), tid(tid), - version(version), last_complete(last_complete), trace(trace) {} - void finish(int) override { - if (msg) - msg->mark_event("sub_op_committed"); - pg->sub_write_committed(tid, version, last_complete, trace); - } -}; -void ECBackend::sub_write_committed( - ceph_tid_t tid, eversion_t version, eversion_t last_complete, - const ZTracer::Trace &trace) { - if (get_parent()->pgb_is_primary()) { - ECSubWriteReply reply; - reply.tid = tid; - reply.last_complete = last_complete; - reply.committed = true; - reply.from = get_parent()->whoami_shard(); - handle_sub_write_reply( - get_parent()->whoami_shard(), - reply, trace); - } else { - get_parent()->update_last_complete_ondisk(last_complete); - MOSDECSubOpWriteReply *r = new MOSDECSubOpWriteReply; - r->pgid = get_parent()->primary_spg_t(); - r->map_epoch = get_parent()->get_epoch(); - r->min_epoch = get_parent()->get_interval_start_epoch(); - r->op.tid = tid; - r->op.last_complete = last_complete; - r->op.committed = true; - r->op.from = get_parent()->whoami_shard(); - r->set_priority(CEPH_MSG_PRIO_HIGH); - r->trace = trace; - r->trace.event("sending sub op commit"); - get_parent()->send_message_osd_cluster( - get_parent()->primary_shard().osd, r, get_parent()->get_epoch()); - } -} - -struct SubWriteApplied : public Context { - ECBackend *pg; - OpRequestRef msg; - ceph_tid_t tid; - eversion_t version; - const ZTracer::Trace trace; - SubWriteApplied( - ECBackend *pg, - OpRequestRef msg, - ceph_tid_t tid, - eversion_t version, - const ZTracer::Trace &trace) - : pg(pg), msg(msg), tid(tid), version(version), trace(trace) {} - void finish(int) override { - if (msg) - msg->mark_event("sub_op_applied"); - pg->sub_write_applied(tid, version, trace); - } -}; -void ECBackend::sub_write_applied( - ceph_tid_t tid, eversion_t version, - const ZTracer::Trace &trace) { - parent->op_applied(version); - if (get_parent()->pgb_is_primary()) { - ECSubWriteReply reply; - reply.from = get_parent()->whoami_shard(); - reply.tid = tid; - reply.applied = true; - handle_sub_write_reply( - get_parent()->whoami_shard(), - reply, trace); - } else { - MOSDECSubOpWriteReply *r = new MOSDECSubOpWriteReply; - r->pgid = get_parent()->primary_spg_t(); - r->map_epoch = get_parent()->get_epoch(); - r->min_epoch = get_parent()->get_interval_start_epoch(); - r->op.from = get_parent()->whoami_shard(); - r->op.tid = tid; - r->op.applied = true; - r->set_priority(CEPH_MSG_PRIO_HIGH); - r->trace = trace; - r->trace.event("sending sub op apply"); - get_parent()->send_message_osd_cluster( - get_parent()->primary_shard().osd, r, get_parent()->get_epoch()); - } -} - -void ECBackend::handle_sub_write( - pg_shard_t from, - OpRequestRef msg, - ECSubWrite &op, - const ZTracer::Trace &trace, - Context *on_local_applied_sync) -{ - if (msg) - msg->mark_started(); - trace.event("handle_sub_write"); - assert(!get_parent()->get_log().get_missing().is_missing(op.soid)); - if (!get_parent()->pgb_is_primary()) - get_parent()->update_stats(op.stats); - ObjectStore::Transaction localt; - if (!op.temp_added.empty()) { - add_temp_objs(op.temp_added); - } - if (op.backfill) { - for (set::iterator i = op.temp_removed.begin(); - i != op.temp_removed.end(); - ++i) { - dout(10) << __func__ << ": removing object " << *i - << " since we won't get the transaction" << dendl; - localt.remove( - coll, - ghobject_t( - *i, - ghobject_t::NO_GEN, - get_parent()->whoami_shard().shard)); - } - } - clear_temp_objs(op.temp_removed); - get_parent()->log_operation( - op.log_entries, - op.updated_hit_set_history, - op.trim_to, - op.roll_forward_to, - !op.backfill, - localt); - - PrimaryLogPG *_rPG = dynamic_cast(get_parent()); - if (_rPG && !_rPG->is_undersized() && - (unsigned)get_parent()->whoami_shard().shard >= ec_impl->get_data_chunk_count()) - op.t.set_fadvise_flag(CEPH_OSD_OP_FLAG_FADVISE_DONTNEED); - - if (on_local_applied_sync) { - dout(10) << "Queueing onreadable_sync: " << on_local_applied_sync << dendl; - localt.register_on_applied_sync(on_local_applied_sync); - } - localt.register_on_commit( - get_parent()->bless_context( - new SubWriteCommitted( - this, msg, op.tid, - op.at_version, - get_parent()->get_info().last_complete, trace))); - localt.register_on_applied( - get_parent()->bless_context( - new SubWriteApplied(this, msg, op.tid, op.at_version, trace))); - vector tls; - tls.reserve(2); - tls.push_back(std::move(op.t)); - tls.push_back(std::move(localt)); - get_parent()->queue_transactions(tls, msg); -} - -void ECBackend::handle_sub_read( - pg_shard_t from, - const ECSubRead &op, - ECSubReadReply *reply, - const ZTracer::Trace &trace) -{ - trace.event("handle sub read"); - shard_id_t shard = get_parent()->whoami_shard().shard; - for(auto i = op.to_read.begin(); - i != op.to_read.end(); - ++i) { - int r = 0; - ECUtil::HashInfoRef hinfo; - if (!get_parent()->get_pool().allows_ecoverwrites()) { - hinfo = get_hash_info(i->first); - if (!hinfo) { - r = -EIO; - get_parent()->clog_error() << "Corruption detected: object " << i->first - << " is missing hash_info"; - dout(5) << __func__ << ": No hinfo for " << i->first << dendl; - goto error; - } - } - for (auto j = i->second.begin(); j != i->second.end(); ++j) { - bufferlist bl; - r = store->read( - ch, - ghobject_t(i->first, ghobject_t::NO_GEN, shard), - j->get<0>(), - j->get<1>(), - bl, j->get<2>()); - if (r < 0) { - get_parent()->clog_error() << "Error " << r - << " reading object " - << i->first; - dout(5) << __func__ << ": Error " << r - << " reading " << i->first << dendl; - goto error; - } else { - dout(20) << __func__ << " read request=" << j->get<1>() << " r=" << r << " len=" << bl.length() << dendl; - reply->buffers_read[i->first].push_back( - make_pair( - j->get<0>(), - bl) - ); - } - - if (!get_parent()->get_pool().allows_ecoverwrites()) { - // This shows that we still need deep scrub because large enough files - // are read in sections, so the digest check here won't be done here. - // Do NOT check osd_read_eio_on_bad_digest here. We need to report - // the state of our chunk in case other chunks could substitute. - assert(hinfo->has_chunk_hash()); - if ((bl.length() == hinfo->get_total_chunk_size()) && - (j->get<0>() == 0)) { - dout(20) << __func__ << ": Checking hash of " << i->first << dendl; - bufferhash h(-1); - h << bl; - if (h.digest() != hinfo->get_chunk_hash(shard)) { - get_parent()->clog_error() << "Bad hash for " << i->first << " digest 0x" - << hex << h.digest() << " expected 0x" << hinfo->get_chunk_hash(shard) << dec; - dout(5) << __func__ << ": Bad hash for " << i->first << " digest 0x" - << hex << h.digest() << " expected 0x" << hinfo->get_chunk_hash(shard) << dec << dendl; - r = -EIO; - goto error; - } - } - } - } - continue; -error: - // Do NOT check osd_read_eio_on_bad_digest here. We need to report - // the state of our chunk in case other chunks could substitute. - reply->buffers_read.erase(i->first); - reply->errors[i->first] = r; - } - for (set::iterator i = op.attrs_to_read.begin(); - i != op.attrs_to_read.end(); - ++i) { - dout(10) << __func__ << ": fulfilling attr request on " - << *i << dendl; - if (reply->errors.count(*i)) - continue; - int r = store->getattrs( - ch, - ghobject_t( - *i, ghobject_t::NO_GEN, get_parent()->whoami_shard().shard), - reply->attrs_read[*i]); - if (r < 0) { - reply->buffers_read.erase(*i); - reply->errors[*i] = r; - } - } - reply->from = get_parent()->whoami_shard(); - reply->tid = op.tid; -} - -void ECBackend::handle_sub_write_reply( - pg_shard_t from, - const ECSubWriteReply &op, - const ZTracer::Trace &trace) -{ - map::iterator i = tid_to_op_map.find(op.tid); - assert(i != tid_to_op_map.end()); - if (op.committed) { - trace.event("sub write committed"); - assert(i->second.pending_commit.count(from)); - i->second.pending_commit.erase(from); - if (from != get_parent()->whoami_shard()) { - get_parent()->update_peer_last_complete_ondisk(from, op.last_complete); - } - } - if (op.applied) { - trace.event("sub write applied"); - assert(i->second.pending_apply.count(from)); - i->second.pending_apply.erase(from); - } - - if (i->second.pending_apply.empty() && i->second.on_all_applied) { - dout(10) << __func__ << " Calling on_all_applied on " << i->second << dendl; - i->second.on_all_applied->complete(0); - i->second.on_all_applied = 0; - i->second.trace.event("ec write all applied"); - } - if (i->second.pending_commit.empty() && i->second.on_all_commit) { - dout(10) << __func__ << " Calling on_all_commit on " << i->second << dendl; - i->second.on_all_commit->complete(0); - i->second.on_all_commit = 0; - i->second.trace.event("ec write all committed"); - } - check_ops(); -} - -void ECBackend::handle_sub_read_reply( - pg_shard_t from, - ECSubReadReply &op, - RecoveryMessages *m, - const ZTracer::Trace &trace) -{ - trace.event("ec sub read reply"); - dout(10) << __func__ << ": reply " << op << dendl; - map::iterator iter = tid_to_read_map.find(op.tid); - if (iter == tid_to_read_map.end()) { - //canceled - dout(20) << __func__ << ": dropped " << op << dendl; - return; - } - ReadOp &rop = iter->second; - for (auto i = op.buffers_read.begin(); - i != op.buffers_read.end(); - ++i) { - assert(!op.errors.count(i->first)); // If attribute error we better not have sent a buffer - if (!rop.to_read.count(i->first)) { - // We canceled this read! @see filter_read_op - dout(20) << __func__ << " to_read skipping" << dendl; - continue; - } - list >::const_iterator req_iter = - rop.to_read.find(i->first)->second.to_read.begin(); - list< - boost::tuple< - uint64_t, uint64_t, map > >::iterator riter = - rop.complete[i->first].returned.begin(); - for (list >::iterator j = i->second.begin(); - j != i->second.end(); - ++j, ++req_iter, ++riter) { - assert(req_iter != rop.to_read.find(i->first)->second.to_read.end()); - assert(riter != rop.complete[i->first].returned.end()); - pair adjusted = - sinfo.aligned_offset_len_to_chunk( - make_pair(req_iter->get<0>(), req_iter->get<1>())); - assert(adjusted.first == j->first); - riter->get<2>()[from].claim(j->second); - } - } - for (auto i = op.attrs_read.begin(); - i != op.attrs_read.end(); - ++i) { - assert(!op.errors.count(i->first)); // if read error better not have sent an attribute - if (!rop.to_read.count(i->first)) { - // We canceled this read! @see filter_read_op - dout(20) << __func__ << " to_read skipping" << dendl; - continue; - } - rop.complete[i->first].attrs = map(); - (*(rop.complete[i->first].attrs)).swap(i->second); - } - for (auto i = op.errors.begin(); - i != op.errors.end(); - ++i) { - rop.complete[i->first].errors.insert( - make_pair( - from, - i->second)); - dout(20) << __func__ << " shard=" << from << " error=" << i->second << dendl; - } - - map >::iterator siter = - shard_to_read_map.find(from); - assert(siter != shard_to_read_map.end()); - assert(siter->second.count(op.tid)); - siter->second.erase(op.tid); - - assert(rop.in_progress.count(from)); - rop.in_progress.erase(from); - unsigned is_complete = 0; - // For redundant reads check for completion as each shard comes in, - // or in a non-recovery read check for completion once all the shards read. - // TODO: It would be nice if recovery could send more reads too - if (rop.do_redundant_reads || (!rop.for_recovery && rop.in_progress.empty())) { - for (map::const_iterator iter = - rop.complete.begin(); - iter != rop.complete.end(); - ++iter) { - set have; - for (map::const_iterator j = - iter->second.returned.front().get<2>().begin(); - j != iter->second.returned.front().get<2>().end(); - ++j) { - have.insert(j->first.shard); - dout(20) << __func__ << " have shard=" << j->first.shard << dendl; - } - set want_to_read, dummy_minimum; - get_want_to_read_shards(&want_to_read); - int err; - if ((err = ec_impl->minimum_to_decode(want_to_read, have, &dummy_minimum)) < 0) { - dout(20) << __func__ << " minimum_to_decode failed" << dendl; - if (rop.in_progress.empty()) { - // If we don't have enough copies and we haven't sent reads for all shards - // we can send the rest of the reads, if any. - if (!rop.do_redundant_reads) { - int r = send_all_remaining_reads(iter->first, rop); - if (r == 0) { - // We added to in_progress and not incrementing is_complete - continue; - } - // Couldn't read any additional shards so handle as completed with errors - } - // We don't want to confuse clients / RBD with objectstore error - // values in particular ENOENT. We may have different error returns - // from different shards, so we'll return minimum_to_decode() error - // (usually EIO) to reader. It is likely an error here is due to a - // damaged pg. - rop.complete[iter->first].r = err; - ++is_complete; - } - } else { - assert(rop.complete[iter->first].r == 0); - if (!rop.complete[iter->first].errors.empty()) { - if (cct->_conf->osd_read_ec_check_for_errors) { - dout(10) << __func__ << ": Not ignoring errors, use one shard err=" << err << dendl; - err = rop.complete[iter->first].errors.begin()->second; - rop.complete[iter->first].r = err; - } else { - get_parent()->clog_warn() << "Error(s) ignored for " - << iter->first << " enough copies available"; - dout(10) << __func__ << " Error(s) ignored for " << iter->first - << " enough copies available" << dendl; - rop.complete[iter->first].errors.clear(); - } - } - ++is_complete; - } - } - } - if (rop.in_progress.empty() || is_complete == rop.complete.size()) { - dout(20) << __func__ << " Complete: " << rop << dendl; - rop.trace.event("ec read complete"); - complete_read_op(rop, m); - } else { - dout(10) << __func__ << " readop not complete: " << rop << dendl; - } -} - -void ECBackend::complete_read_op(ReadOp &rop, RecoveryMessages *m) -{ - map::iterator reqiter = - rop.to_read.begin(); - map::iterator resiter = - rop.complete.begin(); - assert(rop.to_read.size() == rop.complete.size()); - for (; reqiter != rop.to_read.end(); ++reqiter, ++resiter) { - if (reqiter->second.cb) { - pair arg( - m, resiter->second); - reqiter->second.cb->complete(arg); - reqiter->second.cb = NULL; - } - } - tid_to_read_map.erase(rop.tid); -} - -struct FinishReadOp : public GenContext { - ECBackend *ec; - ceph_tid_t tid; - FinishReadOp(ECBackend *ec, ceph_tid_t tid) : ec(ec), tid(tid) {} - void finish(ThreadPool::TPHandle &handle) override { - auto ropiter = ec->tid_to_read_map.find(tid); - assert(ropiter != ec->tid_to_read_map.end()); - int priority = ropiter->second.priority; - RecoveryMessages rm; - ec->complete_read_op(ropiter->second, &rm); - ec->dispatch_recovery_messages(rm, priority); - } -}; - -void ECBackend::filter_read_op( - const OSDMapRef& osdmap, - ReadOp &op) -{ - set to_cancel; - for (map >::iterator i = op.source_to_obj.begin(); - i != op.source_to_obj.end(); - ++i) { - if (osdmap->is_down(i->first.osd)) { - to_cancel.insert(i->second.begin(), i->second.end()); - op.in_progress.erase(i->first); - continue; - } - } - - if (to_cancel.empty()) - return; - - for (map >::iterator i = op.source_to_obj.begin(); - i != op.source_to_obj.end(); - ) { - for (set::iterator j = i->second.begin(); - j != i->second.end(); - ) { - if (to_cancel.count(*j)) - i->second.erase(j++); - else - ++j; - } - if (i->second.empty()) { - op.source_to_obj.erase(i++); - } else { - assert(!osdmap->is_down(i->first.osd)); - ++i; - } - } - - for (set::iterator i = to_cancel.begin(); - i != to_cancel.end(); - ++i) { - get_parent()->cancel_pull(*i); - - assert(op.to_read.count(*i)); - read_request_t &req = op.to_read.find(*i)->second; - dout(10) << __func__ << ": canceling " << req - << " for obj " << *i << dendl; - assert(req.cb); - delete req.cb; - req.cb = NULL; - - op.to_read.erase(*i); - op.complete.erase(*i); - recovery_ops.erase(*i); - } - - if (op.in_progress.empty()) { - get_parent()->schedule_recovery_work( - get_parent()->bless_gencontext( - new FinishReadOp(this, op.tid))); - } -} - -void ECBackend::check_recovery_sources(const OSDMapRef& osdmap) -{ - set tids_to_filter; - for (map >::iterator - i = shard_to_read_map.begin(); - i != shard_to_read_map.end(); - ) { - if (osdmap->is_down(i->first.osd)) { - tids_to_filter.insert(i->second.begin(), i->second.end()); - shard_to_read_map.erase(i++); - } else { - ++i; - } - } - for (set::iterator i = tids_to_filter.begin(); - i != tids_to_filter.end(); - ++i) { - map::iterator j = tid_to_read_map.find(*i); - assert(j != tid_to_read_map.end()); - filter_read_op(osdmap, j->second); - } -} - -void ECBackend::on_change() -{ - dout(10) << __func__ << dendl; - - completed_to = eversion_t(); - committed_to = eversion_t(); - pipeline_state.clear(); - waiting_reads.clear(); - waiting_state.clear(); - waiting_commit.clear(); - for (auto &&op: tid_to_op_map) { - cache.release_write_pin(op.second.pin); - } - tid_to_op_map.clear(); - - for (map::iterator i = tid_to_read_map.begin(); - i != tid_to_read_map.end(); - ++i) { - dout(10) << __func__ << ": cancelling " << i->second << dendl; - for (map::iterator j = - i->second.to_read.begin(); - j != i->second.to_read.end(); - ++j) { - delete j->second.cb; - j->second.cb = 0; - } - } - tid_to_read_map.clear(); - in_progress_client_reads.clear(); - shard_to_read_map.clear(); - clear_recovery_state(); -} - -void ECBackend::clear_recovery_state() -{ - recovery_ops.clear(); -} - -void ECBackend::on_flushed() -{ -} - -void ECBackend::dump_recovery_info(Formatter *f) const -{ - f->open_array_section("recovery_ops"); - for (map::const_iterator i = recovery_ops.begin(); - i != recovery_ops.end(); - ++i) { - f->open_object_section("op"); - i->second.dump(f); - f->close_section(); - } - f->close_section(); - f->open_array_section("read_ops"); - for (map::const_iterator i = tid_to_read_map.begin(); - i != tid_to_read_map.end(); - ++i) { - f->open_object_section("read_op"); - i->second.dump(f); - f->close_section(); - } - f->close_section(); -} - -void ECBackend::submit_transaction( - const hobject_t &hoid, - const object_stat_sum_t &delta_stats, - const eversion_t &at_version, - PGTransactionUPtr &&t, - const eversion_t &trim_to, - const eversion_t &roll_forward_to, - const vector &log_entries, - boost::optional &hset_history, - Context *on_local_applied_sync, - Context *on_all_applied, - Context *on_all_commit, - ceph_tid_t tid, - osd_reqid_t reqid, - OpRequestRef client_op - ) -{ - assert(!tid_to_op_map.count(tid)); - Op *op = &(tid_to_op_map[tid]); - op->hoid = hoid; - op->delta_stats = delta_stats; - op->version = at_version; - op->trim_to = trim_to; - op->roll_forward_to = MAX(roll_forward_to, committed_to); - op->log_entries = log_entries; - std::swap(op->updated_hit_set_history, hset_history); - op->on_local_applied_sync = on_local_applied_sync; - op->on_all_applied = on_all_applied; - op->on_all_commit = on_all_commit; - op->tid = tid; - op->reqid = reqid; - op->client_op = client_op; - if (client_op) - op->trace = client_op->pg_trace; - - dout(10) << __func__ << ": op " << *op << " starting" << dendl; - start_rmw(op, std::move(t)); - dout(10) << "onreadable_sync: " << op->on_local_applied_sync << dendl; -} - -void ECBackend::call_write_ordered(std::function &&cb) { - if (!waiting_state.empty()) { - waiting_state.back().on_write.emplace_back(std::move(cb)); - } else if (!waiting_reads.empty()) { - waiting_reads.back().on_write.emplace_back(std::move(cb)); - } else { - // Nothing earlier in the pipeline, just call it - cb(); - } -} - -int ECBackend::get_min_avail_to_read_shards( - const hobject_t &hoid, - const set &want, - bool for_recovery, - bool do_redundant_reads, - set *to_read) -{ - // Make sure we don't do redundant reads for recovery - assert(!for_recovery || !do_redundant_reads); - - set have; - map shards; - - for (set::const_iterator i = - get_parent()->get_acting_shards().begin(); - i != get_parent()->get_acting_shards().end(); - ++i) { - dout(10) << __func__ << ": checking acting " << *i << dendl; - const pg_missing_t &missing = get_parent()->get_shard_missing(*i); - if (!missing.is_missing(hoid)) { - assert(!have.count(i->shard)); - have.insert(i->shard); - assert(!shards.count(i->shard)); - shards.insert(make_pair(i->shard, *i)); - } - } - - if (for_recovery) { - for (set::const_iterator i = - get_parent()->get_backfill_shards().begin(); - i != get_parent()->get_backfill_shards().end(); - ++i) { - if (have.count(i->shard)) { - assert(shards.count(i->shard)); - continue; - } - dout(10) << __func__ << ": checking backfill " << *i << dendl; - assert(!shards.count(i->shard)); - const pg_info_t &info = get_parent()->get_shard_info(*i); - const pg_missing_t &missing = get_parent()->get_shard_missing(*i); - if (hoid < info.last_backfill && - !missing.is_missing(hoid)) { - have.insert(i->shard); - shards.insert(make_pair(i->shard, *i)); - } - } - - map>::const_iterator miter = - get_parent()->get_missing_loc_shards().find(hoid); - if (miter != get_parent()->get_missing_loc_shards().end()) { - for (set::iterator i = miter->second.begin(); - i != miter->second.end(); - ++i) { - dout(10) << __func__ << ": checking missing_loc " << *i << dendl; - auto m = get_parent()->maybe_get_shard_missing(*i); - if (m) { - assert(!(*m).is_missing(hoid)); - } - have.insert(i->shard); - shards.insert(make_pair(i->shard, *i)); - } - } - } - - set need; - int r = ec_impl->minimum_to_decode(want, have, &need); - if (r < 0) - return r; - - if (do_redundant_reads) { - need.swap(have); - } - - if (!to_read) - return 0; - - for (set::iterator i = need.begin(); - i != need.end(); - ++i) { - assert(shards.count(shard_id_t(*i))); - to_read->insert(shards[shard_id_t(*i)]); - } - return 0; -} - -int ECBackend::get_remaining_shards( - const hobject_t &hoid, - const set &avail, - set *to_read) -{ - set need; - map shards; - - for (set::const_iterator i = - get_parent()->get_acting_shards().begin(); - i != get_parent()->get_acting_shards().end(); - ++i) { - dout(10) << __func__ << ": checking acting " << *i << dendl; - const pg_missing_t &missing = get_parent()->get_shard_missing(*i); - if (!missing.is_missing(hoid)) { - assert(!need.count(i->shard)); - need.insert(i->shard); - assert(!shards.count(i->shard)); - shards.insert(make_pair(i->shard, *i)); - } - } - - if (!to_read) - return 0; - - for (set::iterator i = need.begin(); - i != need.end(); - ++i) { - assert(shards.count(shard_id_t(*i))); - if (avail.find(*i) == avail.end()) - to_read->insert(shards[shard_id_t(*i)]); - } - return 0; -} - -void ECBackend::start_read_op( - int priority, - map &to_read, - OpRequestRef _op, - bool do_redundant_reads, - bool for_recovery) -{ - ceph_tid_t tid = get_parent()->get_tid(); - assert(!tid_to_read_map.count(tid)); - auto &op = tid_to_read_map.emplace( - tid, - ReadOp( - priority, - tid, - do_redundant_reads, - for_recovery, - _op, - std::move(to_read))).first->second; - dout(10) << __func__ << ": starting " << op << dendl; - if (_op) { - op.trace = _op->pg_trace; - op.trace.event("start ec read"); - } - do_read_op(op); -} - -void ECBackend::do_read_op(ReadOp &op) -{ - int priority = op.priority; - ceph_tid_t tid = op.tid; - - dout(10) << __func__ << ": starting read " << op << dendl; - - map messages; - for (map::iterator i = op.to_read.begin(); - i != op.to_read.end(); - ++i) { - bool need_attrs = i->second.want_attrs; - for (set::const_iterator j = i->second.need.begin(); - j != i->second.need.end(); - ++j) { - if (need_attrs) { - messages[*j].attrs_to_read.insert(i->first); - need_attrs = false; - } - op.obj_to_source[i->first].insert(*j); - op.source_to_obj[*j].insert(i->first); - } - for (list >::const_iterator j = - i->second.to_read.begin(); - j != i->second.to_read.end(); - ++j) { - pair chunk_off_len = - sinfo.aligned_offset_len_to_chunk(make_pair(j->get<0>(), j->get<1>())); - for (set::const_iterator k = i->second.need.begin(); - k != i->second.need.end(); - ++k) { - messages[*k].to_read[i->first].push_back( - boost::make_tuple( - chunk_off_len.first, - chunk_off_len.second, - j->get<2>())); - } - assert(!need_attrs); - } - } - - for (map::iterator i = messages.begin(); - i != messages.end(); - ++i) { - op.in_progress.insert(i->first); - shard_to_read_map[i->first].insert(op.tid); - i->second.tid = tid; - MOSDECSubOpRead *msg = new MOSDECSubOpRead; - msg->set_priority(priority); - msg->pgid = spg_t( - get_parent()->whoami_spg_t().pgid, - i->first.shard); - msg->map_epoch = get_parent()->get_epoch(); - msg->min_epoch = get_parent()->get_interval_start_epoch(); - msg->op = i->second; - msg->op.from = get_parent()->whoami_shard(); - msg->op.tid = tid; - if (op.trace) { - // initialize a child span for this shard - msg->trace.init("ec sub read", nullptr, &op.trace); - msg->trace.keyval("shard", i->first.shard.id); - } - get_parent()->send_message_osd_cluster( - i->first.osd, - msg, - get_parent()->get_epoch()); - } - dout(10) << __func__ << ": started " << op << dendl; -} - -ECUtil::HashInfoRef ECBackend::get_hash_info( - const hobject_t &hoid, bool checks, const map *attrs) -{ - dout(10) << __func__ << ": Getting attr on " << hoid << dendl; - ECUtil::HashInfoRef ref = unstable_hashinfo_registry.lookup(hoid); - if (!ref) { - dout(10) << __func__ << ": not in cache " << hoid << dendl; - struct stat st; - int r = store->stat( - ch, - ghobject_t(hoid, ghobject_t::NO_GEN, get_parent()->whoami_shard().shard), - &st); - ECUtil::HashInfo hinfo(ec_impl->get_chunk_count()); - // XXX: What does it mean if there is no object on disk? - if (r >= 0) { - dout(10) << __func__ << ": found on disk, size " << st.st_size << dendl; - bufferlist bl; - if (attrs) { - map::const_iterator k = attrs->find(ECUtil::get_hinfo_key()); - if (k == attrs->end()) { - dout(5) << __func__ << " " << hoid << " missing hinfo attr" << dendl; - } else { - bl.push_back(k->second); - } - } else { - r = store->getattr( - ch, - ghobject_t(hoid, ghobject_t::NO_GEN, get_parent()->whoami_shard().shard), - ECUtil::get_hinfo_key(), - bl); - if (r < 0) { - dout(5) << __func__ << ": getattr failed: " << cpp_strerror(r) << dendl; - bl.clear(); // just in case - } - } - if (bl.length() > 0) { - bufferlist::iterator bp = bl.begin(); - ::decode(hinfo, bp); - if (checks && hinfo.get_total_chunk_size() != (uint64_t)st.st_size) { - dout(0) << __func__ << ": Mismatch of total_chunk_size " - << hinfo.get_total_chunk_size() << dendl; - return ECUtil::HashInfoRef(); - } - } else if (st.st_size > 0) { // If empty object and no hinfo, create it - return ECUtil::HashInfoRef(); - } - } - ref = unstable_hashinfo_registry.lookup_or_create(hoid, hinfo); - } - return ref; -} - -void ECBackend::start_rmw(Op *op, PGTransactionUPtr &&t) -{ - assert(op); - - op->plan = ECTransaction::get_write_plan( - sinfo, - std::move(t), - [&](const hobject_t &i) { - ECUtil::HashInfoRef ref = get_hash_info(i, false); - if (!ref) { - derr << __func__ << ": get_hash_info(" << i << ")" - << " returned a null pointer and there is no " - << " way to recover from such an error in this " - << " context" << dendl; - ceph_abort(); - } - return ref; - }, - get_parent()->get_dpp()); - - dout(10) << __func__ << ": " << *op << dendl; - - waiting_state.push_back(*op); - check_ops(); -} - -bool ECBackend::try_state_to_reads() -{ - if (waiting_state.empty()) - return false; - - Op *op = &(waiting_state.front()); - if (op->requires_rmw() && pipeline_state.cache_invalid()) { - assert(get_parent()->get_pool().allows_ecoverwrites()); - dout(20) << __func__ << ": blocking " << *op - << " because it requires an rmw and the cache is invalid " - << pipeline_state - << dendl; - return false; - } - - if (op->invalidates_cache()) { - dout(20) << __func__ << ": invalidating cache after this op" - << dendl; - pipeline_state.invalidate(); - op->using_cache = false; - } else { - op->using_cache = pipeline_state.caching_enabled(); - } - - waiting_state.pop_front(); - waiting_reads.push_back(*op); - - if (op->using_cache) { - cache.open_write_pin(op->pin); - - extent_set empty; - for (auto &&hpair: op->plan.will_write) { - auto to_read_plan_iter = op->plan.to_read.find(hpair.first); - const extent_set &to_read_plan = - to_read_plan_iter == op->plan.to_read.end() ? - empty : - to_read_plan_iter->second; - - extent_set remote_read = cache.reserve_extents_for_rmw( - hpair.first, - op->pin, - hpair.second, - to_read_plan); - - extent_set pending_read = to_read_plan; - pending_read.subtract(remote_read); - - if (!remote_read.empty()) { - op->remote_read[hpair.first] = std::move(remote_read); - } - if (!pending_read.empty()) { - op->pending_read[hpair.first] = std::move(pending_read); - } - } - } else { - op->remote_read = op->plan.to_read; - } - - dout(10) << __func__ << ": " << *op << dendl; - - if (!op->remote_read.empty()) { - assert(get_parent()->get_pool().allows_ecoverwrites()); - objects_read_async_no_cache( - op->remote_read, - [this, op](map > &&results) { - for (auto &&i: results) { - op->remote_read_result.emplace(i.first, i.second.second); - } - check_ops(); - }); - } - - return true; -} - -bool ECBackend::try_reads_to_commit() -{ - if (waiting_reads.empty()) - return false; - Op *op = &(waiting_reads.front()); - if (op->read_in_progress()) - return false; - waiting_reads.pop_front(); - waiting_commit.push_back(*op); - - dout(10) << __func__ << ": starting commit on " << *op << dendl; - dout(20) << __func__ << ": " << cache << dendl; - - get_parent()->apply_stats( - op->hoid, - op->delta_stats); - - if (op->using_cache) { - for (auto &&hpair: op->pending_read) { - op->remote_read_result[hpair.first].insert( - cache.get_remaining_extents_for_rmw( - hpair.first, - op->pin, - hpair.second)); - } - op->pending_read.clear(); - } else { - assert(op->pending_read.empty()); - } - - map trans; - for (set::const_iterator i = - get_parent()->get_actingbackfill_shards().begin(); - i != get_parent()->get_actingbackfill_shards().end(); - ++i) { - trans[i->shard]; - } - - op->trace.event("start ec write"); - - map written; - if (op->plan.t) { - ECTransaction::generate_transactions( - op->plan, - ec_impl, - get_parent()->get_info().pgid.pgid, - (get_osdmap()->require_osd_release < CEPH_RELEASE_KRAKEN), - sinfo, - op->remote_read_result, - op->log_entries, - &written, - &trans, - &(op->temp_added), - &(op->temp_cleared), - get_parent()->get_dpp()); - } - - dout(20) << __func__ << ": " << cache << dendl; - dout(20) << __func__ << ": written: " << written << dendl; - dout(20) << __func__ << ": op: " << *op << dendl; - - if (!get_parent()->get_pool().allows_ecoverwrites()) { - for (auto &&i: op->log_entries) { - if (i.requires_kraken()) { - derr << __func__ << ": log entry " << i << " requires kraken" - << " but overwrites are not enabled!" << dendl; - ceph_abort(); - } - } - } - - map written_set; - for (auto &&i: written) { - written_set[i.first] = i.second.get_interval_set(); - } - dout(20) << __func__ << ": written_set: " << written_set << dendl; - assert(written_set == op->plan.will_write); - - if (op->using_cache) { - for (auto &&hpair: written) { - dout(20) << __func__ << ": " << hpair << dendl; - cache.present_rmw_update(hpair.first, op->pin, hpair.second); - } - } - op->remote_read.clear(); - op->remote_read_result.clear(); - - dout(10) << "onreadable_sync: " << op->on_local_applied_sync << dendl; - ObjectStore::Transaction empty; - bool should_write_local = false; - ECSubWrite local_write_op; - for (set::const_iterator i = - get_parent()->get_actingbackfill_shards().begin(); - i != get_parent()->get_actingbackfill_shards().end(); - ++i) { - op->pending_apply.insert(*i); - op->pending_commit.insert(*i); - map::iterator iter = - trans.find(i->shard); - assert(iter != trans.end()); - bool should_send = get_parent()->should_send_op(*i, op->hoid); - const pg_stat_t &stats = - should_send ? - get_info().stats : - parent->get_shard_info().find(*i)->second.stats; - - ECSubWrite sop( - get_parent()->whoami_shard(), - op->tid, - op->reqid, - op->hoid, - stats, - should_send ? iter->second : empty, - op->version, - op->trim_to, - op->roll_forward_to, - op->log_entries, - op->updated_hit_set_history, - op->temp_added, - op->temp_cleared, - !should_send); - - ZTracer::Trace trace; - if (op->trace) { - // initialize a child span for this shard - trace.init("ec sub write", nullptr, &op->trace); - trace.keyval("shard", i->shard.id); - } - - if (*i == get_parent()->whoami_shard()) { - should_write_local = true; - local_write_op.claim(sop); - } else { - MOSDECSubOpWrite *r = new MOSDECSubOpWrite(sop); - r->pgid = spg_t(get_parent()->primary_spg_t().pgid, i->shard); - r->map_epoch = get_parent()->get_epoch(); - r->min_epoch = get_parent()->get_interval_start_epoch(); - r->trace = trace; - get_parent()->send_message_osd_cluster( - i->osd, r, get_parent()->get_epoch()); - } - } - if (should_write_local) { - handle_sub_write( - get_parent()->whoami_shard(), - op->client_op, - local_write_op, - op->trace, - op->on_local_applied_sync); - op->on_local_applied_sync = 0; - } - - for (auto i = op->on_write.begin(); - i != op->on_write.end(); - op->on_write.erase(i++)) { - (*i)(); - } - - return true; -} - -bool ECBackend::try_finish_rmw() -{ - if (waiting_commit.empty()) - return false; - Op *op = &(waiting_commit.front()); - if (op->write_in_progress()) - return false; - waiting_commit.pop_front(); - - dout(10) << __func__ << ": " << *op << dendl; - dout(20) << __func__ << ": " << cache << dendl; - - if (op->roll_forward_to > completed_to) - completed_to = op->roll_forward_to; - if (op->version > committed_to) - committed_to = op->version; - - if (get_osdmap()->require_osd_release >= CEPH_RELEASE_KRAKEN) { - if (op->version > get_parent()->get_log().get_can_rollback_to() && - waiting_reads.empty() && - waiting_commit.empty()) { - // submit a dummy transaction to kick the rollforward - auto tid = get_parent()->get_tid(); - Op *nop = &(tid_to_op_map[tid]); - nop->hoid = op->hoid; - nop->trim_to = op->trim_to; - nop->roll_forward_to = op->version; - nop->tid = tid; - nop->reqid = op->reqid; - waiting_reads.push_back(*nop); - } - } - - if (op->using_cache) { - cache.release_write_pin(op->pin); - } - tid_to_op_map.erase(op->tid); - - if (waiting_reads.empty() && - waiting_commit.empty()) { - pipeline_state.clear(); - dout(20) << __func__ << ": clearing pipeline_state " - << pipeline_state - << dendl; - } - return true; -} - -void ECBackend::check_ops() -{ - while (try_state_to_reads() || - try_reads_to_commit() || - try_finish_rmw()); -} - -int ECBackend::objects_read_sync( - const hobject_t &hoid, - uint64_t off, - uint64_t len, - uint32_t op_flags, - bufferlist *bl) -{ - return -EOPNOTSUPP; -} - -void ECBackend::objects_read_async( - const hobject_t &hoid, - const list, - pair > > &to_read, - Context *on_complete, - bool fast_read) -{ - map > > - reads; - - uint32_t flags = 0; - extent_set es; - for (list, - pair > >::const_iterator i = - to_read.begin(); - i != to_read.end(); - ++i) { - pair tmp = - sinfo.offset_len_to_stripe_bounds( - make_pair(i->first.get<0>(), i->first.get<1>())); - - extent_set esnew; - esnew.insert(tmp.first, tmp.second); - es.union_of(esnew); - flags |= i->first.get<2>(); - } - - if (!es.empty()) { - auto &offsets = reads[hoid]; - for (auto j = es.begin(); - j != es.end(); - ++j) { - offsets.push_back( - boost::make_tuple( - j.get_start(), - j.get_len(), - flags)); - } - } - - struct cb { - ECBackend *ec; - hobject_t hoid; - list, - pair > > to_read; - unique_ptr on_complete; - cb(const cb&) = delete; - cb(cb &&) = default; - cb(ECBackend *ec, - const hobject_t &hoid, - const list, - pair > > &to_read, - Context *on_complete) - : ec(ec), - hoid(hoid), - to_read(to_read), - on_complete(on_complete) {} - void operator()(map > &&results) { - auto dpp = ec->get_parent()->get_dpp(); - ldpp_dout(dpp, 20) << "objects_read_async_cb: got: " << results - << dendl; - ldpp_dout(dpp, 20) << "objects_read_async_cb: cache: " << ec->cache - << dendl; - - auto &got = results[hoid]; - - int r = 0; - for (auto &&read: to_read) { - if (got.first < 0) { - if (read.second.second) { - read.second.second->complete(got.first); - } - if (r == 0) - r = got.first; - } else { - assert(read.second.first); - uint64_t offset = read.first.get<0>(); - uint64_t length = read.first.get<1>(); - auto range = got.second.get_containing_range(offset, length); - assert(range.first != range.second); - assert(range.first.get_off() <= offset); - assert( - (offset + length) <= - (range.first.get_off() + range.first.get_len())); - read.second.first->substr_of( - range.first.get_val(), - offset - range.first.get_off(), - length); - if (read.second.second) { - read.second.second->complete(length); - read.second.second = nullptr; - } - } - } - to_read.clear(); - if (on_complete) { - on_complete.release()->complete(r); - } - } - ~cb() { - for (auto &&i: to_read) { - delete i.second.second; - } - to_read.clear(); - } - }; - objects_read_and_reconstruct( - reads, - fast_read, - make_gen_lambda_context< - map > &&, cb>( - cb(this, - hoid, - to_read, - on_complete))); -} - -struct CallClientContexts : - public GenContext &> { - hobject_t hoid; - ECBackend *ec; - ECBackend::ClientAsyncReadStatus *status; - list > to_read; - CallClientContexts( - hobject_t hoid, - ECBackend *ec, - ECBackend::ClientAsyncReadStatus *status, - const list > &to_read) - : hoid(hoid), ec(ec), status(status), to_read(to_read) {} - void finish(pair &in) override { - ECBackend::read_result_t &res = in.second; - extent_map result; - if (res.r != 0) - goto out; - assert(res.returned.size() == to_read.size()); - assert(res.r == 0); - assert(res.errors.empty()); - for (auto &&read: to_read) { - pair adjusted = - ec->sinfo.offset_len_to_stripe_bounds( - make_pair(read.get<0>(), read.get<1>())); - assert(res.returned.front().get<0>() == adjusted.first && - res.returned.front().get<1>() == adjusted.second); - map to_decode; - bufferlist bl; - for (map::iterator j = - res.returned.front().get<2>().begin(); - j != res.returned.front().get<2>().end(); - ++j) { - to_decode[j->first.shard].claim(j->second); - } - int r = ECUtil::decode( - ec->sinfo, - ec->ec_impl, - to_decode, - &bl); - if (r < 0) { - res.r = r; - goto out; - } - bufferlist trimmed; - trimmed.substr_of( - bl, - read.get<0>() - adjusted.first, - MIN(read.get<1>(), - bl.length() - (read.get<0>() - adjusted.first))); - result.insert( - read.get<0>(), trimmed.length(), std::move(trimmed)); - res.returned.pop_front(); - } -out: - status->complete_object(hoid, res.r, std::move(result)); - ec->kick_reads(); - } -}; - -void ECBackend::objects_read_and_reconstruct( - const map > - > &reads, - bool fast_read, - GenContextURef > &&> &&func) -{ - in_progress_client_reads.emplace_back( - reads.size(), std::move(func)); - if (!reads.size()) { - kick_reads(); - return; - } - - set want_to_read; - get_want_to_read_shards(&want_to_read); - - map for_read_op; - for (auto &&to_read: reads) { - set shards; - int r = get_min_avail_to_read_shards( - to_read.first, - want_to_read, - false, - fast_read, - &shards); - assert(r == 0); - - CallClientContexts *c = new CallClientContexts( - to_read.first, - this, - &(in_progress_client_reads.back()), - to_read.second); - for_read_op.insert( - make_pair( - to_read.first, - read_request_t( - to_read.second, - shards, - false, - c))); - } - - start_read_op( - CEPH_MSG_PRIO_DEFAULT, - for_read_op, - OpRequestRef(), - fast_read, false); - return; -} - - -int ECBackend::send_all_remaining_reads( - const hobject_t &hoid, - ReadOp &rop) -{ - set already_read; - const set& ots = rop.obj_to_source[hoid]; - for (set::iterator i = ots.begin(); i != ots.end(); ++i) - already_read.insert(i->shard); - dout(10) << __func__ << " have/error shards=" << already_read << dendl; - set shards; - int r = get_remaining_shards(hoid, already_read, &shards); - if (r) - return r; - if (shards.empty()) - return -EIO; - - dout(10) << __func__ << " Read remaining shards " << shards << dendl; - - // TODOSAM: this doesn't seem right - list > offsets = - rop.to_read.find(hoid)->second.to_read; - GenContext &> *c = - rop.to_read.find(hoid)->second.cb; - - map for_read_op; - for_read_op.insert( - make_pair( - hoid, - read_request_t( - offsets, - shards, - false, - c))); - - rop.to_read.swap(for_read_op); - do_read_op(rop); - return 0; -} - -int ECBackend::objects_get_attrs( - const hobject_t &hoid, - map *out) -{ - int r = store->getattrs( - ch, - ghobject_t(hoid, ghobject_t::NO_GEN, get_parent()->whoami_shard().shard), - *out); - if (r < 0) - return r; - - for (map::iterator i = out->begin(); - i != out->end(); - ) { - if (ECUtil::is_hinfo_key_string(i->first)) - out->erase(i++); - else - ++i; - } - return r; -} - -void ECBackend::rollback_append( - const hobject_t &hoid, - uint64_t old_size, - ObjectStore::Transaction *t) -{ - assert(old_size % sinfo.get_stripe_width() == 0); - t->truncate( - coll, - ghobject_t(hoid, ghobject_t::NO_GEN, get_parent()->whoami_shard().shard), - sinfo.aligned_logical_offset_to_chunk_offset( - old_size)); -} - -void ECBackend::be_deep_scrub( - const hobject_t &poid, - uint32_t seed, - ScrubMap::object &o, - ThreadPool::TPHandle &handle) { - bufferhash h(-1); // we always used -1 - int r; - uint64_t stride = cct->_conf->osd_deep_scrub_stride; - if (stride % sinfo.get_chunk_size()) - stride += sinfo.get_chunk_size() - (stride % sinfo.get_chunk_size()); - uint64_t pos = 0; - - uint32_t fadvise_flags = CEPH_OSD_OP_FLAG_FADVISE_SEQUENTIAL | CEPH_OSD_OP_FLAG_FADVISE_DONTNEED; - - while (true) { - bufferlist bl; - handle.reset_tp_timeout(); - r = store->read( - ch, - ghobject_t( - poid, ghobject_t::NO_GEN, get_parent()->whoami_shard().shard), - pos, - stride, bl, - fadvise_flags); - if (r < 0) - break; - if (bl.length() % sinfo.get_chunk_size()) { - r = -EIO; - break; - } - pos += r; - h << bl; - if ((unsigned)r < stride) - break; - } - - if (r == -EIO) { - dout(0) << "_scan_list " << poid << " got " - << r << " on read, read_error" << dendl; - o.read_error = true; - return; - } - - ECUtil::HashInfoRef hinfo = get_hash_info(poid, false, &o.attrs); - if (!hinfo) { - dout(0) << "_scan_list " << poid << " could not retrieve hash info" << dendl; - o.read_error = true; - o.digest_present = false; - return; - } else { - if (!get_parent()->get_pool().allows_ecoverwrites()) { - assert(hinfo->has_chunk_hash()); - if (hinfo->get_total_chunk_size() != pos) { - dout(0) << "_scan_list " << poid << " got incorrect size on read" << dendl; - o.ec_size_mismatch = true; - return; - } - - if (hinfo->get_chunk_hash(get_parent()->whoami_shard().shard) != h.digest()) { - dout(0) << "_scan_list " << poid << " got incorrect hash on read" << dendl; - o.ec_hash_mismatch = true; - return; - } - - /* We checked above that we match our own stored hash. We cannot - * send a hash of the actual object, so instead we simply send - * our locally stored hash of shard 0 on the assumption that if - * we match our chunk hash and our recollection of the hash for - * chunk 0 matches that of our peers, there is likely no corruption. - */ - o.digest = hinfo->get_chunk_hash(0); - o.digest_present = true; - } else { - /* Hack! We must be using partial overwrites, and partial overwrites - * don't support deep-scrub yet - */ - o.digest = 0; - o.digest_present = true; - } - } - - o.omap_digest = seed; - o.omap_digest_present = true; -}