// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- // vim: ts=8 sw=2 smarttab /* * Ceph - scalable distributed file system * * Copyright (C) 2013 Inktank Storage, Inc. * * This is free software; you can redistribute it and/or * modify it under the terms of the GNU Lesser General Public * License version 2.1, as published by the Free Software * Foundation. See file COPYING. * */ #include "common/errno.h" #include "ReplicatedBackend.h" #include "messages/MOSDOp.h" #include "messages/MOSDSubOp.h" #include "messages/MOSDRepOp.h" #include "messages/MOSDSubOpReply.h" #include "messages/MOSDRepOpReply.h" #include "messages/MOSDPGPush.h" #include "messages/MOSDPGPull.h" #include "messages/MOSDPGPushReply.h" #include "common/EventTrace.h" #define dout_context cct #define dout_subsys ceph_subsys_osd #define DOUT_PREFIX_ARGS this #undef dout_prefix #define dout_prefix _prefix(_dout, this) static ostream& _prefix(std::ostream *_dout, ReplicatedBackend *pgb) { return *_dout << pgb->get_parent()->gen_dbg_prefix(); } namespace { class PG_SendMessageOnConn: public Context { PGBackend::Listener *pg; Message *reply; ConnectionRef conn; public: PG_SendMessageOnConn( PGBackend::Listener *pg, Message *reply, ConnectionRef conn) : pg(pg), reply(reply), conn(conn) {} void finish(int) override { pg->send_message_osd_cluster(reply, conn.get()); } }; class PG_RecoveryQueueAsync : public Context { PGBackend::Listener *pg; unique_ptr> c; public: PG_RecoveryQueueAsync( PGBackend::Listener *pg, GenContext *c) : pg(pg), c(c) {} void finish(int) override { pg->schedule_recovery_work(c.release()); } }; } struct ReplicatedBackend::C_OSD_RepModifyApply : public Context { ReplicatedBackend *pg; RepModifyRef rm; C_OSD_RepModifyApply(ReplicatedBackend *pg, RepModifyRef r) : pg(pg), rm(r) {} void finish(int r) override { pg->repop_applied(rm); } }; struct ReplicatedBackend::C_OSD_RepModifyCommit : public Context { ReplicatedBackend *pg; RepModifyRef rm; C_OSD_RepModifyCommit(ReplicatedBackend *pg, RepModifyRef r) : pg(pg), rm(r) {} void finish(int r) override { pg->repop_commit(rm); } }; static void log_subop_stats( PerfCounters *logger, OpRequestRef op, int subop) { utime_t now = ceph_clock_now(); utime_t latency = now; latency -= op->get_req()->get_recv_stamp(); logger->inc(l_osd_sop); logger->tinc(l_osd_sop_lat, latency); logger->inc(subop); if (subop != l_osd_sop_pull) { uint64_t inb = op->get_req()->get_data().length(); logger->inc(l_osd_sop_inb, inb); if (subop == l_osd_sop_w) { logger->inc(l_osd_sop_w_inb, inb); logger->tinc(l_osd_sop_w_lat, latency); } else if (subop == l_osd_sop_push) { logger->inc(l_osd_sop_push_inb, inb); logger->tinc(l_osd_sop_push_lat, latency); } else assert("no support subop" == 0); } else { logger->tinc(l_osd_sop_pull_lat, latency); } } ReplicatedBackend::ReplicatedBackend( PGBackend::Listener *pg, coll_t coll, ObjectStore::CollectionHandle &c, ObjectStore *store, CephContext *cct) : PGBackend(cct, pg, store, coll, c) {} void ReplicatedBackend::run_recovery_op( PGBackend::RecoveryHandle *_h, int priority) { RPGHandle *h = static_cast(_h); send_pushes(priority, h->pushes); send_pulls(priority, h->pulls); send_recovery_deletes(priority, h->deletes); delete h; } int ReplicatedBackend::recover_object( const hobject_t &hoid, eversion_t v, ObjectContextRef head, ObjectContextRef obc, RecoveryHandle *_h ) { dout(10) << __func__ << ": " << hoid << dendl; RPGHandle *h = static_cast(_h); if (get_parent()->get_local_missing().is_missing(hoid)) { assert(!obc); // pull prepare_pull( v, hoid, head, h); } else { assert(obc); int started = start_pushes( hoid, obc, h); if (started < 0) { pushing[hoid].clear(); return started; } } return 0; } void ReplicatedBackend::check_recovery_sources(const OSDMapRef& osdmap) { for(map >::iterator i = pull_from_peer.begin(); i != pull_from_peer.end(); ) { if (osdmap->is_down(i->first.osd)) { dout(10) << "check_recovery_sources resetting pulls from osd." << i->first << ", osdmap has it marked down" << dendl; for (set::iterator j = i->second.begin(); j != i->second.end(); ++j) { get_parent()->cancel_pull(*j); clear_pull(pulling.find(*j), false); } pull_from_peer.erase(i++); } else { ++i; } } } bool ReplicatedBackend::can_handle_while_inactive(OpRequestRef op) { dout(10) << __func__ << ": " << op << dendl; switch (op->get_req()->get_type()) { case MSG_OSD_PG_PULL: return true; default: return false; } } bool ReplicatedBackend::_handle_message( OpRequestRef op ) { dout(10) << __func__ << ": " << op << dendl; switch (op->get_req()->get_type()) { case MSG_OSD_PG_PUSH: do_push(op); return true; case MSG_OSD_PG_PULL: do_pull(op); return true; case MSG_OSD_PG_PUSH_REPLY: do_push_reply(op); return true; case MSG_OSD_SUBOP: { const MOSDSubOp *m = static_cast(op->get_req()); if (m->ops.size() == 0) { assert(0); } break; } case MSG_OSD_REPOP: { do_repop(op); return true; } case MSG_OSD_REPOPREPLY: { do_repop_reply(op); return true; } default: break; } return false; } void ReplicatedBackend::clear_recovery_state() { // clear pushing/pulling maps for (auto &&i: pushing) { for (auto &&j: i.second) { get_parent()->release_locks(j.second.lock_manager); } } pushing.clear(); for (auto &&i: pulling) { get_parent()->release_locks(i.second.lock_manager); } pulling.clear(); pull_from_peer.clear(); } void ReplicatedBackend::on_change() { dout(10) << __func__ << dendl; for (map::iterator i = in_progress_ops.begin(); i != in_progress_ops.end(); in_progress_ops.erase(i++)) { if (i->second.on_commit) delete i->second.on_commit; if (i->second.on_applied) delete i->second.on_applied; } clear_recovery_state(); } void ReplicatedBackend::on_flushed() { } int ReplicatedBackend::objects_read_sync( const hobject_t &hoid, uint64_t off, uint64_t len, uint32_t op_flags, bufferlist *bl) { return store->read(ch, ghobject_t(hoid), off, len, *bl, op_flags); } struct AsyncReadCallback : public GenContext { int r; Context *c; AsyncReadCallback(int r, Context *c) : r(r), c(c) {} void finish(ThreadPool::TPHandle&) override { c->complete(r); c = NULL; } ~AsyncReadCallback() override { delete c; } }; void ReplicatedBackend::objects_read_async( const hobject_t &hoid, const list, pair > > &to_read, Context *on_complete, bool fast_read) { // There is no fast read implementation for replication backend yet assert(!fast_read); int r = 0; for (list, pair > >::const_iterator i = to_read.begin(); i != to_read.end() && r >= 0; ++i) { int _r = store->read(ch, ghobject_t(hoid), i->first.get<0>(), i->first.get<1>(), *(i->second.first), i->first.get<2>()); if (i->second.second) { get_parent()->schedule_recovery_work( get_parent()->bless_gencontext( new AsyncReadCallback(_r, i->second.second))); } if (_r < 0) r = _r; } get_parent()->schedule_recovery_work( get_parent()->bless_gencontext( new AsyncReadCallback(r, on_complete))); } class C_OSD_OnOpCommit : public Context { ReplicatedBackend *pg; ReplicatedBackend::InProgressOp *op; public: C_OSD_OnOpCommit(ReplicatedBackend *pg, ReplicatedBackend::InProgressOp *op) : pg(pg), op(op) {} void finish(int) override { pg->op_commit(op); } }; class C_OSD_OnOpApplied : public Context { ReplicatedBackend *pg; ReplicatedBackend::InProgressOp *op; public: C_OSD_OnOpApplied(ReplicatedBackend *pg, ReplicatedBackend::InProgressOp *op) : pg(pg), op(op) {} void finish(int) override { pg->op_applied(op); } }; void generate_transaction( PGTransactionUPtr &pgt, const coll_t &coll, bool legacy_log_entries, vector &log_entries, ObjectStore::Transaction *t, set *added, set *removed) { assert(t); assert(added); assert(removed); for (auto &&le: log_entries) { le.mark_unrollbackable(); auto oiter = pgt->op_map.find(le.soid); if (oiter != pgt->op_map.end() && oiter->second.updated_snaps) { bufferlist bl(oiter->second.updated_snaps->second.size() * 8 + 8); ::encode(oiter->second.updated_snaps->second, bl); le.snaps.swap(bl); le.snaps.reassign_to_mempool(mempool::mempool_osd_pglog); } } pgt->safe_create_traverse( [&](pair &obj_op) { const hobject_t &oid = obj_op.first; const ghobject_t goid = ghobject_t(oid, ghobject_t::NO_GEN, shard_id_t::NO_SHARD); const PGTransaction::ObjectOperation &op = obj_op.second; if (oid.is_temp()) { if (op.is_fresh_object()) { added->insert(oid); } else if (op.is_delete()) { removed->insert(oid); } } if (op.delete_first) { t->remove(coll, goid); } match( op.init_type, [&](const PGTransaction::ObjectOperation::Init::None &) { }, [&](const PGTransaction::ObjectOperation::Init::Create &op) { t->touch(coll, goid); }, [&](const PGTransaction::ObjectOperation::Init::Clone &op) { t->clone( coll, ghobject_t( op.source, ghobject_t::NO_GEN, shard_id_t::NO_SHARD), goid); }, [&](const PGTransaction::ObjectOperation::Init::Rename &op) { assert(op.source.is_temp()); t->collection_move_rename( coll, ghobject_t( op.source, ghobject_t::NO_GEN, shard_id_t::NO_SHARD), coll, goid); }); if (op.truncate) { t->truncate(coll, goid, op.truncate->first); if (op.truncate->first != op.truncate->second) t->truncate(coll, goid, op.truncate->second); } if (!op.attr_updates.empty()) { map attrs; for (auto &&p: op.attr_updates) { if (p.second) attrs[p.first] = *(p.second); else t->rmattr(coll, goid, p.first); } t->setattrs(coll, goid, attrs); } if (op.clear_omap) t->omap_clear(coll, goid); if (op.omap_header) t->omap_setheader(coll, goid, *(op.omap_header)); for (auto &&up: op.omap_updates) { using UpdateType = PGTransaction::ObjectOperation::OmapUpdateType; switch (up.first) { case UpdateType::Remove: t->omap_rmkeys(coll, goid, up.second); break; case UpdateType::Insert: t->omap_setkeys(coll, goid, up.second); break; } } // updated_snaps doesn't matter since we marked unrollbackable if (op.alloc_hint) { auto &hint = *(op.alloc_hint); t->set_alloc_hint( coll, goid, hint.expected_object_size, hint.expected_write_size, hint.flags); } for (auto &&extent: op.buffer_updates) { using BufferUpdate = PGTransaction::ObjectOperation::BufferUpdate; match( extent.get_val(), [&](const BufferUpdate::Write &op) { t->write( coll, goid, extent.get_off(), extent.get_len(), op.buffer); }, [&](const BufferUpdate::Zero &op) { t->zero( coll, goid, extent.get_off(), extent.get_len()); }, [&](const BufferUpdate::CloneRange &op) { assert(op.len == extent.get_len()); t->clone_range( coll, ghobject_t(op.from, ghobject_t::NO_GEN, shard_id_t::NO_SHARD), goid, op.offset, extent.get_len(), extent.get_off()); }); } }); } void ReplicatedBackend::submit_transaction( const hobject_t &soid, const object_stat_sum_t &delta_stats, const eversion_t &at_version, PGTransactionUPtr &&_t, const eversion_t &trim_to, const eversion_t &roll_forward_to, const vector &_log_entries, boost::optional &hset_history, Context *on_local_applied_sync, Context *on_all_acked, Context *on_all_commit, ceph_tid_t tid, osd_reqid_t reqid, OpRequestRef orig_op) { parent->apply_stats( soid, delta_stats); vector log_entries(_log_entries); ObjectStore::Transaction op_t; PGTransactionUPtr t(std::move(_t)); set added, removed; generate_transaction( t, coll, (get_osdmap()->require_osd_release < CEPH_RELEASE_KRAKEN), log_entries, &op_t, &added, &removed); assert(added.size() <= 1); assert(removed.size() <= 1); assert(!in_progress_ops.count(tid)); InProgressOp &op = in_progress_ops.insert( make_pair( tid, InProgressOp( tid, on_all_commit, on_all_acked, orig_op, at_version) ) ).first->second; op.waiting_for_applied.insert( parent->get_actingbackfill_shards().begin(), parent->get_actingbackfill_shards().end()); op.waiting_for_commit.insert( parent->get_actingbackfill_shards().begin(), parent->get_actingbackfill_shards().end()); issue_op( soid, at_version, tid, reqid, trim_to, at_version, added.size() ? *(added.begin()) : hobject_t(), removed.size() ? *(removed.begin()) : hobject_t(), log_entries, hset_history, &op, op_t); add_temp_objs(added); clear_temp_objs(removed); parent->log_operation( log_entries, hset_history, trim_to, at_version, true, op_t); op_t.register_on_applied_sync(on_local_applied_sync); op_t.register_on_applied( parent->bless_context( new C_OSD_OnOpApplied(this, &op))); op_t.register_on_commit( parent->bless_context( new C_OSD_OnOpCommit(this, &op))); vector tls; tls.push_back(std::move(op_t)); parent->queue_transactions(tls, op.op); } void ReplicatedBackend::op_applied( InProgressOp *op) { FUNCTRACE(); OID_EVENT_TRACE_WITH_MSG((op && op->op) ? op->op->get_req() : NULL, "OP_APPLIED_BEGIN", true); dout(10) << __func__ << ": " << op->tid << dendl; if (op->op) { op->op->mark_event("op_applied"); op->op->pg_trace.event("op applied"); } op->waiting_for_applied.erase(get_parent()->whoami_shard()); parent->op_applied(op->v); if (op->waiting_for_applied.empty()) { op->on_applied->complete(0); op->on_applied = 0; } if (op->done()) { assert(!op->on_commit && !op->on_applied); in_progress_ops.erase(op->tid); } } void ReplicatedBackend::op_commit( InProgressOp *op) { FUNCTRACE(); OID_EVENT_TRACE_WITH_MSG((op && op->op) ? op->op->get_req() : NULL, "OP_COMMIT_BEGIN", true); dout(10) << __func__ << ": " << op->tid << dendl; if (op->op) { op->op->mark_event("op_commit"); op->op->pg_trace.event("op commit"); } op->waiting_for_commit.erase(get_parent()->whoami_shard()); if (op->waiting_for_commit.empty()) { op->on_commit->complete(0); op->on_commit = 0; } if (op->done()) { assert(!op->on_commit && !op->on_applied); in_progress_ops.erase(op->tid); } } void ReplicatedBackend::do_repop_reply(OpRequestRef op) { static_cast(op->get_nonconst_req())->finish_decode(); const MOSDRepOpReply *r = static_cast(op->get_req()); assert(r->get_header().type == MSG_OSD_REPOPREPLY); op->mark_started(); // must be replication. ceph_tid_t rep_tid = r->get_tid(); pg_shard_t from = r->from; if (in_progress_ops.count(rep_tid)) { map::iterator iter = in_progress_ops.find(rep_tid); InProgressOp &ip_op = iter->second; const MOSDOp *m = NULL; if (ip_op.op) m = static_cast(ip_op.op->get_req()); if (m) dout(7) << __func__ << ": tid " << ip_op.tid << " op " //<< *m << " ack_type " << (int)r->ack_type << " from " << from << dendl; else dout(7) << __func__ << ": tid " << ip_op.tid << " (no op) " << " ack_type " << (int)r->ack_type << " from " << from << dendl; // oh, good. if (r->ack_type & CEPH_OSD_FLAG_ONDISK) { assert(ip_op.waiting_for_commit.count(from)); ip_op.waiting_for_commit.erase(from); if (ip_op.op) { ostringstream ss; ss << "sub_op_commit_rec from " << from; ip_op.op->mark_event_string(ss.str()); ip_op.op->pg_trace.event("sub_op_commit_rec"); } } else { assert(ip_op.waiting_for_applied.count(from)); if (ip_op.op) { ostringstream ss; ss << "sub_op_applied_rec from " << from; ip_op.op->mark_event_string(ss.str()); ip_op.op->pg_trace.event("sub_op_applied_rec"); } } ip_op.waiting_for_applied.erase(from); parent->update_peer_last_complete_ondisk( from, r->get_last_complete_ondisk()); if (ip_op.waiting_for_applied.empty() && ip_op.on_applied) { ip_op.on_applied->complete(0); ip_op.on_applied = 0; } if (ip_op.waiting_for_commit.empty() && ip_op.on_commit) { ip_op.on_commit->complete(0); ip_op.on_commit= 0; } if (ip_op.done()) { assert(!ip_op.on_commit && !ip_op.on_applied); in_progress_ops.erase(iter); } } } void ReplicatedBackend::be_deep_scrub( const hobject_t &poid, uint32_t seed, ScrubMap::object &o, ThreadPool::TPHandle &handle) { dout(10) << __func__ << " " << poid << " seed " << std::hex << seed << std::dec << dendl; bufferhash h(seed), oh(seed); bufferlist bl, hdrbl; int r; __u64 pos = 0; uint32_t fadvise_flags = CEPH_OSD_OP_FLAG_FADVISE_SEQUENTIAL | CEPH_OSD_OP_FLAG_FADVISE_DONTNEED; while (true) { handle.reset_tp_timeout(); r = store->read( ch, ghobject_t( poid, ghobject_t::NO_GEN, get_parent()->whoami_shard().shard), pos, cct->_conf->osd_deep_scrub_stride, bl, fadvise_flags); if (r <= 0) break; h << bl; pos += bl.length(); bl.clear(); } if (r == -EIO) { dout(25) << __func__ << " " << poid << " got " << r << " on read, read_error" << dendl; o.read_error = true; return; } o.digest = h.digest(); o.digest_present = true; bl.clear(); r = store->omap_get_header( coll, ghobject_t( poid, ghobject_t::NO_GEN, get_parent()->whoami_shard().shard), &hdrbl, true); // NOTE: bobtail to giant, we would crc the head as (len, head). // that changes at the same time we start using a non-zero seed. if (r == 0 && hdrbl.length()) { dout(25) << "CRC header " << string(hdrbl.c_str(), hdrbl.length()) << dendl; if (seed == 0) { // legacy bufferlist bl; ::encode(hdrbl, bl); oh << bl; } else { oh << hdrbl; } } else if (r == -EIO) { dout(25) << __func__ << " " << poid << " got " << r << " on omap header read, read_error" << dendl; o.read_error = true; return; } ObjectMap::ObjectMapIterator iter = store->get_omap_iterator( coll, ghobject_t( poid, ghobject_t::NO_GEN, get_parent()->whoami_shard().shard)); assert(iter); for (iter->seek_to_first(); iter->status() == 0 && iter->valid(); iter->next(false)) { handle.reset_tp_timeout(); dout(25) << "CRC key " << iter->key() << " value:\n"; iter->value().hexdump(*_dout); *_dout << dendl; ::encode(iter->key(), bl); ::encode(iter->value(), bl); oh << bl; bl.clear(); } if (iter->status() < 0) { dout(25) << __func__ << " " << poid << " on omap scan, db status error" << dendl; o.read_error = true; return; } //Store final calculated CRC32 of omap header & key/values o.omap_digest = oh.digest(); o.omap_digest_present = true; dout(20) << __func__ << " " << poid << " omap_digest " << std::hex << o.omap_digest << std::dec << dendl; } void ReplicatedBackend::_do_push(OpRequestRef op) { const MOSDPGPush *m = static_cast(op->get_req()); assert(m->get_type() == MSG_OSD_PG_PUSH); pg_shard_t from = m->from; op->mark_started(); vector replies; ObjectStore::Transaction t; ostringstream ss; if (get_parent()->check_failsafe_full(ss)) { dout(10) << __func__ << " Out of space (failsafe) processing push request: " << ss.str() << dendl; ceph_abort(); } for (vector::const_iterator i = m->pushes.begin(); i != m->pushes.end(); ++i) { replies.push_back(PushReplyOp()); handle_push(from, *i, &(replies.back()), &t); } MOSDPGPushReply *reply = new MOSDPGPushReply; reply->from = get_parent()->whoami_shard(); reply->set_priority(m->get_priority()); reply->pgid = get_info().pgid; reply->map_epoch = m->map_epoch; reply->min_epoch = m->min_epoch; reply->replies.swap(replies); reply->compute_cost(cct); t.register_on_complete( new PG_SendMessageOnConn( get_parent(), reply, m->get_connection())); get_parent()->queue_transaction(std::move(t)); } struct C_ReplicatedBackend_OnPullComplete : GenContext { ReplicatedBackend *bc; list to_continue; int priority; C_ReplicatedBackend_OnPullComplete(ReplicatedBackend *bc, int priority) : bc(bc), priority(priority) {} void finish(ThreadPool::TPHandle &handle) override { ReplicatedBackend::RPGHandle *h = bc->_open_recovery_op(); for (auto &&i: to_continue) { auto j = bc->pulling.find(i.hoid); assert(j != bc->pulling.end()); ObjectContextRef obc = j->second.obc; bc->clear_pull(j, false /* already did it */); int started = bc->start_pushes(i.hoid, obc, h); if (started < 0) { bc->pushing[i.hoid].clear(); bc->get_parent()->primary_failed(i.hoid); bc->get_parent()->primary_error(i.hoid, obc->obs.oi.version); } else if (!started) { bc->get_parent()->on_global_recover( i.hoid, i.stat, false); } handle.reset_tp_timeout(); } bc->run_recovery_op(h, priority); } }; void ReplicatedBackend::_do_pull_response(OpRequestRef op) { const MOSDPGPush *m = static_cast(op->get_req()); assert(m->get_type() == MSG_OSD_PG_PUSH); pg_shard_t from = m->from; op->mark_started(); vector replies(1); ostringstream ss; if (get_parent()->check_failsafe_full(ss)) { dout(10) << __func__ << " Out of space (failsafe) processing pull response (push): " << ss.str() << dendl; ceph_abort(); } ObjectStore::Transaction t; list to_continue; for (vector::const_iterator i = m->pushes.begin(); i != m->pushes.end(); ++i) { bool more = handle_pull_response(from, *i, &(replies.back()), &to_continue, &t); if (more) replies.push_back(PullOp()); } if (!to_continue.empty()) { C_ReplicatedBackend_OnPullComplete *c = new C_ReplicatedBackend_OnPullComplete( this, m->get_priority()); c->to_continue.swap(to_continue); t.register_on_complete( new PG_RecoveryQueueAsync( get_parent(), get_parent()->bless_gencontext(c))); } replies.erase(replies.end() - 1); if (replies.size()) { MOSDPGPull *reply = new MOSDPGPull; reply->from = parent->whoami_shard(); reply->set_priority(m->get_priority()); reply->pgid = get_info().pgid; reply->map_epoch = m->map_epoch; reply->min_epoch = m->min_epoch; reply->set_pulls(&replies); reply->compute_cost(cct); t.register_on_complete( new PG_SendMessageOnConn( get_parent(), reply, m->get_connection())); } get_parent()->queue_transaction(std::move(t)); } void ReplicatedBackend::do_pull(OpRequestRef op) { MOSDPGPull *m = static_cast(op->get_nonconst_req()); assert(m->get_type() == MSG_OSD_PG_PULL); pg_shard_t from = m->from; map > replies; vector pulls; m->take_pulls(&pulls); for (auto& i : pulls) { replies[from].push_back(PushOp()); handle_pull(from, i, &(replies[from].back())); } send_pushes(m->get_priority(), replies); } void ReplicatedBackend::do_push_reply(OpRequestRef op) { const MOSDPGPushReply *m = static_cast(op->get_req()); assert(m->get_type() == MSG_OSD_PG_PUSH_REPLY); pg_shard_t from = m->from; vector replies(1); for (vector::const_iterator i = m->replies.begin(); i != m->replies.end(); ++i) { bool more = handle_push_reply(from, *i, &(replies.back())); if (more) replies.push_back(PushOp()); } replies.erase(replies.end() - 1); map > _replies; _replies[from].swap(replies); send_pushes(m->get_priority(), _replies); } Message * ReplicatedBackend::generate_subop( const hobject_t &soid, const eversion_t &at_version, ceph_tid_t tid, osd_reqid_t reqid, eversion_t pg_trim_to, eversion_t pg_roll_forward_to, hobject_t new_temp_oid, hobject_t discard_temp_oid, const vector &log_entries, boost::optional &hset_hist, ObjectStore::Transaction &op_t, pg_shard_t peer, const pg_info_t &pinfo) { int acks_wanted = CEPH_OSD_FLAG_ACK | CEPH_OSD_FLAG_ONDISK; // forward the write/update/whatever MOSDRepOp *wr = new MOSDRepOp( reqid, parent->whoami_shard(), spg_t(get_info().pgid.pgid, peer.shard), soid, acks_wanted, get_osdmap()->get_epoch(), parent->get_last_peering_reset_epoch(), tid, at_version); // ship resulting transaction, log entries, and pg_stats if (!parent->should_send_op(peer, soid)) { dout(10) << "issue_repop shipping empty opt to osd." << peer <<", object " << soid << " beyond MAX(last_backfill_started " << ", pinfo.last_backfill " << pinfo.last_backfill << ")" << dendl; ObjectStore::Transaction t; ::encode(t, wr->get_data()); } else { ::encode(op_t, wr->get_data()); wr->get_header().data_off = op_t.get_data_alignment(); } ::encode(log_entries, wr->logbl); if (pinfo.is_incomplete()) wr->pg_stats = pinfo.stats; // reflects backfill progress else wr->pg_stats = get_info().stats; wr->pg_trim_to = pg_trim_to; wr->pg_roll_forward_to = pg_roll_forward_to; wr->new_temp_oid = new_temp_oid; wr->discard_temp_oid = discard_temp_oid; wr->updated_hit_set_history = hset_hist; return wr; } void ReplicatedBackend::issue_op( const hobject_t &soid, const eversion_t &at_version, ceph_tid_t tid, osd_reqid_t reqid, eversion_t pg_trim_to, eversion_t pg_roll_forward_to, hobject_t new_temp_oid, hobject_t discard_temp_oid, const vector &log_entries, boost::optional &hset_hist, InProgressOp *op, ObjectStore::Transaction &op_t) { if (op->op) op->op->pg_trace.event("issue replication ops"); if (parent->get_actingbackfill_shards().size() > 1) { ostringstream ss; set replicas = parent->get_actingbackfill_shards(); replicas.erase(parent->whoami_shard()); ss << "waiting for subops from " << replicas; if (op->op) op->op->mark_sub_op_sent(ss.str()); } for (set::const_iterator i = parent->get_actingbackfill_shards().begin(); i != parent->get_actingbackfill_shards().end(); ++i) { if (*i == parent->whoami_shard()) continue; pg_shard_t peer = *i; const pg_info_t &pinfo = parent->get_shard_info().find(peer)->second; Message *wr; wr = generate_subop( soid, at_version, tid, reqid, pg_trim_to, pg_roll_forward_to, new_temp_oid, discard_temp_oid, log_entries, hset_hist, op_t, peer, pinfo); if (op->op) wr->trace.init("replicated op", nullptr, &op->op->pg_trace); get_parent()->send_message_osd_cluster( peer.osd, wr, get_osdmap()->get_epoch()); } } // sub op modify void ReplicatedBackend::do_repop(OpRequestRef op) { static_cast(op->get_nonconst_req())->finish_decode(); const MOSDRepOp *m = static_cast(op->get_req()); int msg_type = m->get_type(); assert(MSG_OSD_REPOP == msg_type); const hobject_t& soid = m->poid; dout(10) << __func__ << " " << soid << " v " << m->version << (m->logbl.length() ? " (transaction)" : " (parallel exec") << " " << m->logbl.length() << dendl; // sanity checks assert(m->map_epoch >= get_info().history.same_interval_since); // we better not be missing this. assert(!parent->get_log().get_missing().is_missing(soid)); int ackerosd = m->get_source().num(); op->mark_started(); RepModifyRef rm(std::make_shared()); rm->op = op; rm->ackerosd = ackerosd; rm->last_complete = get_info().last_complete; rm->epoch_started = get_osdmap()->get_epoch(); assert(m->logbl.length()); // shipped transaction and log entries vector log; bufferlist::iterator p = const_cast(m->get_data()).begin(); ::decode(rm->opt, p); if (m->new_temp_oid != hobject_t()) { dout(20) << __func__ << " start tracking temp " << m->new_temp_oid << dendl; add_temp_obj(m->new_temp_oid); } if (m->discard_temp_oid != hobject_t()) { dout(20) << __func__ << " stop tracking temp " << m->discard_temp_oid << dendl; if (rm->opt.empty()) { dout(10) << __func__ << ": removing object " << m->discard_temp_oid << " since we won't get the transaction" << dendl; rm->localt.remove(coll, ghobject_t(m->discard_temp_oid)); } clear_temp_obj(m->discard_temp_oid); } p = const_cast(m->logbl).begin(); ::decode(log, p); rm->opt.set_fadvise_flag(CEPH_OSD_OP_FLAG_FADVISE_DONTNEED); bool update_snaps = false; if (!rm->opt.empty()) { // If the opt is non-empty, we infer we are before // last_backfill (according to the primary, not our // not-quite-accurate value), and should update the // collections now. Otherwise, we do it later on push. update_snaps = true; } parent->update_stats(m->pg_stats); parent->log_operation( log, m->updated_hit_set_history, m->pg_trim_to, m->pg_roll_forward_to, update_snaps, rm->localt); rm->opt.register_on_commit( parent->bless_context( new C_OSD_RepModifyCommit(this, rm))); rm->localt.register_on_applied( parent->bless_context( new C_OSD_RepModifyApply(this, rm))); vector tls; tls.reserve(2); tls.push_back(std::move(rm->localt)); tls.push_back(std::move(rm->opt)); parent->queue_transactions(tls, op); // op is cleaned up by oncommit/onapply when both are executed } void ReplicatedBackend::repop_applied(RepModifyRef rm) { rm->op->mark_event("sub_op_applied"); rm->applied = true; rm->op->pg_trace.event("sup_op_applied"); dout(10) << __func__ << " on " << rm << " op " << *rm->op->get_req() << dendl; const Message *m = rm->op->get_req(); const MOSDRepOp *req = static_cast(m); eversion_t version = req->version; // send ack to acker only if we haven't sent a commit already if (!rm->committed) { Message *ack = new MOSDRepOpReply( req, parent->whoami_shard(), 0, get_osdmap()->get_epoch(), req->min_epoch, CEPH_OSD_FLAG_ACK); ack->set_priority(CEPH_MSG_PRIO_HIGH); // this better match commit priority! ack->trace = rm->op->pg_trace; get_parent()->send_message_osd_cluster( rm->ackerosd, ack, get_osdmap()->get_epoch()); } parent->op_applied(version); } void ReplicatedBackend::repop_commit(RepModifyRef rm) { rm->op->mark_commit_sent(); rm->op->pg_trace.event("sup_op_commit"); rm->committed = true; // send commit. const MOSDRepOp *m = static_cast(rm->op->get_req()); assert(m->get_type() == MSG_OSD_REPOP); dout(10) << __func__ << " on op " << *m << ", sending commit to osd." << rm->ackerosd << dendl; assert(get_osdmap()->is_up(rm->ackerosd)); get_parent()->update_last_complete_ondisk(rm->last_complete); MOSDRepOpReply *reply = new MOSDRepOpReply( m, get_parent()->whoami_shard(), 0, get_osdmap()->get_epoch(), m->get_min_epoch(), CEPH_OSD_FLAG_ONDISK); reply->set_last_complete_ondisk(rm->last_complete); reply->set_priority(CEPH_MSG_PRIO_HIGH); // this better match ack priority! reply->trace = rm->op->pg_trace; get_parent()->send_message_osd_cluster( rm->ackerosd, reply, get_osdmap()->get_epoch()); log_subop_stats(get_parent()->get_logger(), rm->op, l_osd_sop_w); } // =========================================================== void ReplicatedBackend::calc_head_subsets( ObjectContextRef obc, SnapSet& snapset, const hobject_t& head, const pg_missing_t& missing, const hobject_t &last_backfill, interval_set& data_subset, map>& clone_subsets, ObcLockManager &manager) { dout(10) << "calc_head_subsets " << head << " clone_overlap " << snapset.clone_overlap << dendl; uint64_t size = obc->obs.oi.size; if (size) data_subset.insert(0, size); if (get_parent()->get_pool().allow_incomplete_clones()) { dout(10) << __func__ << ": caching (was) enabled, skipping clone subsets" << dendl; return; } if (!cct->_conf->osd_recover_clone_overlap) { dout(10) << "calc_head_subsets " << head << " -- osd_recover_clone_overlap disabled" << dendl; return; } interval_set cloning; interval_set prev; if (size) prev.insert(0, size); for (int j=snapset.clones.size()-1; j>=0; j--) { hobject_t c = head; c.snap = snapset.clones[j]; prev.intersection_of(snapset.clone_overlap[snapset.clones[j]]); if (!missing.is_missing(c) && c < last_backfill && get_parent()->try_lock_for_read(c, manager)) { dout(10) << "calc_head_subsets " << head << " has prev " << c << " overlap " << prev << dendl; clone_subsets[c] = prev; cloning.union_of(prev); break; } dout(10) << "calc_head_subsets " << head << " does not have prev " << c << " overlap " << prev << dendl; } if (cloning.num_intervals() > cct->_conf->osd_recover_clone_overlap_limit) { dout(10) << "skipping clone, too many holes" << dendl; get_parent()->release_locks(manager); clone_subsets.clear(); cloning.clear(); } // what's left for us to push? data_subset.subtract(cloning); dout(10) << "calc_head_subsets " << head << " data_subset " << data_subset << " clone_subsets " << clone_subsets << dendl; } void ReplicatedBackend::calc_clone_subsets( SnapSet& snapset, const hobject_t& soid, const pg_missing_t& missing, const hobject_t &last_backfill, interval_set& data_subset, map>& clone_subsets, ObcLockManager &manager) { dout(10) << "calc_clone_subsets " << soid << " clone_overlap " << snapset.clone_overlap << dendl; uint64_t size = snapset.clone_size[soid.snap]; if (size) data_subset.insert(0, size); if (get_parent()->get_pool().allow_incomplete_clones()) { dout(10) << __func__ << ": caching (was) enabled, skipping clone subsets" << dendl; return; } if (!cct->_conf->osd_recover_clone_overlap) { dout(10) << "calc_clone_subsets " << soid << " -- osd_recover_clone_overlap disabled" << dendl; return; } unsigned i; for (i=0; i < snapset.clones.size(); i++) if (snapset.clones[i] == soid.snap) break; // any overlap with next older clone? interval_set cloning; interval_set prev; if (size) prev.insert(0, size); for (int j=i-1; j>=0; j--) { hobject_t c = soid; c.snap = snapset.clones[j]; prev.intersection_of(snapset.clone_overlap[snapset.clones[j]]); if (!missing.is_missing(c) && c < last_backfill && get_parent()->try_lock_for_read(c, manager)) { dout(10) << "calc_clone_subsets " << soid << " has prev " << c << " overlap " << prev << dendl; clone_subsets[c] = prev; cloning.union_of(prev); break; } dout(10) << "calc_clone_subsets " << soid << " does not have prev " << c << " overlap " << prev << dendl; } // overlap with next newest? interval_set next; if (size) next.insert(0, size); for (unsigned j=i+1; jtry_lock_for_read(c, manager)) { dout(10) << "calc_clone_subsets " << soid << " has next " << c << " overlap " << next << dendl; clone_subsets[c] = next; cloning.union_of(next); break; } dout(10) << "calc_clone_subsets " << soid << " does not have next " << c << " overlap " << next << dendl; } if (cloning.num_intervals() > cct->_conf->osd_recover_clone_overlap_limit) { dout(10) << "skipping clone, too many holes" << dendl; get_parent()->release_locks(manager); clone_subsets.clear(); cloning.clear(); } // what's left for us to push? data_subset.subtract(cloning); dout(10) << "calc_clone_subsets " << soid << " data_subset " << data_subset << " clone_subsets " << clone_subsets << dendl; } void ReplicatedBackend::prepare_pull( eversion_t v, const hobject_t& soid, ObjectContextRef headctx, RPGHandle *h) { assert(get_parent()->get_local_missing().get_items().count(soid)); eversion_t _v = get_parent()->get_local_missing().get_items().find( soid)->second.need; assert(_v == v); const map> &missing_loc( get_parent()->get_missing_loc_shards()); const map &peer_missing( get_parent()->get_shard_missing()); map>::const_iterator q = missing_loc.find(soid); assert(q != missing_loc.end()); assert(!q->second.empty()); // pick a pullee vector shuffle(q->second.begin(), q->second.end()); random_shuffle(shuffle.begin(), shuffle.end()); vector::iterator p = shuffle.begin(); assert(get_osdmap()->is_up(p->osd)); pg_shard_t fromshard = *p; dout(7) << "pull " << soid << " v " << v << " on osds " << q->second << " from osd." << fromshard << dendl; assert(peer_missing.count(fromshard)); const pg_missing_t &pmissing = peer_missing.find(fromshard)->second; if (pmissing.is_missing(soid, v)) { assert(pmissing.get_items().find(soid)->second.have != v); dout(10) << "pulling soid " << soid << " from osd " << fromshard << " at version " << pmissing.get_items().find(soid)->second.have << " rather than at version " << v << dendl; v = pmissing.get_items().find(soid)->second.have; assert(get_parent()->get_log().get_log().objects.count(soid) && (get_parent()->get_log().get_log().objects.find(soid)->second->op == pg_log_entry_t::LOST_REVERT) && (get_parent()->get_log().get_log().objects.find( soid)->second->reverting_to == v)); } ObjectRecoveryInfo recovery_info; ObcLockManager lock_manager; if (soid.is_snap()) { assert(!get_parent()->get_local_missing().is_missing( soid.get_head()) || !get_parent()->get_local_missing().is_missing( soid.get_snapdir())); assert(headctx); // check snapset SnapSetContext *ssc = headctx->ssc; assert(ssc); dout(10) << " snapset " << ssc->snapset << dendl; recovery_info.ss = ssc->snapset; calc_clone_subsets( ssc->snapset, soid, get_parent()->get_local_missing(), get_info().last_backfill, recovery_info.copy_subset, recovery_info.clone_subset, lock_manager); // FIXME: this may overestimate if we are pulling multiple clones in parallel... dout(10) << " pulling " << recovery_info << dendl; assert(ssc->snapset.clone_size.count(soid.snap)); recovery_info.size = ssc->snapset.clone_size[soid.snap]; } else { // pulling head or unversioned object. // always pull the whole thing. recovery_info.copy_subset.insert(0, (uint64_t)-1); recovery_info.size = ((uint64_t)-1); } h->pulls[fromshard].push_back(PullOp()); PullOp &op = h->pulls[fromshard].back(); op.soid = soid; op.recovery_info = recovery_info; op.recovery_info.soid = soid; op.recovery_info.version = v; op.recovery_progress.data_complete = false; op.recovery_progress.omap_complete = false; op.recovery_progress.data_recovered_to = 0; op.recovery_progress.first = true; assert(!pulling.count(soid)); pull_from_peer[fromshard].insert(soid); PullInfo &pi = pulling[soid]; pi.from = fromshard; pi.soid = soid; pi.head_ctx = headctx; pi.recovery_info = op.recovery_info; pi.recovery_progress = op.recovery_progress; pi.cache_dont_need = h->cache_dont_need; pi.lock_manager = std::move(lock_manager); } /* * intelligently push an object to a replica. make use of existing * clones/heads and dup data ranges where possible. */ int ReplicatedBackend::prep_push_to_replica( ObjectContextRef obc, const hobject_t& soid, pg_shard_t peer, PushOp *pop, bool cache_dont_need) { const object_info_t& oi = obc->obs.oi; uint64_t size = obc->obs.oi.size; dout(10) << __func__ << ": " << soid << " v" << oi.version << " size " << size << " to osd." << peer << dendl; map> clone_subsets; interval_set data_subset; ObcLockManager lock_manager; // are we doing a clone on the replica? if (soid.snap && soid.snap < CEPH_NOSNAP) { hobject_t head = soid; head.snap = CEPH_NOSNAP; // try to base push off of clones that succeed/preceed poid // we need the head (and current SnapSet) locally to do that. if (get_parent()->get_local_missing().is_missing(head)) { dout(15) << "push_to_replica missing head " << head << ", pushing raw clone" << dendl; return prep_push(obc, soid, peer, pop, cache_dont_need); } hobject_t snapdir = head; snapdir.snap = CEPH_SNAPDIR; if (get_parent()->get_local_missing().is_missing(snapdir)) { dout(15) << "push_to_replica missing snapdir " << snapdir << ", pushing raw clone" << dendl; return prep_push(obc, soid, peer, pop, cache_dont_need); } SnapSetContext *ssc = obc->ssc; assert(ssc); dout(15) << "push_to_replica snapset is " << ssc->snapset << dendl; pop->recovery_info.ss = ssc->snapset; map::const_iterator pm = get_parent()->get_shard_missing().find(peer); assert(pm != get_parent()->get_shard_missing().end()); map::const_iterator pi = get_parent()->get_shard_info().find(peer); assert(pi != get_parent()->get_shard_info().end()); calc_clone_subsets( ssc->snapset, soid, pm->second, pi->second.last_backfill, data_subset, clone_subsets, lock_manager); } else if (soid.snap == CEPH_NOSNAP) { // pushing head or unversioned object. // base this on partially on replica's clones? SnapSetContext *ssc = obc->ssc; assert(ssc); dout(15) << "push_to_replica snapset is " << ssc->snapset << dendl; calc_head_subsets( obc, ssc->snapset, soid, get_parent()->get_shard_missing().find(peer)->second, get_parent()->get_shard_info().find(peer)->second.last_backfill, data_subset, clone_subsets, lock_manager); } return prep_push( obc, soid, peer, oi.version, data_subset, clone_subsets, pop, cache_dont_need, std::move(lock_manager)); } int ReplicatedBackend::prep_push(ObjectContextRef obc, const hobject_t& soid, pg_shard_t peer, PushOp *pop, bool cache_dont_need) { interval_set data_subset; if (obc->obs.oi.size) data_subset.insert(0, obc->obs.oi.size); map> clone_subsets; return prep_push(obc, soid, peer, obc->obs.oi.version, data_subset, clone_subsets, pop, cache_dont_need, ObcLockManager()); } int ReplicatedBackend::prep_push( ObjectContextRef obc, const hobject_t& soid, pg_shard_t peer, eversion_t version, interval_set &data_subset, map>& clone_subsets, PushOp *pop, bool cache_dont_need, ObcLockManager &&lock_manager) { get_parent()->begin_peer_recover(peer, soid); // take note. PushInfo &pi = pushing[soid][peer]; pi.obc = obc; pi.recovery_info.size = obc->obs.oi.size; pi.recovery_info.copy_subset = data_subset; pi.recovery_info.clone_subset = clone_subsets; pi.recovery_info.soid = soid; pi.recovery_info.oi = obc->obs.oi; pi.recovery_info.ss = pop->recovery_info.ss; pi.recovery_info.version = version; pi.lock_manager = std::move(lock_manager); ObjectRecoveryProgress new_progress; int r = build_push_op(pi.recovery_info, pi.recovery_progress, &new_progress, pop, &(pi.stat), cache_dont_need); if (r < 0) return r; pi.recovery_progress = new_progress; return 0; } void ReplicatedBackend::submit_push_data( const ObjectRecoveryInfo &recovery_info, bool first, bool complete, bool cache_dont_need, const interval_set &intervals_included, bufferlist data_included, bufferlist omap_header, const map &attrs, const map &omap_entries, ObjectStore::Transaction *t) { hobject_t target_oid; if (first && complete) { target_oid = recovery_info.soid; } else { target_oid = get_parent()->get_temp_recovery_object(recovery_info.soid, recovery_info.version); if (first) { dout(10) << __func__ << ": Adding oid " << target_oid << " in the temp collection" << dendl; add_temp_obj(target_oid); } } if (first) { t->remove(coll, ghobject_t(target_oid)); t->touch(coll, ghobject_t(target_oid)); t->truncate(coll, ghobject_t(target_oid), recovery_info.size); if (omap_header.length()) t->omap_setheader(coll, ghobject_t(target_oid), omap_header); bufferlist bv = attrs.at(OI_ATTR); object_info_t oi(bv); t->set_alloc_hint(coll, ghobject_t(target_oid), oi.expected_object_size, oi.expected_write_size, oi.alloc_hint_flags); } uint64_t off = 0; uint32_t fadvise_flags = CEPH_OSD_OP_FLAG_FADVISE_SEQUENTIAL; if (cache_dont_need) fadvise_flags |= CEPH_OSD_OP_FLAG_FADVISE_DONTNEED; for (interval_set::const_iterator p = intervals_included.begin(); p != intervals_included.end(); ++p) { bufferlist bit; bit.substr_of(data_included, off, p.get_len()); t->write(coll, ghobject_t(target_oid), p.get_start(), p.get_len(), bit, fadvise_flags); off += p.get_len(); } if (!omap_entries.empty()) t->omap_setkeys(coll, ghobject_t(target_oid), omap_entries); if (!attrs.empty()) t->setattrs(coll, ghobject_t(target_oid), attrs); if (complete) { if (!first) { dout(10) << __func__ << ": Removing oid " << target_oid << " from the temp collection" << dendl; clear_temp_obj(target_oid); t->remove(coll, ghobject_t(recovery_info.soid)); t->collection_move_rename(coll, ghobject_t(target_oid), coll, ghobject_t(recovery_info.soid)); } submit_push_complete(recovery_info, t); } } void ReplicatedBackend::submit_push_complete( const ObjectRecoveryInfo &recovery_info, ObjectStore::Transaction *t) { for (map>::const_iterator p = recovery_info.clone_subset.begin(); p != recovery_info.clone_subset.end(); ++p) { for (interval_set::const_iterator q = p->second.begin(); q != p->second.end(); ++q) { dout(15) << " clone_range " << p->first << " " << q.get_start() << "~" << q.get_len() << dendl; t->clone_range(coll, ghobject_t(p->first), ghobject_t(recovery_info.soid), q.get_start(), q.get_len(), q.get_start()); } } } ObjectRecoveryInfo ReplicatedBackend::recalc_subsets( const ObjectRecoveryInfo& recovery_info, SnapSetContext *ssc, ObcLockManager &manager) { if (!recovery_info.soid.snap || recovery_info.soid.snap >= CEPH_NOSNAP) return recovery_info; ObjectRecoveryInfo new_info = recovery_info; new_info.copy_subset.clear(); new_info.clone_subset.clear(); assert(ssc); get_parent()->release_locks(manager); // might already have locks calc_clone_subsets( ssc->snapset, new_info.soid, get_parent()->get_local_missing(), get_info().last_backfill, new_info.copy_subset, new_info.clone_subset, manager); return new_info; } bool ReplicatedBackend::handle_pull_response( pg_shard_t from, const PushOp &pop, PullOp *response, list *to_continue, ObjectStore::Transaction *t) { interval_set data_included = pop.data_included; bufferlist data; data = pop.data; dout(10) << "handle_pull_response " << pop.recovery_info << pop.after_progress << " data.size() is " << data.length() << " data_included: " << data_included << dendl; if (pop.version == eversion_t()) { // replica doesn't have it! _failed_pull(from, pop.soid); return false; } const hobject_t &hoid = pop.soid; assert((data_included.empty() && data.length() == 0) || (!data_included.empty() && data.length() > 0)); auto piter = pulling.find(hoid); if (piter == pulling.end()) { return false; } PullInfo &pi = piter->second; if (pi.recovery_info.size == (uint64_t(-1))) { pi.recovery_info.size = pop.recovery_info.size; pi.recovery_info.copy_subset.intersection_of( pop.recovery_info.copy_subset); } // If primary doesn't have object info and didn't know version if (pi.recovery_info.version == eversion_t()) { pi.recovery_info.version = pop.version; } bool first = pi.recovery_progress.first; if (first) { // attrs only reference the origin bufferlist (decode from // MOSDPGPush message) whose size is much greater than attrs in // recovery. If obc cache it (get_obc maybe cache the attr), this // causes the whole origin bufferlist would not be free until obc // is evicted from obc cache. So rebuild the bufferlists before // cache it. auto attrset = pop.attrset; for (auto& a : attrset) { a.second.rebuild(); } pi.obc = get_parent()->get_obc(pi.recovery_info.soid, attrset); pi.recovery_info.oi = pi.obc->obs.oi; pi.recovery_info = recalc_subsets( pi.recovery_info, pi.obc->ssc, pi.lock_manager); } interval_set usable_intervals; bufferlist usable_data; trim_pushed_data(pi.recovery_info.copy_subset, data_included, data, &usable_intervals, &usable_data); data_included = usable_intervals; data.claim(usable_data); pi.recovery_progress = pop.after_progress; dout(10) << "new recovery_info " << pi.recovery_info << ", new progress " << pi.recovery_progress << dendl; bool complete = pi.is_complete(); submit_push_data(pi.recovery_info, first, complete, pi.cache_dont_need, data_included, data, pop.omap_header, pop.attrset, pop.omap_entries, t); pi.stat.num_keys_recovered += pop.omap_entries.size(); pi.stat.num_bytes_recovered += data.length(); if (complete) { pi.stat.num_objects_recovered++; clear_pull_from(piter); to_continue->push_back({hoid, pi.stat}); get_parent()->on_local_recover( hoid, pi.recovery_info, pi.obc, false, t); return false; } else { response->soid = pop.soid; response->recovery_info = pi.recovery_info; response->recovery_progress = pi.recovery_progress; return true; } } void ReplicatedBackend::handle_push( pg_shard_t from, const PushOp &pop, PushReplyOp *response, ObjectStore::Transaction *t) { dout(10) << "handle_push " << pop.recovery_info << pop.after_progress << dendl; bufferlist data; data = pop.data; bool first = pop.before_progress.first; bool complete = pop.after_progress.data_complete && pop.after_progress.omap_complete; response->soid = pop.recovery_info.soid; submit_push_data(pop.recovery_info, first, complete, true, // must be replicate pop.data_included, data, pop.omap_header, pop.attrset, pop.omap_entries, t); if (complete) get_parent()->on_local_recover( pop.recovery_info.soid, pop.recovery_info, ObjectContextRef(), // ok, is replica false, t); } void ReplicatedBackend::send_pushes(int prio, map > &pushes) { for (map >::iterator i = pushes.begin(); i != pushes.end(); ++i) { ConnectionRef con = get_parent()->get_con_osd_cluster( i->first.osd, get_osdmap()->get_epoch()); if (!con) continue; vector::iterator j = i->second.begin(); while (j != i->second.end()) { uint64_t cost = 0; uint64_t pushes = 0; MOSDPGPush *msg = new MOSDPGPush(); msg->from = get_parent()->whoami_shard(); msg->pgid = get_parent()->primary_spg_t(); msg->map_epoch = get_osdmap()->get_epoch(); msg->min_epoch = get_parent()->get_last_peering_reset_epoch(); msg->set_priority(prio); for (; (j != i->second.end() && cost < cct->_conf->osd_max_push_cost && pushes < cct->_conf->osd_max_push_objects) ; ++j) { dout(20) << __func__ << ": sending push " << *j << " to osd." << i->first << dendl; cost += j->cost(cct); pushes += 1; msg->pushes.push_back(*j); } msg->set_cost(cost); get_parent()->send_message_osd_cluster(msg, con); } } } void ReplicatedBackend::send_pulls(int prio, map > &pulls) { for (map >::iterator i = pulls.begin(); i != pulls.end(); ++i) { ConnectionRef con = get_parent()->get_con_osd_cluster( i->first.osd, get_osdmap()->get_epoch()); if (!con) continue; dout(20) << __func__ << ": sending pulls " << i->second << " to osd." << i->first << dendl; MOSDPGPull *msg = new MOSDPGPull(); msg->from = parent->whoami_shard(); msg->set_priority(prio); msg->pgid = get_parent()->primary_spg_t(); msg->map_epoch = get_osdmap()->get_epoch(); msg->min_epoch = get_parent()->get_last_peering_reset_epoch(); msg->set_pulls(&i->second); msg->compute_cost(cct); get_parent()->send_message_osd_cluster(msg, con); } } int ReplicatedBackend::build_push_op(const ObjectRecoveryInfo &recovery_info, const ObjectRecoveryProgress &progress, ObjectRecoveryProgress *out_progress, PushOp *out_op, object_stat_sum_t *stat, bool cache_dont_need) { ObjectRecoveryProgress _new_progress; if (!out_progress) out_progress = &_new_progress; ObjectRecoveryProgress &new_progress = *out_progress; new_progress = progress; dout(7) << __func__ << " " << recovery_info.soid << " v " << recovery_info.version << " size " << recovery_info.size << " recovery_info: " << recovery_info << dendl; eversion_t v = recovery_info.version; if (progress.first) { int r = store->omap_get_header(coll, ghobject_t(recovery_info.soid), &out_op->omap_header); if(r < 0) { dout(1) << __func__ << " get omap header failed: " << cpp_strerror(-r) << dendl; return r; } r = store->getattrs(ch, ghobject_t(recovery_info.soid), out_op->attrset); if(r < 0) { dout(1) << __func__ << " getattrs failed: " << cpp_strerror(-r) << dendl; return r; } // Debug bufferlist bv = out_op->attrset[OI_ATTR]; object_info_t oi; try { bufferlist::iterator bliter = bv.begin(); ::decode(oi, bliter); } catch (...) { dout(0) << __func__ << ": bad object_info_t: " << recovery_info.soid << dendl; return -EINVAL; } // If requestor didn't know the version, use ours if (v == eversion_t()) { v = oi.version; } else if (oi.version != v) { get_parent()->clog_error() << get_info().pgid << " push " << recovery_info.soid << " v " << recovery_info.version << " failed because local copy is " << oi.version; return -EINVAL; } new_progress.first = false; } // Once we provide the version subsequent requests will have it, so // at this point it must be known. assert(v != eversion_t()); uint64_t available = cct->_conf->osd_recovery_max_chunk; if (!progress.omap_complete) { ObjectMap::ObjectMapIterator iter = store->get_omap_iterator(coll, ghobject_t(recovery_info.soid)); assert(iter); for (iter->lower_bound(progress.omap_recovered_to); iter->valid(); iter->next(false)) { if (!out_op->omap_entries.empty() && ((cct->_conf->osd_recovery_max_omap_entries_per_chunk > 0 && out_op->omap_entries.size() >= cct->_conf->osd_recovery_max_omap_entries_per_chunk) || available <= iter->key().size() + iter->value().length())) break; out_op->omap_entries.insert(make_pair(iter->key(), iter->value())); if ((iter->key().size() + iter->value().length()) <= available) available -= (iter->key().size() + iter->value().length()); else available = 0; } if (!iter->valid()) new_progress.omap_complete = true; else new_progress.omap_recovered_to = iter->key(); } if (available > 0) { if (!recovery_info.copy_subset.empty()) { interval_set copy_subset = recovery_info.copy_subset; map m; int r = store->fiemap(ch, ghobject_t(recovery_info.soid), 0, copy_subset.range_end(), m); if (r >= 0) { interval_set fiemap_included(m); copy_subset.intersection_of(fiemap_included); } else { // intersection of copy_subset and empty interval_set would be empty anyway copy_subset.clear(); } out_op->data_included.span_of(copy_subset, progress.data_recovered_to, available); if (out_op->data_included.empty()) // zero filled section, skip to end! new_progress.data_recovered_to = recovery_info.copy_subset.range_end(); else new_progress.data_recovered_to = out_op->data_included.range_end(); } } else { out_op->data_included.clear(); } for (interval_set::iterator p = out_op->data_included.begin(); p != out_op->data_included.end(); ++p) { bufferlist bit; int r = store->read(ch, ghobject_t(recovery_info.soid), p.get_start(), p.get_len(), bit, cache_dont_need ? CEPH_OSD_OP_FLAG_FADVISE_DONTNEED: 0); if (cct->_conf->osd_debug_random_push_read_error && (rand() % (int)(cct->_conf->osd_debug_random_push_read_error * 100.0)) == 0) { dout(0) << __func__ << ": inject EIO " << recovery_info.soid << dendl; r = -EIO; } if (r < 0) { return r; } if (p.get_len() != bit.length()) { dout(10) << " extent " << p.get_start() << "~" << p.get_len() << " is actually " << p.get_start() << "~" << bit.length() << dendl; interval_set::iterator save = p++; if (bit.length() == 0) out_op->data_included.erase(save); //Remove this empty interval else save.set_len(bit.length()); // Remove any other intervals present while (p != out_op->data_included.end()) { interval_set::iterator save = p++; out_op->data_included.erase(save); } new_progress.data_complete = true; out_op->data.claim_append(bit); break; } out_op->data.claim_append(bit); } if (new_progress.is_complete(recovery_info)) { new_progress.data_complete = true; if (stat) stat->num_objects_recovered++; } if (stat) { stat->num_keys_recovered += out_op->omap_entries.size(); stat->num_bytes_recovered += out_op->data.length(); } get_parent()->get_logger()->inc(l_osd_push); get_parent()->get_logger()->inc(l_osd_push_outb, out_op->data.length()); // send out_op->version = v; out_op->soid = recovery_info.soid; out_op->recovery_info = recovery_info; out_op->after_progress = new_progress; out_op->before_progress = progress; return 0; } void ReplicatedBackend::prep_push_op_blank(const hobject_t& soid, PushOp *op) { op->recovery_info.version = eversion_t(); op->version = eversion_t(); op->soid = soid; } bool ReplicatedBackend::handle_push_reply( pg_shard_t peer, const PushReplyOp &op, PushOp *reply) { const hobject_t &soid = op.soid; if (pushing.count(soid) == 0) { dout(10) << "huh, i wasn't pushing " << soid << " to osd." << peer << ", or anybody else" << dendl; return false; } else if (pushing[soid].count(peer) == 0) { dout(10) << "huh, i wasn't pushing " << soid << " to osd." << peer << dendl; return false; } else { PushInfo *pi = &pushing[soid][peer]; bool error = pushing[soid].begin()->second.recovery_progress.error; if (!pi->recovery_progress.data_complete && !error) { dout(10) << " pushing more from, " << pi->recovery_progress.data_recovered_to << " of " << pi->recovery_info.copy_subset << dendl; ObjectRecoveryProgress new_progress; int r = build_push_op( pi->recovery_info, pi->recovery_progress, &new_progress, reply, &(pi->stat)); // Handle the case of a read error right after we wrote, which is // hopefuilly extremely rare. if (r < 0) { dout(5) << __func__ << ": oid " << soid << " error " << r << dendl; error = true; goto done; } pi->recovery_progress = new_progress; return true; } else { // done! done: if (!error) get_parent()->on_peer_recover( peer, soid, pi->recovery_info); get_parent()->release_locks(pi->lock_manager); object_stat_sum_t stat = pi->stat; eversion_t v = pi->recovery_info.version; pushing[soid].erase(peer); pi = NULL; if (pushing[soid].empty()) { if (!error) get_parent()->on_global_recover(soid, stat, false); else get_parent()->on_primary_error(soid, v); pushing.erase(soid); } else { // This looks weird, but we erased the current peer and need to remember // the error on any other one, while getting more acks. if (error) pushing[soid].begin()->second.recovery_progress.error = true; dout(10) << "pushed " << soid << ", still waiting for push ack from " << pushing[soid].size() << " others" << dendl; } return false; } } } void ReplicatedBackend::handle_pull(pg_shard_t peer, PullOp &op, PushOp *reply) { const hobject_t &soid = op.soid; struct stat st; int r = store->stat(ch, ghobject_t(soid), &st); if (r != 0) { get_parent()->clog_error() << get_info().pgid << " " << peer << " tried to pull " << soid << " but got " << cpp_strerror(-r); prep_push_op_blank(soid, reply); } else { ObjectRecoveryInfo &recovery_info = op.recovery_info; ObjectRecoveryProgress &progress = op.recovery_progress; if (progress.first && recovery_info.size == ((uint64_t)-1)) { // Adjust size and copy_subset recovery_info.size = st.st_size; recovery_info.copy_subset.clear(); if (st.st_size) recovery_info.copy_subset.insert(0, st.st_size); assert(recovery_info.clone_subset.empty()); } r = build_push_op(recovery_info, progress, 0, reply); if (r < 0) prep_push_op_blank(soid, reply); } } /** * trim received data to remove what we don't want * * @param copy_subset intervals we want * @param data_included intervals we got * @param data_recieved data we got * @param intervals_usable intervals we want to keep * @param data_usable matching data we want to keep */ void ReplicatedBackend::trim_pushed_data( const interval_set ©_subset, const interval_set &intervals_received, bufferlist data_received, interval_set *intervals_usable, bufferlist *data_usable) { if (intervals_received.subset_of(copy_subset)) { *intervals_usable = intervals_received; *data_usable = data_received; return; } intervals_usable->intersection_of(copy_subset, intervals_received); uint64_t off = 0; for (interval_set::const_iterator p = intervals_received.begin(); p != intervals_received.end(); ++p) { interval_set x; x.insert(p.get_start(), p.get_len()); x.intersection_of(copy_subset); for (interval_set::const_iterator q = x.begin(); q != x.end(); ++q) { bufferlist sub; uint64_t data_off = off + (q.get_start() - p.get_start()); sub.substr_of(data_received, data_off, q.get_len()); data_usable->claim_append(sub); } off += p.get_len(); } } void ReplicatedBackend::_failed_pull(pg_shard_t from, const hobject_t &soid) { dout(20) << __func__ << ": " << soid << " from " << from << dendl; list fl = { from }; get_parent()->failed_push(fl, soid); clear_pull(pulling.find(soid)); } void ReplicatedBackend::clear_pull_from( map::iterator piter) { auto from = piter->second.from; pull_from_peer[from].erase(piter->second.soid); if (pull_from_peer[from].empty()) pull_from_peer.erase(from); } void ReplicatedBackend::clear_pull( map::iterator piter, bool clear_pull_from_peer) { if (clear_pull_from_peer) { clear_pull_from(piter); } get_parent()->release_locks(piter->second.lock_manager); pulling.erase(piter); } int ReplicatedBackend::start_pushes( const hobject_t &soid, ObjectContextRef obc, RPGHandle *h) { list< map::const_iterator > shards; dout(20) << __func__ << " soid " << soid << dendl; // who needs it? assert(get_parent()->get_actingbackfill_shards().size() > 0); for (set::iterator i = get_parent()->get_actingbackfill_shards().begin(); i != get_parent()->get_actingbackfill_shards().end(); ++i) { if (*i == get_parent()->whoami_shard()) continue; pg_shard_t peer = *i; map::const_iterator j = get_parent()->get_shard_missing().find(peer); assert(j != get_parent()->get_shard_missing().end()); if (j->second.is_missing(soid)) { shards.push_back(j); } } // If more than 1 read will occur ignore possible request to not cache bool cache = shards.size() == 1 ? h->cache_dont_need : false; for (auto j : shards) { pg_shard_t peer = j->first; h->pushes[peer].push_back(PushOp()); int r = prep_push_to_replica(obc, soid, peer, &(h->pushes[peer].back()), cache); if (r < 0) { // Back out all failed reads for (auto k : shards) { pg_shard_t p = k->first; dout(10) << __func__ << " clean up peer " << p << dendl; h->pushes[p].pop_back(); if (p == peer) break; } return r; } } return shards.size(); }