X-Git-Url: https://gerrit.opnfv.org/gerrit/gitweb?a=blobdiff_plain;f=src%2Fceph%2Fsrc%2Fosd%2FReplicatedBackend.cc;fp=src%2Fceph%2Fsrc%2Fosd%2FReplicatedBackend.cc;h=0000000000000000000000000000000000000000;hb=7da45d65be36d36b880cc55c5036e96c24b53f00;hp=081204a033fd91f4885f18d57b5ead56129a7d20;hpb=691462d09d0987b47e112d6ee8740375df3c51b2;p=stor4nfv.git diff --git a/src/ceph/src/osd/ReplicatedBackend.cc b/src/ceph/src/osd/ReplicatedBackend.cc deleted file mode 100644 index 081204a..0000000 --- a/src/ceph/src/osd/ReplicatedBackend.cc +++ /dev/null @@ -1,2309 +0,0 @@ -// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- -// vim: ts=8 sw=2 smarttab -/* - * Ceph - scalable distributed file system - * - * Copyright (C) 2013 Inktank Storage, Inc. - * - * This is free software; you can redistribute it and/or - * modify it under the terms of the GNU Lesser General Public - * License version 2.1, as published by the Free Software - * Foundation. See file COPYING. - * - */ -#include "common/errno.h" -#include "ReplicatedBackend.h" -#include "messages/MOSDOp.h" -#include "messages/MOSDSubOp.h" -#include "messages/MOSDRepOp.h" -#include "messages/MOSDSubOpReply.h" -#include "messages/MOSDRepOpReply.h" -#include "messages/MOSDPGPush.h" -#include "messages/MOSDPGPull.h" -#include "messages/MOSDPGPushReply.h" -#include "common/EventTrace.h" - -#define dout_context cct -#define dout_subsys ceph_subsys_osd -#define DOUT_PREFIX_ARGS this -#undef dout_prefix -#define dout_prefix _prefix(_dout, this) -static ostream& _prefix(std::ostream *_dout, ReplicatedBackend *pgb) { - return *_dout << pgb->get_parent()->gen_dbg_prefix(); -} - -namespace { -class PG_SendMessageOnConn: public Context { - PGBackend::Listener *pg; - Message *reply; - ConnectionRef conn; - public: - PG_SendMessageOnConn( - PGBackend::Listener *pg, - Message *reply, - ConnectionRef conn) : pg(pg), reply(reply), conn(conn) {} - void finish(int) override { - pg->send_message_osd_cluster(reply, conn.get()); - } -}; - -class PG_RecoveryQueueAsync : public Context { - PGBackend::Listener *pg; - unique_ptr> c; - public: - PG_RecoveryQueueAsync( - PGBackend::Listener *pg, - GenContext *c) : pg(pg), c(c) {} - void finish(int) override { - pg->schedule_recovery_work(c.release()); - } -}; -} - -struct ReplicatedBackend::C_OSD_RepModifyApply : public Context { - ReplicatedBackend *pg; - RepModifyRef rm; - C_OSD_RepModifyApply(ReplicatedBackend *pg, RepModifyRef r) - : pg(pg), rm(r) {} - void finish(int r) override { - pg->repop_applied(rm); - } -}; - -struct ReplicatedBackend::C_OSD_RepModifyCommit : public Context { - ReplicatedBackend *pg; - RepModifyRef rm; - C_OSD_RepModifyCommit(ReplicatedBackend *pg, RepModifyRef r) - : pg(pg), rm(r) {} - void finish(int r) override { - pg->repop_commit(rm); - } -}; - -static void log_subop_stats( - PerfCounters *logger, - OpRequestRef op, int subop) -{ - utime_t now = ceph_clock_now(); - utime_t latency = now; - latency -= op->get_req()->get_recv_stamp(); - - - logger->inc(l_osd_sop); - logger->tinc(l_osd_sop_lat, latency); - logger->inc(subop); - - if (subop != l_osd_sop_pull) { - uint64_t inb = op->get_req()->get_data().length(); - logger->inc(l_osd_sop_inb, inb); - if (subop == l_osd_sop_w) { - logger->inc(l_osd_sop_w_inb, inb); - logger->tinc(l_osd_sop_w_lat, latency); - } else if (subop == l_osd_sop_push) { - logger->inc(l_osd_sop_push_inb, inb); - logger->tinc(l_osd_sop_push_lat, latency); - } else - assert("no support subop" == 0); - } else { - logger->tinc(l_osd_sop_pull_lat, latency); - } -} - -ReplicatedBackend::ReplicatedBackend( - PGBackend::Listener *pg, - coll_t coll, - ObjectStore::CollectionHandle &c, - ObjectStore *store, - CephContext *cct) : - PGBackend(cct, pg, store, coll, c) {} - -void ReplicatedBackend::run_recovery_op( - PGBackend::RecoveryHandle *_h, - int priority) -{ - RPGHandle *h = static_cast(_h); - send_pushes(priority, h->pushes); - send_pulls(priority, h->pulls); - send_recovery_deletes(priority, h->deletes); - delete h; -} - -int ReplicatedBackend::recover_object( - const hobject_t &hoid, - eversion_t v, - ObjectContextRef head, - ObjectContextRef obc, - RecoveryHandle *_h - ) -{ - dout(10) << __func__ << ": " << hoid << dendl; - RPGHandle *h = static_cast(_h); - if (get_parent()->get_local_missing().is_missing(hoid)) { - assert(!obc); - // pull - prepare_pull( - v, - hoid, - head, - h); - } else { - assert(obc); - int started = start_pushes( - hoid, - obc, - h); - if (started < 0) { - pushing[hoid].clear(); - return started; - } - } - return 0; -} - -void ReplicatedBackend::check_recovery_sources(const OSDMapRef& osdmap) -{ - for(map >::iterator i = pull_from_peer.begin(); - i != pull_from_peer.end(); - ) { - if (osdmap->is_down(i->first.osd)) { - dout(10) << "check_recovery_sources resetting pulls from osd." << i->first - << ", osdmap has it marked down" << dendl; - for (set::iterator j = i->second.begin(); - j != i->second.end(); - ++j) { - get_parent()->cancel_pull(*j); - clear_pull(pulling.find(*j), false); - } - pull_from_peer.erase(i++); - } else { - ++i; - } - } -} - -bool ReplicatedBackend::can_handle_while_inactive(OpRequestRef op) -{ - dout(10) << __func__ << ": " << op << dendl; - switch (op->get_req()->get_type()) { - case MSG_OSD_PG_PULL: - return true; - default: - return false; - } -} - -bool ReplicatedBackend::_handle_message( - OpRequestRef op - ) -{ - dout(10) << __func__ << ": " << op << dendl; - switch (op->get_req()->get_type()) { - case MSG_OSD_PG_PUSH: - do_push(op); - return true; - - case MSG_OSD_PG_PULL: - do_pull(op); - return true; - - case MSG_OSD_PG_PUSH_REPLY: - do_push_reply(op); - return true; - - case MSG_OSD_SUBOP: { - const MOSDSubOp *m = static_cast(op->get_req()); - if (m->ops.size() == 0) { - assert(0); - } - break; - } - - case MSG_OSD_REPOP: { - do_repop(op); - return true; - } - - case MSG_OSD_REPOPREPLY: { - do_repop_reply(op); - return true; - } - - default: - break; - } - return false; -} - -void ReplicatedBackend::clear_recovery_state() -{ - // clear pushing/pulling maps - for (auto &&i: pushing) { - for (auto &&j: i.second) { - get_parent()->release_locks(j.second.lock_manager); - } - } - pushing.clear(); - - for (auto &&i: pulling) { - get_parent()->release_locks(i.second.lock_manager); - } - pulling.clear(); - pull_from_peer.clear(); -} - -void ReplicatedBackend::on_change() -{ - dout(10) << __func__ << dendl; - for (map::iterator i = in_progress_ops.begin(); - i != in_progress_ops.end(); - in_progress_ops.erase(i++)) { - if (i->second.on_commit) - delete i->second.on_commit; - if (i->second.on_applied) - delete i->second.on_applied; - } - clear_recovery_state(); -} - -void ReplicatedBackend::on_flushed() -{ -} - -int ReplicatedBackend::objects_read_sync( - const hobject_t &hoid, - uint64_t off, - uint64_t len, - uint32_t op_flags, - bufferlist *bl) -{ - return store->read(ch, ghobject_t(hoid), off, len, *bl, op_flags); -} - -struct AsyncReadCallback : public GenContext { - int r; - Context *c; - AsyncReadCallback(int r, Context *c) : r(r), c(c) {} - void finish(ThreadPool::TPHandle&) override { - c->complete(r); - c = NULL; - } - ~AsyncReadCallback() override { - delete c; - } -}; -void ReplicatedBackend::objects_read_async( - const hobject_t &hoid, - const list, - pair > > &to_read, - Context *on_complete, - bool fast_read) -{ - // There is no fast read implementation for replication backend yet - assert(!fast_read); - - int r = 0; - for (list, - pair > >::const_iterator i = - to_read.begin(); - i != to_read.end() && r >= 0; - ++i) { - int _r = store->read(ch, ghobject_t(hoid), i->first.get<0>(), - i->first.get<1>(), *(i->second.first), - i->first.get<2>()); - if (i->second.second) { - get_parent()->schedule_recovery_work( - get_parent()->bless_gencontext( - new AsyncReadCallback(_r, i->second.second))); - } - if (_r < 0) - r = _r; - } - get_parent()->schedule_recovery_work( - get_parent()->bless_gencontext( - new AsyncReadCallback(r, on_complete))); -} - -class C_OSD_OnOpCommit : public Context { - ReplicatedBackend *pg; - ReplicatedBackend::InProgressOp *op; -public: - C_OSD_OnOpCommit(ReplicatedBackend *pg, ReplicatedBackend::InProgressOp *op) - : pg(pg), op(op) {} - void finish(int) override { - pg->op_commit(op); - } -}; - -class C_OSD_OnOpApplied : public Context { - ReplicatedBackend *pg; - ReplicatedBackend::InProgressOp *op; -public: - C_OSD_OnOpApplied(ReplicatedBackend *pg, ReplicatedBackend::InProgressOp *op) - : pg(pg), op(op) {} - void finish(int) override { - pg->op_applied(op); - } -}; - -void generate_transaction( - PGTransactionUPtr &pgt, - const coll_t &coll, - bool legacy_log_entries, - vector &log_entries, - ObjectStore::Transaction *t, - set *added, - set *removed) -{ - assert(t); - assert(added); - assert(removed); - - for (auto &&le: log_entries) { - le.mark_unrollbackable(); - auto oiter = pgt->op_map.find(le.soid); - if (oiter != pgt->op_map.end() && oiter->second.updated_snaps) { - bufferlist bl(oiter->second.updated_snaps->second.size() * 8 + 8); - ::encode(oiter->second.updated_snaps->second, bl); - le.snaps.swap(bl); - le.snaps.reassign_to_mempool(mempool::mempool_osd_pglog); - } - } - - pgt->safe_create_traverse( - [&](pair &obj_op) { - const hobject_t &oid = obj_op.first; - const ghobject_t goid = - ghobject_t(oid, ghobject_t::NO_GEN, shard_id_t::NO_SHARD); - const PGTransaction::ObjectOperation &op = obj_op.second; - - if (oid.is_temp()) { - if (op.is_fresh_object()) { - added->insert(oid); - } else if (op.is_delete()) { - removed->insert(oid); - } - } - - if (op.delete_first) { - t->remove(coll, goid); - } - - match( - op.init_type, - [&](const PGTransaction::ObjectOperation::Init::None &) { - }, - [&](const PGTransaction::ObjectOperation::Init::Create &op) { - t->touch(coll, goid); - }, - [&](const PGTransaction::ObjectOperation::Init::Clone &op) { - t->clone( - coll, - ghobject_t( - op.source, ghobject_t::NO_GEN, shard_id_t::NO_SHARD), - goid); - }, - [&](const PGTransaction::ObjectOperation::Init::Rename &op) { - assert(op.source.is_temp()); - t->collection_move_rename( - coll, - ghobject_t( - op.source, ghobject_t::NO_GEN, shard_id_t::NO_SHARD), - coll, - goid); - }); - - if (op.truncate) { - t->truncate(coll, goid, op.truncate->first); - if (op.truncate->first != op.truncate->second) - t->truncate(coll, goid, op.truncate->second); - } - - if (!op.attr_updates.empty()) { - map attrs; - for (auto &&p: op.attr_updates) { - if (p.second) - attrs[p.first] = *(p.second); - else - t->rmattr(coll, goid, p.first); - } - t->setattrs(coll, goid, attrs); - } - - if (op.clear_omap) - t->omap_clear(coll, goid); - if (op.omap_header) - t->omap_setheader(coll, goid, *(op.omap_header)); - - for (auto &&up: op.omap_updates) { - using UpdateType = PGTransaction::ObjectOperation::OmapUpdateType; - switch (up.first) { - case UpdateType::Remove: - t->omap_rmkeys(coll, goid, up.second); - break; - case UpdateType::Insert: - t->omap_setkeys(coll, goid, up.second); - break; - } - } - - // updated_snaps doesn't matter since we marked unrollbackable - - if (op.alloc_hint) { - auto &hint = *(op.alloc_hint); - t->set_alloc_hint( - coll, - goid, - hint.expected_object_size, - hint.expected_write_size, - hint.flags); - } - - for (auto &&extent: op.buffer_updates) { - using BufferUpdate = PGTransaction::ObjectOperation::BufferUpdate; - match( - extent.get_val(), - [&](const BufferUpdate::Write &op) { - t->write( - coll, - goid, - extent.get_off(), - extent.get_len(), - op.buffer); - }, - [&](const BufferUpdate::Zero &op) { - t->zero( - coll, - goid, - extent.get_off(), - extent.get_len()); - }, - [&](const BufferUpdate::CloneRange &op) { - assert(op.len == extent.get_len()); - t->clone_range( - coll, - ghobject_t(op.from, ghobject_t::NO_GEN, shard_id_t::NO_SHARD), - goid, - op.offset, - extent.get_len(), - extent.get_off()); - }); - } - }); -} - -void ReplicatedBackend::submit_transaction( - const hobject_t &soid, - const object_stat_sum_t &delta_stats, - const eversion_t &at_version, - PGTransactionUPtr &&_t, - const eversion_t &trim_to, - const eversion_t &roll_forward_to, - const vector &_log_entries, - boost::optional &hset_history, - Context *on_local_applied_sync, - Context *on_all_acked, - Context *on_all_commit, - ceph_tid_t tid, - osd_reqid_t reqid, - OpRequestRef orig_op) -{ - parent->apply_stats( - soid, - delta_stats); - - vector log_entries(_log_entries); - ObjectStore::Transaction op_t; - PGTransactionUPtr t(std::move(_t)); - set added, removed; - generate_transaction( - t, - coll, - (get_osdmap()->require_osd_release < CEPH_RELEASE_KRAKEN), - log_entries, - &op_t, - &added, - &removed); - assert(added.size() <= 1); - assert(removed.size() <= 1); - - assert(!in_progress_ops.count(tid)); - InProgressOp &op = in_progress_ops.insert( - make_pair( - tid, - InProgressOp( - tid, on_all_commit, on_all_acked, - orig_op, at_version) - ) - ).first->second; - - op.waiting_for_applied.insert( - parent->get_actingbackfill_shards().begin(), - parent->get_actingbackfill_shards().end()); - op.waiting_for_commit.insert( - parent->get_actingbackfill_shards().begin(), - parent->get_actingbackfill_shards().end()); - - issue_op( - soid, - at_version, - tid, - reqid, - trim_to, - at_version, - added.size() ? *(added.begin()) : hobject_t(), - removed.size() ? *(removed.begin()) : hobject_t(), - log_entries, - hset_history, - &op, - op_t); - - add_temp_objs(added); - clear_temp_objs(removed); - - parent->log_operation( - log_entries, - hset_history, - trim_to, - at_version, - true, - op_t); - - op_t.register_on_applied_sync(on_local_applied_sync); - op_t.register_on_applied( - parent->bless_context( - new C_OSD_OnOpApplied(this, &op))); - op_t.register_on_commit( - parent->bless_context( - new C_OSD_OnOpCommit(this, &op))); - - vector tls; - tls.push_back(std::move(op_t)); - - parent->queue_transactions(tls, op.op); -} - -void ReplicatedBackend::op_applied( - InProgressOp *op) -{ - FUNCTRACE(); - OID_EVENT_TRACE_WITH_MSG((op && op->op) ? op->op->get_req() : NULL, "OP_APPLIED_BEGIN", true); - dout(10) << __func__ << ": " << op->tid << dendl; - if (op->op) { - op->op->mark_event("op_applied"); - op->op->pg_trace.event("op applied"); - } - - op->waiting_for_applied.erase(get_parent()->whoami_shard()); - parent->op_applied(op->v); - - if (op->waiting_for_applied.empty()) { - op->on_applied->complete(0); - op->on_applied = 0; - } - if (op->done()) { - assert(!op->on_commit && !op->on_applied); - in_progress_ops.erase(op->tid); - } -} - -void ReplicatedBackend::op_commit( - InProgressOp *op) -{ - FUNCTRACE(); - OID_EVENT_TRACE_WITH_MSG((op && op->op) ? op->op->get_req() : NULL, "OP_COMMIT_BEGIN", true); - dout(10) << __func__ << ": " << op->tid << dendl; - if (op->op) { - op->op->mark_event("op_commit"); - op->op->pg_trace.event("op commit"); - } - - op->waiting_for_commit.erase(get_parent()->whoami_shard()); - - if (op->waiting_for_commit.empty()) { - op->on_commit->complete(0); - op->on_commit = 0; - } - if (op->done()) { - assert(!op->on_commit && !op->on_applied); - in_progress_ops.erase(op->tid); - } -} - -void ReplicatedBackend::do_repop_reply(OpRequestRef op) -{ - static_cast(op->get_nonconst_req())->finish_decode(); - const MOSDRepOpReply *r = static_cast(op->get_req()); - assert(r->get_header().type == MSG_OSD_REPOPREPLY); - - op->mark_started(); - - // must be replication. - ceph_tid_t rep_tid = r->get_tid(); - pg_shard_t from = r->from; - - if (in_progress_ops.count(rep_tid)) { - map::iterator iter = - in_progress_ops.find(rep_tid); - InProgressOp &ip_op = iter->second; - const MOSDOp *m = NULL; - if (ip_op.op) - m = static_cast(ip_op.op->get_req()); - - if (m) - dout(7) << __func__ << ": tid " << ip_op.tid << " op " //<< *m - << " ack_type " << (int)r->ack_type - << " from " << from - << dendl; - else - dout(7) << __func__ << ": tid " << ip_op.tid << " (no op) " - << " ack_type " << (int)r->ack_type - << " from " << from - << dendl; - - // oh, good. - - if (r->ack_type & CEPH_OSD_FLAG_ONDISK) { - assert(ip_op.waiting_for_commit.count(from)); - ip_op.waiting_for_commit.erase(from); - if (ip_op.op) { - ostringstream ss; - ss << "sub_op_commit_rec from " << from; - ip_op.op->mark_event_string(ss.str()); - ip_op.op->pg_trace.event("sub_op_commit_rec"); - } - } else { - assert(ip_op.waiting_for_applied.count(from)); - if (ip_op.op) { - ostringstream ss; - ss << "sub_op_applied_rec from " << from; - ip_op.op->mark_event_string(ss.str()); - ip_op.op->pg_trace.event("sub_op_applied_rec"); - } - } - ip_op.waiting_for_applied.erase(from); - - parent->update_peer_last_complete_ondisk( - from, - r->get_last_complete_ondisk()); - - if (ip_op.waiting_for_applied.empty() && - ip_op.on_applied) { - ip_op.on_applied->complete(0); - ip_op.on_applied = 0; - } - if (ip_op.waiting_for_commit.empty() && - ip_op.on_commit) { - ip_op.on_commit->complete(0); - ip_op.on_commit= 0; - } - if (ip_op.done()) { - assert(!ip_op.on_commit && !ip_op.on_applied); - in_progress_ops.erase(iter); - } - } -} - -void ReplicatedBackend::be_deep_scrub( - const hobject_t &poid, - uint32_t seed, - ScrubMap::object &o, - ThreadPool::TPHandle &handle) -{ - dout(10) << __func__ << " " << poid << " seed " - << std::hex << seed << std::dec << dendl; - bufferhash h(seed), oh(seed); - bufferlist bl, hdrbl; - int r; - __u64 pos = 0; - - uint32_t fadvise_flags = CEPH_OSD_OP_FLAG_FADVISE_SEQUENTIAL | CEPH_OSD_OP_FLAG_FADVISE_DONTNEED; - - while (true) { - handle.reset_tp_timeout(); - r = store->read( - ch, - ghobject_t( - poid, ghobject_t::NO_GEN, get_parent()->whoami_shard().shard), - pos, - cct->_conf->osd_deep_scrub_stride, bl, - fadvise_flags); - if (r <= 0) - break; - - h << bl; - pos += bl.length(); - bl.clear(); - } - if (r == -EIO) { - dout(25) << __func__ << " " << poid << " got " - << r << " on read, read_error" << dendl; - o.read_error = true; - return; - } - o.digest = h.digest(); - o.digest_present = true; - - bl.clear(); - r = store->omap_get_header( - coll, - ghobject_t( - poid, ghobject_t::NO_GEN, get_parent()->whoami_shard().shard), - &hdrbl, true); - // NOTE: bobtail to giant, we would crc the head as (len, head). - // that changes at the same time we start using a non-zero seed. - if (r == 0 && hdrbl.length()) { - dout(25) << "CRC header " << string(hdrbl.c_str(), hdrbl.length()) - << dendl; - if (seed == 0) { - // legacy - bufferlist bl; - ::encode(hdrbl, bl); - oh << bl; - } else { - oh << hdrbl; - } - } else if (r == -EIO) { - dout(25) << __func__ << " " << poid << " got " - << r << " on omap header read, read_error" << dendl; - o.read_error = true; - return; - } - - ObjectMap::ObjectMapIterator iter = store->get_omap_iterator( - coll, - ghobject_t( - poid, ghobject_t::NO_GEN, get_parent()->whoami_shard().shard)); - assert(iter); - for (iter->seek_to_first(); iter->status() == 0 && iter->valid(); - iter->next(false)) { - handle.reset_tp_timeout(); - - dout(25) << "CRC key " << iter->key() << " value:\n"; - iter->value().hexdump(*_dout); - *_dout << dendl; - - ::encode(iter->key(), bl); - ::encode(iter->value(), bl); - oh << bl; - bl.clear(); - } - - if (iter->status() < 0) { - dout(25) << __func__ << " " << poid - << " on omap scan, db status error" << dendl; - o.read_error = true; - return; - } - - //Store final calculated CRC32 of omap header & key/values - o.omap_digest = oh.digest(); - o.omap_digest_present = true; - dout(20) << __func__ << " " << poid << " omap_digest " - << std::hex << o.omap_digest << std::dec << dendl; -} - -void ReplicatedBackend::_do_push(OpRequestRef op) -{ - const MOSDPGPush *m = static_cast(op->get_req()); - assert(m->get_type() == MSG_OSD_PG_PUSH); - pg_shard_t from = m->from; - - op->mark_started(); - - vector replies; - ObjectStore::Transaction t; - ostringstream ss; - if (get_parent()->check_failsafe_full(ss)) { - dout(10) << __func__ << " Out of space (failsafe) processing push request: " << ss.str() << dendl; - ceph_abort(); - } - for (vector::const_iterator i = m->pushes.begin(); - i != m->pushes.end(); - ++i) { - replies.push_back(PushReplyOp()); - handle_push(from, *i, &(replies.back()), &t); - } - - MOSDPGPushReply *reply = new MOSDPGPushReply; - reply->from = get_parent()->whoami_shard(); - reply->set_priority(m->get_priority()); - reply->pgid = get_info().pgid; - reply->map_epoch = m->map_epoch; - reply->min_epoch = m->min_epoch; - reply->replies.swap(replies); - reply->compute_cost(cct); - - t.register_on_complete( - new PG_SendMessageOnConn( - get_parent(), reply, m->get_connection())); - - get_parent()->queue_transaction(std::move(t)); -} - -struct C_ReplicatedBackend_OnPullComplete : GenContext { - ReplicatedBackend *bc; - list to_continue; - int priority; - C_ReplicatedBackend_OnPullComplete(ReplicatedBackend *bc, int priority) - : bc(bc), priority(priority) {} - - void finish(ThreadPool::TPHandle &handle) override { - ReplicatedBackend::RPGHandle *h = bc->_open_recovery_op(); - for (auto &&i: to_continue) { - auto j = bc->pulling.find(i.hoid); - assert(j != bc->pulling.end()); - ObjectContextRef obc = j->second.obc; - bc->clear_pull(j, false /* already did it */); - int started = bc->start_pushes(i.hoid, obc, h); - if (started < 0) { - bc->pushing[i.hoid].clear(); - bc->get_parent()->primary_failed(i.hoid); - bc->get_parent()->primary_error(i.hoid, obc->obs.oi.version); - } else if (!started) { - bc->get_parent()->on_global_recover( - i.hoid, i.stat, false); - } - handle.reset_tp_timeout(); - } - bc->run_recovery_op(h, priority); - } -}; - -void ReplicatedBackend::_do_pull_response(OpRequestRef op) -{ - const MOSDPGPush *m = static_cast(op->get_req()); - assert(m->get_type() == MSG_OSD_PG_PUSH); - pg_shard_t from = m->from; - - op->mark_started(); - - vector replies(1); - - ostringstream ss; - if (get_parent()->check_failsafe_full(ss)) { - dout(10) << __func__ << " Out of space (failsafe) processing pull response (push): " << ss.str() << dendl; - ceph_abort(); - } - - ObjectStore::Transaction t; - list to_continue; - for (vector::const_iterator i = m->pushes.begin(); - i != m->pushes.end(); - ++i) { - bool more = handle_pull_response(from, *i, &(replies.back()), &to_continue, &t); - if (more) - replies.push_back(PullOp()); - } - if (!to_continue.empty()) { - C_ReplicatedBackend_OnPullComplete *c = - new C_ReplicatedBackend_OnPullComplete( - this, - m->get_priority()); - c->to_continue.swap(to_continue); - t.register_on_complete( - new PG_RecoveryQueueAsync( - get_parent(), - get_parent()->bless_gencontext(c))); - } - replies.erase(replies.end() - 1); - - if (replies.size()) { - MOSDPGPull *reply = new MOSDPGPull; - reply->from = parent->whoami_shard(); - reply->set_priority(m->get_priority()); - reply->pgid = get_info().pgid; - reply->map_epoch = m->map_epoch; - reply->min_epoch = m->min_epoch; - reply->set_pulls(&replies); - reply->compute_cost(cct); - - t.register_on_complete( - new PG_SendMessageOnConn( - get_parent(), reply, m->get_connection())); - } - - get_parent()->queue_transaction(std::move(t)); -} - -void ReplicatedBackend::do_pull(OpRequestRef op) -{ - MOSDPGPull *m = static_cast(op->get_nonconst_req()); - assert(m->get_type() == MSG_OSD_PG_PULL); - pg_shard_t from = m->from; - - map > replies; - vector pulls; - m->take_pulls(&pulls); - for (auto& i : pulls) { - replies[from].push_back(PushOp()); - handle_pull(from, i, &(replies[from].back())); - } - send_pushes(m->get_priority(), replies); -} - -void ReplicatedBackend::do_push_reply(OpRequestRef op) -{ - const MOSDPGPushReply *m = static_cast(op->get_req()); - assert(m->get_type() == MSG_OSD_PG_PUSH_REPLY); - pg_shard_t from = m->from; - - vector replies(1); - for (vector::const_iterator i = m->replies.begin(); - i != m->replies.end(); - ++i) { - bool more = handle_push_reply(from, *i, &(replies.back())); - if (more) - replies.push_back(PushOp()); - } - replies.erase(replies.end() - 1); - - map > _replies; - _replies[from].swap(replies); - send_pushes(m->get_priority(), _replies); -} - -Message * ReplicatedBackend::generate_subop( - const hobject_t &soid, - const eversion_t &at_version, - ceph_tid_t tid, - osd_reqid_t reqid, - eversion_t pg_trim_to, - eversion_t pg_roll_forward_to, - hobject_t new_temp_oid, - hobject_t discard_temp_oid, - const vector &log_entries, - boost::optional &hset_hist, - ObjectStore::Transaction &op_t, - pg_shard_t peer, - const pg_info_t &pinfo) -{ - int acks_wanted = CEPH_OSD_FLAG_ACK | CEPH_OSD_FLAG_ONDISK; - // forward the write/update/whatever - MOSDRepOp *wr = new MOSDRepOp( - reqid, parent->whoami_shard(), - spg_t(get_info().pgid.pgid, peer.shard), - soid, acks_wanted, - get_osdmap()->get_epoch(), - parent->get_last_peering_reset_epoch(), - tid, at_version); - - // ship resulting transaction, log entries, and pg_stats - if (!parent->should_send_op(peer, soid)) { - dout(10) << "issue_repop shipping empty opt to osd." << peer - <<", object " << soid - << " beyond MAX(last_backfill_started " - << ", pinfo.last_backfill " - << pinfo.last_backfill << ")" << dendl; - ObjectStore::Transaction t; - ::encode(t, wr->get_data()); - } else { - ::encode(op_t, wr->get_data()); - wr->get_header().data_off = op_t.get_data_alignment(); - } - - ::encode(log_entries, wr->logbl); - - if (pinfo.is_incomplete()) - wr->pg_stats = pinfo.stats; // reflects backfill progress - else - wr->pg_stats = get_info().stats; - - wr->pg_trim_to = pg_trim_to; - wr->pg_roll_forward_to = pg_roll_forward_to; - - wr->new_temp_oid = new_temp_oid; - wr->discard_temp_oid = discard_temp_oid; - wr->updated_hit_set_history = hset_hist; - return wr; -} - -void ReplicatedBackend::issue_op( - const hobject_t &soid, - const eversion_t &at_version, - ceph_tid_t tid, - osd_reqid_t reqid, - eversion_t pg_trim_to, - eversion_t pg_roll_forward_to, - hobject_t new_temp_oid, - hobject_t discard_temp_oid, - const vector &log_entries, - boost::optional &hset_hist, - InProgressOp *op, - ObjectStore::Transaction &op_t) -{ - if (op->op) - op->op->pg_trace.event("issue replication ops"); - - if (parent->get_actingbackfill_shards().size() > 1) { - ostringstream ss; - set replicas = parent->get_actingbackfill_shards(); - replicas.erase(parent->whoami_shard()); - ss << "waiting for subops from " << replicas; - if (op->op) - op->op->mark_sub_op_sent(ss.str()); - } - for (set::const_iterator i = - parent->get_actingbackfill_shards().begin(); - i != parent->get_actingbackfill_shards().end(); - ++i) { - if (*i == parent->whoami_shard()) continue; - pg_shard_t peer = *i; - const pg_info_t &pinfo = parent->get_shard_info().find(peer)->second; - - Message *wr; - wr = generate_subop( - soid, - at_version, - tid, - reqid, - pg_trim_to, - pg_roll_forward_to, - new_temp_oid, - discard_temp_oid, - log_entries, - hset_hist, - op_t, - peer, - pinfo); - if (op->op) - wr->trace.init("replicated op", nullptr, &op->op->pg_trace); - get_parent()->send_message_osd_cluster( - peer.osd, wr, get_osdmap()->get_epoch()); - } -} - -// sub op modify -void ReplicatedBackend::do_repop(OpRequestRef op) -{ - static_cast(op->get_nonconst_req())->finish_decode(); - const MOSDRepOp *m = static_cast(op->get_req()); - int msg_type = m->get_type(); - assert(MSG_OSD_REPOP == msg_type); - - const hobject_t& soid = m->poid; - - dout(10) << __func__ << " " << soid - << " v " << m->version - << (m->logbl.length() ? " (transaction)" : " (parallel exec") - << " " << m->logbl.length() - << dendl; - - // sanity checks - assert(m->map_epoch >= get_info().history.same_interval_since); - - // we better not be missing this. - assert(!parent->get_log().get_missing().is_missing(soid)); - - int ackerosd = m->get_source().num(); - - op->mark_started(); - - RepModifyRef rm(std::make_shared()); - rm->op = op; - rm->ackerosd = ackerosd; - rm->last_complete = get_info().last_complete; - rm->epoch_started = get_osdmap()->get_epoch(); - - assert(m->logbl.length()); - // shipped transaction and log entries - vector log; - - bufferlist::iterator p = const_cast(m->get_data()).begin(); - ::decode(rm->opt, p); - - if (m->new_temp_oid != hobject_t()) { - dout(20) << __func__ << " start tracking temp " << m->new_temp_oid << dendl; - add_temp_obj(m->new_temp_oid); - } - if (m->discard_temp_oid != hobject_t()) { - dout(20) << __func__ << " stop tracking temp " << m->discard_temp_oid << dendl; - if (rm->opt.empty()) { - dout(10) << __func__ << ": removing object " << m->discard_temp_oid - << " since we won't get the transaction" << dendl; - rm->localt.remove(coll, ghobject_t(m->discard_temp_oid)); - } - clear_temp_obj(m->discard_temp_oid); - } - - p = const_cast(m->logbl).begin(); - ::decode(log, p); - rm->opt.set_fadvise_flag(CEPH_OSD_OP_FLAG_FADVISE_DONTNEED); - - bool update_snaps = false; - if (!rm->opt.empty()) { - // If the opt is non-empty, we infer we are before - // last_backfill (according to the primary, not our - // not-quite-accurate value), and should update the - // collections now. Otherwise, we do it later on push. - update_snaps = true; - } - parent->update_stats(m->pg_stats); - parent->log_operation( - log, - m->updated_hit_set_history, - m->pg_trim_to, - m->pg_roll_forward_to, - update_snaps, - rm->localt); - - rm->opt.register_on_commit( - parent->bless_context( - new C_OSD_RepModifyCommit(this, rm))); - rm->localt.register_on_applied( - parent->bless_context( - new C_OSD_RepModifyApply(this, rm))); - vector tls; - tls.reserve(2); - tls.push_back(std::move(rm->localt)); - tls.push_back(std::move(rm->opt)); - parent->queue_transactions(tls, op); - // op is cleaned up by oncommit/onapply when both are executed -} - -void ReplicatedBackend::repop_applied(RepModifyRef rm) -{ - rm->op->mark_event("sub_op_applied"); - rm->applied = true; - rm->op->pg_trace.event("sup_op_applied"); - - dout(10) << __func__ << " on " << rm << " op " - << *rm->op->get_req() << dendl; - const Message *m = rm->op->get_req(); - const MOSDRepOp *req = static_cast(m); - eversion_t version = req->version; - - // send ack to acker only if we haven't sent a commit already - if (!rm->committed) { - Message *ack = new MOSDRepOpReply( - req, parent->whoami_shard(), - 0, get_osdmap()->get_epoch(), req->min_epoch, CEPH_OSD_FLAG_ACK); - ack->set_priority(CEPH_MSG_PRIO_HIGH); // this better match commit priority! - ack->trace = rm->op->pg_trace; - get_parent()->send_message_osd_cluster( - rm->ackerosd, ack, get_osdmap()->get_epoch()); - } - - parent->op_applied(version); -} - -void ReplicatedBackend::repop_commit(RepModifyRef rm) -{ - rm->op->mark_commit_sent(); - rm->op->pg_trace.event("sup_op_commit"); - rm->committed = true; - - // send commit. - const MOSDRepOp *m = static_cast(rm->op->get_req()); - assert(m->get_type() == MSG_OSD_REPOP); - dout(10) << __func__ << " on op " << *m - << ", sending commit to osd." << rm->ackerosd - << dendl; - assert(get_osdmap()->is_up(rm->ackerosd)); - - get_parent()->update_last_complete_ondisk(rm->last_complete); - - MOSDRepOpReply *reply = new MOSDRepOpReply( - m, - get_parent()->whoami_shard(), - 0, get_osdmap()->get_epoch(), m->get_min_epoch(), CEPH_OSD_FLAG_ONDISK); - reply->set_last_complete_ondisk(rm->last_complete); - reply->set_priority(CEPH_MSG_PRIO_HIGH); // this better match ack priority! - reply->trace = rm->op->pg_trace; - get_parent()->send_message_osd_cluster( - rm->ackerosd, reply, get_osdmap()->get_epoch()); - - log_subop_stats(get_parent()->get_logger(), rm->op, l_osd_sop_w); -} - - -// =========================================================== - -void ReplicatedBackend::calc_head_subsets( - ObjectContextRef obc, SnapSet& snapset, const hobject_t& head, - const pg_missing_t& missing, - const hobject_t &last_backfill, - interval_set& data_subset, - map>& clone_subsets, - ObcLockManager &manager) -{ - dout(10) << "calc_head_subsets " << head - << " clone_overlap " << snapset.clone_overlap << dendl; - - uint64_t size = obc->obs.oi.size; - if (size) - data_subset.insert(0, size); - - if (get_parent()->get_pool().allow_incomplete_clones()) { - dout(10) << __func__ << ": caching (was) enabled, skipping clone subsets" << dendl; - return; - } - - if (!cct->_conf->osd_recover_clone_overlap) { - dout(10) << "calc_head_subsets " << head << " -- osd_recover_clone_overlap disabled" << dendl; - return; - } - - - interval_set cloning; - interval_set prev; - if (size) - prev.insert(0, size); - - for (int j=snapset.clones.size()-1; j>=0; j--) { - hobject_t c = head; - c.snap = snapset.clones[j]; - prev.intersection_of(snapset.clone_overlap[snapset.clones[j]]); - if (!missing.is_missing(c) && - c < last_backfill && - get_parent()->try_lock_for_read(c, manager)) { - dout(10) << "calc_head_subsets " << head << " has prev " << c - << " overlap " << prev << dendl; - clone_subsets[c] = prev; - cloning.union_of(prev); - break; - } - dout(10) << "calc_head_subsets " << head << " does not have prev " << c - << " overlap " << prev << dendl; - } - - - if (cloning.num_intervals() > cct->_conf->osd_recover_clone_overlap_limit) { - dout(10) << "skipping clone, too many holes" << dendl; - get_parent()->release_locks(manager); - clone_subsets.clear(); - cloning.clear(); - } - - // what's left for us to push? - data_subset.subtract(cloning); - - dout(10) << "calc_head_subsets " << head - << " data_subset " << data_subset - << " clone_subsets " << clone_subsets << dendl; -} - -void ReplicatedBackend::calc_clone_subsets( - SnapSet& snapset, const hobject_t& soid, - const pg_missing_t& missing, - const hobject_t &last_backfill, - interval_set& data_subset, - map>& clone_subsets, - ObcLockManager &manager) -{ - dout(10) << "calc_clone_subsets " << soid - << " clone_overlap " << snapset.clone_overlap << dendl; - - uint64_t size = snapset.clone_size[soid.snap]; - if (size) - data_subset.insert(0, size); - - if (get_parent()->get_pool().allow_incomplete_clones()) { - dout(10) << __func__ << ": caching (was) enabled, skipping clone subsets" << dendl; - return; - } - - if (!cct->_conf->osd_recover_clone_overlap) { - dout(10) << "calc_clone_subsets " << soid << " -- osd_recover_clone_overlap disabled" << dendl; - return; - } - - unsigned i; - for (i=0; i < snapset.clones.size(); i++) - if (snapset.clones[i] == soid.snap) - break; - - // any overlap with next older clone? - interval_set cloning; - interval_set prev; - if (size) - prev.insert(0, size); - for (int j=i-1; j>=0; j--) { - hobject_t c = soid; - c.snap = snapset.clones[j]; - prev.intersection_of(snapset.clone_overlap[snapset.clones[j]]); - if (!missing.is_missing(c) && - c < last_backfill && - get_parent()->try_lock_for_read(c, manager)) { - dout(10) << "calc_clone_subsets " << soid << " has prev " << c - << " overlap " << prev << dendl; - clone_subsets[c] = prev; - cloning.union_of(prev); - break; - } - dout(10) << "calc_clone_subsets " << soid << " does not have prev " << c - << " overlap " << prev << dendl; - } - - // overlap with next newest? - interval_set next; - if (size) - next.insert(0, size); - for (unsigned j=i+1; jtry_lock_for_read(c, manager)) { - dout(10) << "calc_clone_subsets " << soid << " has next " << c - << " overlap " << next << dendl; - clone_subsets[c] = next; - cloning.union_of(next); - break; - } - dout(10) << "calc_clone_subsets " << soid << " does not have next " << c - << " overlap " << next << dendl; - } - - if (cloning.num_intervals() > cct->_conf->osd_recover_clone_overlap_limit) { - dout(10) << "skipping clone, too many holes" << dendl; - get_parent()->release_locks(manager); - clone_subsets.clear(); - cloning.clear(); - } - - - // what's left for us to push? - data_subset.subtract(cloning); - - dout(10) << "calc_clone_subsets " << soid - << " data_subset " << data_subset - << " clone_subsets " << clone_subsets << dendl; -} - -void ReplicatedBackend::prepare_pull( - eversion_t v, - const hobject_t& soid, - ObjectContextRef headctx, - RPGHandle *h) -{ - assert(get_parent()->get_local_missing().get_items().count(soid)); - eversion_t _v = get_parent()->get_local_missing().get_items().find( - soid)->second.need; - assert(_v == v); - const map> &missing_loc( - get_parent()->get_missing_loc_shards()); - const map &peer_missing( - get_parent()->get_shard_missing()); - map>::const_iterator q = missing_loc.find(soid); - assert(q != missing_loc.end()); - assert(!q->second.empty()); - - // pick a pullee - vector shuffle(q->second.begin(), q->second.end()); - random_shuffle(shuffle.begin(), shuffle.end()); - vector::iterator p = shuffle.begin(); - assert(get_osdmap()->is_up(p->osd)); - pg_shard_t fromshard = *p; - - dout(7) << "pull " << soid - << " v " << v - << " on osds " << q->second - << " from osd." << fromshard - << dendl; - - assert(peer_missing.count(fromshard)); - const pg_missing_t &pmissing = peer_missing.find(fromshard)->second; - if (pmissing.is_missing(soid, v)) { - assert(pmissing.get_items().find(soid)->second.have != v); - dout(10) << "pulling soid " << soid << " from osd " << fromshard - << " at version " << pmissing.get_items().find(soid)->second.have - << " rather than at version " << v << dendl; - v = pmissing.get_items().find(soid)->second.have; - assert(get_parent()->get_log().get_log().objects.count(soid) && - (get_parent()->get_log().get_log().objects.find(soid)->second->op == - pg_log_entry_t::LOST_REVERT) && - (get_parent()->get_log().get_log().objects.find( - soid)->second->reverting_to == - v)); - } - - ObjectRecoveryInfo recovery_info; - ObcLockManager lock_manager; - - if (soid.is_snap()) { - assert(!get_parent()->get_local_missing().is_missing( - soid.get_head()) || - !get_parent()->get_local_missing().is_missing( - soid.get_snapdir())); - assert(headctx); - // check snapset - SnapSetContext *ssc = headctx->ssc; - assert(ssc); - dout(10) << " snapset " << ssc->snapset << dendl; - recovery_info.ss = ssc->snapset; - calc_clone_subsets( - ssc->snapset, soid, get_parent()->get_local_missing(), - get_info().last_backfill, - recovery_info.copy_subset, - recovery_info.clone_subset, - lock_manager); - // FIXME: this may overestimate if we are pulling multiple clones in parallel... - dout(10) << " pulling " << recovery_info << dendl; - - assert(ssc->snapset.clone_size.count(soid.snap)); - recovery_info.size = ssc->snapset.clone_size[soid.snap]; - } else { - // pulling head or unversioned object. - // always pull the whole thing. - recovery_info.copy_subset.insert(0, (uint64_t)-1); - recovery_info.size = ((uint64_t)-1); - } - - h->pulls[fromshard].push_back(PullOp()); - PullOp &op = h->pulls[fromshard].back(); - op.soid = soid; - - op.recovery_info = recovery_info; - op.recovery_info.soid = soid; - op.recovery_info.version = v; - op.recovery_progress.data_complete = false; - op.recovery_progress.omap_complete = false; - op.recovery_progress.data_recovered_to = 0; - op.recovery_progress.first = true; - - assert(!pulling.count(soid)); - pull_from_peer[fromshard].insert(soid); - PullInfo &pi = pulling[soid]; - pi.from = fromshard; - pi.soid = soid; - pi.head_ctx = headctx; - pi.recovery_info = op.recovery_info; - pi.recovery_progress = op.recovery_progress; - pi.cache_dont_need = h->cache_dont_need; - pi.lock_manager = std::move(lock_manager); -} - -/* - * intelligently push an object to a replica. make use of existing - * clones/heads and dup data ranges where possible. - */ -int ReplicatedBackend::prep_push_to_replica( - ObjectContextRef obc, const hobject_t& soid, pg_shard_t peer, - PushOp *pop, bool cache_dont_need) -{ - const object_info_t& oi = obc->obs.oi; - uint64_t size = obc->obs.oi.size; - - dout(10) << __func__ << ": " << soid << " v" << oi.version - << " size " << size << " to osd." << peer << dendl; - - map> clone_subsets; - interval_set data_subset; - - ObcLockManager lock_manager; - // are we doing a clone on the replica? - if (soid.snap && soid.snap < CEPH_NOSNAP) { - hobject_t head = soid; - head.snap = CEPH_NOSNAP; - - // try to base push off of clones that succeed/preceed poid - // we need the head (and current SnapSet) locally to do that. - if (get_parent()->get_local_missing().is_missing(head)) { - dout(15) << "push_to_replica missing head " << head << ", pushing raw clone" << dendl; - return prep_push(obc, soid, peer, pop, cache_dont_need); - } - hobject_t snapdir = head; - snapdir.snap = CEPH_SNAPDIR; - if (get_parent()->get_local_missing().is_missing(snapdir)) { - dout(15) << "push_to_replica missing snapdir " << snapdir - << ", pushing raw clone" << dendl; - return prep_push(obc, soid, peer, pop, cache_dont_need); - } - - SnapSetContext *ssc = obc->ssc; - assert(ssc); - dout(15) << "push_to_replica snapset is " << ssc->snapset << dendl; - pop->recovery_info.ss = ssc->snapset; - map::const_iterator pm = - get_parent()->get_shard_missing().find(peer); - assert(pm != get_parent()->get_shard_missing().end()); - map::const_iterator pi = - get_parent()->get_shard_info().find(peer); - assert(pi != get_parent()->get_shard_info().end()); - calc_clone_subsets( - ssc->snapset, soid, - pm->second, - pi->second.last_backfill, - data_subset, clone_subsets, - lock_manager); - } else if (soid.snap == CEPH_NOSNAP) { - // pushing head or unversioned object. - // base this on partially on replica's clones? - SnapSetContext *ssc = obc->ssc; - assert(ssc); - dout(15) << "push_to_replica snapset is " << ssc->snapset << dendl; - calc_head_subsets( - obc, - ssc->snapset, soid, get_parent()->get_shard_missing().find(peer)->second, - get_parent()->get_shard_info().find(peer)->second.last_backfill, - data_subset, clone_subsets, - lock_manager); - } - - return prep_push( - obc, - soid, - peer, - oi.version, - data_subset, - clone_subsets, - pop, - cache_dont_need, - std::move(lock_manager)); -} - -int ReplicatedBackend::prep_push(ObjectContextRef obc, - const hobject_t& soid, pg_shard_t peer, - PushOp *pop, bool cache_dont_need) -{ - interval_set data_subset; - if (obc->obs.oi.size) - data_subset.insert(0, obc->obs.oi.size); - map> clone_subsets; - - return prep_push(obc, soid, peer, - obc->obs.oi.version, data_subset, clone_subsets, - pop, cache_dont_need, ObcLockManager()); -} - -int ReplicatedBackend::prep_push( - ObjectContextRef obc, - const hobject_t& soid, pg_shard_t peer, - eversion_t version, - interval_set &data_subset, - map>& clone_subsets, - PushOp *pop, - bool cache_dont_need, - ObcLockManager &&lock_manager) -{ - get_parent()->begin_peer_recover(peer, soid); - // take note. - PushInfo &pi = pushing[soid][peer]; - pi.obc = obc; - pi.recovery_info.size = obc->obs.oi.size; - pi.recovery_info.copy_subset = data_subset; - pi.recovery_info.clone_subset = clone_subsets; - pi.recovery_info.soid = soid; - pi.recovery_info.oi = obc->obs.oi; - pi.recovery_info.ss = pop->recovery_info.ss; - pi.recovery_info.version = version; - pi.lock_manager = std::move(lock_manager); - - ObjectRecoveryProgress new_progress; - int r = build_push_op(pi.recovery_info, - pi.recovery_progress, - &new_progress, - pop, - &(pi.stat), cache_dont_need); - if (r < 0) - return r; - pi.recovery_progress = new_progress; - return 0; -} - -void ReplicatedBackend::submit_push_data( - const ObjectRecoveryInfo &recovery_info, - bool first, - bool complete, - bool cache_dont_need, - const interval_set &intervals_included, - bufferlist data_included, - bufferlist omap_header, - const map &attrs, - const map &omap_entries, - ObjectStore::Transaction *t) -{ - hobject_t target_oid; - if (first && complete) { - target_oid = recovery_info.soid; - } else { - target_oid = get_parent()->get_temp_recovery_object(recovery_info.soid, - recovery_info.version); - if (first) { - dout(10) << __func__ << ": Adding oid " - << target_oid << " in the temp collection" << dendl; - add_temp_obj(target_oid); - } - } - - if (first) { - t->remove(coll, ghobject_t(target_oid)); - t->touch(coll, ghobject_t(target_oid)); - t->truncate(coll, ghobject_t(target_oid), recovery_info.size); - if (omap_header.length()) - t->omap_setheader(coll, ghobject_t(target_oid), omap_header); - - bufferlist bv = attrs.at(OI_ATTR); - object_info_t oi(bv); - t->set_alloc_hint(coll, ghobject_t(target_oid), - oi.expected_object_size, - oi.expected_write_size, - oi.alloc_hint_flags); - } - uint64_t off = 0; - uint32_t fadvise_flags = CEPH_OSD_OP_FLAG_FADVISE_SEQUENTIAL; - if (cache_dont_need) - fadvise_flags |= CEPH_OSD_OP_FLAG_FADVISE_DONTNEED; - for (interval_set::const_iterator p = intervals_included.begin(); - p != intervals_included.end(); - ++p) { - bufferlist bit; - bit.substr_of(data_included, off, p.get_len()); - t->write(coll, ghobject_t(target_oid), - p.get_start(), p.get_len(), bit, fadvise_flags); - off += p.get_len(); - } - - if (!omap_entries.empty()) - t->omap_setkeys(coll, ghobject_t(target_oid), omap_entries); - if (!attrs.empty()) - t->setattrs(coll, ghobject_t(target_oid), attrs); - - if (complete) { - if (!first) { - dout(10) << __func__ << ": Removing oid " - << target_oid << " from the temp collection" << dendl; - clear_temp_obj(target_oid); - t->remove(coll, ghobject_t(recovery_info.soid)); - t->collection_move_rename(coll, ghobject_t(target_oid), - coll, ghobject_t(recovery_info.soid)); - } - - submit_push_complete(recovery_info, t); - } -} - -void ReplicatedBackend::submit_push_complete( - const ObjectRecoveryInfo &recovery_info, - ObjectStore::Transaction *t) -{ - for (map>::const_iterator p = - recovery_info.clone_subset.begin(); - p != recovery_info.clone_subset.end(); - ++p) { - for (interval_set::const_iterator q = p->second.begin(); - q != p->second.end(); - ++q) { - dout(15) << " clone_range " << p->first << " " - << q.get_start() << "~" << q.get_len() << dendl; - t->clone_range(coll, ghobject_t(p->first), ghobject_t(recovery_info.soid), - q.get_start(), q.get_len(), q.get_start()); - } - } -} - -ObjectRecoveryInfo ReplicatedBackend::recalc_subsets( - const ObjectRecoveryInfo& recovery_info, - SnapSetContext *ssc, - ObcLockManager &manager) -{ - if (!recovery_info.soid.snap || recovery_info.soid.snap >= CEPH_NOSNAP) - return recovery_info; - ObjectRecoveryInfo new_info = recovery_info; - new_info.copy_subset.clear(); - new_info.clone_subset.clear(); - assert(ssc); - get_parent()->release_locks(manager); // might already have locks - calc_clone_subsets( - ssc->snapset, new_info.soid, get_parent()->get_local_missing(), - get_info().last_backfill, - new_info.copy_subset, new_info.clone_subset, - manager); - return new_info; -} - -bool ReplicatedBackend::handle_pull_response( - pg_shard_t from, const PushOp &pop, PullOp *response, - list *to_continue, - ObjectStore::Transaction *t) -{ - interval_set data_included = pop.data_included; - bufferlist data; - data = pop.data; - dout(10) << "handle_pull_response " - << pop.recovery_info - << pop.after_progress - << " data.size() is " << data.length() - << " data_included: " << data_included - << dendl; - if (pop.version == eversion_t()) { - // replica doesn't have it! - _failed_pull(from, pop.soid); - return false; - } - - const hobject_t &hoid = pop.soid; - assert((data_included.empty() && data.length() == 0) || - (!data_included.empty() && data.length() > 0)); - - auto piter = pulling.find(hoid); - if (piter == pulling.end()) { - return false; - } - - PullInfo &pi = piter->second; - if (pi.recovery_info.size == (uint64_t(-1))) { - pi.recovery_info.size = pop.recovery_info.size; - pi.recovery_info.copy_subset.intersection_of( - pop.recovery_info.copy_subset); - } - // If primary doesn't have object info and didn't know version - if (pi.recovery_info.version == eversion_t()) { - pi.recovery_info.version = pop.version; - } - - bool first = pi.recovery_progress.first; - if (first) { - // attrs only reference the origin bufferlist (decode from - // MOSDPGPush message) whose size is much greater than attrs in - // recovery. If obc cache it (get_obc maybe cache the attr), this - // causes the whole origin bufferlist would not be free until obc - // is evicted from obc cache. So rebuild the bufferlists before - // cache it. - auto attrset = pop.attrset; - for (auto& a : attrset) { - a.second.rebuild(); - } - pi.obc = get_parent()->get_obc(pi.recovery_info.soid, attrset); - pi.recovery_info.oi = pi.obc->obs.oi; - pi.recovery_info = recalc_subsets( - pi.recovery_info, - pi.obc->ssc, - pi.lock_manager); - } - - - interval_set usable_intervals; - bufferlist usable_data; - trim_pushed_data(pi.recovery_info.copy_subset, - data_included, - data, - &usable_intervals, - &usable_data); - data_included = usable_intervals; - data.claim(usable_data); - - - pi.recovery_progress = pop.after_progress; - - dout(10) << "new recovery_info " << pi.recovery_info - << ", new progress " << pi.recovery_progress - << dendl; - - bool complete = pi.is_complete(); - - submit_push_data(pi.recovery_info, first, - complete, pi.cache_dont_need, - data_included, data, - pop.omap_header, - pop.attrset, - pop.omap_entries, - t); - - pi.stat.num_keys_recovered += pop.omap_entries.size(); - pi.stat.num_bytes_recovered += data.length(); - - if (complete) { - pi.stat.num_objects_recovered++; - clear_pull_from(piter); - to_continue->push_back({hoid, pi.stat}); - get_parent()->on_local_recover( - hoid, pi.recovery_info, pi.obc, false, t); - return false; - } else { - response->soid = pop.soid; - response->recovery_info = pi.recovery_info; - response->recovery_progress = pi.recovery_progress; - return true; - } -} - -void ReplicatedBackend::handle_push( - pg_shard_t from, const PushOp &pop, PushReplyOp *response, - ObjectStore::Transaction *t) -{ - dout(10) << "handle_push " - << pop.recovery_info - << pop.after_progress - << dendl; - bufferlist data; - data = pop.data; - bool first = pop.before_progress.first; - bool complete = pop.after_progress.data_complete && - pop.after_progress.omap_complete; - - response->soid = pop.recovery_info.soid; - submit_push_data(pop.recovery_info, - first, - complete, - true, // must be replicate - pop.data_included, - data, - pop.omap_header, - pop.attrset, - pop.omap_entries, - t); - - if (complete) - get_parent()->on_local_recover( - pop.recovery_info.soid, - pop.recovery_info, - ObjectContextRef(), // ok, is replica - false, - t); -} - -void ReplicatedBackend::send_pushes(int prio, map > &pushes) -{ - for (map >::iterator i = pushes.begin(); - i != pushes.end(); - ++i) { - ConnectionRef con = get_parent()->get_con_osd_cluster( - i->first.osd, - get_osdmap()->get_epoch()); - if (!con) - continue; - vector::iterator j = i->second.begin(); - while (j != i->second.end()) { - uint64_t cost = 0; - uint64_t pushes = 0; - MOSDPGPush *msg = new MOSDPGPush(); - msg->from = get_parent()->whoami_shard(); - msg->pgid = get_parent()->primary_spg_t(); - msg->map_epoch = get_osdmap()->get_epoch(); - msg->min_epoch = get_parent()->get_last_peering_reset_epoch(); - msg->set_priority(prio); - for (; - (j != i->second.end() && - cost < cct->_conf->osd_max_push_cost && - pushes < cct->_conf->osd_max_push_objects) ; - ++j) { - dout(20) << __func__ << ": sending push " << *j - << " to osd." << i->first << dendl; - cost += j->cost(cct); - pushes += 1; - msg->pushes.push_back(*j); - } - msg->set_cost(cost); - get_parent()->send_message_osd_cluster(msg, con); - } - } -} - -void ReplicatedBackend::send_pulls(int prio, map > &pulls) -{ - for (map >::iterator i = pulls.begin(); - i != pulls.end(); - ++i) { - ConnectionRef con = get_parent()->get_con_osd_cluster( - i->first.osd, - get_osdmap()->get_epoch()); - if (!con) - continue; - dout(20) << __func__ << ": sending pulls " << i->second - << " to osd." << i->first << dendl; - MOSDPGPull *msg = new MOSDPGPull(); - msg->from = parent->whoami_shard(); - msg->set_priority(prio); - msg->pgid = get_parent()->primary_spg_t(); - msg->map_epoch = get_osdmap()->get_epoch(); - msg->min_epoch = get_parent()->get_last_peering_reset_epoch(); - msg->set_pulls(&i->second); - msg->compute_cost(cct); - get_parent()->send_message_osd_cluster(msg, con); - } -} - -int ReplicatedBackend::build_push_op(const ObjectRecoveryInfo &recovery_info, - const ObjectRecoveryProgress &progress, - ObjectRecoveryProgress *out_progress, - PushOp *out_op, - object_stat_sum_t *stat, - bool cache_dont_need) -{ - ObjectRecoveryProgress _new_progress; - if (!out_progress) - out_progress = &_new_progress; - ObjectRecoveryProgress &new_progress = *out_progress; - new_progress = progress; - - dout(7) << __func__ << " " << recovery_info.soid - << " v " << recovery_info.version - << " size " << recovery_info.size - << " recovery_info: " << recovery_info - << dendl; - - eversion_t v = recovery_info.version; - if (progress.first) { - int r = store->omap_get_header(coll, ghobject_t(recovery_info.soid), &out_op->omap_header); - if(r < 0) { - dout(1) << __func__ << " get omap header failed: " << cpp_strerror(-r) << dendl; - return r; - } - r = store->getattrs(ch, ghobject_t(recovery_info.soid), out_op->attrset); - if(r < 0) { - dout(1) << __func__ << " getattrs failed: " << cpp_strerror(-r) << dendl; - return r; - } - - // Debug - bufferlist bv = out_op->attrset[OI_ATTR]; - object_info_t oi; - try { - bufferlist::iterator bliter = bv.begin(); - ::decode(oi, bliter); - } catch (...) { - dout(0) << __func__ << ": bad object_info_t: " << recovery_info.soid << dendl; - return -EINVAL; - } - - // If requestor didn't know the version, use ours - if (v == eversion_t()) { - v = oi.version; - } else if (oi.version != v) { - get_parent()->clog_error() << get_info().pgid << " push " - << recovery_info.soid << " v " - << recovery_info.version - << " failed because local copy is " - << oi.version; - return -EINVAL; - } - - new_progress.first = false; - } - // Once we provide the version subsequent requests will have it, so - // at this point it must be known. - assert(v != eversion_t()); - - uint64_t available = cct->_conf->osd_recovery_max_chunk; - if (!progress.omap_complete) { - ObjectMap::ObjectMapIterator iter = - store->get_omap_iterator(coll, - ghobject_t(recovery_info.soid)); - assert(iter); - for (iter->lower_bound(progress.omap_recovered_to); - iter->valid(); - iter->next(false)) { - if (!out_op->omap_entries.empty() && - ((cct->_conf->osd_recovery_max_omap_entries_per_chunk > 0 && - out_op->omap_entries.size() >= cct->_conf->osd_recovery_max_omap_entries_per_chunk) || - available <= iter->key().size() + iter->value().length())) - break; - out_op->omap_entries.insert(make_pair(iter->key(), iter->value())); - - if ((iter->key().size() + iter->value().length()) <= available) - available -= (iter->key().size() + iter->value().length()); - else - available = 0; - } - if (!iter->valid()) - new_progress.omap_complete = true; - else - new_progress.omap_recovered_to = iter->key(); - } - - if (available > 0) { - if (!recovery_info.copy_subset.empty()) { - interval_set copy_subset = recovery_info.copy_subset; - map m; - int r = store->fiemap(ch, ghobject_t(recovery_info.soid), 0, - copy_subset.range_end(), m); - if (r >= 0) { - interval_set fiemap_included(m); - copy_subset.intersection_of(fiemap_included); - } else { - // intersection of copy_subset and empty interval_set would be empty anyway - copy_subset.clear(); - } - - out_op->data_included.span_of(copy_subset, progress.data_recovered_to, - available); - if (out_op->data_included.empty()) // zero filled section, skip to end! - new_progress.data_recovered_to = recovery_info.copy_subset.range_end(); - else - new_progress.data_recovered_to = out_op->data_included.range_end(); - } - } else { - out_op->data_included.clear(); - } - - for (interval_set::iterator p = out_op->data_included.begin(); - p != out_op->data_included.end(); - ++p) { - bufferlist bit; - int r = store->read(ch, ghobject_t(recovery_info.soid), - p.get_start(), p.get_len(), bit, - cache_dont_need ? CEPH_OSD_OP_FLAG_FADVISE_DONTNEED: 0); - if (cct->_conf->osd_debug_random_push_read_error && - (rand() % (int)(cct->_conf->osd_debug_random_push_read_error * 100.0)) == 0) { - dout(0) << __func__ << ": inject EIO " << recovery_info.soid << dendl; - r = -EIO; - } - if (r < 0) { - return r; - } - if (p.get_len() != bit.length()) { - dout(10) << " extent " << p.get_start() << "~" << p.get_len() - << " is actually " << p.get_start() << "~" << bit.length() - << dendl; - interval_set::iterator save = p++; - if (bit.length() == 0) - out_op->data_included.erase(save); //Remove this empty interval - else - save.set_len(bit.length()); - // Remove any other intervals present - while (p != out_op->data_included.end()) { - interval_set::iterator save = p++; - out_op->data_included.erase(save); - } - new_progress.data_complete = true; - out_op->data.claim_append(bit); - break; - } - out_op->data.claim_append(bit); - } - - if (new_progress.is_complete(recovery_info)) { - new_progress.data_complete = true; - if (stat) - stat->num_objects_recovered++; - } - - if (stat) { - stat->num_keys_recovered += out_op->omap_entries.size(); - stat->num_bytes_recovered += out_op->data.length(); - } - - get_parent()->get_logger()->inc(l_osd_push); - get_parent()->get_logger()->inc(l_osd_push_outb, out_op->data.length()); - - // send - out_op->version = v; - out_op->soid = recovery_info.soid; - out_op->recovery_info = recovery_info; - out_op->after_progress = new_progress; - out_op->before_progress = progress; - return 0; -} - -void ReplicatedBackend::prep_push_op_blank(const hobject_t& soid, PushOp *op) -{ - op->recovery_info.version = eversion_t(); - op->version = eversion_t(); - op->soid = soid; -} - -bool ReplicatedBackend::handle_push_reply( - pg_shard_t peer, const PushReplyOp &op, PushOp *reply) -{ - const hobject_t &soid = op.soid; - if (pushing.count(soid) == 0) { - dout(10) << "huh, i wasn't pushing " << soid << " to osd." << peer - << ", or anybody else" - << dendl; - return false; - } else if (pushing[soid].count(peer) == 0) { - dout(10) << "huh, i wasn't pushing " << soid << " to osd." << peer - << dendl; - return false; - } else { - PushInfo *pi = &pushing[soid][peer]; - bool error = pushing[soid].begin()->second.recovery_progress.error; - - if (!pi->recovery_progress.data_complete && !error) { - dout(10) << " pushing more from, " - << pi->recovery_progress.data_recovered_to - << " of " << pi->recovery_info.copy_subset << dendl; - ObjectRecoveryProgress new_progress; - int r = build_push_op( - pi->recovery_info, - pi->recovery_progress, &new_progress, reply, - &(pi->stat)); - // Handle the case of a read error right after we wrote, which is - // hopefuilly extremely rare. - if (r < 0) { - dout(5) << __func__ << ": oid " << soid << " error " << r << dendl; - - error = true; - goto done; - } - pi->recovery_progress = new_progress; - return true; - } else { - // done! -done: - if (!error) - get_parent()->on_peer_recover( peer, soid, pi->recovery_info); - - get_parent()->release_locks(pi->lock_manager); - object_stat_sum_t stat = pi->stat; - eversion_t v = pi->recovery_info.version; - pushing[soid].erase(peer); - pi = NULL; - - if (pushing[soid].empty()) { - if (!error) - get_parent()->on_global_recover(soid, stat, false); - else - get_parent()->on_primary_error(soid, v); - pushing.erase(soid); - } else { - // This looks weird, but we erased the current peer and need to remember - // the error on any other one, while getting more acks. - if (error) - pushing[soid].begin()->second.recovery_progress.error = true; - dout(10) << "pushed " << soid << ", still waiting for push ack from " - << pushing[soid].size() << " others" << dendl; - } - return false; - } - } -} - -void ReplicatedBackend::handle_pull(pg_shard_t peer, PullOp &op, PushOp *reply) -{ - const hobject_t &soid = op.soid; - struct stat st; - int r = store->stat(ch, ghobject_t(soid), &st); - if (r != 0) { - get_parent()->clog_error() << get_info().pgid << " " - << peer << " tried to pull " << soid - << " but got " << cpp_strerror(-r); - prep_push_op_blank(soid, reply); - } else { - ObjectRecoveryInfo &recovery_info = op.recovery_info; - ObjectRecoveryProgress &progress = op.recovery_progress; - if (progress.first && recovery_info.size == ((uint64_t)-1)) { - // Adjust size and copy_subset - recovery_info.size = st.st_size; - recovery_info.copy_subset.clear(); - if (st.st_size) - recovery_info.copy_subset.insert(0, st.st_size); - assert(recovery_info.clone_subset.empty()); - } - - r = build_push_op(recovery_info, progress, 0, reply); - if (r < 0) - prep_push_op_blank(soid, reply); - } -} - -/** - * trim received data to remove what we don't want - * - * @param copy_subset intervals we want - * @param data_included intervals we got - * @param data_recieved data we got - * @param intervals_usable intervals we want to keep - * @param data_usable matching data we want to keep - */ -void ReplicatedBackend::trim_pushed_data( - const interval_set ©_subset, - const interval_set &intervals_received, - bufferlist data_received, - interval_set *intervals_usable, - bufferlist *data_usable) -{ - if (intervals_received.subset_of(copy_subset)) { - *intervals_usable = intervals_received; - *data_usable = data_received; - return; - } - - intervals_usable->intersection_of(copy_subset, - intervals_received); - - uint64_t off = 0; - for (interval_set::const_iterator p = intervals_received.begin(); - p != intervals_received.end(); - ++p) { - interval_set x; - x.insert(p.get_start(), p.get_len()); - x.intersection_of(copy_subset); - for (interval_set::const_iterator q = x.begin(); - q != x.end(); - ++q) { - bufferlist sub; - uint64_t data_off = off + (q.get_start() - p.get_start()); - sub.substr_of(data_received, data_off, q.get_len()); - data_usable->claim_append(sub); - } - off += p.get_len(); - } -} - -void ReplicatedBackend::_failed_pull(pg_shard_t from, const hobject_t &soid) -{ - dout(20) << __func__ << ": " << soid << " from " << from << dendl; - list fl = { from }; - get_parent()->failed_push(fl, soid); - - clear_pull(pulling.find(soid)); -} - -void ReplicatedBackend::clear_pull_from( - map::iterator piter) -{ - auto from = piter->second.from; - pull_from_peer[from].erase(piter->second.soid); - if (pull_from_peer[from].empty()) - pull_from_peer.erase(from); -} - -void ReplicatedBackend::clear_pull( - map::iterator piter, - bool clear_pull_from_peer) -{ - if (clear_pull_from_peer) { - clear_pull_from(piter); - } - get_parent()->release_locks(piter->second.lock_manager); - pulling.erase(piter); -} - -int ReplicatedBackend::start_pushes( - const hobject_t &soid, - ObjectContextRef obc, - RPGHandle *h) -{ - list< map::const_iterator > shards; - - dout(20) << __func__ << " soid " << soid << dendl; - // who needs it? - assert(get_parent()->get_actingbackfill_shards().size() > 0); - for (set::iterator i = - get_parent()->get_actingbackfill_shards().begin(); - i != get_parent()->get_actingbackfill_shards().end(); - ++i) { - if (*i == get_parent()->whoami_shard()) continue; - pg_shard_t peer = *i; - map::const_iterator j = - get_parent()->get_shard_missing().find(peer); - assert(j != get_parent()->get_shard_missing().end()); - if (j->second.is_missing(soid)) { - shards.push_back(j); - } - } - - // If more than 1 read will occur ignore possible request to not cache - bool cache = shards.size() == 1 ? h->cache_dont_need : false; - - for (auto j : shards) { - pg_shard_t peer = j->first; - h->pushes[peer].push_back(PushOp()); - int r = prep_push_to_replica(obc, soid, peer, - &(h->pushes[peer].back()), cache); - if (r < 0) { - // Back out all failed reads - for (auto k : shards) { - pg_shard_t p = k->first; - dout(10) << __func__ << " clean up peer " << p << dendl; - h->pushes[p].pop_back(); - if (p == peer) break; - } - return r; - } - } - return shards.size(); -}