X-Git-Url: https://gerrit.opnfv.org/gerrit/gitweb?a=blobdiff_plain;f=src%2Fceph%2Fsrc%2Fosd%2FReplicatedBackend.cc;fp=src%2Fceph%2Fsrc%2Fosd%2FReplicatedBackend.cc;h=081204a033fd91f4885f18d57b5ead56129a7d20;hb=812ff6ca9fcd3e629e49d4328905f33eee8ca3f5;hp=0000000000000000000000000000000000000000;hpb=15280273faafb77777eab341909a3f495cf248d9;p=stor4nfv.git diff --git a/src/ceph/src/osd/ReplicatedBackend.cc b/src/ceph/src/osd/ReplicatedBackend.cc new file mode 100644 index 0000000..081204a --- /dev/null +++ b/src/ceph/src/osd/ReplicatedBackend.cc @@ -0,0 +1,2309 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab +/* + * Ceph - scalable distributed file system + * + * Copyright (C) 2013 Inktank Storage, Inc. + * + * This is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License version 2.1, as published by the Free Software + * Foundation. See file COPYING. + * + */ +#include "common/errno.h" +#include "ReplicatedBackend.h" +#include "messages/MOSDOp.h" +#include "messages/MOSDSubOp.h" +#include "messages/MOSDRepOp.h" +#include "messages/MOSDSubOpReply.h" +#include "messages/MOSDRepOpReply.h" +#include "messages/MOSDPGPush.h" +#include "messages/MOSDPGPull.h" +#include "messages/MOSDPGPushReply.h" +#include "common/EventTrace.h" + +#define dout_context cct +#define dout_subsys ceph_subsys_osd +#define DOUT_PREFIX_ARGS this +#undef dout_prefix +#define dout_prefix _prefix(_dout, this) +static ostream& _prefix(std::ostream *_dout, ReplicatedBackend *pgb) { + return *_dout << pgb->get_parent()->gen_dbg_prefix(); +} + +namespace { +class PG_SendMessageOnConn: public Context { + PGBackend::Listener *pg; + Message *reply; + ConnectionRef conn; + public: + PG_SendMessageOnConn( + PGBackend::Listener *pg, + Message *reply, + ConnectionRef conn) : pg(pg), reply(reply), conn(conn) {} + void finish(int) override { + pg->send_message_osd_cluster(reply, conn.get()); + } +}; + +class PG_RecoveryQueueAsync : public Context { + PGBackend::Listener *pg; + unique_ptr> c; + public: + PG_RecoveryQueueAsync( + PGBackend::Listener *pg, + GenContext *c) : pg(pg), c(c) {} + void finish(int) override { + pg->schedule_recovery_work(c.release()); + } +}; +} + +struct ReplicatedBackend::C_OSD_RepModifyApply : public Context { + ReplicatedBackend *pg; + RepModifyRef rm; + C_OSD_RepModifyApply(ReplicatedBackend *pg, RepModifyRef r) + : pg(pg), rm(r) {} + void finish(int r) override { + pg->repop_applied(rm); + } +}; + +struct ReplicatedBackend::C_OSD_RepModifyCommit : public Context { + ReplicatedBackend *pg; + RepModifyRef rm; + C_OSD_RepModifyCommit(ReplicatedBackend *pg, RepModifyRef r) + : pg(pg), rm(r) {} + void finish(int r) override { + pg->repop_commit(rm); + } +}; + +static void log_subop_stats( + PerfCounters *logger, + OpRequestRef op, int subop) +{ + utime_t now = ceph_clock_now(); + utime_t latency = now; + latency -= op->get_req()->get_recv_stamp(); + + + logger->inc(l_osd_sop); + logger->tinc(l_osd_sop_lat, latency); + logger->inc(subop); + + if (subop != l_osd_sop_pull) { + uint64_t inb = op->get_req()->get_data().length(); + logger->inc(l_osd_sop_inb, inb); + if (subop == l_osd_sop_w) { + logger->inc(l_osd_sop_w_inb, inb); + logger->tinc(l_osd_sop_w_lat, latency); + } else if (subop == l_osd_sop_push) { + logger->inc(l_osd_sop_push_inb, inb); + logger->tinc(l_osd_sop_push_lat, latency); + } else + assert("no support subop" == 0); + } else { + logger->tinc(l_osd_sop_pull_lat, latency); + } +} + +ReplicatedBackend::ReplicatedBackend( + PGBackend::Listener *pg, + coll_t coll, + ObjectStore::CollectionHandle &c, + ObjectStore *store, + CephContext *cct) : + PGBackend(cct, pg, store, coll, c) {} + +void ReplicatedBackend::run_recovery_op( + PGBackend::RecoveryHandle *_h, + int priority) +{ + RPGHandle *h = static_cast(_h); + send_pushes(priority, h->pushes); + send_pulls(priority, h->pulls); + send_recovery_deletes(priority, h->deletes); + delete h; +} + +int ReplicatedBackend::recover_object( + const hobject_t &hoid, + eversion_t v, + ObjectContextRef head, + ObjectContextRef obc, + RecoveryHandle *_h + ) +{ + dout(10) << __func__ << ": " << hoid << dendl; + RPGHandle *h = static_cast(_h); + if (get_parent()->get_local_missing().is_missing(hoid)) { + assert(!obc); + // pull + prepare_pull( + v, + hoid, + head, + h); + } else { + assert(obc); + int started = start_pushes( + hoid, + obc, + h); + if (started < 0) { + pushing[hoid].clear(); + return started; + } + } + return 0; +} + +void ReplicatedBackend::check_recovery_sources(const OSDMapRef& osdmap) +{ + for(map >::iterator i = pull_from_peer.begin(); + i != pull_from_peer.end(); + ) { + if (osdmap->is_down(i->first.osd)) { + dout(10) << "check_recovery_sources resetting pulls from osd." << i->first + << ", osdmap has it marked down" << dendl; + for (set::iterator j = i->second.begin(); + j != i->second.end(); + ++j) { + get_parent()->cancel_pull(*j); + clear_pull(pulling.find(*j), false); + } + pull_from_peer.erase(i++); + } else { + ++i; + } + } +} + +bool ReplicatedBackend::can_handle_while_inactive(OpRequestRef op) +{ + dout(10) << __func__ << ": " << op << dendl; + switch (op->get_req()->get_type()) { + case MSG_OSD_PG_PULL: + return true; + default: + return false; + } +} + +bool ReplicatedBackend::_handle_message( + OpRequestRef op + ) +{ + dout(10) << __func__ << ": " << op << dendl; + switch (op->get_req()->get_type()) { + case MSG_OSD_PG_PUSH: + do_push(op); + return true; + + case MSG_OSD_PG_PULL: + do_pull(op); + return true; + + case MSG_OSD_PG_PUSH_REPLY: + do_push_reply(op); + return true; + + case MSG_OSD_SUBOP: { + const MOSDSubOp *m = static_cast(op->get_req()); + if (m->ops.size() == 0) { + assert(0); + } + break; + } + + case MSG_OSD_REPOP: { + do_repop(op); + return true; + } + + case MSG_OSD_REPOPREPLY: { + do_repop_reply(op); + return true; + } + + default: + break; + } + return false; +} + +void ReplicatedBackend::clear_recovery_state() +{ + // clear pushing/pulling maps + for (auto &&i: pushing) { + for (auto &&j: i.second) { + get_parent()->release_locks(j.second.lock_manager); + } + } + pushing.clear(); + + for (auto &&i: pulling) { + get_parent()->release_locks(i.second.lock_manager); + } + pulling.clear(); + pull_from_peer.clear(); +} + +void ReplicatedBackend::on_change() +{ + dout(10) << __func__ << dendl; + for (map::iterator i = in_progress_ops.begin(); + i != in_progress_ops.end(); + in_progress_ops.erase(i++)) { + if (i->second.on_commit) + delete i->second.on_commit; + if (i->second.on_applied) + delete i->second.on_applied; + } + clear_recovery_state(); +} + +void ReplicatedBackend::on_flushed() +{ +} + +int ReplicatedBackend::objects_read_sync( + const hobject_t &hoid, + uint64_t off, + uint64_t len, + uint32_t op_flags, + bufferlist *bl) +{ + return store->read(ch, ghobject_t(hoid), off, len, *bl, op_flags); +} + +struct AsyncReadCallback : public GenContext { + int r; + Context *c; + AsyncReadCallback(int r, Context *c) : r(r), c(c) {} + void finish(ThreadPool::TPHandle&) override { + c->complete(r); + c = NULL; + } + ~AsyncReadCallback() override { + delete c; + } +}; +void ReplicatedBackend::objects_read_async( + const hobject_t &hoid, + const list, + pair > > &to_read, + Context *on_complete, + bool fast_read) +{ + // There is no fast read implementation for replication backend yet + assert(!fast_read); + + int r = 0; + for (list, + pair > >::const_iterator i = + to_read.begin(); + i != to_read.end() && r >= 0; + ++i) { + int _r = store->read(ch, ghobject_t(hoid), i->first.get<0>(), + i->first.get<1>(), *(i->second.first), + i->first.get<2>()); + if (i->second.second) { + get_parent()->schedule_recovery_work( + get_parent()->bless_gencontext( + new AsyncReadCallback(_r, i->second.second))); + } + if (_r < 0) + r = _r; + } + get_parent()->schedule_recovery_work( + get_parent()->bless_gencontext( + new AsyncReadCallback(r, on_complete))); +} + +class C_OSD_OnOpCommit : public Context { + ReplicatedBackend *pg; + ReplicatedBackend::InProgressOp *op; +public: + C_OSD_OnOpCommit(ReplicatedBackend *pg, ReplicatedBackend::InProgressOp *op) + : pg(pg), op(op) {} + void finish(int) override { + pg->op_commit(op); + } +}; + +class C_OSD_OnOpApplied : public Context { + ReplicatedBackend *pg; + ReplicatedBackend::InProgressOp *op; +public: + C_OSD_OnOpApplied(ReplicatedBackend *pg, ReplicatedBackend::InProgressOp *op) + : pg(pg), op(op) {} + void finish(int) override { + pg->op_applied(op); + } +}; + +void generate_transaction( + PGTransactionUPtr &pgt, + const coll_t &coll, + bool legacy_log_entries, + vector &log_entries, + ObjectStore::Transaction *t, + set *added, + set *removed) +{ + assert(t); + assert(added); + assert(removed); + + for (auto &&le: log_entries) { + le.mark_unrollbackable(); + auto oiter = pgt->op_map.find(le.soid); + if (oiter != pgt->op_map.end() && oiter->second.updated_snaps) { + bufferlist bl(oiter->second.updated_snaps->second.size() * 8 + 8); + ::encode(oiter->second.updated_snaps->second, bl); + le.snaps.swap(bl); + le.snaps.reassign_to_mempool(mempool::mempool_osd_pglog); + } + } + + pgt->safe_create_traverse( + [&](pair &obj_op) { + const hobject_t &oid = obj_op.first; + const ghobject_t goid = + ghobject_t(oid, ghobject_t::NO_GEN, shard_id_t::NO_SHARD); + const PGTransaction::ObjectOperation &op = obj_op.second; + + if (oid.is_temp()) { + if (op.is_fresh_object()) { + added->insert(oid); + } else if (op.is_delete()) { + removed->insert(oid); + } + } + + if (op.delete_first) { + t->remove(coll, goid); + } + + match( + op.init_type, + [&](const PGTransaction::ObjectOperation::Init::None &) { + }, + [&](const PGTransaction::ObjectOperation::Init::Create &op) { + t->touch(coll, goid); + }, + [&](const PGTransaction::ObjectOperation::Init::Clone &op) { + t->clone( + coll, + ghobject_t( + op.source, ghobject_t::NO_GEN, shard_id_t::NO_SHARD), + goid); + }, + [&](const PGTransaction::ObjectOperation::Init::Rename &op) { + assert(op.source.is_temp()); + t->collection_move_rename( + coll, + ghobject_t( + op.source, ghobject_t::NO_GEN, shard_id_t::NO_SHARD), + coll, + goid); + }); + + if (op.truncate) { + t->truncate(coll, goid, op.truncate->first); + if (op.truncate->first != op.truncate->second) + t->truncate(coll, goid, op.truncate->second); + } + + if (!op.attr_updates.empty()) { + map attrs; + for (auto &&p: op.attr_updates) { + if (p.second) + attrs[p.first] = *(p.second); + else + t->rmattr(coll, goid, p.first); + } + t->setattrs(coll, goid, attrs); + } + + if (op.clear_omap) + t->omap_clear(coll, goid); + if (op.omap_header) + t->omap_setheader(coll, goid, *(op.omap_header)); + + for (auto &&up: op.omap_updates) { + using UpdateType = PGTransaction::ObjectOperation::OmapUpdateType; + switch (up.first) { + case UpdateType::Remove: + t->omap_rmkeys(coll, goid, up.second); + break; + case UpdateType::Insert: + t->omap_setkeys(coll, goid, up.second); + break; + } + } + + // updated_snaps doesn't matter since we marked unrollbackable + + if (op.alloc_hint) { + auto &hint = *(op.alloc_hint); + t->set_alloc_hint( + coll, + goid, + hint.expected_object_size, + hint.expected_write_size, + hint.flags); + } + + for (auto &&extent: op.buffer_updates) { + using BufferUpdate = PGTransaction::ObjectOperation::BufferUpdate; + match( + extent.get_val(), + [&](const BufferUpdate::Write &op) { + t->write( + coll, + goid, + extent.get_off(), + extent.get_len(), + op.buffer); + }, + [&](const BufferUpdate::Zero &op) { + t->zero( + coll, + goid, + extent.get_off(), + extent.get_len()); + }, + [&](const BufferUpdate::CloneRange &op) { + assert(op.len == extent.get_len()); + t->clone_range( + coll, + ghobject_t(op.from, ghobject_t::NO_GEN, shard_id_t::NO_SHARD), + goid, + op.offset, + extent.get_len(), + extent.get_off()); + }); + } + }); +} + +void ReplicatedBackend::submit_transaction( + const hobject_t &soid, + const object_stat_sum_t &delta_stats, + const eversion_t &at_version, + PGTransactionUPtr &&_t, + const eversion_t &trim_to, + const eversion_t &roll_forward_to, + const vector &_log_entries, + boost::optional &hset_history, + Context *on_local_applied_sync, + Context *on_all_acked, + Context *on_all_commit, + ceph_tid_t tid, + osd_reqid_t reqid, + OpRequestRef orig_op) +{ + parent->apply_stats( + soid, + delta_stats); + + vector log_entries(_log_entries); + ObjectStore::Transaction op_t; + PGTransactionUPtr t(std::move(_t)); + set added, removed; + generate_transaction( + t, + coll, + (get_osdmap()->require_osd_release < CEPH_RELEASE_KRAKEN), + log_entries, + &op_t, + &added, + &removed); + assert(added.size() <= 1); + assert(removed.size() <= 1); + + assert(!in_progress_ops.count(tid)); + InProgressOp &op = in_progress_ops.insert( + make_pair( + tid, + InProgressOp( + tid, on_all_commit, on_all_acked, + orig_op, at_version) + ) + ).first->second; + + op.waiting_for_applied.insert( + parent->get_actingbackfill_shards().begin(), + parent->get_actingbackfill_shards().end()); + op.waiting_for_commit.insert( + parent->get_actingbackfill_shards().begin(), + parent->get_actingbackfill_shards().end()); + + issue_op( + soid, + at_version, + tid, + reqid, + trim_to, + at_version, + added.size() ? *(added.begin()) : hobject_t(), + removed.size() ? *(removed.begin()) : hobject_t(), + log_entries, + hset_history, + &op, + op_t); + + add_temp_objs(added); + clear_temp_objs(removed); + + parent->log_operation( + log_entries, + hset_history, + trim_to, + at_version, + true, + op_t); + + op_t.register_on_applied_sync(on_local_applied_sync); + op_t.register_on_applied( + parent->bless_context( + new C_OSD_OnOpApplied(this, &op))); + op_t.register_on_commit( + parent->bless_context( + new C_OSD_OnOpCommit(this, &op))); + + vector tls; + tls.push_back(std::move(op_t)); + + parent->queue_transactions(tls, op.op); +} + +void ReplicatedBackend::op_applied( + InProgressOp *op) +{ + FUNCTRACE(); + OID_EVENT_TRACE_WITH_MSG((op && op->op) ? op->op->get_req() : NULL, "OP_APPLIED_BEGIN", true); + dout(10) << __func__ << ": " << op->tid << dendl; + if (op->op) { + op->op->mark_event("op_applied"); + op->op->pg_trace.event("op applied"); + } + + op->waiting_for_applied.erase(get_parent()->whoami_shard()); + parent->op_applied(op->v); + + if (op->waiting_for_applied.empty()) { + op->on_applied->complete(0); + op->on_applied = 0; + } + if (op->done()) { + assert(!op->on_commit && !op->on_applied); + in_progress_ops.erase(op->tid); + } +} + +void ReplicatedBackend::op_commit( + InProgressOp *op) +{ + FUNCTRACE(); + OID_EVENT_TRACE_WITH_MSG((op && op->op) ? op->op->get_req() : NULL, "OP_COMMIT_BEGIN", true); + dout(10) << __func__ << ": " << op->tid << dendl; + if (op->op) { + op->op->mark_event("op_commit"); + op->op->pg_trace.event("op commit"); + } + + op->waiting_for_commit.erase(get_parent()->whoami_shard()); + + if (op->waiting_for_commit.empty()) { + op->on_commit->complete(0); + op->on_commit = 0; + } + if (op->done()) { + assert(!op->on_commit && !op->on_applied); + in_progress_ops.erase(op->tid); + } +} + +void ReplicatedBackend::do_repop_reply(OpRequestRef op) +{ + static_cast(op->get_nonconst_req())->finish_decode(); + const MOSDRepOpReply *r = static_cast(op->get_req()); + assert(r->get_header().type == MSG_OSD_REPOPREPLY); + + op->mark_started(); + + // must be replication. + ceph_tid_t rep_tid = r->get_tid(); + pg_shard_t from = r->from; + + if (in_progress_ops.count(rep_tid)) { + map::iterator iter = + in_progress_ops.find(rep_tid); + InProgressOp &ip_op = iter->second; + const MOSDOp *m = NULL; + if (ip_op.op) + m = static_cast(ip_op.op->get_req()); + + if (m) + dout(7) << __func__ << ": tid " << ip_op.tid << " op " //<< *m + << " ack_type " << (int)r->ack_type + << " from " << from + << dendl; + else + dout(7) << __func__ << ": tid " << ip_op.tid << " (no op) " + << " ack_type " << (int)r->ack_type + << " from " << from + << dendl; + + // oh, good. + + if (r->ack_type & CEPH_OSD_FLAG_ONDISK) { + assert(ip_op.waiting_for_commit.count(from)); + ip_op.waiting_for_commit.erase(from); + if (ip_op.op) { + ostringstream ss; + ss << "sub_op_commit_rec from " << from; + ip_op.op->mark_event_string(ss.str()); + ip_op.op->pg_trace.event("sub_op_commit_rec"); + } + } else { + assert(ip_op.waiting_for_applied.count(from)); + if (ip_op.op) { + ostringstream ss; + ss << "sub_op_applied_rec from " << from; + ip_op.op->mark_event_string(ss.str()); + ip_op.op->pg_trace.event("sub_op_applied_rec"); + } + } + ip_op.waiting_for_applied.erase(from); + + parent->update_peer_last_complete_ondisk( + from, + r->get_last_complete_ondisk()); + + if (ip_op.waiting_for_applied.empty() && + ip_op.on_applied) { + ip_op.on_applied->complete(0); + ip_op.on_applied = 0; + } + if (ip_op.waiting_for_commit.empty() && + ip_op.on_commit) { + ip_op.on_commit->complete(0); + ip_op.on_commit= 0; + } + if (ip_op.done()) { + assert(!ip_op.on_commit && !ip_op.on_applied); + in_progress_ops.erase(iter); + } + } +} + +void ReplicatedBackend::be_deep_scrub( + const hobject_t &poid, + uint32_t seed, + ScrubMap::object &o, + ThreadPool::TPHandle &handle) +{ + dout(10) << __func__ << " " << poid << " seed " + << std::hex << seed << std::dec << dendl; + bufferhash h(seed), oh(seed); + bufferlist bl, hdrbl; + int r; + __u64 pos = 0; + + uint32_t fadvise_flags = CEPH_OSD_OP_FLAG_FADVISE_SEQUENTIAL | CEPH_OSD_OP_FLAG_FADVISE_DONTNEED; + + while (true) { + handle.reset_tp_timeout(); + r = store->read( + ch, + ghobject_t( + poid, ghobject_t::NO_GEN, get_parent()->whoami_shard().shard), + pos, + cct->_conf->osd_deep_scrub_stride, bl, + fadvise_flags); + if (r <= 0) + break; + + h << bl; + pos += bl.length(); + bl.clear(); + } + if (r == -EIO) { + dout(25) << __func__ << " " << poid << " got " + << r << " on read, read_error" << dendl; + o.read_error = true; + return; + } + o.digest = h.digest(); + o.digest_present = true; + + bl.clear(); + r = store->omap_get_header( + coll, + ghobject_t( + poid, ghobject_t::NO_GEN, get_parent()->whoami_shard().shard), + &hdrbl, true); + // NOTE: bobtail to giant, we would crc the head as (len, head). + // that changes at the same time we start using a non-zero seed. + if (r == 0 && hdrbl.length()) { + dout(25) << "CRC header " << string(hdrbl.c_str(), hdrbl.length()) + << dendl; + if (seed == 0) { + // legacy + bufferlist bl; + ::encode(hdrbl, bl); + oh << bl; + } else { + oh << hdrbl; + } + } else if (r == -EIO) { + dout(25) << __func__ << " " << poid << " got " + << r << " on omap header read, read_error" << dendl; + o.read_error = true; + return; + } + + ObjectMap::ObjectMapIterator iter = store->get_omap_iterator( + coll, + ghobject_t( + poid, ghobject_t::NO_GEN, get_parent()->whoami_shard().shard)); + assert(iter); + for (iter->seek_to_first(); iter->status() == 0 && iter->valid(); + iter->next(false)) { + handle.reset_tp_timeout(); + + dout(25) << "CRC key " << iter->key() << " value:\n"; + iter->value().hexdump(*_dout); + *_dout << dendl; + + ::encode(iter->key(), bl); + ::encode(iter->value(), bl); + oh << bl; + bl.clear(); + } + + if (iter->status() < 0) { + dout(25) << __func__ << " " << poid + << " on omap scan, db status error" << dendl; + o.read_error = true; + return; + } + + //Store final calculated CRC32 of omap header & key/values + o.omap_digest = oh.digest(); + o.omap_digest_present = true; + dout(20) << __func__ << " " << poid << " omap_digest " + << std::hex << o.omap_digest << std::dec << dendl; +} + +void ReplicatedBackend::_do_push(OpRequestRef op) +{ + const MOSDPGPush *m = static_cast(op->get_req()); + assert(m->get_type() == MSG_OSD_PG_PUSH); + pg_shard_t from = m->from; + + op->mark_started(); + + vector replies; + ObjectStore::Transaction t; + ostringstream ss; + if (get_parent()->check_failsafe_full(ss)) { + dout(10) << __func__ << " Out of space (failsafe) processing push request: " << ss.str() << dendl; + ceph_abort(); + } + for (vector::const_iterator i = m->pushes.begin(); + i != m->pushes.end(); + ++i) { + replies.push_back(PushReplyOp()); + handle_push(from, *i, &(replies.back()), &t); + } + + MOSDPGPushReply *reply = new MOSDPGPushReply; + reply->from = get_parent()->whoami_shard(); + reply->set_priority(m->get_priority()); + reply->pgid = get_info().pgid; + reply->map_epoch = m->map_epoch; + reply->min_epoch = m->min_epoch; + reply->replies.swap(replies); + reply->compute_cost(cct); + + t.register_on_complete( + new PG_SendMessageOnConn( + get_parent(), reply, m->get_connection())); + + get_parent()->queue_transaction(std::move(t)); +} + +struct C_ReplicatedBackend_OnPullComplete : GenContext { + ReplicatedBackend *bc; + list to_continue; + int priority; + C_ReplicatedBackend_OnPullComplete(ReplicatedBackend *bc, int priority) + : bc(bc), priority(priority) {} + + void finish(ThreadPool::TPHandle &handle) override { + ReplicatedBackend::RPGHandle *h = bc->_open_recovery_op(); + for (auto &&i: to_continue) { + auto j = bc->pulling.find(i.hoid); + assert(j != bc->pulling.end()); + ObjectContextRef obc = j->second.obc; + bc->clear_pull(j, false /* already did it */); + int started = bc->start_pushes(i.hoid, obc, h); + if (started < 0) { + bc->pushing[i.hoid].clear(); + bc->get_parent()->primary_failed(i.hoid); + bc->get_parent()->primary_error(i.hoid, obc->obs.oi.version); + } else if (!started) { + bc->get_parent()->on_global_recover( + i.hoid, i.stat, false); + } + handle.reset_tp_timeout(); + } + bc->run_recovery_op(h, priority); + } +}; + +void ReplicatedBackend::_do_pull_response(OpRequestRef op) +{ + const MOSDPGPush *m = static_cast(op->get_req()); + assert(m->get_type() == MSG_OSD_PG_PUSH); + pg_shard_t from = m->from; + + op->mark_started(); + + vector replies(1); + + ostringstream ss; + if (get_parent()->check_failsafe_full(ss)) { + dout(10) << __func__ << " Out of space (failsafe) processing pull response (push): " << ss.str() << dendl; + ceph_abort(); + } + + ObjectStore::Transaction t; + list to_continue; + for (vector::const_iterator i = m->pushes.begin(); + i != m->pushes.end(); + ++i) { + bool more = handle_pull_response(from, *i, &(replies.back()), &to_continue, &t); + if (more) + replies.push_back(PullOp()); + } + if (!to_continue.empty()) { + C_ReplicatedBackend_OnPullComplete *c = + new C_ReplicatedBackend_OnPullComplete( + this, + m->get_priority()); + c->to_continue.swap(to_continue); + t.register_on_complete( + new PG_RecoveryQueueAsync( + get_parent(), + get_parent()->bless_gencontext(c))); + } + replies.erase(replies.end() - 1); + + if (replies.size()) { + MOSDPGPull *reply = new MOSDPGPull; + reply->from = parent->whoami_shard(); + reply->set_priority(m->get_priority()); + reply->pgid = get_info().pgid; + reply->map_epoch = m->map_epoch; + reply->min_epoch = m->min_epoch; + reply->set_pulls(&replies); + reply->compute_cost(cct); + + t.register_on_complete( + new PG_SendMessageOnConn( + get_parent(), reply, m->get_connection())); + } + + get_parent()->queue_transaction(std::move(t)); +} + +void ReplicatedBackend::do_pull(OpRequestRef op) +{ + MOSDPGPull *m = static_cast(op->get_nonconst_req()); + assert(m->get_type() == MSG_OSD_PG_PULL); + pg_shard_t from = m->from; + + map > replies; + vector pulls; + m->take_pulls(&pulls); + for (auto& i : pulls) { + replies[from].push_back(PushOp()); + handle_pull(from, i, &(replies[from].back())); + } + send_pushes(m->get_priority(), replies); +} + +void ReplicatedBackend::do_push_reply(OpRequestRef op) +{ + const MOSDPGPushReply *m = static_cast(op->get_req()); + assert(m->get_type() == MSG_OSD_PG_PUSH_REPLY); + pg_shard_t from = m->from; + + vector replies(1); + for (vector::const_iterator i = m->replies.begin(); + i != m->replies.end(); + ++i) { + bool more = handle_push_reply(from, *i, &(replies.back())); + if (more) + replies.push_back(PushOp()); + } + replies.erase(replies.end() - 1); + + map > _replies; + _replies[from].swap(replies); + send_pushes(m->get_priority(), _replies); +} + +Message * ReplicatedBackend::generate_subop( + const hobject_t &soid, + const eversion_t &at_version, + ceph_tid_t tid, + osd_reqid_t reqid, + eversion_t pg_trim_to, + eversion_t pg_roll_forward_to, + hobject_t new_temp_oid, + hobject_t discard_temp_oid, + const vector &log_entries, + boost::optional &hset_hist, + ObjectStore::Transaction &op_t, + pg_shard_t peer, + const pg_info_t &pinfo) +{ + int acks_wanted = CEPH_OSD_FLAG_ACK | CEPH_OSD_FLAG_ONDISK; + // forward the write/update/whatever + MOSDRepOp *wr = new MOSDRepOp( + reqid, parent->whoami_shard(), + spg_t(get_info().pgid.pgid, peer.shard), + soid, acks_wanted, + get_osdmap()->get_epoch(), + parent->get_last_peering_reset_epoch(), + tid, at_version); + + // ship resulting transaction, log entries, and pg_stats + if (!parent->should_send_op(peer, soid)) { + dout(10) << "issue_repop shipping empty opt to osd." << peer + <<", object " << soid + << " beyond MAX(last_backfill_started " + << ", pinfo.last_backfill " + << pinfo.last_backfill << ")" << dendl; + ObjectStore::Transaction t; + ::encode(t, wr->get_data()); + } else { + ::encode(op_t, wr->get_data()); + wr->get_header().data_off = op_t.get_data_alignment(); + } + + ::encode(log_entries, wr->logbl); + + if (pinfo.is_incomplete()) + wr->pg_stats = pinfo.stats; // reflects backfill progress + else + wr->pg_stats = get_info().stats; + + wr->pg_trim_to = pg_trim_to; + wr->pg_roll_forward_to = pg_roll_forward_to; + + wr->new_temp_oid = new_temp_oid; + wr->discard_temp_oid = discard_temp_oid; + wr->updated_hit_set_history = hset_hist; + return wr; +} + +void ReplicatedBackend::issue_op( + const hobject_t &soid, + const eversion_t &at_version, + ceph_tid_t tid, + osd_reqid_t reqid, + eversion_t pg_trim_to, + eversion_t pg_roll_forward_to, + hobject_t new_temp_oid, + hobject_t discard_temp_oid, + const vector &log_entries, + boost::optional &hset_hist, + InProgressOp *op, + ObjectStore::Transaction &op_t) +{ + if (op->op) + op->op->pg_trace.event("issue replication ops"); + + if (parent->get_actingbackfill_shards().size() > 1) { + ostringstream ss; + set replicas = parent->get_actingbackfill_shards(); + replicas.erase(parent->whoami_shard()); + ss << "waiting for subops from " << replicas; + if (op->op) + op->op->mark_sub_op_sent(ss.str()); + } + for (set::const_iterator i = + parent->get_actingbackfill_shards().begin(); + i != parent->get_actingbackfill_shards().end(); + ++i) { + if (*i == parent->whoami_shard()) continue; + pg_shard_t peer = *i; + const pg_info_t &pinfo = parent->get_shard_info().find(peer)->second; + + Message *wr; + wr = generate_subop( + soid, + at_version, + tid, + reqid, + pg_trim_to, + pg_roll_forward_to, + new_temp_oid, + discard_temp_oid, + log_entries, + hset_hist, + op_t, + peer, + pinfo); + if (op->op) + wr->trace.init("replicated op", nullptr, &op->op->pg_trace); + get_parent()->send_message_osd_cluster( + peer.osd, wr, get_osdmap()->get_epoch()); + } +} + +// sub op modify +void ReplicatedBackend::do_repop(OpRequestRef op) +{ + static_cast(op->get_nonconst_req())->finish_decode(); + const MOSDRepOp *m = static_cast(op->get_req()); + int msg_type = m->get_type(); + assert(MSG_OSD_REPOP == msg_type); + + const hobject_t& soid = m->poid; + + dout(10) << __func__ << " " << soid + << " v " << m->version + << (m->logbl.length() ? " (transaction)" : " (parallel exec") + << " " << m->logbl.length() + << dendl; + + // sanity checks + assert(m->map_epoch >= get_info().history.same_interval_since); + + // we better not be missing this. + assert(!parent->get_log().get_missing().is_missing(soid)); + + int ackerosd = m->get_source().num(); + + op->mark_started(); + + RepModifyRef rm(std::make_shared()); + rm->op = op; + rm->ackerosd = ackerosd; + rm->last_complete = get_info().last_complete; + rm->epoch_started = get_osdmap()->get_epoch(); + + assert(m->logbl.length()); + // shipped transaction and log entries + vector log; + + bufferlist::iterator p = const_cast(m->get_data()).begin(); + ::decode(rm->opt, p); + + if (m->new_temp_oid != hobject_t()) { + dout(20) << __func__ << " start tracking temp " << m->new_temp_oid << dendl; + add_temp_obj(m->new_temp_oid); + } + if (m->discard_temp_oid != hobject_t()) { + dout(20) << __func__ << " stop tracking temp " << m->discard_temp_oid << dendl; + if (rm->opt.empty()) { + dout(10) << __func__ << ": removing object " << m->discard_temp_oid + << " since we won't get the transaction" << dendl; + rm->localt.remove(coll, ghobject_t(m->discard_temp_oid)); + } + clear_temp_obj(m->discard_temp_oid); + } + + p = const_cast(m->logbl).begin(); + ::decode(log, p); + rm->opt.set_fadvise_flag(CEPH_OSD_OP_FLAG_FADVISE_DONTNEED); + + bool update_snaps = false; + if (!rm->opt.empty()) { + // If the opt is non-empty, we infer we are before + // last_backfill (according to the primary, not our + // not-quite-accurate value), and should update the + // collections now. Otherwise, we do it later on push. + update_snaps = true; + } + parent->update_stats(m->pg_stats); + parent->log_operation( + log, + m->updated_hit_set_history, + m->pg_trim_to, + m->pg_roll_forward_to, + update_snaps, + rm->localt); + + rm->opt.register_on_commit( + parent->bless_context( + new C_OSD_RepModifyCommit(this, rm))); + rm->localt.register_on_applied( + parent->bless_context( + new C_OSD_RepModifyApply(this, rm))); + vector tls; + tls.reserve(2); + tls.push_back(std::move(rm->localt)); + tls.push_back(std::move(rm->opt)); + parent->queue_transactions(tls, op); + // op is cleaned up by oncommit/onapply when both are executed +} + +void ReplicatedBackend::repop_applied(RepModifyRef rm) +{ + rm->op->mark_event("sub_op_applied"); + rm->applied = true; + rm->op->pg_trace.event("sup_op_applied"); + + dout(10) << __func__ << " on " << rm << " op " + << *rm->op->get_req() << dendl; + const Message *m = rm->op->get_req(); + const MOSDRepOp *req = static_cast(m); + eversion_t version = req->version; + + // send ack to acker only if we haven't sent a commit already + if (!rm->committed) { + Message *ack = new MOSDRepOpReply( + req, parent->whoami_shard(), + 0, get_osdmap()->get_epoch(), req->min_epoch, CEPH_OSD_FLAG_ACK); + ack->set_priority(CEPH_MSG_PRIO_HIGH); // this better match commit priority! + ack->trace = rm->op->pg_trace; + get_parent()->send_message_osd_cluster( + rm->ackerosd, ack, get_osdmap()->get_epoch()); + } + + parent->op_applied(version); +} + +void ReplicatedBackend::repop_commit(RepModifyRef rm) +{ + rm->op->mark_commit_sent(); + rm->op->pg_trace.event("sup_op_commit"); + rm->committed = true; + + // send commit. + const MOSDRepOp *m = static_cast(rm->op->get_req()); + assert(m->get_type() == MSG_OSD_REPOP); + dout(10) << __func__ << " on op " << *m + << ", sending commit to osd." << rm->ackerosd + << dendl; + assert(get_osdmap()->is_up(rm->ackerosd)); + + get_parent()->update_last_complete_ondisk(rm->last_complete); + + MOSDRepOpReply *reply = new MOSDRepOpReply( + m, + get_parent()->whoami_shard(), + 0, get_osdmap()->get_epoch(), m->get_min_epoch(), CEPH_OSD_FLAG_ONDISK); + reply->set_last_complete_ondisk(rm->last_complete); + reply->set_priority(CEPH_MSG_PRIO_HIGH); // this better match ack priority! + reply->trace = rm->op->pg_trace; + get_parent()->send_message_osd_cluster( + rm->ackerosd, reply, get_osdmap()->get_epoch()); + + log_subop_stats(get_parent()->get_logger(), rm->op, l_osd_sop_w); +} + + +// =========================================================== + +void ReplicatedBackend::calc_head_subsets( + ObjectContextRef obc, SnapSet& snapset, const hobject_t& head, + const pg_missing_t& missing, + const hobject_t &last_backfill, + interval_set& data_subset, + map>& clone_subsets, + ObcLockManager &manager) +{ + dout(10) << "calc_head_subsets " << head + << " clone_overlap " << snapset.clone_overlap << dendl; + + uint64_t size = obc->obs.oi.size; + if (size) + data_subset.insert(0, size); + + if (get_parent()->get_pool().allow_incomplete_clones()) { + dout(10) << __func__ << ": caching (was) enabled, skipping clone subsets" << dendl; + return; + } + + if (!cct->_conf->osd_recover_clone_overlap) { + dout(10) << "calc_head_subsets " << head << " -- osd_recover_clone_overlap disabled" << dendl; + return; + } + + + interval_set cloning; + interval_set prev; + if (size) + prev.insert(0, size); + + for (int j=snapset.clones.size()-1; j>=0; j--) { + hobject_t c = head; + c.snap = snapset.clones[j]; + prev.intersection_of(snapset.clone_overlap[snapset.clones[j]]); + if (!missing.is_missing(c) && + c < last_backfill && + get_parent()->try_lock_for_read(c, manager)) { + dout(10) << "calc_head_subsets " << head << " has prev " << c + << " overlap " << prev << dendl; + clone_subsets[c] = prev; + cloning.union_of(prev); + break; + } + dout(10) << "calc_head_subsets " << head << " does not have prev " << c + << " overlap " << prev << dendl; + } + + + if (cloning.num_intervals() > cct->_conf->osd_recover_clone_overlap_limit) { + dout(10) << "skipping clone, too many holes" << dendl; + get_parent()->release_locks(manager); + clone_subsets.clear(); + cloning.clear(); + } + + // what's left for us to push? + data_subset.subtract(cloning); + + dout(10) << "calc_head_subsets " << head + << " data_subset " << data_subset + << " clone_subsets " << clone_subsets << dendl; +} + +void ReplicatedBackend::calc_clone_subsets( + SnapSet& snapset, const hobject_t& soid, + const pg_missing_t& missing, + const hobject_t &last_backfill, + interval_set& data_subset, + map>& clone_subsets, + ObcLockManager &manager) +{ + dout(10) << "calc_clone_subsets " << soid + << " clone_overlap " << snapset.clone_overlap << dendl; + + uint64_t size = snapset.clone_size[soid.snap]; + if (size) + data_subset.insert(0, size); + + if (get_parent()->get_pool().allow_incomplete_clones()) { + dout(10) << __func__ << ": caching (was) enabled, skipping clone subsets" << dendl; + return; + } + + if (!cct->_conf->osd_recover_clone_overlap) { + dout(10) << "calc_clone_subsets " << soid << " -- osd_recover_clone_overlap disabled" << dendl; + return; + } + + unsigned i; + for (i=0; i < snapset.clones.size(); i++) + if (snapset.clones[i] == soid.snap) + break; + + // any overlap with next older clone? + interval_set cloning; + interval_set prev; + if (size) + prev.insert(0, size); + for (int j=i-1; j>=0; j--) { + hobject_t c = soid; + c.snap = snapset.clones[j]; + prev.intersection_of(snapset.clone_overlap[snapset.clones[j]]); + if (!missing.is_missing(c) && + c < last_backfill && + get_parent()->try_lock_for_read(c, manager)) { + dout(10) << "calc_clone_subsets " << soid << " has prev " << c + << " overlap " << prev << dendl; + clone_subsets[c] = prev; + cloning.union_of(prev); + break; + } + dout(10) << "calc_clone_subsets " << soid << " does not have prev " << c + << " overlap " << prev << dendl; + } + + // overlap with next newest? + interval_set next; + if (size) + next.insert(0, size); + for (unsigned j=i+1; jtry_lock_for_read(c, manager)) { + dout(10) << "calc_clone_subsets " << soid << " has next " << c + << " overlap " << next << dendl; + clone_subsets[c] = next; + cloning.union_of(next); + break; + } + dout(10) << "calc_clone_subsets " << soid << " does not have next " << c + << " overlap " << next << dendl; + } + + if (cloning.num_intervals() > cct->_conf->osd_recover_clone_overlap_limit) { + dout(10) << "skipping clone, too many holes" << dendl; + get_parent()->release_locks(manager); + clone_subsets.clear(); + cloning.clear(); + } + + + // what's left for us to push? + data_subset.subtract(cloning); + + dout(10) << "calc_clone_subsets " << soid + << " data_subset " << data_subset + << " clone_subsets " << clone_subsets << dendl; +} + +void ReplicatedBackend::prepare_pull( + eversion_t v, + const hobject_t& soid, + ObjectContextRef headctx, + RPGHandle *h) +{ + assert(get_parent()->get_local_missing().get_items().count(soid)); + eversion_t _v = get_parent()->get_local_missing().get_items().find( + soid)->second.need; + assert(_v == v); + const map> &missing_loc( + get_parent()->get_missing_loc_shards()); + const map &peer_missing( + get_parent()->get_shard_missing()); + map>::const_iterator q = missing_loc.find(soid); + assert(q != missing_loc.end()); + assert(!q->second.empty()); + + // pick a pullee + vector shuffle(q->second.begin(), q->second.end()); + random_shuffle(shuffle.begin(), shuffle.end()); + vector::iterator p = shuffle.begin(); + assert(get_osdmap()->is_up(p->osd)); + pg_shard_t fromshard = *p; + + dout(7) << "pull " << soid + << " v " << v + << " on osds " << q->second + << " from osd." << fromshard + << dendl; + + assert(peer_missing.count(fromshard)); + const pg_missing_t &pmissing = peer_missing.find(fromshard)->second; + if (pmissing.is_missing(soid, v)) { + assert(pmissing.get_items().find(soid)->second.have != v); + dout(10) << "pulling soid " << soid << " from osd " << fromshard + << " at version " << pmissing.get_items().find(soid)->second.have + << " rather than at version " << v << dendl; + v = pmissing.get_items().find(soid)->second.have; + assert(get_parent()->get_log().get_log().objects.count(soid) && + (get_parent()->get_log().get_log().objects.find(soid)->second->op == + pg_log_entry_t::LOST_REVERT) && + (get_parent()->get_log().get_log().objects.find( + soid)->second->reverting_to == + v)); + } + + ObjectRecoveryInfo recovery_info; + ObcLockManager lock_manager; + + if (soid.is_snap()) { + assert(!get_parent()->get_local_missing().is_missing( + soid.get_head()) || + !get_parent()->get_local_missing().is_missing( + soid.get_snapdir())); + assert(headctx); + // check snapset + SnapSetContext *ssc = headctx->ssc; + assert(ssc); + dout(10) << " snapset " << ssc->snapset << dendl; + recovery_info.ss = ssc->snapset; + calc_clone_subsets( + ssc->snapset, soid, get_parent()->get_local_missing(), + get_info().last_backfill, + recovery_info.copy_subset, + recovery_info.clone_subset, + lock_manager); + // FIXME: this may overestimate if we are pulling multiple clones in parallel... + dout(10) << " pulling " << recovery_info << dendl; + + assert(ssc->snapset.clone_size.count(soid.snap)); + recovery_info.size = ssc->snapset.clone_size[soid.snap]; + } else { + // pulling head or unversioned object. + // always pull the whole thing. + recovery_info.copy_subset.insert(0, (uint64_t)-1); + recovery_info.size = ((uint64_t)-1); + } + + h->pulls[fromshard].push_back(PullOp()); + PullOp &op = h->pulls[fromshard].back(); + op.soid = soid; + + op.recovery_info = recovery_info; + op.recovery_info.soid = soid; + op.recovery_info.version = v; + op.recovery_progress.data_complete = false; + op.recovery_progress.omap_complete = false; + op.recovery_progress.data_recovered_to = 0; + op.recovery_progress.first = true; + + assert(!pulling.count(soid)); + pull_from_peer[fromshard].insert(soid); + PullInfo &pi = pulling[soid]; + pi.from = fromshard; + pi.soid = soid; + pi.head_ctx = headctx; + pi.recovery_info = op.recovery_info; + pi.recovery_progress = op.recovery_progress; + pi.cache_dont_need = h->cache_dont_need; + pi.lock_manager = std::move(lock_manager); +} + +/* + * intelligently push an object to a replica. make use of existing + * clones/heads and dup data ranges where possible. + */ +int ReplicatedBackend::prep_push_to_replica( + ObjectContextRef obc, const hobject_t& soid, pg_shard_t peer, + PushOp *pop, bool cache_dont_need) +{ + const object_info_t& oi = obc->obs.oi; + uint64_t size = obc->obs.oi.size; + + dout(10) << __func__ << ": " << soid << " v" << oi.version + << " size " << size << " to osd." << peer << dendl; + + map> clone_subsets; + interval_set data_subset; + + ObcLockManager lock_manager; + // are we doing a clone on the replica? + if (soid.snap && soid.snap < CEPH_NOSNAP) { + hobject_t head = soid; + head.snap = CEPH_NOSNAP; + + // try to base push off of clones that succeed/preceed poid + // we need the head (and current SnapSet) locally to do that. + if (get_parent()->get_local_missing().is_missing(head)) { + dout(15) << "push_to_replica missing head " << head << ", pushing raw clone" << dendl; + return prep_push(obc, soid, peer, pop, cache_dont_need); + } + hobject_t snapdir = head; + snapdir.snap = CEPH_SNAPDIR; + if (get_parent()->get_local_missing().is_missing(snapdir)) { + dout(15) << "push_to_replica missing snapdir " << snapdir + << ", pushing raw clone" << dendl; + return prep_push(obc, soid, peer, pop, cache_dont_need); + } + + SnapSetContext *ssc = obc->ssc; + assert(ssc); + dout(15) << "push_to_replica snapset is " << ssc->snapset << dendl; + pop->recovery_info.ss = ssc->snapset; + map::const_iterator pm = + get_parent()->get_shard_missing().find(peer); + assert(pm != get_parent()->get_shard_missing().end()); + map::const_iterator pi = + get_parent()->get_shard_info().find(peer); + assert(pi != get_parent()->get_shard_info().end()); + calc_clone_subsets( + ssc->snapset, soid, + pm->second, + pi->second.last_backfill, + data_subset, clone_subsets, + lock_manager); + } else if (soid.snap == CEPH_NOSNAP) { + // pushing head or unversioned object. + // base this on partially on replica's clones? + SnapSetContext *ssc = obc->ssc; + assert(ssc); + dout(15) << "push_to_replica snapset is " << ssc->snapset << dendl; + calc_head_subsets( + obc, + ssc->snapset, soid, get_parent()->get_shard_missing().find(peer)->second, + get_parent()->get_shard_info().find(peer)->second.last_backfill, + data_subset, clone_subsets, + lock_manager); + } + + return prep_push( + obc, + soid, + peer, + oi.version, + data_subset, + clone_subsets, + pop, + cache_dont_need, + std::move(lock_manager)); +} + +int ReplicatedBackend::prep_push(ObjectContextRef obc, + const hobject_t& soid, pg_shard_t peer, + PushOp *pop, bool cache_dont_need) +{ + interval_set data_subset; + if (obc->obs.oi.size) + data_subset.insert(0, obc->obs.oi.size); + map> clone_subsets; + + return prep_push(obc, soid, peer, + obc->obs.oi.version, data_subset, clone_subsets, + pop, cache_dont_need, ObcLockManager()); +} + +int ReplicatedBackend::prep_push( + ObjectContextRef obc, + const hobject_t& soid, pg_shard_t peer, + eversion_t version, + interval_set &data_subset, + map>& clone_subsets, + PushOp *pop, + bool cache_dont_need, + ObcLockManager &&lock_manager) +{ + get_parent()->begin_peer_recover(peer, soid); + // take note. + PushInfo &pi = pushing[soid][peer]; + pi.obc = obc; + pi.recovery_info.size = obc->obs.oi.size; + pi.recovery_info.copy_subset = data_subset; + pi.recovery_info.clone_subset = clone_subsets; + pi.recovery_info.soid = soid; + pi.recovery_info.oi = obc->obs.oi; + pi.recovery_info.ss = pop->recovery_info.ss; + pi.recovery_info.version = version; + pi.lock_manager = std::move(lock_manager); + + ObjectRecoveryProgress new_progress; + int r = build_push_op(pi.recovery_info, + pi.recovery_progress, + &new_progress, + pop, + &(pi.stat), cache_dont_need); + if (r < 0) + return r; + pi.recovery_progress = new_progress; + return 0; +} + +void ReplicatedBackend::submit_push_data( + const ObjectRecoveryInfo &recovery_info, + bool first, + bool complete, + bool cache_dont_need, + const interval_set &intervals_included, + bufferlist data_included, + bufferlist omap_header, + const map &attrs, + const map &omap_entries, + ObjectStore::Transaction *t) +{ + hobject_t target_oid; + if (first && complete) { + target_oid = recovery_info.soid; + } else { + target_oid = get_parent()->get_temp_recovery_object(recovery_info.soid, + recovery_info.version); + if (first) { + dout(10) << __func__ << ": Adding oid " + << target_oid << " in the temp collection" << dendl; + add_temp_obj(target_oid); + } + } + + if (first) { + t->remove(coll, ghobject_t(target_oid)); + t->touch(coll, ghobject_t(target_oid)); + t->truncate(coll, ghobject_t(target_oid), recovery_info.size); + if (omap_header.length()) + t->omap_setheader(coll, ghobject_t(target_oid), omap_header); + + bufferlist bv = attrs.at(OI_ATTR); + object_info_t oi(bv); + t->set_alloc_hint(coll, ghobject_t(target_oid), + oi.expected_object_size, + oi.expected_write_size, + oi.alloc_hint_flags); + } + uint64_t off = 0; + uint32_t fadvise_flags = CEPH_OSD_OP_FLAG_FADVISE_SEQUENTIAL; + if (cache_dont_need) + fadvise_flags |= CEPH_OSD_OP_FLAG_FADVISE_DONTNEED; + for (interval_set::const_iterator p = intervals_included.begin(); + p != intervals_included.end(); + ++p) { + bufferlist bit; + bit.substr_of(data_included, off, p.get_len()); + t->write(coll, ghobject_t(target_oid), + p.get_start(), p.get_len(), bit, fadvise_flags); + off += p.get_len(); + } + + if (!omap_entries.empty()) + t->omap_setkeys(coll, ghobject_t(target_oid), omap_entries); + if (!attrs.empty()) + t->setattrs(coll, ghobject_t(target_oid), attrs); + + if (complete) { + if (!first) { + dout(10) << __func__ << ": Removing oid " + << target_oid << " from the temp collection" << dendl; + clear_temp_obj(target_oid); + t->remove(coll, ghobject_t(recovery_info.soid)); + t->collection_move_rename(coll, ghobject_t(target_oid), + coll, ghobject_t(recovery_info.soid)); + } + + submit_push_complete(recovery_info, t); + } +} + +void ReplicatedBackend::submit_push_complete( + const ObjectRecoveryInfo &recovery_info, + ObjectStore::Transaction *t) +{ + for (map>::const_iterator p = + recovery_info.clone_subset.begin(); + p != recovery_info.clone_subset.end(); + ++p) { + for (interval_set::const_iterator q = p->second.begin(); + q != p->second.end(); + ++q) { + dout(15) << " clone_range " << p->first << " " + << q.get_start() << "~" << q.get_len() << dendl; + t->clone_range(coll, ghobject_t(p->first), ghobject_t(recovery_info.soid), + q.get_start(), q.get_len(), q.get_start()); + } + } +} + +ObjectRecoveryInfo ReplicatedBackend::recalc_subsets( + const ObjectRecoveryInfo& recovery_info, + SnapSetContext *ssc, + ObcLockManager &manager) +{ + if (!recovery_info.soid.snap || recovery_info.soid.snap >= CEPH_NOSNAP) + return recovery_info; + ObjectRecoveryInfo new_info = recovery_info; + new_info.copy_subset.clear(); + new_info.clone_subset.clear(); + assert(ssc); + get_parent()->release_locks(manager); // might already have locks + calc_clone_subsets( + ssc->snapset, new_info.soid, get_parent()->get_local_missing(), + get_info().last_backfill, + new_info.copy_subset, new_info.clone_subset, + manager); + return new_info; +} + +bool ReplicatedBackend::handle_pull_response( + pg_shard_t from, const PushOp &pop, PullOp *response, + list *to_continue, + ObjectStore::Transaction *t) +{ + interval_set data_included = pop.data_included; + bufferlist data; + data = pop.data; + dout(10) << "handle_pull_response " + << pop.recovery_info + << pop.after_progress + << " data.size() is " << data.length() + << " data_included: " << data_included + << dendl; + if (pop.version == eversion_t()) { + // replica doesn't have it! + _failed_pull(from, pop.soid); + return false; + } + + const hobject_t &hoid = pop.soid; + assert((data_included.empty() && data.length() == 0) || + (!data_included.empty() && data.length() > 0)); + + auto piter = pulling.find(hoid); + if (piter == pulling.end()) { + return false; + } + + PullInfo &pi = piter->second; + if (pi.recovery_info.size == (uint64_t(-1))) { + pi.recovery_info.size = pop.recovery_info.size; + pi.recovery_info.copy_subset.intersection_of( + pop.recovery_info.copy_subset); + } + // If primary doesn't have object info and didn't know version + if (pi.recovery_info.version == eversion_t()) { + pi.recovery_info.version = pop.version; + } + + bool first = pi.recovery_progress.first; + if (first) { + // attrs only reference the origin bufferlist (decode from + // MOSDPGPush message) whose size is much greater than attrs in + // recovery. If obc cache it (get_obc maybe cache the attr), this + // causes the whole origin bufferlist would not be free until obc + // is evicted from obc cache. So rebuild the bufferlists before + // cache it. + auto attrset = pop.attrset; + for (auto& a : attrset) { + a.second.rebuild(); + } + pi.obc = get_parent()->get_obc(pi.recovery_info.soid, attrset); + pi.recovery_info.oi = pi.obc->obs.oi; + pi.recovery_info = recalc_subsets( + pi.recovery_info, + pi.obc->ssc, + pi.lock_manager); + } + + + interval_set usable_intervals; + bufferlist usable_data; + trim_pushed_data(pi.recovery_info.copy_subset, + data_included, + data, + &usable_intervals, + &usable_data); + data_included = usable_intervals; + data.claim(usable_data); + + + pi.recovery_progress = pop.after_progress; + + dout(10) << "new recovery_info " << pi.recovery_info + << ", new progress " << pi.recovery_progress + << dendl; + + bool complete = pi.is_complete(); + + submit_push_data(pi.recovery_info, first, + complete, pi.cache_dont_need, + data_included, data, + pop.omap_header, + pop.attrset, + pop.omap_entries, + t); + + pi.stat.num_keys_recovered += pop.omap_entries.size(); + pi.stat.num_bytes_recovered += data.length(); + + if (complete) { + pi.stat.num_objects_recovered++; + clear_pull_from(piter); + to_continue->push_back({hoid, pi.stat}); + get_parent()->on_local_recover( + hoid, pi.recovery_info, pi.obc, false, t); + return false; + } else { + response->soid = pop.soid; + response->recovery_info = pi.recovery_info; + response->recovery_progress = pi.recovery_progress; + return true; + } +} + +void ReplicatedBackend::handle_push( + pg_shard_t from, const PushOp &pop, PushReplyOp *response, + ObjectStore::Transaction *t) +{ + dout(10) << "handle_push " + << pop.recovery_info + << pop.after_progress + << dendl; + bufferlist data; + data = pop.data; + bool first = pop.before_progress.first; + bool complete = pop.after_progress.data_complete && + pop.after_progress.omap_complete; + + response->soid = pop.recovery_info.soid; + submit_push_data(pop.recovery_info, + first, + complete, + true, // must be replicate + pop.data_included, + data, + pop.omap_header, + pop.attrset, + pop.omap_entries, + t); + + if (complete) + get_parent()->on_local_recover( + pop.recovery_info.soid, + pop.recovery_info, + ObjectContextRef(), // ok, is replica + false, + t); +} + +void ReplicatedBackend::send_pushes(int prio, map > &pushes) +{ + for (map >::iterator i = pushes.begin(); + i != pushes.end(); + ++i) { + ConnectionRef con = get_parent()->get_con_osd_cluster( + i->first.osd, + get_osdmap()->get_epoch()); + if (!con) + continue; + vector::iterator j = i->second.begin(); + while (j != i->second.end()) { + uint64_t cost = 0; + uint64_t pushes = 0; + MOSDPGPush *msg = new MOSDPGPush(); + msg->from = get_parent()->whoami_shard(); + msg->pgid = get_parent()->primary_spg_t(); + msg->map_epoch = get_osdmap()->get_epoch(); + msg->min_epoch = get_parent()->get_last_peering_reset_epoch(); + msg->set_priority(prio); + for (; + (j != i->second.end() && + cost < cct->_conf->osd_max_push_cost && + pushes < cct->_conf->osd_max_push_objects) ; + ++j) { + dout(20) << __func__ << ": sending push " << *j + << " to osd." << i->first << dendl; + cost += j->cost(cct); + pushes += 1; + msg->pushes.push_back(*j); + } + msg->set_cost(cost); + get_parent()->send_message_osd_cluster(msg, con); + } + } +} + +void ReplicatedBackend::send_pulls(int prio, map > &pulls) +{ + for (map >::iterator i = pulls.begin(); + i != pulls.end(); + ++i) { + ConnectionRef con = get_parent()->get_con_osd_cluster( + i->first.osd, + get_osdmap()->get_epoch()); + if (!con) + continue; + dout(20) << __func__ << ": sending pulls " << i->second + << " to osd." << i->first << dendl; + MOSDPGPull *msg = new MOSDPGPull(); + msg->from = parent->whoami_shard(); + msg->set_priority(prio); + msg->pgid = get_parent()->primary_spg_t(); + msg->map_epoch = get_osdmap()->get_epoch(); + msg->min_epoch = get_parent()->get_last_peering_reset_epoch(); + msg->set_pulls(&i->second); + msg->compute_cost(cct); + get_parent()->send_message_osd_cluster(msg, con); + } +} + +int ReplicatedBackend::build_push_op(const ObjectRecoveryInfo &recovery_info, + const ObjectRecoveryProgress &progress, + ObjectRecoveryProgress *out_progress, + PushOp *out_op, + object_stat_sum_t *stat, + bool cache_dont_need) +{ + ObjectRecoveryProgress _new_progress; + if (!out_progress) + out_progress = &_new_progress; + ObjectRecoveryProgress &new_progress = *out_progress; + new_progress = progress; + + dout(7) << __func__ << " " << recovery_info.soid + << " v " << recovery_info.version + << " size " << recovery_info.size + << " recovery_info: " << recovery_info + << dendl; + + eversion_t v = recovery_info.version; + if (progress.first) { + int r = store->omap_get_header(coll, ghobject_t(recovery_info.soid), &out_op->omap_header); + if(r < 0) { + dout(1) << __func__ << " get omap header failed: " << cpp_strerror(-r) << dendl; + return r; + } + r = store->getattrs(ch, ghobject_t(recovery_info.soid), out_op->attrset); + if(r < 0) { + dout(1) << __func__ << " getattrs failed: " << cpp_strerror(-r) << dendl; + return r; + } + + // Debug + bufferlist bv = out_op->attrset[OI_ATTR]; + object_info_t oi; + try { + bufferlist::iterator bliter = bv.begin(); + ::decode(oi, bliter); + } catch (...) { + dout(0) << __func__ << ": bad object_info_t: " << recovery_info.soid << dendl; + return -EINVAL; + } + + // If requestor didn't know the version, use ours + if (v == eversion_t()) { + v = oi.version; + } else if (oi.version != v) { + get_parent()->clog_error() << get_info().pgid << " push " + << recovery_info.soid << " v " + << recovery_info.version + << " failed because local copy is " + << oi.version; + return -EINVAL; + } + + new_progress.first = false; + } + // Once we provide the version subsequent requests will have it, so + // at this point it must be known. + assert(v != eversion_t()); + + uint64_t available = cct->_conf->osd_recovery_max_chunk; + if (!progress.omap_complete) { + ObjectMap::ObjectMapIterator iter = + store->get_omap_iterator(coll, + ghobject_t(recovery_info.soid)); + assert(iter); + for (iter->lower_bound(progress.omap_recovered_to); + iter->valid(); + iter->next(false)) { + if (!out_op->omap_entries.empty() && + ((cct->_conf->osd_recovery_max_omap_entries_per_chunk > 0 && + out_op->omap_entries.size() >= cct->_conf->osd_recovery_max_omap_entries_per_chunk) || + available <= iter->key().size() + iter->value().length())) + break; + out_op->omap_entries.insert(make_pair(iter->key(), iter->value())); + + if ((iter->key().size() + iter->value().length()) <= available) + available -= (iter->key().size() + iter->value().length()); + else + available = 0; + } + if (!iter->valid()) + new_progress.omap_complete = true; + else + new_progress.omap_recovered_to = iter->key(); + } + + if (available > 0) { + if (!recovery_info.copy_subset.empty()) { + interval_set copy_subset = recovery_info.copy_subset; + map m; + int r = store->fiemap(ch, ghobject_t(recovery_info.soid), 0, + copy_subset.range_end(), m); + if (r >= 0) { + interval_set fiemap_included(m); + copy_subset.intersection_of(fiemap_included); + } else { + // intersection of copy_subset and empty interval_set would be empty anyway + copy_subset.clear(); + } + + out_op->data_included.span_of(copy_subset, progress.data_recovered_to, + available); + if (out_op->data_included.empty()) // zero filled section, skip to end! + new_progress.data_recovered_to = recovery_info.copy_subset.range_end(); + else + new_progress.data_recovered_to = out_op->data_included.range_end(); + } + } else { + out_op->data_included.clear(); + } + + for (interval_set::iterator p = out_op->data_included.begin(); + p != out_op->data_included.end(); + ++p) { + bufferlist bit; + int r = store->read(ch, ghobject_t(recovery_info.soid), + p.get_start(), p.get_len(), bit, + cache_dont_need ? CEPH_OSD_OP_FLAG_FADVISE_DONTNEED: 0); + if (cct->_conf->osd_debug_random_push_read_error && + (rand() % (int)(cct->_conf->osd_debug_random_push_read_error * 100.0)) == 0) { + dout(0) << __func__ << ": inject EIO " << recovery_info.soid << dendl; + r = -EIO; + } + if (r < 0) { + return r; + } + if (p.get_len() != bit.length()) { + dout(10) << " extent " << p.get_start() << "~" << p.get_len() + << " is actually " << p.get_start() << "~" << bit.length() + << dendl; + interval_set::iterator save = p++; + if (bit.length() == 0) + out_op->data_included.erase(save); //Remove this empty interval + else + save.set_len(bit.length()); + // Remove any other intervals present + while (p != out_op->data_included.end()) { + interval_set::iterator save = p++; + out_op->data_included.erase(save); + } + new_progress.data_complete = true; + out_op->data.claim_append(bit); + break; + } + out_op->data.claim_append(bit); + } + + if (new_progress.is_complete(recovery_info)) { + new_progress.data_complete = true; + if (stat) + stat->num_objects_recovered++; + } + + if (stat) { + stat->num_keys_recovered += out_op->omap_entries.size(); + stat->num_bytes_recovered += out_op->data.length(); + } + + get_parent()->get_logger()->inc(l_osd_push); + get_parent()->get_logger()->inc(l_osd_push_outb, out_op->data.length()); + + // send + out_op->version = v; + out_op->soid = recovery_info.soid; + out_op->recovery_info = recovery_info; + out_op->after_progress = new_progress; + out_op->before_progress = progress; + return 0; +} + +void ReplicatedBackend::prep_push_op_blank(const hobject_t& soid, PushOp *op) +{ + op->recovery_info.version = eversion_t(); + op->version = eversion_t(); + op->soid = soid; +} + +bool ReplicatedBackend::handle_push_reply( + pg_shard_t peer, const PushReplyOp &op, PushOp *reply) +{ + const hobject_t &soid = op.soid; + if (pushing.count(soid) == 0) { + dout(10) << "huh, i wasn't pushing " << soid << " to osd." << peer + << ", or anybody else" + << dendl; + return false; + } else if (pushing[soid].count(peer) == 0) { + dout(10) << "huh, i wasn't pushing " << soid << " to osd." << peer + << dendl; + return false; + } else { + PushInfo *pi = &pushing[soid][peer]; + bool error = pushing[soid].begin()->second.recovery_progress.error; + + if (!pi->recovery_progress.data_complete && !error) { + dout(10) << " pushing more from, " + << pi->recovery_progress.data_recovered_to + << " of " << pi->recovery_info.copy_subset << dendl; + ObjectRecoveryProgress new_progress; + int r = build_push_op( + pi->recovery_info, + pi->recovery_progress, &new_progress, reply, + &(pi->stat)); + // Handle the case of a read error right after we wrote, which is + // hopefuilly extremely rare. + if (r < 0) { + dout(5) << __func__ << ": oid " << soid << " error " << r << dendl; + + error = true; + goto done; + } + pi->recovery_progress = new_progress; + return true; + } else { + // done! +done: + if (!error) + get_parent()->on_peer_recover( peer, soid, pi->recovery_info); + + get_parent()->release_locks(pi->lock_manager); + object_stat_sum_t stat = pi->stat; + eversion_t v = pi->recovery_info.version; + pushing[soid].erase(peer); + pi = NULL; + + if (pushing[soid].empty()) { + if (!error) + get_parent()->on_global_recover(soid, stat, false); + else + get_parent()->on_primary_error(soid, v); + pushing.erase(soid); + } else { + // This looks weird, but we erased the current peer and need to remember + // the error on any other one, while getting more acks. + if (error) + pushing[soid].begin()->second.recovery_progress.error = true; + dout(10) << "pushed " << soid << ", still waiting for push ack from " + << pushing[soid].size() << " others" << dendl; + } + return false; + } + } +} + +void ReplicatedBackend::handle_pull(pg_shard_t peer, PullOp &op, PushOp *reply) +{ + const hobject_t &soid = op.soid; + struct stat st; + int r = store->stat(ch, ghobject_t(soid), &st); + if (r != 0) { + get_parent()->clog_error() << get_info().pgid << " " + << peer << " tried to pull " << soid + << " but got " << cpp_strerror(-r); + prep_push_op_blank(soid, reply); + } else { + ObjectRecoveryInfo &recovery_info = op.recovery_info; + ObjectRecoveryProgress &progress = op.recovery_progress; + if (progress.first && recovery_info.size == ((uint64_t)-1)) { + // Adjust size and copy_subset + recovery_info.size = st.st_size; + recovery_info.copy_subset.clear(); + if (st.st_size) + recovery_info.copy_subset.insert(0, st.st_size); + assert(recovery_info.clone_subset.empty()); + } + + r = build_push_op(recovery_info, progress, 0, reply); + if (r < 0) + prep_push_op_blank(soid, reply); + } +} + +/** + * trim received data to remove what we don't want + * + * @param copy_subset intervals we want + * @param data_included intervals we got + * @param data_recieved data we got + * @param intervals_usable intervals we want to keep + * @param data_usable matching data we want to keep + */ +void ReplicatedBackend::trim_pushed_data( + const interval_set ©_subset, + const interval_set &intervals_received, + bufferlist data_received, + interval_set *intervals_usable, + bufferlist *data_usable) +{ + if (intervals_received.subset_of(copy_subset)) { + *intervals_usable = intervals_received; + *data_usable = data_received; + return; + } + + intervals_usable->intersection_of(copy_subset, + intervals_received); + + uint64_t off = 0; + for (interval_set::const_iterator p = intervals_received.begin(); + p != intervals_received.end(); + ++p) { + interval_set x; + x.insert(p.get_start(), p.get_len()); + x.intersection_of(copy_subset); + for (interval_set::const_iterator q = x.begin(); + q != x.end(); + ++q) { + bufferlist sub; + uint64_t data_off = off + (q.get_start() - p.get_start()); + sub.substr_of(data_received, data_off, q.get_len()); + data_usable->claim_append(sub); + } + off += p.get_len(); + } +} + +void ReplicatedBackend::_failed_pull(pg_shard_t from, const hobject_t &soid) +{ + dout(20) << __func__ << ": " << soid << " from " << from << dendl; + list fl = { from }; + get_parent()->failed_push(fl, soid); + + clear_pull(pulling.find(soid)); +} + +void ReplicatedBackend::clear_pull_from( + map::iterator piter) +{ + auto from = piter->second.from; + pull_from_peer[from].erase(piter->second.soid); + if (pull_from_peer[from].empty()) + pull_from_peer.erase(from); +} + +void ReplicatedBackend::clear_pull( + map::iterator piter, + bool clear_pull_from_peer) +{ + if (clear_pull_from_peer) { + clear_pull_from(piter); + } + get_parent()->release_locks(piter->second.lock_manager); + pulling.erase(piter); +} + +int ReplicatedBackend::start_pushes( + const hobject_t &soid, + ObjectContextRef obc, + RPGHandle *h) +{ + list< map::const_iterator > shards; + + dout(20) << __func__ << " soid " << soid << dendl; + // who needs it? + assert(get_parent()->get_actingbackfill_shards().size() > 0); + for (set::iterator i = + get_parent()->get_actingbackfill_shards().begin(); + i != get_parent()->get_actingbackfill_shards().end(); + ++i) { + if (*i == get_parent()->whoami_shard()) continue; + pg_shard_t peer = *i; + map::const_iterator j = + get_parent()->get_shard_missing().find(peer); + assert(j != get_parent()->get_shard_missing().end()); + if (j->second.is_missing(soid)) { + shards.push_back(j); + } + } + + // If more than 1 read will occur ignore possible request to not cache + bool cache = shards.size() == 1 ? h->cache_dont_need : false; + + for (auto j : shards) { + pg_shard_t peer = j->first; + h->pushes[peer].push_back(PushOp()); + int r = prep_push_to_replica(obc, soid, peer, + &(h->pushes[peer].back()), cache); + if (r < 0) { + // Back out all failed reads + for (auto k : shards) { + pg_shard_t p = k->first; + dout(10) << __func__ << " clean up peer " << p << dendl; + h->pushes[p].pop_back(); + if (p == peer) break; + } + return r; + } + } + return shards.size(); +}