// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- // vim: ts=8 sw=2 smarttab /* * Ceph - scalable distributed file system * * Copyright (C) 2013 Inktank Storage, Inc. * * This is free software; you can redistribute it and/or * modify it under the terms of the GNU Lesser General Public * License version 2.1, as published by the Free Software * Foundation. See file COPYING. * */ #include #include #include "ECBackend.h" #include "messages/MOSDPGPush.h" #include "messages/MOSDPGPushReply.h" #include "messages/MOSDECSubOpWrite.h" #include "messages/MOSDECSubOpWriteReply.h" #include "messages/MOSDECSubOpRead.h" #include "messages/MOSDECSubOpReadReply.h" #include "ECMsgTypes.h" #include "PrimaryLogPG.h" #define dout_context cct #define dout_subsys ceph_subsys_osd #define DOUT_PREFIX_ARGS this #undef dout_prefix #define dout_prefix _prefix(_dout, this) static ostream& _prefix(std::ostream *_dout, ECBackend *pgb) { return *_dout << pgb->get_parent()->gen_dbg_prefix(); } struct ECRecoveryHandle : public PGBackend::RecoveryHandle { list ops; }; ostream &operator<<(ostream &lhs, const ECBackend::pipeline_state_t &rhs) { switch (rhs.pipeline_state) { case ECBackend::pipeline_state_t::CACHE_VALID: return lhs << "CACHE_VALID"; case ECBackend::pipeline_state_t::CACHE_INVALID: return lhs << "CACHE_INVALID"; default: assert(0 == "invalid pipeline state"); } return lhs; // unreachable } static ostream &operator<<(ostream &lhs, const map &rhs) { lhs << "["; for (map::const_iterator i = rhs.begin(); i != rhs.end(); ++i) { if (i != rhs.begin()) lhs << ", "; lhs << make_pair(i->first, i->second.length()); } return lhs << "]"; } static ostream &operator<<(ostream &lhs, const map &rhs) { lhs << "["; for (map::const_iterator i = rhs.begin(); i != rhs.end(); ++i) { if (i != rhs.begin()) lhs << ", "; lhs << make_pair(i->first, i->second.length()); } return lhs << "]"; } static ostream &operator<<( ostream &lhs, const boost::tuple > &rhs) { return lhs << "(" << rhs.get<0>() << ", " << rhs.get<1>() << ", " << rhs.get<2>() << ")"; } ostream &operator<<(ostream &lhs, const ECBackend::read_request_t &rhs) { return lhs << "read_request_t(to_read=[" << rhs.to_read << "]" << ", need=" << rhs.need << ", want_attrs=" << rhs.want_attrs << ")"; } ostream &operator<<(ostream &lhs, const ECBackend::read_result_t &rhs) { lhs << "read_result_t(r=" << rhs.r << ", errors=" << rhs.errors; if (rhs.attrs) { lhs << ", attrs=" << rhs.attrs.get(); } else { lhs << ", noattrs"; } return lhs << ", returned=" << rhs.returned << ")"; } ostream &operator<<(ostream &lhs, const ECBackend::ReadOp &rhs) { lhs << "ReadOp(tid=" << rhs.tid; if (rhs.op && rhs.op->get_req()) { lhs << ", op="; rhs.op->get_req()->print(lhs); } return lhs << ", to_read=" << rhs.to_read << ", complete=" << rhs.complete << ", priority=" << rhs.priority << ", obj_to_source=" << rhs.obj_to_source << ", source_to_obj=" << rhs.source_to_obj << ", in_progress=" << rhs.in_progress << ")"; } void ECBackend::ReadOp::dump(Formatter *f) const { f->dump_unsigned("tid", tid); if (op && op->get_req()) { f->dump_stream("op") << *(op->get_req()); } f->dump_stream("to_read") << to_read; f->dump_stream("complete") << complete; f->dump_int("priority", priority); f->dump_stream("obj_to_source") << obj_to_source; f->dump_stream("source_to_obj") << source_to_obj; f->dump_stream("in_progress") << in_progress; } ostream &operator<<(ostream &lhs, const ECBackend::Op &rhs) { lhs << "Op(" << rhs.hoid << " v=" << rhs.version << " tt=" << rhs.trim_to << " tid=" << rhs.tid << " reqid=" << rhs.reqid; if (rhs.client_op && rhs.client_op->get_req()) { lhs << " client_op="; rhs.client_op->get_req()->print(lhs); } lhs << " roll_forward_to=" << rhs.roll_forward_to << " temp_added=" << rhs.temp_added << " temp_cleared=" << rhs.temp_cleared << " pending_read=" << rhs.pending_read << " remote_read=" << rhs.remote_read << " remote_read_result=" << rhs.remote_read_result << " pending_apply=" << rhs.pending_apply << " pending_commit=" << rhs.pending_commit << " plan.to_read=" << rhs.plan.to_read << " plan.will_write=" << rhs.plan.will_write << ")"; return lhs; } ostream &operator<<(ostream &lhs, const ECBackend::RecoveryOp &rhs) { return lhs << "RecoveryOp(" << "hoid=" << rhs.hoid << " v=" << rhs.v << " missing_on=" << rhs.missing_on << " missing_on_shards=" << rhs.missing_on_shards << " recovery_info=" << rhs.recovery_info << " recovery_progress=" << rhs.recovery_progress << " obc refcount=" << rhs.obc.use_count() << " state=" << ECBackend::RecoveryOp::tostr(rhs.state) << " waiting_on_pushes=" << rhs.waiting_on_pushes << " extent_requested=" << rhs.extent_requested << ")"; } void ECBackend::RecoveryOp::dump(Formatter *f) const { f->dump_stream("hoid") << hoid; f->dump_stream("v") << v; f->dump_stream("missing_on") << missing_on; f->dump_stream("missing_on_shards") << missing_on_shards; f->dump_stream("recovery_info") << recovery_info; f->dump_stream("recovery_progress") << recovery_progress; f->dump_stream("state") << tostr(state); f->dump_stream("waiting_on_pushes") << waiting_on_pushes; f->dump_stream("extent_requested") << extent_requested; } ECBackend::ECBackend( PGBackend::Listener *pg, coll_t coll, ObjectStore::CollectionHandle &ch, ObjectStore *store, CephContext *cct, ErasureCodeInterfaceRef ec_impl, uint64_t stripe_width) : PGBackend(cct, pg, store, coll, ch), ec_impl(ec_impl), sinfo(ec_impl->get_data_chunk_count(), stripe_width) { assert((ec_impl->get_data_chunk_count() * ec_impl->get_chunk_size(stripe_width)) == stripe_width); } PGBackend::RecoveryHandle *ECBackend::open_recovery_op() { return new ECRecoveryHandle; } void ECBackend::_failed_push(const hobject_t &hoid, pair &in) { ECBackend::read_result_t &res = in.second; dout(10) << __func__ << ": Read error " << hoid << " r=" << res.r << " errors=" << res.errors << dendl; dout(10) << __func__ << ": canceling recovery op for obj " << hoid << dendl; assert(recovery_ops.count(hoid)); recovery_ops.erase(hoid); list fl; for (auto&& i : res.errors) { fl.push_back(i.first); } get_parent()->failed_push(fl, hoid); } struct OnRecoveryReadComplete : public GenContext &> { ECBackend *pg; hobject_t hoid; set want; OnRecoveryReadComplete(ECBackend *pg, const hobject_t &hoid) : pg(pg), hoid(hoid) {} void finish(pair &in) override { ECBackend::read_result_t &res = in.second; if (!(res.r == 0 && res.errors.empty())) { pg->_failed_push(hoid, in); return; } assert(res.returned.size() == 1); pg->handle_recovery_read_complete( hoid, res.returned.back(), res.attrs, in.first); } }; struct RecoveryMessages { map reads; void read( ECBackend *ec, const hobject_t &hoid, uint64_t off, uint64_t len, const set &need, bool attrs) { list > to_read; to_read.push_back(boost::make_tuple(off, len, 0)); assert(!reads.count(hoid)); reads.insert( make_pair( hoid, ECBackend::read_request_t( to_read, need, attrs, new OnRecoveryReadComplete( ec, hoid)))); } map > pushes; map > push_replies; ObjectStore::Transaction t; RecoveryMessages() {} ~RecoveryMessages(){} }; void ECBackend::handle_recovery_push( const PushOp &op, RecoveryMessages *m) { ostringstream ss; if (get_parent()->check_failsafe_full(ss)) { dout(10) << __func__ << " Out of space (failsafe) processing push request: " << ss.str() << dendl; ceph_abort(); } bool oneshot = op.before_progress.first && op.after_progress.data_complete; ghobject_t tobj; if (oneshot) { tobj = ghobject_t(op.soid, ghobject_t::NO_GEN, get_parent()->whoami_shard().shard); } else { tobj = ghobject_t(get_parent()->get_temp_recovery_object(op.soid, op.version), ghobject_t::NO_GEN, get_parent()->whoami_shard().shard); if (op.before_progress.first) { dout(10) << __func__ << ": Adding oid " << tobj.hobj << " in the temp collection" << dendl; add_temp_obj(tobj.hobj); } } if (op.before_progress.first) { m->t.remove(coll, tobj); m->t.touch(coll, tobj); } if (!op.data_included.empty()) { uint64_t start = op.data_included.range_start(); uint64_t end = op.data_included.range_end(); assert(op.data.length() == (end - start)); m->t.write( coll, tobj, start, op.data.length(), op.data); } else { assert(op.data.length() == 0); } if (op.before_progress.first) { assert(op.attrset.count(string("_"))); m->t.setattrs( coll, tobj, op.attrset); } if (op.after_progress.data_complete && !oneshot) { dout(10) << __func__ << ": Removing oid " << tobj.hobj << " from the temp collection" << dendl; clear_temp_obj(tobj.hobj); m->t.remove(coll, ghobject_t( op.soid, ghobject_t::NO_GEN, get_parent()->whoami_shard().shard)); m->t.collection_move_rename( coll, tobj, coll, ghobject_t( op.soid, ghobject_t::NO_GEN, get_parent()->whoami_shard().shard)); } if (op.after_progress.data_complete) { if ((get_parent()->pgb_is_primary())) { assert(recovery_ops.count(op.soid)); assert(recovery_ops[op.soid].obc); get_parent()->on_local_recover( op.soid, op.recovery_info, recovery_ops[op.soid].obc, false, &m->t); } else { get_parent()->on_local_recover( op.soid, op.recovery_info, ObjectContextRef(), false, &m->t); } } m->push_replies[get_parent()->primary_shard()].push_back(PushReplyOp()); m->push_replies[get_parent()->primary_shard()].back().soid = op.soid; } void ECBackend::handle_recovery_push_reply( const PushReplyOp &op, pg_shard_t from, RecoveryMessages *m) { if (!recovery_ops.count(op.soid)) return; RecoveryOp &rop = recovery_ops[op.soid]; assert(rop.waiting_on_pushes.count(from)); rop.waiting_on_pushes.erase(from); continue_recovery_op(rop, m); } void ECBackend::handle_recovery_read_complete( const hobject_t &hoid, boost::tuple > &to_read, boost::optional > attrs, RecoveryMessages *m) { dout(10) << __func__ << ": returned " << hoid << " " << "(" << to_read.get<0>() << ", " << to_read.get<1>() << ", " << to_read.get<2>() << ")" << dendl; assert(recovery_ops.count(hoid)); RecoveryOp &op = recovery_ops[hoid]; assert(op.returned_data.empty()); map target; for (set::iterator i = op.missing_on_shards.begin(); i != op.missing_on_shards.end(); ++i) { target[*i] = &(op.returned_data[*i]); } map from; for(map::iterator i = to_read.get<2>().begin(); i != to_read.get<2>().end(); ++i) { from[i->first.shard].claim(i->second); } dout(10) << __func__ << ": " << from << dendl; int r = ECUtil::decode(sinfo, ec_impl, from, target); assert(r == 0); if (attrs) { op.xattrs.swap(*attrs); if (!op.obc) { // attrs only reference the origin bufferlist (decode from // ECSubReadReply message) whose size is much greater than attrs // in recovery. If obc cache it (get_obc maybe cache the attr), // this causes the whole origin bufferlist would not be free // until obc is evicted from obc cache. So rebuild the // bufferlist before cache it. for (map::iterator it = op.xattrs.begin(); it != op.xattrs.end(); ++it) { it->second.rebuild(); } // Need to remove ECUtil::get_hinfo_key() since it should not leak out // of the backend (see bug #12983) map sanitized_attrs(op.xattrs); sanitized_attrs.erase(ECUtil::get_hinfo_key()); op.obc = get_parent()->get_obc(hoid, sanitized_attrs); assert(op.obc); op.recovery_info.size = op.obc->obs.oi.size; op.recovery_info.oi = op.obc->obs.oi; } ECUtil::HashInfo hinfo(ec_impl->get_chunk_count()); if (op.obc->obs.oi.size > 0) { assert(op.xattrs.count(ECUtil::get_hinfo_key())); bufferlist::iterator bp = op.xattrs[ECUtil::get_hinfo_key()].begin(); ::decode(hinfo, bp); } op.hinfo = unstable_hashinfo_registry.lookup_or_create(hoid, hinfo); } assert(op.xattrs.size()); assert(op.obc); continue_recovery_op(op, m); } struct SendPushReplies : public Context { PGBackend::Listener *l; epoch_t epoch; map replies; SendPushReplies( PGBackend::Listener *l, epoch_t epoch, map &in) : l(l), epoch(epoch) { replies.swap(in); } void finish(int) override { for (map::iterator i = replies.begin(); i != replies.end(); ++i) { l->send_message_osd_cluster(i->first, i->second, epoch); } replies.clear(); } ~SendPushReplies() override { for (map::iterator i = replies.begin(); i != replies.end(); ++i) { i->second->put(); } replies.clear(); } }; void ECBackend::dispatch_recovery_messages(RecoveryMessages &m, int priority) { for (map >::iterator i = m.pushes.begin(); i != m.pushes.end(); m.pushes.erase(i++)) { MOSDPGPush *msg = new MOSDPGPush(); msg->set_priority(priority); msg->map_epoch = get_parent()->get_epoch(); msg->min_epoch = get_parent()->get_last_peering_reset_epoch(); msg->from = get_parent()->whoami_shard(); msg->pgid = spg_t(get_parent()->get_info().pgid.pgid, i->first.shard); msg->pushes.swap(i->second); msg->compute_cost(cct); get_parent()->send_message( i->first.osd, msg); } map replies; for (map >::iterator i = m.push_replies.begin(); i != m.push_replies.end(); m.push_replies.erase(i++)) { MOSDPGPushReply *msg = new MOSDPGPushReply(); msg->set_priority(priority); msg->map_epoch = get_parent()->get_epoch(); msg->min_epoch = get_parent()->get_last_peering_reset_epoch(); msg->from = get_parent()->whoami_shard(); msg->pgid = spg_t(get_parent()->get_info().pgid.pgid, i->first.shard); msg->replies.swap(i->second); msg->compute_cost(cct); replies.insert(make_pair(i->first.osd, msg)); } if (!replies.empty()) { (m.t).register_on_complete( get_parent()->bless_context( new SendPushReplies( get_parent(), get_parent()->get_epoch(), replies))); get_parent()->queue_transaction(std::move(m.t)); } if (m.reads.empty()) return; start_read_op( priority, m.reads, OpRequestRef(), false, true); } void ECBackend::continue_recovery_op( RecoveryOp &op, RecoveryMessages *m) { dout(10) << __func__ << ": continuing " << op << dendl; while (1) { switch (op.state) { case RecoveryOp::IDLE: { // start read op.state = RecoveryOp::READING; assert(!op.recovery_progress.data_complete); set want(op.missing_on_shards.begin(), op.missing_on_shards.end()); uint64_t from = op.recovery_progress.data_recovered_to; uint64_t amount = get_recovery_chunk_size(); if (op.recovery_progress.first && op.obc) { /* We've got the attrs and the hinfo, might as well use them */ op.hinfo = get_hash_info(op.hoid); assert(op.hinfo); op.xattrs = op.obc->attr_cache; ::encode(*(op.hinfo), op.xattrs[ECUtil::get_hinfo_key()]); } set to_read; int r = get_min_avail_to_read_shards( op.hoid, want, true, false, &to_read); if (r != 0) { // we must have lost a recovery source assert(!op.recovery_progress.first); dout(10) << __func__ << ": canceling recovery op for obj " << op.hoid << dendl; get_parent()->cancel_pull(op.hoid); recovery_ops.erase(op.hoid); return; } m->read( this, op.hoid, op.recovery_progress.data_recovered_to, amount, to_read, op.recovery_progress.first && !op.obc); op.extent_requested = make_pair( from, amount); dout(10) << __func__ << ": IDLE return " << op << dendl; return; } case RecoveryOp::READING: { // read completed, start write assert(op.xattrs.size()); assert(op.returned_data.size()); op.state = RecoveryOp::WRITING; ObjectRecoveryProgress after_progress = op.recovery_progress; after_progress.data_recovered_to += op.extent_requested.second; after_progress.first = false; if (after_progress.data_recovered_to >= op.obc->obs.oi.size) { after_progress.data_recovered_to = sinfo.logical_to_next_stripe_offset( op.obc->obs.oi.size); after_progress.data_complete = true; } for (set::iterator mi = op.missing_on.begin(); mi != op.missing_on.end(); ++mi) { assert(op.returned_data.count(mi->shard)); m->pushes[*mi].push_back(PushOp()); PushOp &pop = m->pushes[*mi].back(); pop.soid = op.hoid; pop.version = op.v; pop.data = op.returned_data[mi->shard]; dout(10) << __func__ << ": before_progress=" << op.recovery_progress << ", after_progress=" << after_progress << ", pop.data.length()=" << pop.data.length() << ", size=" << op.obc->obs.oi.size << dendl; assert( pop.data.length() == sinfo.aligned_logical_offset_to_chunk_offset( after_progress.data_recovered_to - op.recovery_progress.data_recovered_to) ); if (pop.data.length()) pop.data_included.insert( sinfo.aligned_logical_offset_to_chunk_offset( op.recovery_progress.data_recovered_to), pop.data.length() ); if (op.recovery_progress.first) { pop.attrset = op.xattrs; } pop.recovery_info = op.recovery_info; pop.before_progress = op.recovery_progress; pop.after_progress = after_progress; if (*mi != get_parent()->primary_shard()) get_parent()->begin_peer_recover( *mi, op.hoid); } op.returned_data.clear(); op.waiting_on_pushes = op.missing_on; op.recovery_progress = after_progress; dout(10) << __func__ << ": READING return " << op << dendl; return; } case RecoveryOp::WRITING: { if (op.waiting_on_pushes.empty()) { if (op.recovery_progress.data_complete) { op.state = RecoveryOp::COMPLETE; for (set::iterator i = op.missing_on.begin(); i != op.missing_on.end(); ++i) { if (*i != get_parent()->primary_shard()) { dout(10) << __func__ << ": on_peer_recover on " << *i << ", obj " << op.hoid << dendl; get_parent()->on_peer_recover( *i, op.hoid, op.recovery_info); } } object_stat_sum_t stat; stat.num_bytes_recovered = op.recovery_info.size; stat.num_keys_recovered = 0; // ??? op ... omap_entries.size(); ? stat.num_objects_recovered = 1; get_parent()->on_global_recover(op.hoid, stat, false); dout(10) << __func__ << ": WRITING return " << op << dendl; recovery_ops.erase(op.hoid); return; } else { op.state = RecoveryOp::IDLE; dout(10) << __func__ << ": WRITING continue " << op << dendl; continue; } } return; } // should never be called once complete case RecoveryOp::COMPLETE: default: { ceph_abort(); }; } } } void ECBackend::run_recovery_op( RecoveryHandle *_h, int priority) { ECRecoveryHandle *h = static_cast(_h); RecoveryMessages m; for (list::iterator i = h->ops.begin(); i != h->ops.end(); ++i) { dout(10) << __func__ << ": starting " << *i << dendl; assert(!recovery_ops.count(i->hoid)); RecoveryOp &op = recovery_ops.insert(make_pair(i->hoid, *i)).first->second; continue_recovery_op(op, &m); } dispatch_recovery_messages(m, priority); send_recovery_deletes(priority, h->deletes); delete _h; } int ECBackend::recover_object( const hobject_t &hoid, eversion_t v, ObjectContextRef head, ObjectContextRef obc, RecoveryHandle *_h) { ECRecoveryHandle *h = static_cast(_h); h->ops.push_back(RecoveryOp()); h->ops.back().v = v; h->ops.back().hoid = hoid; h->ops.back().obc = obc; h->ops.back().recovery_info.soid = hoid; h->ops.back().recovery_info.version = v; if (obc) { h->ops.back().recovery_info.size = obc->obs.oi.size; h->ops.back().recovery_info.oi = obc->obs.oi; } if (hoid.is_snap()) { if (obc) { assert(obc->ssc); h->ops.back().recovery_info.ss = obc->ssc->snapset; } else if (head) { assert(head->ssc); h->ops.back().recovery_info.ss = head->ssc->snapset; } else { assert(0 == "neither obc nor head set for a snap object"); } } h->ops.back().recovery_progress.omap_complete = true; for (set::const_iterator i = get_parent()->get_actingbackfill_shards().begin(); i != get_parent()->get_actingbackfill_shards().end(); ++i) { dout(10) << "checking " << *i << dendl; if (get_parent()->get_shard_missing(*i).is_missing(hoid)) { h->ops.back().missing_on.insert(*i); h->ops.back().missing_on_shards.insert(i->shard); } } dout(10) << __func__ << ": built op " << h->ops.back() << dendl; return 0; } bool ECBackend::can_handle_while_inactive( OpRequestRef _op) { return false; } bool ECBackend::_handle_message( OpRequestRef _op) { dout(10) << __func__ << ": " << *_op->get_req() << dendl; int priority = _op->get_req()->get_priority(); switch (_op->get_req()->get_type()) { case MSG_OSD_EC_WRITE: { // NOTE: this is non-const because handle_sub_write modifies the embedded // ObjectStore::Transaction in place (and then std::move's it). It does // not conflict with ECSubWrite's operator<<. MOSDECSubOpWrite *op = static_cast( _op->get_nonconst_req()); handle_sub_write(op->op.from, _op, op->op, _op->pg_trace); return true; } case MSG_OSD_EC_WRITE_REPLY: { const MOSDECSubOpWriteReply *op = static_cast( _op->get_req()); handle_sub_write_reply(op->op.from, op->op, _op->pg_trace); return true; } case MSG_OSD_EC_READ: { const MOSDECSubOpRead *op = static_cast(_op->get_req()); MOSDECSubOpReadReply *reply = new MOSDECSubOpReadReply; reply->pgid = get_parent()->primary_spg_t(); reply->map_epoch = get_parent()->get_epoch(); reply->min_epoch = get_parent()->get_interval_start_epoch(); handle_sub_read(op->op.from, op->op, &(reply->op), _op->pg_trace); reply->trace = _op->pg_trace; get_parent()->send_message_osd_cluster( op->op.from.osd, reply, get_parent()->get_epoch()); return true; } case MSG_OSD_EC_READ_REPLY: { // NOTE: this is non-const because handle_sub_read_reply steals resulting // buffers. It does not conflict with ECSubReadReply operator<<. MOSDECSubOpReadReply *op = static_cast( _op->get_nonconst_req()); RecoveryMessages rm; handle_sub_read_reply(op->op.from, op->op, &rm, _op->pg_trace); dispatch_recovery_messages(rm, priority); return true; } case MSG_OSD_PG_PUSH: { const MOSDPGPush *op = static_cast(_op->get_req()); RecoveryMessages rm; for (vector::const_iterator i = op->pushes.begin(); i != op->pushes.end(); ++i) { handle_recovery_push(*i, &rm); } dispatch_recovery_messages(rm, priority); return true; } case MSG_OSD_PG_PUSH_REPLY: { const MOSDPGPushReply *op = static_cast( _op->get_req()); RecoveryMessages rm; for (vector::const_iterator i = op->replies.begin(); i != op->replies.end(); ++i) { handle_recovery_push_reply(*i, op->from, &rm); } dispatch_recovery_messages(rm, priority); return true; } default: return false; } return false; } struct SubWriteCommitted : public Context { ECBackend *pg; OpRequestRef msg; ceph_tid_t tid; eversion_t version; eversion_t last_complete; const ZTracer::Trace trace; SubWriteCommitted( ECBackend *pg, OpRequestRef msg, ceph_tid_t tid, eversion_t version, eversion_t last_complete, const ZTracer::Trace &trace) : pg(pg), msg(msg), tid(tid), version(version), last_complete(last_complete), trace(trace) {} void finish(int) override { if (msg) msg->mark_event("sub_op_committed"); pg->sub_write_committed(tid, version, last_complete, trace); } }; void ECBackend::sub_write_committed( ceph_tid_t tid, eversion_t version, eversion_t last_complete, const ZTracer::Trace &trace) { if (get_parent()->pgb_is_primary()) { ECSubWriteReply reply; reply.tid = tid; reply.last_complete = last_complete; reply.committed = true; reply.from = get_parent()->whoami_shard(); handle_sub_write_reply( get_parent()->whoami_shard(), reply, trace); } else { get_parent()->update_last_complete_ondisk(last_complete); MOSDECSubOpWriteReply *r = new MOSDECSubOpWriteReply; r->pgid = get_parent()->primary_spg_t(); r->map_epoch = get_parent()->get_epoch(); r->min_epoch = get_parent()->get_interval_start_epoch(); r->op.tid = tid; r->op.last_complete = last_complete; r->op.committed = true; r->op.from = get_parent()->whoami_shard(); r->set_priority(CEPH_MSG_PRIO_HIGH); r->trace = trace; r->trace.event("sending sub op commit"); get_parent()->send_message_osd_cluster( get_parent()->primary_shard().osd, r, get_parent()->get_epoch()); } } struct SubWriteApplied : public Context { ECBackend *pg; OpRequestRef msg; ceph_tid_t tid; eversion_t version; const ZTracer::Trace trace; SubWriteApplied( ECBackend *pg, OpRequestRef msg, ceph_tid_t tid, eversion_t version, const ZTracer::Trace &trace) : pg(pg), msg(msg), tid(tid), version(version), trace(trace) {} void finish(int) override { if (msg) msg->mark_event("sub_op_applied"); pg->sub_write_applied(tid, version, trace); } }; void ECBackend::sub_write_applied( ceph_tid_t tid, eversion_t version, const ZTracer::Trace &trace) { parent->op_applied(version); if (get_parent()->pgb_is_primary()) { ECSubWriteReply reply; reply.from = get_parent()->whoami_shard(); reply.tid = tid; reply.applied = true; handle_sub_write_reply( get_parent()->whoami_shard(), reply, trace); } else { MOSDECSubOpWriteReply *r = new MOSDECSubOpWriteReply; r->pgid = get_parent()->primary_spg_t(); r->map_epoch = get_parent()->get_epoch(); r->min_epoch = get_parent()->get_interval_start_epoch(); r->op.from = get_parent()->whoami_shard(); r->op.tid = tid; r->op.applied = true; r->set_priority(CEPH_MSG_PRIO_HIGH); r->trace = trace; r->trace.event("sending sub op apply"); get_parent()->send_message_osd_cluster( get_parent()->primary_shard().osd, r, get_parent()->get_epoch()); } } void ECBackend::handle_sub_write( pg_shard_t from, OpRequestRef msg, ECSubWrite &op, const ZTracer::Trace &trace, Context *on_local_applied_sync) { if (msg) msg->mark_started(); trace.event("handle_sub_write"); assert(!get_parent()->get_log().get_missing().is_missing(op.soid)); if (!get_parent()->pgb_is_primary()) get_parent()->update_stats(op.stats); ObjectStore::Transaction localt; if (!op.temp_added.empty()) { add_temp_objs(op.temp_added); } if (op.backfill) { for (set::iterator i = op.temp_removed.begin(); i != op.temp_removed.end(); ++i) { dout(10) << __func__ << ": removing object " << *i << " since we won't get the transaction" << dendl; localt.remove( coll, ghobject_t( *i, ghobject_t::NO_GEN, get_parent()->whoami_shard().shard)); } } clear_temp_objs(op.temp_removed); get_parent()->log_operation( op.log_entries, op.updated_hit_set_history, op.trim_to, op.roll_forward_to, !op.backfill, localt); PrimaryLogPG *_rPG = dynamic_cast(get_parent()); if (_rPG && !_rPG->is_undersized() && (unsigned)get_parent()->whoami_shard().shard >= ec_impl->get_data_chunk_count()) op.t.set_fadvise_flag(CEPH_OSD_OP_FLAG_FADVISE_DONTNEED); if (on_local_applied_sync) { dout(10) << "Queueing onreadable_sync: " << on_local_applied_sync << dendl; localt.register_on_applied_sync(on_local_applied_sync); } localt.register_on_commit( get_parent()->bless_context( new SubWriteCommitted( this, msg, op.tid, op.at_version, get_parent()->get_info().last_complete, trace))); localt.register_on_applied( get_parent()->bless_context( new SubWriteApplied(this, msg, op.tid, op.at_version, trace))); vector tls; tls.reserve(2); tls.push_back(std::move(op.t)); tls.push_back(std::move(localt)); get_parent()->queue_transactions(tls, msg); } void ECBackend::handle_sub_read( pg_shard_t from, const ECSubRead &op, ECSubReadReply *reply, const ZTracer::Trace &trace) { trace.event("handle sub read"); shard_id_t shard = get_parent()->whoami_shard().shard; for(auto i = op.to_read.begin(); i != op.to_read.end(); ++i) { int r = 0; ECUtil::HashInfoRef hinfo; if (!get_parent()->get_pool().allows_ecoverwrites()) { hinfo = get_hash_info(i->first); if (!hinfo) { r = -EIO; get_parent()->clog_error() << "Corruption detected: object " << i->first << " is missing hash_info"; dout(5) << __func__ << ": No hinfo for " << i->first << dendl; goto error; } } for (auto j = i->second.begin(); j != i->second.end(); ++j) { bufferlist bl; r = store->read( ch, ghobject_t(i->first, ghobject_t::NO_GEN, shard), j->get<0>(), j->get<1>(), bl, j->get<2>()); if (r < 0) { get_parent()->clog_error() << "Error " << r << " reading object " << i->first; dout(5) << __func__ << ": Error " << r << " reading " << i->first << dendl; goto error; } else { dout(20) << __func__ << " read request=" << j->get<1>() << " r=" << r << " len=" << bl.length() << dendl; reply->buffers_read[i->first].push_back( make_pair( j->get<0>(), bl) ); } if (!get_parent()->get_pool().allows_ecoverwrites()) { // This shows that we still need deep scrub because large enough files // are read in sections, so the digest check here won't be done here. // Do NOT check osd_read_eio_on_bad_digest here. We need to report // the state of our chunk in case other chunks could substitute. assert(hinfo->has_chunk_hash()); if ((bl.length() == hinfo->get_total_chunk_size()) && (j->get<0>() == 0)) { dout(20) << __func__ << ": Checking hash of " << i->first << dendl; bufferhash h(-1); h << bl; if (h.digest() != hinfo->get_chunk_hash(shard)) { get_parent()->clog_error() << "Bad hash for " << i->first << " digest 0x" << hex << h.digest() << " expected 0x" << hinfo->get_chunk_hash(shard) << dec; dout(5) << __func__ << ": Bad hash for " << i->first << " digest 0x" << hex << h.digest() << " expected 0x" << hinfo->get_chunk_hash(shard) << dec << dendl; r = -EIO; goto error; } } } } continue; error: // Do NOT check osd_read_eio_on_bad_digest here. We need to report // the state of our chunk in case other chunks could substitute. reply->buffers_read.erase(i->first); reply->errors[i->first] = r; } for (set::iterator i = op.attrs_to_read.begin(); i != op.attrs_to_read.end(); ++i) { dout(10) << __func__ << ": fulfilling attr request on " << *i << dendl; if (reply->errors.count(*i)) continue; int r = store->getattrs( ch, ghobject_t( *i, ghobject_t::NO_GEN, get_parent()->whoami_shard().shard), reply->attrs_read[*i]); if (r < 0) { reply->buffers_read.erase(*i); reply->errors[*i] = r; } } reply->from = get_parent()->whoami_shard(); reply->tid = op.tid; } void ECBackend::handle_sub_write_reply( pg_shard_t from, const ECSubWriteReply &op, const ZTracer::Trace &trace) { map::iterator i = tid_to_op_map.find(op.tid); assert(i != tid_to_op_map.end()); if (op.committed) { trace.event("sub write committed"); assert(i->second.pending_commit.count(from)); i->second.pending_commit.erase(from); if (from != get_parent()->whoami_shard()) { get_parent()->update_peer_last_complete_ondisk(from, op.last_complete); } } if (op.applied) { trace.event("sub write applied"); assert(i->second.pending_apply.count(from)); i->second.pending_apply.erase(from); } if (i->second.pending_apply.empty() && i->second.on_all_applied) { dout(10) << __func__ << " Calling on_all_applied on " << i->second << dendl; i->second.on_all_applied->complete(0); i->second.on_all_applied = 0; i->second.trace.event("ec write all applied"); } if (i->second.pending_commit.empty() && i->second.on_all_commit) { dout(10) << __func__ << " Calling on_all_commit on " << i->second << dendl; i->second.on_all_commit->complete(0); i->second.on_all_commit = 0; i->second.trace.event("ec write all committed"); } check_ops(); } void ECBackend::handle_sub_read_reply( pg_shard_t from, ECSubReadReply &op, RecoveryMessages *m, const ZTracer::Trace &trace) { trace.event("ec sub read reply"); dout(10) << __func__ << ": reply " << op << dendl; map::iterator iter = tid_to_read_map.find(op.tid); if (iter == tid_to_read_map.end()) { //canceled dout(20) << __func__ << ": dropped " << op << dendl; return; } ReadOp &rop = iter->second; for (auto i = op.buffers_read.begin(); i != op.buffers_read.end(); ++i) { assert(!op.errors.count(i->first)); // If attribute error we better not have sent a buffer if (!rop.to_read.count(i->first)) { // We canceled this read! @see filter_read_op dout(20) << __func__ << " to_read skipping" << dendl; continue; } list >::const_iterator req_iter = rop.to_read.find(i->first)->second.to_read.begin(); list< boost::tuple< uint64_t, uint64_t, map > >::iterator riter = rop.complete[i->first].returned.begin(); for (list >::iterator j = i->second.begin(); j != i->second.end(); ++j, ++req_iter, ++riter) { assert(req_iter != rop.to_read.find(i->first)->second.to_read.end()); assert(riter != rop.complete[i->first].returned.end()); pair adjusted = sinfo.aligned_offset_len_to_chunk( make_pair(req_iter->get<0>(), req_iter->get<1>())); assert(adjusted.first == j->first); riter->get<2>()[from].claim(j->second); } } for (auto i = op.attrs_read.begin(); i != op.attrs_read.end(); ++i) { assert(!op.errors.count(i->first)); // if read error better not have sent an attribute if (!rop.to_read.count(i->first)) { // We canceled this read! @see filter_read_op dout(20) << __func__ << " to_read skipping" << dendl; continue; } rop.complete[i->first].attrs = map(); (*(rop.complete[i->first].attrs)).swap(i->second); } for (auto i = op.errors.begin(); i != op.errors.end(); ++i) { rop.complete[i->first].errors.insert( make_pair( from, i->second)); dout(20) << __func__ << " shard=" << from << " error=" << i->second << dendl; } map >::iterator siter = shard_to_read_map.find(from); assert(siter != shard_to_read_map.end()); assert(siter->second.count(op.tid)); siter->second.erase(op.tid); assert(rop.in_progress.count(from)); rop.in_progress.erase(from); unsigned is_complete = 0; // For redundant reads check for completion as each shard comes in, // or in a non-recovery read check for completion once all the shards read. // TODO: It would be nice if recovery could send more reads too if (rop.do_redundant_reads || (!rop.for_recovery && rop.in_progress.empty())) { for (map::const_iterator iter = rop.complete.begin(); iter != rop.complete.end(); ++iter) { set have; for (map::const_iterator j = iter->second.returned.front().get<2>().begin(); j != iter->second.returned.front().get<2>().end(); ++j) { have.insert(j->first.shard); dout(20) << __func__ << " have shard=" << j->first.shard << dendl; } set want_to_read, dummy_minimum; get_want_to_read_shards(&want_to_read); int err; if ((err = ec_impl->minimum_to_decode(want_to_read, have, &dummy_minimum)) < 0) { dout(20) << __func__ << " minimum_to_decode failed" << dendl; if (rop.in_progress.empty()) { // If we don't have enough copies and we haven't sent reads for all shards // we can send the rest of the reads, if any. if (!rop.do_redundant_reads) { int r = send_all_remaining_reads(iter->first, rop); if (r == 0) { // We added to in_progress and not incrementing is_complete continue; } // Couldn't read any additional shards so handle as completed with errors } // We don't want to confuse clients / RBD with objectstore error // values in particular ENOENT. We may have different error returns // from different shards, so we'll return minimum_to_decode() error // (usually EIO) to reader. It is likely an error here is due to a // damaged pg. rop.complete[iter->first].r = err; ++is_complete; } } else { assert(rop.complete[iter->first].r == 0); if (!rop.complete[iter->first].errors.empty()) { if (cct->_conf->osd_read_ec_check_for_errors) { dout(10) << __func__ << ": Not ignoring errors, use one shard err=" << err << dendl; err = rop.complete[iter->first].errors.begin()->second; rop.complete[iter->first].r = err; } else { get_parent()->clog_warn() << "Error(s) ignored for " << iter->first << " enough copies available"; dout(10) << __func__ << " Error(s) ignored for " << iter->first << " enough copies available" << dendl; rop.complete[iter->first].errors.clear(); } } ++is_complete; } } } if (rop.in_progress.empty() || is_complete == rop.complete.size()) { dout(20) << __func__ << " Complete: " << rop << dendl; rop.trace.event("ec read complete"); complete_read_op(rop, m); } else { dout(10) << __func__ << " readop not complete: " << rop << dendl; } } void ECBackend::complete_read_op(ReadOp &rop, RecoveryMessages *m) { map::iterator reqiter = rop.to_read.begin(); map::iterator resiter = rop.complete.begin(); assert(rop.to_read.size() == rop.complete.size()); for (; reqiter != rop.to_read.end(); ++reqiter, ++resiter) { if (reqiter->second.cb) { pair arg( m, resiter->second); reqiter->second.cb->complete(arg); reqiter->second.cb = NULL; } } tid_to_read_map.erase(rop.tid); } struct FinishReadOp : public GenContext { ECBackend *ec; ceph_tid_t tid; FinishReadOp(ECBackend *ec, ceph_tid_t tid) : ec(ec), tid(tid) {} void finish(ThreadPool::TPHandle &handle) override { auto ropiter = ec->tid_to_read_map.find(tid); assert(ropiter != ec->tid_to_read_map.end()); int priority = ropiter->second.priority; RecoveryMessages rm; ec->complete_read_op(ropiter->second, &rm); ec->dispatch_recovery_messages(rm, priority); } }; void ECBackend::filter_read_op( const OSDMapRef& osdmap, ReadOp &op) { set to_cancel; for (map >::iterator i = op.source_to_obj.begin(); i != op.source_to_obj.end(); ++i) { if (osdmap->is_down(i->first.osd)) { to_cancel.insert(i->second.begin(), i->second.end()); op.in_progress.erase(i->first); continue; } } if (to_cancel.empty()) return; for (map >::iterator i = op.source_to_obj.begin(); i != op.source_to_obj.end(); ) { for (set::iterator j = i->second.begin(); j != i->second.end(); ) { if (to_cancel.count(*j)) i->second.erase(j++); else ++j; } if (i->second.empty()) { op.source_to_obj.erase(i++); } else { assert(!osdmap->is_down(i->first.osd)); ++i; } } for (set::iterator i = to_cancel.begin(); i != to_cancel.end(); ++i) { get_parent()->cancel_pull(*i); assert(op.to_read.count(*i)); read_request_t &req = op.to_read.find(*i)->second; dout(10) << __func__ << ": canceling " << req << " for obj " << *i << dendl; assert(req.cb); delete req.cb; req.cb = NULL; op.to_read.erase(*i); op.complete.erase(*i); recovery_ops.erase(*i); } if (op.in_progress.empty()) { get_parent()->schedule_recovery_work( get_parent()->bless_gencontext( new FinishReadOp(this, op.tid))); } } void ECBackend::check_recovery_sources(const OSDMapRef& osdmap) { set tids_to_filter; for (map >::iterator i = shard_to_read_map.begin(); i != shard_to_read_map.end(); ) { if (osdmap->is_down(i->first.osd)) { tids_to_filter.insert(i->second.begin(), i->second.end()); shard_to_read_map.erase(i++); } else { ++i; } } for (set::iterator i = tids_to_filter.begin(); i != tids_to_filter.end(); ++i) { map::iterator j = tid_to_read_map.find(*i); assert(j != tid_to_read_map.end()); filter_read_op(osdmap, j->second); } } void ECBackend::on_change() { dout(10) << __func__ << dendl; completed_to = eversion_t(); committed_to = eversion_t(); pipeline_state.clear(); waiting_reads.clear(); waiting_state.clear(); waiting_commit.clear(); for (auto &&op: tid_to_op_map) { cache.release_write_pin(op.second.pin); } tid_to_op_map.clear(); for (map::iterator i = tid_to_read_map.begin(); i != tid_to_read_map.end(); ++i) { dout(10) << __func__ << ": cancelling " << i->second << dendl; for (map::iterator j = i->second.to_read.begin(); j != i->second.to_read.end(); ++j) { delete j->second.cb; j->second.cb = 0; } } tid_to_read_map.clear(); in_progress_client_reads.clear(); shard_to_read_map.clear(); clear_recovery_state(); } void ECBackend::clear_recovery_state() { recovery_ops.clear(); } void ECBackend::on_flushed() { } void ECBackend::dump_recovery_info(Formatter *f) const { f->open_array_section("recovery_ops"); for (map::const_iterator i = recovery_ops.begin(); i != recovery_ops.end(); ++i) { f->open_object_section("op"); i->second.dump(f); f->close_section(); } f->close_section(); f->open_array_section("read_ops"); for (map::const_iterator i = tid_to_read_map.begin(); i != tid_to_read_map.end(); ++i) { f->open_object_section("read_op"); i->second.dump(f); f->close_section(); } f->close_section(); } void ECBackend::submit_transaction( const hobject_t &hoid, const object_stat_sum_t &delta_stats, const eversion_t &at_version, PGTransactionUPtr &&t, const eversion_t &trim_to, const eversion_t &roll_forward_to, const vector &log_entries, boost::optional &hset_history, Context *on_local_applied_sync, Context *on_all_applied, Context *on_all_commit, ceph_tid_t tid, osd_reqid_t reqid, OpRequestRef client_op ) { assert(!tid_to_op_map.count(tid)); Op *op = &(tid_to_op_map[tid]); op->hoid = hoid; op->delta_stats = delta_stats; op->version = at_version; op->trim_to = trim_to; op->roll_forward_to = MAX(roll_forward_to, committed_to); op->log_entries = log_entries; std::swap(op->updated_hit_set_history, hset_history); op->on_local_applied_sync = on_local_applied_sync; op->on_all_applied = on_all_applied; op->on_all_commit = on_all_commit; op->tid = tid; op->reqid = reqid; op->client_op = client_op; if (client_op) op->trace = client_op->pg_trace; dout(10) << __func__ << ": op " << *op << " starting" << dendl; start_rmw(op, std::move(t)); dout(10) << "onreadable_sync: " << op->on_local_applied_sync << dendl; } void ECBackend::call_write_ordered(std::function &&cb) { if (!waiting_state.empty()) { waiting_state.back().on_write.emplace_back(std::move(cb)); } else if (!waiting_reads.empty()) { waiting_reads.back().on_write.emplace_back(std::move(cb)); } else { // Nothing earlier in the pipeline, just call it cb(); } } int ECBackend::get_min_avail_to_read_shards( const hobject_t &hoid, const set &want, bool for_recovery, bool do_redundant_reads, set *to_read) { // Make sure we don't do redundant reads for recovery assert(!for_recovery || !do_redundant_reads); set have; map shards; for (set::const_iterator i = get_parent()->get_acting_shards().begin(); i != get_parent()->get_acting_shards().end(); ++i) { dout(10) << __func__ << ": checking acting " << *i << dendl; const pg_missing_t &missing = get_parent()->get_shard_missing(*i); if (!missing.is_missing(hoid)) { assert(!have.count(i->shard)); have.insert(i->shard); assert(!shards.count(i->shard)); shards.insert(make_pair(i->shard, *i)); } } if (for_recovery) { for (set::const_iterator i = get_parent()->get_backfill_shards().begin(); i != get_parent()->get_backfill_shards().end(); ++i) { if (have.count(i->shard)) { assert(shards.count(i->shard)); continue; } dout(10) << __func__ << ": checking backfill " << *i << dendl; assert(!shards.count(i->shard)); const pg_info_t &info = get_parent()->get_shard_info(*i); const pg_missing_t &missing = get_parent()->get_shard_missing(*i); if (hoid < info.last_backfill && !missing.is_missing(hoid)) { have.insert(i->shard); shards.insert(make_pair(i->shard, *i)); } } map>::const_iterator miter = get_parent()->get_missing_loc_shards().find(hoid); if (miter != get_parent()->get_missing_loc_shards().end()) { for (set::iterator i = miter->second.begin(); i != miter->second.end(); ++i) { dout(10) << __func__ << ": checking missing_loc " << *i << dendl; auto m = get_parent()->maybe_get_shard_missing(*i); if (m) { assert(!(*m).is_missing(hoid)); } have.insert(i->shard); shards.insert(make_pair(i->shard, *i)); } } } set need; int r = ec_impl->minimum_to_decode(want, have, &need); if (r < 0) return r; if (do_redundant_reads) { need.swap(have); } if (!to_read) return 0; for (set::iterator i = need.begin(); i != need.end(); ++i) { assert(shards.count(shard_id_t(*i))); to_read->insert(shards[shard_id_t(*i)]); } return 0; } int ECBackend::get_remaining_shards( const hobject_t &hoid, const set &avail, set *to_read) { set need; map shards; for (set::const_iterator i = get_parent()->get_acting_shards().begin(); i != get_parent()->get_acting_shards().end(); ++i) { dout(10) << __func__ << ": checking acting " << *i << dendl; const pg_missing_t &missing = get_parent()->get_shard_missing(*i); if (!missing.is_missing(hoid)) { assert(!need.count(i->shard)); need.insert(i->shard); assert(!shards.count(i->shard)); shards.insert(make_pair(i->shard, *i)); } } if (!to_read) return 0; for (set::iterator i = need.begin(); i != need.end(); ++i) { assert(shards.count(shard_id_t(*i))); if (avail.find(*i) == avail.end()) to_read->insert(shards[shard_id_t(*i)]); } return 0; } void ECBackend::start_read_op( int priority, map &to_read, OpRequestRef _op, bool do_redundant_reads, bool for_recovery) { ceph_tid_t tid = get_parent()->get_tid(); assert(!tid_to_read_map.count(tid)); auto &op = tid_to_read_map.emplace( tid, ReadOp( priority, tid, do_redundant_reads, for_recovery, _op, std::move(to_read))).first->second; dout(10) << __func__ << ": starting " << op << dendl; if (_op) { op.trace = _op->pg_trace; op.trace.event("start ec read"); } do_read_op(op); } void ECBackend::do_read_op(ReadOp &op) { int priority = op.priority; ceph_tid_t tid = op.tid; dout(10) << __func__ << ": starting read " << op << dendl; map messages; for (map::iterator i = op.to_read.begin(); i != op.to_read.end(); ++i) { bool need_attrs = i->second.want_attrs; for (set::const_iterator j = i->second.need.begin(); j != i->second.need.end(); ++j) { if (need_attrs) { messages[*j].attrs_to_read.insert(i->first); need_attrs = false; } op.obj_to_source[i->first].insert(*j); op.source_to_obj[*j].insert(i->first); } for (list >::const_iterator j = i->second.to_read.begin(); j != i->second.to_read.end(); ++j) { pair chunk_off_len = sinfo.aligned_offset_len_to_chunk(make_pair(j->get<0>(), j->get<1>())); for (set::const_iterator k = i->second.need.begin(); k != i->second.need.end(); ++k) { messages[*k].to_read[i->first].push_back( boost::make_tuple( chunk_off_len.first, chunk_off_len.second, j->get<2>())); } assert(!need_attrs); } } for (map::iterator i = messages.begin(); i != messages.end(); ++i) { op.in_progress.insert(i->first); shard_to_read_map[i->first].insert(op.tid); i->second.tid = tid; MOSDECSubOpRead *msg = new MOSDECSubOpRead; msg->set_priority(priority); msg->pgid = spg_t( get_parent()->whoami_spg_t().pgid, i->first.shard); msg->map_epoch = get_parent()->get_epoch(); msg->min_epoch = get_parent()->get_interval_start_epoch(); msg->op = i->second; msg->op.from = get_parent()->whoami_shard(); msg->op.tid = tid; if (op.trace) { // initialize a child span for this shard msg->trace.init("ec sub read", nullptr, &op.trace); msg->trace.keyval("shard", i->first.shard.id); } get_parent()->send_message_osd_cluster( i->first.osd, msg, get_parent()->get_epoch()); } dout(10) << __func__ << ": started " << op << dendl; } ECUtil::HashInfoRef ECBackend::get_hash_info( const hobject_t &hoid, bool checks, const map *attrs) { dout(10) << __func__ << ": Getting attr on " << hoid << dendl; ECUtil::HashInfoRef ref = unstable_hashinfo_registry.lookup(hoid); if (!ref) { dout(10) << __func__ << ": not in cache " << hoid << dendl; struct stat st; int r = store->stat( ch, ghobject_t(hoid, ghobject_t::NO_GEN, get_parent()->whoami_shard().shard), &st); ECUtil::HashInfo hinfo(ec_impl->get_chunk_count()); // XXX: What does it mean if there is no object on disk? if (r >= 0) { dout(10) << __func__ << ": found on disk, size " << st.st_size << dendl; bufferlist bl; if (attrs) { map::const_iterator k = attrs->find(ECUtil::get_hinfo_key()); if (k == attrs->end()) { dout(5) << __func__ << " " << hoid << " missing hinfo attr" << dendl; } else { bl.push_back(k->second); } } else { r = store->getattr( ch, ghobject_t(hoid, ghobject_t::NO_GEN, get_parent()->whoami_shard().shard), ECUtil::get_hinfo_key(), bl); if (r < 0) { dout(5) << __func__ << ": getattr failed: " << cpp_strerror(r) << dendl; bl.clear(); // just in case } } if (bl.length() > 0) { bufferlist::iterator bp = bl.begin(); ::decode(hinfo, bp); if (checks && hinfo.get_total_chunk_size() != (uint64_t)st.st_size) { dout(0) << __func__ << ": Mismatch of total_chunk_size " << hinfo.get_total_chunk_size() << dendl; return ECUtil::HashInfoRef(); } } else if (st.st_size > 0) { // If empty object and no hinfo, create it return ECUtil::HashInfoRef(); } } ref = unstable_hashinfo_registry.lookup_or_create(hoid, hinfo); } return ref; } void ECBackend::start_rmw(Op *op, PGTransactionUPtr &&t) { assert(op); op->plan = ECTransaction::get_write_plan( sinfo, std::move(t), [&](const hobject_t &i) { ECUtil::HashInfoRef ref = get_hash_info(i, false); if (!ref) { derr << __func__ << ": get_hash_info(" << i << ")" << " returned a null pointer and there is no " << " way to recover from such an error in this " << " context" << dendl; ceph_abort(); } return ref; }, get_parent()->get_dpp()); dout(10) << __func__ << ": " << *op << dendl; waiting_state.push_back(*op); check_ops(); } bool ECBackend::try_state_to_reads() { if (waiting_state.empty()) return false; Op *op = &(waiting_state.front()); if (op->requires_rmw() && pipeline_state.cache_invalid()) { assert(get_parent()->get_pool().allows_ecoverwrites()); dout(20) << __func__ << ": blocking " << *op << " because it requires an rmw and the cache is invalid " << pipeline_state << dendl; return false; } if (op->invalidates_cache()) { dout(20) << __func__ << ": invalidating cache after this op" << dendl; pipeline_state.invalidate(); op->using_cache = false; } else { op->using_cache = pipeline_state.caching_enabled(); } waiting_state.pop_front(); waiting_reads.push_back(*op); if (op->using_cache) { cache.open_write_pin(op->pin); extent_set empty; for (auto &&hpair: op->plan.will_write) { auto to_read_plan_iter = op->plan.to_read.find(hpair.first); const extent_set &to_read_plan = to_read_plan_iter == op->plan.to_read.end() ? empty : to_read_plan_iter->second; extent_set remote_read = cache.reserve_extents_for_rmw( hpair.first, op->pin, hpair.second, to_read_plan); extent_set pending_read = to_read_plan; pending_read.subtract(remote_read); if (!remote_read.empty()) { op->remote_read[hpair.first] = std::move(remote_read); } if (!pending_read.empty()) { op->pending_read[hpair.first] = std::move(pending_read); } } } else { op->remote_read = op->plan.to_read; } dout(10) << __func__ << ": " << *op << dendl; if (!op->remote_read.empty()) { assert(get_parent()->get_pool().allows_ecoverwrites()); objects_read_async_no_cache( op->remote_read, [this, op](map > &&results) { for (auto &&i: results) { op->remote_read_result.emplace(i.first, i.second.second); } check_ops(); }); } return true; } bool ECBackend::try_reads_to_commit() { if (waiting_reads.empty()) return false; Op *op = &(waiting_reads.front()); if (op->read_in_progress()) return false; waiting_reads.pop_front(); waiting_commit.push_back(*op); dout(10) << __func__ << ": starting commit on " << *op << dendl; dout(20) << __func__ << ": " << cache << dendl; get_parent()->apply_stats( op->hoid, op->delta_stats); if (op->using_cache) { for (auto &&hpair: op->pending_read) { op->remote_read_result[hpair.first].insert( cache.get_remaining_extents_for_rmw( hpair.first, op->pin, hpair.second)); } op->pending_read.clear(); } else { assert(op->pending_read.empty()); } map trans; for (set::const_iterator i = get_parent()->get_actingbackfill_shards().begin(); i != get_parent()->get_actingbackfill_shards().end(); ++i) { trans[i->shard]; } op->trace.event("start ec write"); map written; if (op->plan.t) { ECTransaction::generate_transactions( op->plan, ec_impl, get_parent()->get_info().pgid.pgid, (get_osdmap()->require_osd_release < CEPH_RELEASE_KRAKEN), sinfo, op->remote_read_result, op->log_entries, &written, &trans, &(op->temp_added), &(op->temp_cleared), get_parent()->get_dpp()); } dout(20) << __func__ << ": " << cache << dendl; dout(20) << __func__ << ": written: " << written << dendl; dout(20) << __func__ << ": op: " << *op << dendl; if (!get_parent()->get_pool().allows_ecoverwrites()) { for (auto &&i: op->log_entries) { if (i.requires_kraken()) { derr << __func__ << ": log entry " << i << " requires kraken" << " but overwrites are not enabled!" << dendl; ceph_abort(); } } } map written_set; for (auto &&i: written) { written_set[i.first] = i.second.get_interval_set(); } dout(20) << __func__ << ": written_set: " << written_set << dendl; assert(written_set == op->plan.will_write); if (op->using_cache) { for (auto &&hpair: written) { dout(20) << __func__ << ": " << hpair << dendl; cache.present_rmw_update(hpair.first, op->pin, hpair.second); } } op->remote_read.clear(); op->remote_read_result.clear(); dout(10) << "onreadable_sync: " << op->on_local_applied_sync << dendl; ObjectStore::Transaction empty; bool should_write_local = false; ECSubWrite local_write_op; for (set::const_iterator i = get_parent()->get_actingbackfill_shards().begin(); i != get_parent()->get_actingbackfill_shards().end(); ++i) { op->pending_apply.insert(*i); op->pending_commit.insert(*i); map::iterator iter = trans.find(i->shard); assert(iter != trans.end()); bool should_send = get_parent()->should_send_op(*i, op->hoid); const pg_stat_t &stats = should_send ? get_info().stats : parent->get_shard_info().find(*i)->second.stats; ECSubWrite sop( get_parent()->whoami_shard(), op->tid, op->reqid, op->hoid, stats, should_send ? iter->second : empty, op->version, op->trim_to, op->roll_forward_to, op->log_entries, op->updated_hit_set_history, op->temp_added, op->temp_cleared, !should_send); ZTracer::Trace trace; if (op->trace) { // initialize a child span for this shard trace.init("ec sub write", nullptr, &op->trace); trace.keyval("shard", i->shard.id); } if (*i == get_parent()->whoami_shard()) { should_write_local = true; local_write_op.claim(sop); } else { MOSDECSubOpWrite *r = new MOSDECSubOpWrite(sop); r->pgid = spg_t(get_parent()->primary_spg_t().pgid, i->shard); r->map_epoch = get_parent()->get_epoch(); r->min_epoch = get_parent()->get_interval_start_epoch(); r->trace = trace; get_parent()->send_message_osd_cluster( i->osd, r, get_parent()->get_epoch()); } } if (should_write_local) { handle_sub_write( get_parent()->whoami_shard(), op->client_op, local_write_op, op->trace, op->on_local_applied_sync); op->on_local_applied_sync = 0; } for (auto i = op->on_write.begin(); i != op->on_write.end(); op->on_write.erase(i++)) { (*i)(); } return true; } bool ECBackend::try_finish_rmw() { if (waiting_commit.empty()) return false; Op *op = &(waiting_commit.front()); if (op->write_in_progress()) return false; waiting_commit.pop_front(); dout(10) << __func__ << ": " << *op << dendl; dout(20) << __func__ << ": " << cache << dendl; if (op->roll_forward_to > completed_to) completed_to = op->roll_forward_to; if (op->version > committed_to) committed_to = op->version; if (get_osdmap()->require_osd_release >= CEPH_RELEASE_KRAKEN) { if (op->version > get_parent()->get_log().get_can_rollback_to() && waiting_reads.empty() && waiting_commit.empty()) { // submit a dummy transaction to kick the rollforward auto tid = get_parent()->get_tid(); Op *nop = &(tid_to_op_map[tid]); nop->hoid = op->hoid; nop->trim_to = op->trim_to; nop->roll_forward_to = op->version; nop->tid = tid; nop->reqid = op->reqid; waiting_reads.push_back(*nop); } } if (op->using_cache) { cache.release_write_pin(op->pin); } tid_to_op_map.erase(op->tid); if (waiting_reads.empty() && waiting_commit.empty()) { pipeline_state.clear(); dout(20) << __func__ << ": clearing pipeline_state " << pipeline_state << dendl; } return true; } void ECBackend::check_ops() { while (try_state_to_reads() || try_reads_to_commit() || try_finish_rmw()); } int ECBackend::objects_read_sync( const hobject_t &hoid, uint64_t off, uint64_t len, uint32_t op_flags, bufferlist *bl) { return -EOPNOTSUPP; } void ECBackend::objects_read_async( const hobject_t &hoid, const list, pair > > &to_read, Context *on_complete, bool fast_read) { map > > reads; uint32_t flags = 0; extent_set es; for (list, pair > >::const_iterator i = to_read.begin(); i != to_read.end(); ++i) { pair tmp = sinfo.offset_len_to_stripe_bounds( make_pair(i->first.get<0>(), i->first.get<1>())); extent_set esnew; esnew.insert(tmp.first, tmp.second); es.union_of(esnew); flags |= i->first.get<2>(); } if (!es.empty()) { auto &offsets = reads[hoid]; for (auto j = es.begin(); j != es.end(); ++j) { offsets.push_back( boost::make_tuple( j.get_start(), j.get_len(), flags)); } } struct cb { ECBackend *ec; hobject_t hoid; list, pair > > to_read; unique_ptr on_complete; cb(const cb&) = delete; cb(cb &&) = default; cb(ECBackend *ec, const hobject_t &hoid, const list, pair > > &to_read, Context *on_complete) : ec(ec), hoid(hoid), to_read(to_read), on_complete(on_complete) {} void operator()(map > &&results) { auto dpp = ec->get_parent()->get_dpp(); ldpp_dout(dpp, 20) << "objects_read_async_cb: got: " << results << dendl; ldpp_dout(dpp, 20) << "objects_read_async_cb: cache: " << ec->cache << dendl; auto &got = results[hoid]; int r = 0; for (auto &&read: to_read) { if (got.first < 0) { if (read.second.second) { read.second.second->complete(got.first); } if (r == 0) r = got.first; } else { assert(read.second.first); uint64_t offset = read.first.get<0>(); uint64_t length = read.first.get<1>(); auto range = got.second.get_containing_range(offset, length); assert(range.first != range.second); assert(range.first.get_off() <= offset); assert( (offset + length) <= (range.first.get_off() + range.first.get_len())); read.second.first->substr_of( range.first.get_val(), offset - range.first.get_off(), length); if (read.second.second) { read.second.second->complete(length); read.second.second = nullptr; } } } to_read.clear(); if (on_complete) { on_complete.release()->complete(r); } } ~cb() { for (auto &&i: to_read) { delete i.second.second; } to_read.clear(); } }; objects_read_and_reconstruct( reads, fast_read, make_gen_lambda_context< map > &&, cb>( cb(this, hoid, to_read, on_complete))); } struct CallClientContexts : public GenContext &> { hobject_t hoid; ECBackend *ec; ECBackend::ClientAsyncReadStatus *status; list > to_read; CallClientContexts( hobject_t hoid, ECBackend *ec, ECBackend::ClientAsyncReadStatus *status, const list > &to_read) : hoid(hoid), ec(ec), status(status), to_read(to_read) {} void finish(pair &in) override { ECBackend::read_result_t &res = in.second; extent_map result; if (res.r != 0) goto out; assert(res.returned.size() == to_read.size()); assert(res.r == 0); assert(res.errors.empty()); for (auto &&read: to_read) { pair adjusted = ec->sinfo.offset_len_to_stripe_bounds( make_pair(read.get<0>(), read.get<1>())); assert(res.returned.front().get<0>() == adjusted.first && res.returned.front().get<1>() == adjusted.second); map to_decode; bufferlist bl; for (map::iterator j = res.returned.front().get<2>().begin(); j != res.returned.front().get<2>().end(); ++j) { to_decode[j->first.shard].claim(j->second); } int r = ECUtil::decode( ec->sinfo, ec->ec_impl, to_decode, &bl); if (r < 0) { res.r = r; goto out; } bufferlist trimmed; trimmed.substr_of( bl, read.get<0>() - adjusted.first, MIN(read.get<1>(), bl.length() - (read.get<0>() - adjusted.first))); result.insert( read.get<0>(), trimmed.length(), std::move(trimmed)); res.returned.pop_front(); } out: status->complete_object(hoid, res.r, std::move(result)); ec->kick_reads(); } }; void ECBackend::objects_read_and_reconstruct( const map > > &reads, bool fast_read, GenContextURef > &&> &&func) { in_progress_client_reads.emplace_back( reads.size(), std::move(func)); if (!reads.size()) { kick_reads(); return; } set want_to_read; get_want_to_read_shards(&want_to_read); map for_read_op; for (auto &&to_read: reads) { set shards; int r = get_min_avail_to_read_shards( to_read.first, want_to_read, false, fast_read, &shards); assert(r == 0); CallClientContexts *c = new CallClientContexts( to_read.first, this, &(in_progress_client_reads.back()), to_read.second); for_read_op.insert( make_pair( to_read.first, read_request_t( to_read.second, shards, false, c))); } start_read_op( CEPH_MSG_PRIO_DEFAULT, for_read_op, OpRequestRef(), fast_read, false); return; } int ECBackend::send_all_remaining_reads( const hobject_t &hoid, ReadOp &rop) { set already_read; const set& ots = rop.obj_to_source[hoid]; for (set::iterator i = ots.begin(); i != ots.end(); ++i) already_read.insert(i->shard); dout(10) << __func__ << " have/error shards=" << already_read << dendl; set shards; int r = get_remaining_shards(hoid, already_read, &shards); if (r) return r; if (shards.empty()) return -EIO; dout(10) << __func__ << " Read remaining shards " << shards << dendl; // TODOSAM: this doesn't seem right list > offsets = rop.to_read.find(hoid)->second.to_read; GenContext &> *c = rop.to_read.find(hoid)->second.cb; map for_read_op; for_read_op.insert( make_pair( hoid, read_request_t( offsets, shards, false, c))); rop.to_read.swap(for_read_op); do_read_op(rop); return 0; } int ECBackend::objects_get_attrs( const hobject_t &hoid, map *out) { int r = store->getattrs( ch, ghobject_t(hoid, ghobject_t::NO_GEN, get_parent()->whoami_shard().shard), *out); if (r < 0) return r; for (map::iterator i = out->begin(); i != out->end(); ) { if (ECUtil::is_hinfo_key_string(i->first)) out->erase(i++); else ++i; } return r; } void ECBackend::rollback_append( const hobject_t &hoid, uint64_t old_size, ObjectStore::Transaction *t) { assert(old_size % sinfo.get_stripe_width() == 0); t->truncate( coll, ghobject_t(hoid, ghobject_t::NO_GEN, get_parent()->whoami_shard().shard), sinfo.aligned_logical_offset_to_chunk_offset( old_size)); } void ECBackend::be_deep_scrub( const hobject_t &poid, uint32_t seed, ScrubMap::object &o, ThreadPool::TPHandle &handle) { bufferhash h(-1); // we always used -1 int r; uint64_t stride = cct->_conf->osd_deep_scrub_stride; if (stride % sinfo.get_chunk_size()) stride += sinfo.get_chunk_size() - (stride % sinfo.get_chunk_size()); uint64_t pos = 0; uint32_t fadvise_flags = CEPH_OSD_OP_FLAG_FADVISE_SEQUENTIAL | CEPH_OSD_OP_FLAG_FADVISE_DONTNEED; while (true) { bufferlist bl; handle.reset_tp_timeout(); r = store->read( ch, ghobject_t( poid, ghobject_t::NO_GEN, get_parent()->whoami_shard().shard), pos, stride, bl, fadvise_flags); if (r < 0) break; if (bl.length() % sinfo.get_chunk_size()) { r = -EIO; break; } pos += r; h << bl; if ((unsigned)r < stride) break; } if (r == -EIO) { dout(0) << "_scan_list " << poid << " got " << r << " on read, read_error" << dendl; o.read_error = true; return; } ECUtil::HashInfoRef hinfo = get_hash_info(poid, false, &o.attrs); if (!hinfo) { dout(0) << "_scan_list " << poid << " could not retrieve hash info" << dendl; o.read_error = true; o.digest_present = false; return; } else { if (!get_parent()->get_pool().allows_ecoverwrites()) { assert(hinfo->has_chunk_hash()); if (hinfo->get_total_chunk_size() != pos) { dout(0) << "_scan_list " << poid << " got incorrect size on read" << dendl; o.ec_size_mismatch = true; return; } if (hinfo->get_chunk_hash(get_parent()->whoami_shard().shard) != h.digest()) { dout(0) << "_scan_list " << poid << " got incorrect hash on read" << dendl; o.ec_hash_mismatch = true; return; } /* We checked above that we match our own stored hash. We cannot * send a hash of the actual object, so instead we simply send * our locally stored hash of shard 0 on the assumption that if * we match our chunk hash and our recollection of the hash for * chunk 0 matches that of our peers, there is likely no corruption. */ o.digest = hinfo->get_chunk_hash(0); o.digest_present = true; } else { /* Hack! We must be using partial overwrites, and partial overwrites * don't support deep-scrub yet */ o.digest = 0; o.digest_present = true; } } o.omap_digest = seed; o.omap_digest_present = true; }