1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
4 * Ceph - scalable distributed file system
6 * Copyright (C) 2013 Inktank Storage, Inc.
8 * This is free software; you can redistribute it and/or
9 * modify it under the terms of the GNU Lesser General Public
10 * License version 2.1, as published by the Free Software
11 * Foundation. See file COPYING.
18 #include "ECBackend.h"
19 #include "messages/MOSDPGPush.h"
20 #include "messages/MOSDPGPushReply.h"
21 #include "messages/MOSDECSubOpWrite.h"
22 #include "messages/MOSDECSubOpWriteReply.h"
23 #include "messages/MOSDECSubOpRead.h"
24 #include "messages/MOSDECSubOpReadReply.h"
25 #include "ECMsgTypes.h"
27 #include "PrimaryLogPG.h"
29 #define dout_context cct
30 #define dout_subsys ceph_subsys_osd
31 #define DOUT_PREFIX_ARGS this
33 #define dout_prefix _prefix(_dout, this)
34 static ostream& _prefix(std::ostream *_dout, ECBackend *pgb) {
35 return *_dout << pgb->get_parent()->gen_dbg_prefix();
38 struct ECRecoveryHandle : public PGBackend::RecoveryHandle {
39 list<ECBackend::RecoveryOp> ops;
42 ostream &operator<<(ostream &lhs, const ECBackend::pipeline_state_t &rhs) {
43 switch (rhs.pipeline_state) {
44 case ECBackend::pipeline_state_t::CACHE_VALID:
45 return lhs << "CACHE_VALID";
46 case ECBackend::pipeline_state_t::CACHE_INVALID:
47 return lhs << "CACHE_INVALID";
49 assert(0 == "invalid pipeline state");
51 return lhs; // unreachable
54 static ostream &operator<<(ostream &lhs, const map<pg_shard_t, bufferlist> &rhs)
57 for (map<pg_shard_t, bufferlist>::const_iterator i = rhs.begin();
62 lhs << make_pair(i->first, i->second.length());
67 static ostream &operator<<(ostream &lhs, const map<int, bufferlist> &rhs)
70 for (map<int, bufferlist>::const_iterator i = rhs.begin();
75 lhs << make_pair(i->first, i->second.length());
80 static ostream &operator<<(
82 const boost::tuple<uint64_t, uint64_t, map<pg_shard_t, bufferlist> > &rhs)
84 return lhs << "(" << rhs.get<0>() << ", "
85 << rhs.get<1>() << ", " << rhs.get<2>() << ")";
88 ostream &operator<<(ostream &lhs, const ECBackend::read_request_t &rhs)
90 return lhs << "read_request_t(to_read=[" << rhs.to_read << "]"
91 << ", need=" << rhs.need
92 << ", want_attrs=" << rhs.want_attrs
96 ostream &operator<<(ostream &lhs, const ECBackend::read_result_t &rhs)
98 lhs << "read_result_t(r=" << rhs.r
99 << ", errors=" << rhs.errors;
101 lhs << ", attrs=" << rhs.attrs.get();
105 return lhs << ", returned=" << rhs.returned << ")";
108 ostream &operator<<(ostream &lhs, const ECBackend::ReadOp &rhs)
110 lhs << "ReadOp(tid=" << rhs.tid;
111 if (rhs.op && rhs.op->get_req()) {
113 rhs.op->get_req()->print(lhs);
115 return lhs << ", to_read=" << rhs.to_read
116 << ", complete=" << rhs.complete
117 << ", priority=" << rhs.priority
118 << ", obj_to_source=" << rhs.obj_to_source
119 << ", source_to_obj=" << rhs.source_to_obj
120 << ", in_progress=" << rhs.in_progress << ")";
123 void ECBackend::ReadOp::dump(Formatter *f) const
125 f->dump_unsigned("tid", tid);
126 if (op && op->get_req()) {
127 f->dump_stream("op") << *(op->get_req());
129 f->dump_stream("to_read") << to_read;
130 f->dump_stream("complete") << complete;
131 f->dump_int("priority", priority);
132 f->dump_stream("obj_to_source") << obj_to_source;
133 f->dump_stream("source_to_obj") << source_to_obj;
134 f->dump_stream("in_progress") << in_progress;
137 ostream &operator<<(ostream &lhs, const ECBackend::Op &rhs)
139 lhs << "Op(" << rhs.hoid
140 << " v=" << rhs.version
141 << " tt=" << rhs.trim_to
142 << " tid=" << rhs.tid
143 << " reqid=" << rhs.reqid;
144 if (rhs.client_op && rhs.client_op->get_req()) {
145 lhs << " client_op=";
146 rhs.client_op->get_req()->print(lhs);
148 lhs << " roll_forward_to=" << rhs.roll_forward_to
149 << " temp_added=" << rhs.temp_added
150 << " temp_cleared=" << rhs.temp_cleared
151 << " pending_read=" << rhs.pending_read
152 << " remote_read=" << rhs.remote_read
153 << " remote_read_result=" << rhs.remote_read_result
154 << " pending_apply=" << rhs.pending_apply
155 << " pending_commit=" << rhs.pending_commit
156 << " plan.to_read=" << rhs.plan.to_read
157 << " plan.will_write=" << rhs.plan.will_write
162 ostream &operator<<(ostream &lhs, const ECBackend::RecoveryOp &rhs)
164 return lhs << "RecoveryOp("
165 << "hoid=" << rhs.hoid
167 << " missing_on=" << rhs.missing_on
168 << " missing_on_shards=" << rhs.missing_on_shards
169 << " recovery_info=" << rhs.recovery_info
170 << " recovery_progress=" << rhs.recovery_progress
171 << " obc refcount=" << rhs.obc.use_count()
172 << " state=" << ECBackend::RecoveryOp::tostr(rhs.state)
173 << " waiting_on_pushes=" << rhs.waiting_on_pushes
174 << " extent_requested=" << rhs.extent_requested
178 void ECBackend::RecoveryOp::dump(Formatter *f) const
180 f->dump_stream("hoid") << hoid;
181 f->dump_stream("v") << v;
182 f->dump_stream("missing_on") << missing_on;
183 f->dump_stream("missing_on_shards") << missing_on_shards;
184 f->dump_stream("recovery_info") << recovery_info;
185 f->dump_stream("recovery_progress") << recovery_progress;
186 f->dump_stream("state") << tostr(state);
187 f->dump_stream("waiting_on_pushes") << waiting_on_pushes;
188 f->dump_stream("extent_requested") << extent_requested;
191 ECBackend::ECBackend(
192 PGBackend::Listener *pg,
194 ObjectStore::CollectionHandle &ch,
197 ErasureCodeInterfaceRef ec_impl,
198 uint64_t stripe_width)
199 : PGBackend(cct, pg, store, coll, ch),
201 sinfo(ec_impl->get_data_chunk_count(), stripe_width) {
202 assert((ec_impl->get_data_chunk_count() *
203 ec_impl->get_chunk_size(stripe_width)) == stripe_width);
206 PGBackend::RecoveryHandle *ECBackend::open_recovery_op()
208 return new ECRecoveryHandle;
211 void ECBackend::_failed_push(const hobject_t &hoid,
212 pair<RecoveryMessages *, ECBackend::read_result_t &> &in)
214 ECBackend::read_result_t &res = in.second;
215 dout(10) << __func__ << ": Read error " << hoid << " r="
216 << res.r << " errors=" << res.errors << dendl;
217 dout(10) << __func__ << ": canceling recovery op for obj " << hoid
219 assert(recovery_ops.count(hoid));
220 recovery_ops.erase(hoid);
223 for (auto&& i : res.errors) {
224 fl.push_back(i.first);
226 get_parent()->failed_push(fl, hoid);
229 struct OnRecoveryReadComplete :
230 public GenContext<pair<RecoveryMessages*, ECBackend::read_result_t& > &> {
234 OnRecoveryReadComplete(ECBackend *pg, const hobject_t &hoid)
235 : pg(pg), hoid(hoid) {}
236 void finish(pair<RecoveryMessages *, ECBackend::read_result_t &> &in) override {
237 ECBackend::read_result_t &res = in.second;
238 if (!(res.r == 0 && res.errors.empty())) {
239 pg->_failed_push(hoid, in);
242 assert(res.returned.size() == 1);
243 pg->handle_recovery_read_complete(
251 struct RecoveryMessages {
253 ECBackend::read_request_t> reads;
256 const hobject_t &hoid, uint64_t off, uint64_t len,
257 const set<pg_shard_t> &need,
259 list<boost::tuple<uint64_t, uint64_t, uint32_t> > to_read;
260 to_read.push_back(boost::make_tuple(off, len, 0));
261 assert(!reads.count(hoid));
265 ECBackend::read_request_t(
269 new OnRecoveryReadComplete(
274 map<pg_shard_t, vector<PushOp> > pushes;
275 map<pg_shard_t, vector<PushReplyOp> > push_replies;
276 ObjectStore::Transaction t;
277 RecoveryMessages() {}
278 ~RecoveryMessages(){}
281 void ECBackend::handle_recovery_push(
286 if (get_parent()->check_failsafe_full(ss)) {
287 dout(10) << __func__ << " Out of space (failsafe) processing push request: " << ss.str() << dendl;
291 bool oneshot = op.before_progress.first && op.after_progress.data_complete;
294 tobj = ghobject_t(op.soid, ghobject_t::NO_GEN,
295 get_parent()->whoami_shard().shard);
297 tobj = ghobject_t(get_parent()->get_temp_recovery_object(op.soid,
300 get_parent()->whoami_shard().shard);
301 if (op.before_progress.first) {
302 dout(10) << __func__ << ": Adding oid "
303 << tobj.hobj << " in the temp collection" << dendl;
304 add_temp_obj(tobj.hobj);
308 if (op.before_progress.first) {
309 m->t.remove(coll, tobj);
310 m->t.touch(coll, tobj);
313 if (!op.data_included.empty()) {
314 uint64_t start = op.data_included.range_start();
315 uint64_t end = op.data_included.range_end();
316 assert(op.data.length() == (end - start));
325 assert(op.data.length() == 0);
328 if (op.before_progress.first) {
329 assert(op.attrset.count(string("_")));
336 if (op.after_progress.data_complete && !oneshot) {
337 dout(10) << __func__ << ": Removing oid "
338 << tobj.hobj << " from the temp collection" << dendl;
339 clear_temp_obj(tobj.hobj);
340 m->t.remove(coll, ghobject_t(
341 op.soid, ghobject_t::NO_GEN, get_parent()->whoami_shard().shard));
342 m->t.collection_move_rename(
345 op.soid, ghobject_t::NO_GEN, get_parent()->whoami_shard().shard));
347 if (op.after_progress.data_complete) {
348 if ((get_parent()->pgb_is_primary())) {
349 assert(recovery_ops.count(op.soid));
350 assert(recovery_ops[op.soid].obc);
351 get_parent()->on_local_recover(
354 recovery_ops[op.soid].obc,
358 get_parent()->on_local_recover(
366 m->push_replies[get_parent()->primary_shard()].push_back(PushReplyOp());
367 m->push_replies[get_parent()->primary_shard()].back().soid = op.soid;
370 void ECBackend::handle_recovery_push_reply(
371 const PushReplyOp &op,
375 if (!recovery_ops.count(op.soid))
377 RecoveryOp &rop = recovery_ops[op.soid];
378 assert(rop.waiting_on_pushes.count(from));
379 rop.waiting_on_pushes.erase(from);
380 continue_recovery_op(rop, m);
383 void ECBackend::handle_recovery_read_complete(
384 const hobject_t &hoid,
385 boost::tuple<uint64_t, uint64_t, map<pg_shard_t, bufferlist> > &to_read,
386 boost::optional<map<string, bufferlist> > attrs,
389 dout(10) << __func__ << ": returned " << hoid << " "
390 << "(" << to_read.get<0>()
391 << ", " << to_read.get<1>()
392 << ", " << to_read.get<2>()
395 assert(recovery_ops.count(hoid));
396 RecoveryOp &op = recovery_ops[hoid];
397 assert(op.returned_data.empty());
398 map<int, bufferlist*> target;
399 for (set<shard_id_t>::iterator i = op.missing_on_shards.begin();
400 i != op.missing_on_shards.end();
402 target[*i] = &(op.returned_data[*i]);
404 map<int, bufferlist> from;
405 for(map<pg_shard_t, bufferlist>::iterator i = to_read.get<2>().begin();
406 i != to_read.get<2>().end();
408 from[i->first.shard].claim(i->second);
410 dout(10) << __func__ << ": " << from << dendl;
411 int r = ECUtil::decode(sinfo, ec_impl, from, target);
414 op.xattrs.swap(*attrs);
417 // attrs only reference the origin bufferlist (decode from
418 // ECSubReadReply message) whose size is much greater than attrs
419 // in recovery. If obc cache it (get_obc maybe cache the attr),
420 // this causes the whole origin bufferlist would not be free
421 // until obc is evicted from obc cache. So rebuild the
422 // bufferlist before cache it.
423 for (map<string, bufferlist>::iterator it = op.xattrs.begin();
424 it != op.xattrs.end();
426 it->second.rebuild();
428 // Need to remove ECUtil::get_hinfo_key() since it should not leak out
429 // of the backend (see bug #12983)
430 map<string, bufferlist> sanitized_attrs(op.xattrs);
431 sanitized_attrs.erase(ECUtil::get_hinfo_key());
432 op.obc = get_parent()->get_obc(hoid, sanitized_attrs);
434 op.recovery_info.size = op.obc->obs.oi.size;
435 op.recovery_info.oi = op.obc->obs.oi;
438 ECUtil::HashInfo hinfo(ec_impl->get_chunk_count());
439 if (op.obc->obs.oi.size > 0) {
440 assert(op.xattrs.count(ECUtil::get_hinfo_key()));
441 bufferlist::iterator bp = op.xattrs[ECUtil::get_hinfo_key()].begin();
444 op.hinfo = unstable_hashinfo_registry.lookup_or_create(hoid, hinfo);
446 assert(op.xattrs.size());
448 continue_recovery_op(op, m);
451 struct SendPushReplies : public Context {
452 PGBackend::Listener *l;
454 map<int, MOSDPGPushReply*> replies;
456 PGBackend::Listener *l,
458 map<int, MOSDPGPushReply*> &in) : l(l), epoch(epoch) {
461 void finish(int) override {
462 for (map<int, MOSDPGPushReply*>::iterator i = replies.begin();
465 l->send_message_osd_cluster(i->first, i->second, epoch);
469 ~SendPushReplies() override {
470 for (map<int, MOSDPGPushReply*>::iterator i = replies.begin();
479 void ECBackend::dispatch_recovery_messages(RecoveryMessages &m, int priority)
481 for (map<pg_shard_t, vector<PushOp> >::iterator i = m.pushes.begin();
483 m.pushes.erase(i++)) {
484 MOSDPGPush *msg = new MOSDPGPush();
485 msg->set_priority(priority);
486 msg->map_epoch = get_parent()->get_epoch();
487 msg->min_epoch = get_parent()->get_last_peering_reset_epoch();
488 msg->from = get_parent()->whoami_shard();
489 msg->pgid = spg_t(get_parent()->get_info().pgid.pgid, i->first.shard);
490 msg->pushes.swap(i->second);
491 msg->compute_cost(cct);
492 get_parent()->send_message(
496 map<int, MOSDPGPushReply*> replies;
497 for (map<pg_shard_t, vector<PushReplyOp> >::iterator i =
498 m.push_replies.begin();
499 i != m.push_replies.end();
500 m.push_replies.erase(i++)) {
501 MOSDPGPushReply *msg = new MOSDPGPushReply();
502 msg->set_priority(priority);
503 msg->map_epoch = get_parent()->get_epoch();
504 msg->min_epoch = get_parent()->get_last_peering_reset_epoch();
505 msg->from = get_parent()->whoami_shard();
506 msg->pgid = spg_t(get_parent()->get_info().pgid.pgid, i->first.shard);
507 msg->replies.swap(i->second);
508 msg->compute_cost(cct);
509 replies.insert(make_pair(i->first.osd, msg));
512 if (!replies.empty()) {
513 (m.t).register_on_complete(
514 get_parent()->bless_context(
517 get_parent()->get_epoch(),
519 get_parent()->queue_transaction(std::move(m.t));
531 void ECBackend::continue_recovery_op(
535 dout(10) << __func__ << ": continuing " << op << dendl;
538 case RecoveryOp::IDLE: {
540 op.state = RecoveryOp::READING;
541 assert(!op.recovery_progress.data_complete);
542 set<int> want(op.missing_on_shards.begin(), op.missing_on_shards.end());
543 uint64_t from = op.recovery_progress.data_recovered_to;
544 uint64_t amount = get_recovery_chunk_size();
546 if (op.recovery_progress.first && op.obc) {
547 /* We've got the attrs and the hinfo, might as well use them */
548 op.hinfo = get_hash_info(op.hoid);
550 op.xattrs = op.obc->attr_cache;
551 ::encode(*(op.hinfo), op.xattrs[ECUtil::get_hinfo_key()]);
554 set<pg_shard_t> to_read;
555 int r = get_min_avail_to_read_shards(
556 op.hoid, want, true, false, &to_read);
558 // we must have lost a recovery source
559 assert(!op.recovery_progress.first);
560 dout(10) << __func__ << ": canceling recovery op for obj " << op.hoid
562 get_parent()->cancel_pull(op.hoid);
563 recovery_ops.erase(op.hoid);
569 op.recovery_progress.data_recovered_to,
572 op.recovery_progress.first && !op.obc);
573 op.extent_requested = make_pair(
576 dout(10) << __func__ << ": IDLE return " << op << dendl;
579 case RecoveryOp::READING: {
580 // read completed, start write
581 assert(op.xattrs.size());
582 assert(op.returned_data.size());
583 op.state = RecoveryOp::WRITING;
584 ObjectRecoveryProgress after_progress = op.recovery_progress;
585 after_progress.data_recovered_to += op.extent_requested.second;
586 after_progress.first = false;
587 if (after_progress.data_recovered_to >= op.obc->obs.oi.size) {
588 after_progress.data_recovered_to =
589 sinfo.logical_to_next_stripe_offset(
590 op.obc->obs.oi.size);
591 after_progress.data_complete = true;
593 for (set<pg_shard_t>::iterator mi = op.missing_on.begin();
594 mi != op.missing_on.end();
596 assert(op.returned_data.count(mi->shard));
597 m->pushes[*mi].push_back(PushOp());
598 PushOp &pop = m->pushes[*mi].back();
601 pop.data = op.returned_data[mi->shard];
602 dout(10) << __func__ << ": before_progress=" << op.recovery_progress
603 << ", after_progress=" << after_progress
604 << ", pop.data.length()=" << pop.data.length()
605 << ", size=" << op.obc->obs.oi.size << dendl;
608 sinfo.aligned_logical_offset_to_chunk_offset(
609 after_progress.data_recovered_to -
610 op.recovery_progress.data_recovered_to)
612 if (pop.data.length())
613 pop.data_included.insert(
614 sinfo.aligned_logical_offset_to_chunk_offset(
615 op.recovery_progress.data_recovered_to),
618 if (op.recovery_progress.first) {
619 pop.attrset = op.xattrs;
621 pop.recovery_info = op.recovery_info;
622 pop.before_progress = op.recovery_progress;
623 pop.after_progress = after_progress;
624 if (*mi != get_parent()->primary_shard())
625 get_parent()->begin_peer_recover(
629 op.returned_data.clear();
630 op.waiting_on_pushes = op.missing_on;
631 op.recovery_progress = after_progress;
632 dout(10) << __func__ << ": READING return " << op << dendl;
635 case RecoveryOp::WRITING: {
636 if (op.waiting_on_pushes.empty()) {
637 if (op.recovery_progress.data_complete) {
638 op.state = RecoveryOp::COMPLETE;
639 for (set<pg_shard_t>::iterator i = op.missing_on.begin();
640 i != op.missing_on.end();
642 if (*i != get_parent()->primary_shard()) {
643 dout(10) << __func__ << ": on_peer_recover on " << *i
644 << ", obj " << op.hoid << dendl;
645 get_parent()->on_peer_recover(
651 object_stat_sum_t stat;
652 stat.num_bytes_recovered = op.recovery_info.size;
653 stat.num_keys_recovered = 0; // ??? op ... omap_entries.size(); ?
654 stat.num_objects_recovered = 1;
655 get_parent()->on_global_recover(op.hoid, stat, false);
656 dout(10) << __func__ << ": WRITING return " << op << dendl;
657 recovery_ops.erase(op.hoid);
660 op.state = RecoveryOp::IDLE;
661 dout(10) << __func__ << ": WRITING continue " << op << dendl;
667 // should never be called once complete
668 case RecoveryOp::COMPLETE:
676 void ECBackend::run_recovery_op(
680 ECRecoveryHandle *h = static_cast<ECRecoveryHandle*>(_h);
682 for (list<RecoveryOp>::iterator i = h->ops.begin();
685 dout(10) << __func__ << ": starting " << *i << dendl;
686 assert(!recovery_ops.count(i->hoid));
687 RecoveryOp &op = recovery_ops.insert(make_pair(i->hoid, *i)).first->second;
688 continue_recovery_op(op, &m);
691 dispatch_recovery_messages(m, priority);
692 send_recovery_deletes(priority, h->deletes);
696 int ECBackend::recover_object(
697 const hobject_t &hoid,
699 ObjectContextRef head,
700 ObjectContextRef obc,
703 ECRecoveryHandle *h = static_cast<ECRecoveryHandle*>(_h);
704 h->ops.push_back(RecoveryOp());
706 h->ops.back().hoid = hoid;
707 h->ops.back().obc = obc;
708 h->ops.back().recovery_info.soid = hoid;
709 h->ops.back().recovery_info.version = v;
711 h->ops.back().recovery_info.size = obc->obs.oi.size;
712 h->ops.back().recovery_info.oi = obc->obs.oi;
714 if (hoid.is_snap()) {
717 h->ops.back().recovery_info.ss = obc->ssc->snapset;
720 h->ops.back().recovery_info.ss = head->ssc->snapset;
722 assert(0 == "neither obc nor head set for a snap object");
725 h->ops.back().recovery_progress.omap_complete = true;
726 for (set<pg_shard_t>::const_iterator i =
727 get_parent()->get_actingbackfill_shards().begin();
728 i != get_parent()->get_actingbackfill_shards().end();
730 dout(10) << "checking " << *i << dendl;
731 if (get_parent()->get_shard_missing(*i).is_missing(hoid)) {
732 h->ops.back().missing_on.insert(*i);
733 h->ops.back().missing_on_shards.insert(i->shard);
736 dout(10) << __func__ << ": built op " << h->ops.back() << dendl;
740 bool ECBackend::can_handle_while_inactive(
746 bool ECBackend::_handle_message(
749 dout(10) << __func__ << ": " << *_op->get_req() << dendl;
750 int priority = _op->get_req()->get_priority();
751 switch (_op->get_req()->get_type()) {
752 case MSG_OSD_EC_WRITE: {
753 // NOTE: this is non-const because handle_sub_write modifies the embedded
754 // ObjectStore::Transaction in place (and then std::move's it). It does
755 // not conflict with ECSubWrite's operator<<.
756 MOSDECSubOpWrite *op = static_cast<MOSDECSubOpWrite*>(
757 _op->get_nonconst_req());
758 handle_sub_write(op->op.from, _op, op->op, _op->pg_trace);
761 case MSG_OSD_EC_WRITE_REPLY: {
762 const MOSDECSubOpWriteReply *op = static_cast<const MOSDECSubOpWriteReply*>(
764 handle_sub_write_reply(op->op.from, op->op, _op->pg_trace);
767 case MSG_OSD_EC_READ: {
768 const MOSDECSubOpRead *op = static_cast<const MOSDECSubOpRead*>(_op->get_req());
769 MOSDECSubOpReadReply *reply = new MOSDECSubOpReadReply;
770 reply->pgid = get_parent()->primary_spg_t();
771 reply->map_epoch = get_parent()->get_epoch();
772 reply->min_epoch = get_parent()->get_interval_start_epoch();
773 handle_sub_read(op->op.from, op->op, &(reply->op), _op->pg_trace);
774 reply->trace = _op->pg_trace;
775 get_parent()->send_message_osd_cluster(
776 op->op.from.osd, reply, get_parent()->get_epoch());
779 case MSG_OSD_EC_READ_REPLY: {
780 // NOTE: this is non-const because handle_sub_read_reply steals resulting
781 // buffers. It does not conflict with ECSubReadReply operator<<.
782 MOSDECSubOpReadReply *op = static_cast<MOSDECSubOpReadReply*>(
783 _op->get_nonconst_req());
785 handle_sub_read_reply(op->op.from, op->op, &rm, _op->pg_trace);
786 dispatch_recovery_messages(rm, priority);
789 case MSG_OSD_PG_PUSH: {
790 const MOSDPGPush *op = static_cast<const MOSDPGPush *>(_op->get_req());
792 for (vector<PushOp>::const_iterator i = op->pushes.begin();
793 i != op->pushes.end();
795 handle_recovery_push(*i, &rm);
797 dispatch_recovery_messages(rm, priority);
800 case MSG_OSD_PG_PUSH_REPLY: {
801 const MOSDPGPushReply *op = static_cast<const MOSDPGPushReply *>(
804 for (vector<PushReplyOp>::const_iterator i = op->replies.begin();
805 i != op->replies.end();
807 handle_recovery_push_reply(*i, op->from, &rm);
809 dispatch_recovery_messages(rm, priority);
818 struct SubWriteCommitted : public Context {
823 eversion_t last_complete;
824 const ZTracer::Trace trace;
830 eversion_t last_complete,
831 const ZTracer::Trace &trace)
832 : pg(pg), msg(msg), tid(tid),
833 version(version), last_complete(last_complete), trace(trace) {}
834 void finish(int) override {
836 msg->mark_event("sub_op_committed");
837 pg->sub_write_committed(tid, version, last_complete, trace);
840 void ECBackend::sub_write_committed(
841 ceph_tid_t tid, eversion_t version, eversion_t last_complete,
842 const ZTracer::Trace &trace) {
843 if (get_parent()->pgb_is_primary()) {
844 ECSubWriteReply reply;
846 reply.last_complete = last_complete;
847 reply.committed = true;
848 reply.from = get_parent()->whoami_shard();
849 handle_sub_write_reply(
850 get_parent()->whoami_shard(),
853 get_parent()->update_last_complete_ondisk(last_complete);
854 MOSDECSubOpWriteReply *r = new MOSDECSubOpWriteReply;
855 r->pgid = get_parent()->primary_spg_t();
856 r->map_epoch = get_parent()->get_epoch();
857 r->min_epoch = get_parent()->get_interval_start_epoch();
859 r->op.last_complete = last_complete;
860 r->op.committed = true;
861 r->op.from = get_parent()->whoami_shard();
862 r->set_priority(CEPH_MSG_PRIO_HIGH);
864 r->trace.event("sending sub op commit");
865 get_parent()->send_message_osd_cluster(
866 get_parent()->primary_shard().osd, r, get_parent()->get_epoch());
870 struct SubWriteApplied : public Context {
875 const ZTracer::Trace trace;
881 const ZTracer::Trace &trace)
882 : pg(pg), msg(msg), tid(tid), version(version), trace(trace) {}
883 void finish(int) override {
885 msg->mark_event("sub_op_applied");
886 pg->sub_write_applied(tid, version, trace);
889 void ECBackend::sub_write_applied(
890 ceph_tid_t tid, eversion_t version,
891 const ZTracer::Trace &trace) {
892 parent->op_applied(version);
893 if (get_parent()->pgb_is_primary()) {
894 ECSubWriteReply reply;
895 reply.from = get_parent()->whoami_shard();
897 reply.applied = true;
898 handle_sub_write_reply(
899 get_parent()->whoami_shard(),
902 MOSDECSubOpWriteReply *r = new MOSDECSubOpWriteReply;
903 r->pgid = get_parent()->primary_spg_t();
904 r->map_epoch = get_parent()->get_epoch();
905 r->min_epoch = get_parent()->get_interval_start_epoch();
906 r->op.from = get_parent()->whoami_shard();
908 r->op.applied = true;
909 r->set_priority(CEPH_MSG_PRIO_HIGH);
911 r->trace.event("sending sub op apply");
912 get_parent()->send_message_osd_cluster(
913 get_parent()->primary_shard().osd, r, get_parent()->get_epoch());
917 void ECBackend::handle_sub_write(
921 const ZTracer::Trace &trace,
922 Context *on_local_applied_sync)
926 trace.event("handle_sub_write");
927 assert(!get_parent()->get_log().get_missing().is_missing(op.soid));
928 if (!get_parent()->pgb_is_primary())
929 get_parent()->update_stats(op.stats);
930 ObjectStore::Transaction localt;
931 if (!op.temp_added.empty()) {
932 add_temp_objs(op.temp_added);
935 for (set<hobject_t>::iterator i = op.temp_removed.begin();
936 i != op.temp_removed.end();
938 dout(10) << __func__ << ": removing object " << *i
939 << " since we won't get the transaction" << dendl;
945 get_parent()->whoami_shard().shard));
948 clear_temp_objs(op.temp_removed);
949 get_parent()->log_operation(
951 op.updated_hit_set_history,
957 PrimaryLogPG *_rPG = dynamic_cast<PrimaryLogPG *>(get_parent());
958 if (_rPG && !_rPG->is_undersized() &&
959 (unsigned)get_parent()->whoami_shard().shard >= ec_impl->get_data_chunk_count())
960 op.t.set_fadvise_flag(CEPH_OSD_OP_FLAG_FADVISE_DONTNEED);
962 if (on_local_applied_sync) {
963 dout(10) << "Queueing onreadable_sync: " << on_local_applied_sync << dendl;
964 localt.register_on_applied_sync(on_local_applied_sync);
966 localt.register_on_commit(
967 get_parent()->bless_context(
968 new SubWriteCommitted(
971 get_parent()->get_info().last_complete, trace)));
972 localt.register_on_applied(
973 get_parent()->bless_context(
974 new SubWriteApplied(this, msg, op.tid, op.at_version, trace)));
975 vector<ObjectStore::Transaction> tls;
977 tls.push_back(std::move(op.t));
978 tls.push_back(std::move(localt));
979 get_parent()->queue_transactions(tls, msg);
982 void ECBackend::handle_sub_read(
985 ECSubReadReply *reply,
986 const ZTracer::Trace &trace)
988 trace.event("handle sub read");
989 shard_id_t shard = get_parent()->whoami_shard().shard;
990 for(auto i = op.to_read.begin();
991 i != op.to_read.end();
994 ECUtil::HashInfoRef hinfo;
995 if (!get_parent()->get_pool().allows_ecoverwrites()) {
996 hinfo = get_hash_info(i->first);
999 get_parent()->clog_error() << "Corruption detected: object " << i->first
1000 << " is missing hash_info";
1001 dout(5) << __func__ << ": No hinfo for " << i->first << dendl;
1005 for (auto j = i->second.begin(); j != i->second.end(); ++j) {
1009 ghobject_t(i->first, ghobject_t::NO_GEN, shard),
1014 get_parent()->clog_error() << "Error " << r
1015 << " reading object "
1017 dout(5) << __func__ << ": Error " << r
1018 << " reading " << i->first << dendl;
1021 dout(20) << __func__ << " read request=" << j->get<1>() << " r=" << r << " len=" << bl.length() << dendl;
1022 reply->buffers_read[i->first].push_back(
1029 if (!get_parent()->get_pool().allows_ecoverwrites()) {
1030 // This shows that we still need deep scrub because large enough files
1031 // are read in sections, so the digest check here won't be done here.
1032 // Do NOT check osd_read_eio_on_bad_digest here. We need to report
1033 // the state of our chunk in case other chunks could substitute.
1034 assert(hinfo->has_chunk_hash());
1035 if ((bl.length() == hinfo->get_total_chunk_size()) &&
1036 (j->get<0>() == 0)) {
1037 dout(20) << __func__ << ": Checking hash of " << i->first << dendl;
1040 if (h.digest() != hinfo->get_chunk_hash(shard)) {
1041 get_parent()->clog_error() << "Bad hash for " << i->first << " digest 0x"
1042 << hex << h.digest() << " expected 0x" << hinfo->get_chunk_hash(shard) << dec;
1043 dout(5) << __func__ << ": Bad hash for " << i->first << " digest 0x"
1044 << hex << h.digest() << " expected 0x" << hinfo->get_chunk_hash(shard) << dec << dendl;
1053 // Do NOT check osd_read_eio_on_bad_digest here. We need to report
1054 // the state of our chunk in case other chunks could substitute.
1055 reply->buffers_read.erase(i->first);
1056 reply->errors[i->first] = r;
1058 for (set<hobject_t>::iterator i = op.attrs_to_read.begin();
1059 i != op.attrs_to_read.end();
1061 dout(10) << __func__ << ": fulfilling attr request on "
1063 if (reply->errors.count(*i))
1065 int r = store->getattrs(
1068 *i, ghobject_t::NO_GEN, get_parent()->whoami_shard().shard),
1069 reply->attrs_read[*i]);
1071 reply->buffers_read.erase(*i);
1072 reply->errors[*i] = r;
1075 reply->from = get_parent()->whoami_shard();
1076 reply->tid = op.tid;
1079 void ECBackend::handle_sub_write_reply(
1081 const ECSubWriteReply &op,
1082 const ZTracer::Trace &trace)
1084 map<ceph_tid_t, Op>::iterator i = tid_to_op_map.find(op.tid);
1085 assert(i != tid_to_op_map.end());
1087 trace.event("sub write committed");
1088 assert(i->second.pending_commit.count(from));
1089 i->second.pending_commit.erase(from);
1090 if (from != get_parent()->whoami_shard()) {
1091 get_parent()->update_peer_last_complete_ondisk(from, op.last_complete);
1095 trace.event("sub write applied");
1096 assert(i->second.pending_apply.count(from));
1097 i->second.pending_apply.erase(from);
1100 if (i->second.pending_apply.empty() && i->second.on_all_applied) {
1101 dout(10) << __func__ << " Calling on_all_applied on " << i->second << dendl;
1102 i->second.on_all_applied->complete(0);
1103 i->second.on_all_applied = 0;
1104 i->second.trace.event("ec write all applied");
1106 if (i->second.pending_commit.empty() && i->second.on_all_commit) {
1107 dout(10) << __func__ << " Calling on_all_commit on " << i->second << dendl;
1108 i->second.on_all_commit->complete(0);
1109 i->second.on_all_commit = 0;
1110 i->second.trace.event("ec write all committed");
1115 void ECBackend::handle_sub_read_reply(
1118 RecoveryMessages *m,
1119 const ZTracer::Trace &trace)
1121 trace.event("ec sub read reply");
1122 dout(10) << __func__ << ": reply " << op << dendl;
1123 map<ceph_tid_t, ReadOp>::iterator iter = tid_to_read_map.find(op.tid);
1124 if (iter == tid_to_read_map.end()) {
1126 dout(20) << __func__ << ": dropped " << op << dendl;
1129 ReadOp &rop = iter->second;
1130 for (auto i = op.buffers_read.begin();
1131 i != op.buffers_read.end();
1133 assert(!op.errors.count(i->first)); // If attribute error we better not have sent a buffer
1134 if (!rop.to_read.count(i->first)) {
1135 // We canceled this read! @see filter_read_op
1136 dout(20) << __func__ << " to_read skipping" << dendl;
1139 list<boost::tuple<uint64_t, uint64_t, uint32_t> >::const_iterator req_iter =
1140 rop.to_read.find(i->first)->second.to_read.begin();
1143 uint64_t, uint64_t, map<pg_shard_t, bufferlist> > >::iterator riter =
1144 rop.complete[i->first].returned.begin();
1145 for (list<pair<uint64_t, bufferlist> >::iterator j = i->second.begin();
1146 j != i->second.end();
1147 ++j, ++req_iter, ++riter) {
1148 assert(req_iter != rop.to_read.find(i->first)->second.to_read.end());
1149 assert(riter != rop.complete[i->first].returned.end());
1150 pair<uint64_t, uint64_t> adjusted =
1151 sinfo.aligned_offset_len_to_chunk(
1152 make_pair(req_iter->get<0>(), req_iter->get<1>()));
1153 assert(adjusted.first == j->first);
1154 riter->get<2>()[from].claim(j->second);
1157 for (auto i = op.attrs_read.begin();
1158 i != op.attrs_read.end();
1160 assert(!op.errors.count(i->first)); // if read error better not have sent an attribute
1161 if (!rop.to_read.count(i->first)) {
1162 // We canceled this read! @see filter_read_op
1163 dout(20) << __func__ << " to_read skipping" << dendl;
1166 rop.complete[i->first].attrs = map<string, bufferlist>();
1167 (*(rop.complete[i->first].attrs)).swap(i->second);
1169 for (auto i = op.errors.begin();
1170 i != op.errors.end();
1172 rop.complete[i->first].errors.insert(
1176 dout(20) << __func__ << " shard=" << from << " error=" << i->second << dendl;
1179 map<pg_shard_t, set<ceph_tid_t> >::iterator siter =
1180 shard_to_read_map.find(from);
1181 assert(siter != shard_to_read_map.end());
1182 assert(siter->second.count(op.tid));
1183 siter->second.erase(op.tid);
1185 assert(rop.in_progress.count(from));
1186 rop.in_progress.erase(from);
1187 unsigned is_complete = 0;
1188 // For redundant reads check for completion as each shard comes in,
1189 // or in a non-recovery read check for completion once all the shards read.
1190 // TODO: It would be nice if recovery could send more reads too
1191 if (rop.do_redundant_reads || (!rop.for_recovery && rop.in_progress.empty())) {
1192 for (map<hobject_t, read_result_t>::const_iterator iter =
1193 rop.complete.begin();
1194 iter != rop.complete.end();
1197 for (map<pg_shard_t, bufferlist>::const_iterator j =
1198 iter->second.returned.front().get<2>().begin();
1199 j != iter->second.returned.front().get<2>().end();
1201 have.insert(j->first.shard);
1202 dout(20) << __func__ << " have shard=" << j->first.shard << dendl;
1204 set<int> want_to_read, dummy_minimum;
1205 get_want_to_read_shards(&want_to_read);
1207 if ((err = ec_impl->minimum_to_decode(want_to_read, have, &dummy_minimum)) < 0) {
1208 dout(20) << __func__ << " minimum_to_decode failed" << dendl;
1209 if (rop.in_progress.empty()) {
1210 // If we don't have enough copies and we haven't sent reads for all shards
1211 // we can send the rest of the reads, if any.
1212 if (!rop.do_redundant_reads) {
1213 int r = send_all_remaining_reads(iter->first, rop);
1215 // We added to in_progress and not incrementing is_complete
1218 // Couldn't read any additional shards so handle as completed with errors
1220 // We don't want to confuse clients / RBD with objectstore error
1221 // values in particular ENOENT. We may have different error returns
1222 // from different shards, so we'll return minimum_to_decode() error
1223 // (usually EIO) to reader. It is likely an error here is due to a
1225 rop.complete[iter->first].r = err;
1229 assert(rop.complete[iter->first].r == 0);
1230 if (!rop.complete[iter->first].errors.empty()) {
1231 if (cct->_conf->osd_read_ec_check_for_errors) {
1232 dout(10) << __func__ << ": Not ignoring errors, use one shard err=" << err << dendl;
1233 err = rop.complete[iter->first].errors.begin()->second;
1234 rop.complete[iter->first].r = err;
1236 get_parent()->clog_warn() << "Error(s) ignored for "
1237 << iter->first << " enough copies available";
1238 dout(10) << __func__ << " Error(s) ignored for " << iter->first
1239 << " enough copies available" << dendl;
1240 rop.complete[iter->first].errors.clear();
1247 if (rop.in_progress.empty() || is_complete == rop.complete.size()) {
1248 dout(20) << __func__ << " Complete: " << rop << dendl;
1249 rop.trace.event("ec read complete");
1250 complete_read_op(rop, m);
1252 dout(10) << __func__ << " readop not complete: " << rop << dendl;
1256 void ECBackend::complete_read_op(ReadOp &rop, RecoveryMessages *m)
1258 map<hobject_t, read_request_t>::iterator reqiter =
1259 rop.to_read.begin();
1260 map<hobject_t, read_result_t>::iterator resiter =
1261 rop.complete.begin();
1262 assert(rop.to_read.size() == rop.complete.size());
1263 for (; reqiter != rop.to_read.end(); ++reqiter, ++resiter) {
1264 if (reqiter->second.cb) {
1265 pair<RecoveryMessages *, read_result_t &> arg(
1266 m, resiter->second);
1267 reqiter->second.cb->complete(arg);
1268 reqiter->second.cb = NULL;
1271 tid_to_read_map.erase(rop.tid);
1274 struct FinishReadOp : public GenContext<ThreadPool::TPHandle&> {
1277 FinishReadOp(ECBackend *ec, ceph_tid_t tid) : ec(ec), tid(tid) {}
1278 void finish(ThreadPool::TPHandle &handle) override {
1279 auto ropiter = ec->tid_to_read_map.find(tid);
1280 assert(ropiter != ec->tid_to_read_map.end());
1281 int priority = ropiter->second.priority;
1282 RecoveryMessages rm;
1283 ec->complete_read_op(ropiter->second, &rm);
1284 ec->dispatch_recovery_messages(rm, priority);
1288 void ECBackend::filter_read_op(
1289 const OSDMapRef& osdmap,
1292 set<hobject_t> to_cancel;
1293 for (map<pg_shard_t, set<hobject_t> >::iterator i = op.source_to_obj.begin();
1294 i != op.source_to_obj.end();
1296 if (osdmap->is_down(i->first.osd)) {
1297 to_cancel.insert(i->second.begin(), i->second.end());
1298 op.in_progress.erase(i->first);
1303 if (to_cancel.empty())
1306 for (map<pg_shard_t, set<hobject_t> >::iterator i = op.source_to_obj.begin();
1307 i != op.source_to_obj.end();
1309 for (set<hobject_t>::iterator j = i->second.begin();
1310 j != i->second.end();
1312 if (to_cancel.count(*j))
1313 i->second.erase(j++);
1317 if (i->second.empty()) {
1318 op.source_to_obj.erase(i++);
1320 assert(!osdmap->is_down(i->first.osd));
1325 for (set<hobject_t>::iterator i = to_cancel.begin();
1326 i != to_cancel.end();
1328 get_parent()->cancel_pull(*i);
1330 assert(op.to_read.count(*i));
1331 read_request_t &req = op.to_read.find(*i)->second;
1332 dout(10) << __func__ << ": canceling " << req
1333 << " for obj " << *i << dendl;
1338 op.to_read.erase(*i);
1339 op.complete.erase(*i);
1340 recovery_ops.erase(*i);
1343 if (op.in_progress.empty()) {
1344 get_parent()->schedule_recovery_work(
1345 get_parent()->bless_gencontext(
1346 new FinishReadOp(this, op.tid)));
1350 void ECBackend::check_recovery_sources(const OSDMapRef& osdmap)
1352 set<ceph_tid_t> tids_to_filter;
1353 for (map<pg_shard_t, set<ceph_tid_t> >::iterator
1354 i = shard_to_read_map.begin();
1355 i != shard_to_read_map.end();
1357 if (osdmap->is_down(i->first.osd)) {
1358 tids_to_filter.insert(i->second.begin(), i->second.end());
1359 shard_to_read_map.erase(i++);
1364 for (set<ceph_tid_t>::iterator i = tids_to_filter.begin();
1365 i != tids_to_filter.end();
1367 map<ceph_tid_t, ReadOp>::iterator j = tid_to_read_map.find(*i);
1368 assert(j != tid_to_read_map.end());
1369 filter_read_op(osdmap, j->second);
1373 void ECBackend::on_change()
1375 dout(10) << __func__ << dendl;
1377 completed_to = eversion_t();
1378 committed_to = eversion_t();
1379 pipeline_state.clear();
1380 waiting_reads.clear();
1381 waiting_state.clear();
1382 waiting_commit.clear();
1383 for (auto &&op: tid_to_op_map) {
1384 cache.release_write_pin(op.second.pin);
1386 tid_to_op_map.clear();
1388 for (map<ceph_tid_t, ReadOp>::iterator i = tid_to_read_map.begin();
1389 i != tid_to_read_map.end();
1391 dout(10) << __func__ << ": cancelling " << i->second << dendl;
1392 for (map<hobject_t, read_request_t>::iterator j =
1393 i->second.to_read.begin();
1394 j != i->second.to_read.end();
1396 delete j->second.cb;
1400 tid_to_read_map.clear();
1401 in_progress_client_reads.clear();
1402 shard_to_read_map.clear();
1403 clear_recovery_state();
1406 void ECBackend::clear_recovery_state()
1408 recovery_ops.clear();
1411 void ECBackend::on_flushed()
1415 void ECBackend::dump_recovery_info(Formatter *f) const
1417 f->open_array_section("recovery_ops");
1418 for (map<hobject_t, RecoveryOp>::const_iterator i = recovery_ops.begin();
1419 i != recovery_ops.end();
1421 f->open_object_section("op");
1426 f->open_array_section("read_ops");
1427 for (map<ceph_tid_t, ReadOp>::const_iterator i = tid_to_read_map.begin();
1428 i != tid_to_read_map.end();
1430 f->open_object_section("read_op");
1437 void ECBackend::submit_transaction(
1438 const hobject_t &hoid,
1439 const object_stat_sum_t &delta_stats,
1440 const eversion_t &at_version,
1441 PGTransactionUPtr &&t,
1442 const eversion_t &trim_to,
1443 const eversion_t &roll_forward_to,
1444 const vector<pg_log_entry_t> &log_entries,
1445 boost::optional<pg_hit_set_history_t> &hset_history,
1446 Context *on_local_applied_sync,
1447 Context *on_all_applied,
1448 Context *on_all_commit,
1451 OpRequestRef client_op
1454 assert(!tid_to_op_map.count(tid));
1455 Op *op = &(tid_to_op_map[tid]);
1457 op->delta_stats = delta_stats;
1458 op->version = at_version;
1459 op->trim_to = trim_to;
1460 op->roll_forward_to = MAX(roll_forward_to, committed_to);
1461 op->log_entries = log_entries;
1462 std::swap(op->updated_hit_set_history, hset_history);
1463 op->on_local_applied_sync = on_local_applied_sync;
1464 op->on_all_applied = on_all_applied;
1465 op->on_all_commit = on_all_commit;
1468 op->client_op = client_op;
1470 op->trace = client_op->pg_trace;
1472 dout(10) << __func__ << ": op " << *op << " starting" << dendl;
1473 start_rmw(op, std::move(t));
1474 dout(10) << "onreadable_sync: " << op->on_local_applied_sync << dendl;
1477 void ECBackend::call_write_ordered(std::function<void(void)> &&cb) {
1478 if (!waiting_state.empty()) {
1479 waiting_state.back().on_write.emplace_back(std::move(cb));
1480 } else if (!waiting_reads.empty()) {
1481 waiting_reads.back().on_write.emplace_back(std::move(cb));
1483 // Nothing earlier in the pipeline, just call it
1488 int ECBackend::get_min_avail_to_read_shards(
1489 const hobject_t &hoid,
1490 const set<int> &want,
1492 bool do_redundant_reads,
1493 set<pg_shard_t> *to_read)
1495 // Make sure we don't do redundant reads for recovery
1496 assert(!for_recovery || !do_redundant_reads);
1499 map<shard_id_t, pg_shard_t> shards;
1501 for (set<pg_shard_t>::const_iterator i =
1502 get_parent()->get_acting_shards().begin();
1503 i != get_parent()->get_acting_shards().end();
1505 dout(10) << __func__ << ": checking acting " << *i << dendl;
1506 const pg_missing_t &missing = get_parent()->get_shard_missing(*i);
1507 if (!missing.is_missing(hoid)) {
1508 assert(!have.count(i->shard));
1509 have.insert(i->shard);
1510 assert(!shards.count(i->shard));
1511 shards.insert(make_pair(i->shard, *i));
1516 for (set<pg_shard_t>::const_iterator i =
1517 get_parent()->get_backfill_shards().begin();
1518 i != get_parent()->get_backfill_shards().end();
1520 if (have.count(i->shard)) {
1521 assert(shards.count(i->shard));
1524 dout(10) << __func__ << ": checking backfill " << *i << dendl;
1525 assert(!shards.count(i->shard));
1526 const pg_info_t &info = get_parent()->get_shard_info(*i);
1527 const pg_missing_t &missing = get_parent()->get_shard_missing(*i);
1528 if (hoid < info.last_backfill &&
1529 !missing.is_missing(hoid)) {
1530 have.insert(i->shard);
1531 shards.insert(make_pair(i->shard, *i));
1535 map<hobject_t, set<pg_shard_t>>::const_iterator miter =
1536 get_parent()->get_missing_loc_shards().find(hoid);
1537 if (miter != get_parent()->get_missing_loc_shards().end()) {
1538 for (set<pg_shard_t>::iterator i = miter->second.begin();
1539 i != miter->second.end();
1541 dout(10) << __func__ << ": checking missing_loc " << *i << dendl;
1542 auto m = get_parent()->maybe_get_shard_missing(*i);
1544 assert(!(*m).is_missing(hoid));
1546 have.insert(i->shard);
1547 shards.insert(make_pair(i->shard, *i));
1553 int r = ec_impl->minimum_to_decode(want, have, &need);
1557 if (do_redundant_reads) {
1564 for (set<int>::iterator i = need.begin();
1567 assert(shards.count(shard_id_t(*i)));
1568 to_read->insert(shards[shard_id_t(*i)]);
1573 int ECBackend::get_remaining_shards(
1574 const hobject_t &hoid,
1575 const set<int> &avail,
1576 set<pg_shard_t> *to_read)
1579 map<shard_id_t, pg_shard_t> shards;
1581 for (set<pg_shard_t>::const_iterator i =
1582 get_parent()->get_acting_shards().begin();
1583 i != get_parent()->get_acting_shards().end();
1585 dout(10) << __func__ << ": checking acting " << *i << dendl;
1586 const pg_missing_t &missing = get_parent()->get_shard_missing(*i);
1587 if (!missing.is_missing(hoid)) {
1588 assert(!need.count(i->shard));
1589 need.insert(i->shard);
1590 assert(!shards.count(i->shard));
1591 shards.insert(make_pair(i->shard, *i));
1598 for (set<int>::iterator i = need.begin();
1601 assert(shards.count(shard_id_t(*i)));
1602 if (avail.find(*i) == avail.end())
1603 to_read->insert(shards[shard_id_t(*i)]);
1608 void ECBackend::start_read_op(
1610 map<hobject_t, read_request_t> &to_read,
1612 bool do_redundant_reads,
1615 ceph_tid_t tid = get_parent()->get_tid();
1616 assert(!tid_to_read_map.count(tid));
1617 auto &op = tid_to_read_map.emplace(
1625 std::move(to_read))).first->second;
1626 dout(10) << __func__ << ": starting " << op << dendl;
1628 op.trace = _op->pg_trace;
1629 op.trace.event("start ec read");
1634 void ECBackend::do_read_op(ReadOp &op)
1636 int priority = op.priority;
1637 ceph_tid_t tid = op.tid;
1639 dout(10) << __func__ << ": starting read " << op << dendl;
1641 map<pg_shard_t, ECSubRead> messages;
1642 for (map<hobject_t, read_request_t>::iterator i = op.to_read.begin();
1643 i != op.to_read.end();
1645 bool need_attrs = i->second.want_attrs;
1646 for (set<pg_shard_t>::const_iterator j = i->second.need.begin();
1647 j != i->second.need.end();
1650 messages[*j].attrs_to_read.insert(i->first);
1653 op.obj_to_source[i->first].insert(*j);
1654 op.source_to_obj[*j].insert(i->first);
1656 for (list<boost::tuple<uint64_t, uint64_t, uint32_t> >::const_iterator j =
1657 i->second.to_read.begin();
1658 j != i->second.to_read.end();
1660 pair<uint64_t, uint64_t> chunk_off_len =
1661 sinfo.aligned_offset_len_to_chunk(make_pair(j->get<0>(), j->get<1>()));
1662 for (set<pg_shard_t>::const_iterator k = i->second.need.begin();
1663 k != i->second.need.end();
1665 messages[*k].to_read[i->first].push_back(
1667 chunk_off_len.first,
1668 chunk_off_len.second,
1671 assert(!need_attrs);
1675 for (map<pg_shard_t, ECSubRead>::iterator i = messages.begin();
1676 i != messages.end();
1678 op.in_progress.insert(i->first);
1679 shard_to_read_map[i->first].insert(op.tid);
1680 i->second.tid = tid;
1681 MOSDECSubOpRead *msg = new MOSDECSubOpRead;
1682 msg->set_priority(priority);
1684 get_parent()->whoami_spg_t().pgid,
1686 msg->map_epoch = get_parent()->get_epoch();
1687 msg->min_epoch = get_parent()->get_interval_start_epoch();
1688 msg->op = i->second;
1689 msg->op.from = get_parent()->whoami_shard();
1692 // initialize a child span for this shard
1693 msg->trace.init("ec sub read", nullptr, &op.trace);
1694 msg->trace.keyval("shard", i->first.shard.id);
1696 get_parent()->send_message_osd_cluster(
1699 get_parent()->get_epoch());
1701 dout(10) << __func__ << ": started " << op << dendl;
1704 ECUtil::HashInfoRef ECBackend::get_hash_info(
1705 const hobject_t &hoid, bool checks, const map<string,bufferptr> *attrs)
1707 dout(10) << __func__ << ": Getting attr on " << hoid << dendl;
1708 ECUtil::HashInfoRef ref = unstable_hashinfo_registry.lookup(hoid);
1710 dout(10) << __func__ << ": not in cache " << hoid << dendl;
1712 int r = store->stat(
1714 ghobject_t(hoid, ghobject_t::NO_GEN, get_parent()->whoami_shard().shard),
1716 ECUtil::HashInfo hinfo(ec_impl->get_chunk_count());
1717 // XXX: What does it mean if there is no object on disk?
1719 dout(10) << __func__ << ": found on disk, size " << st.st_size << dendl;
1722 map<string, bufferptr>::const_iterator k = attrs->find(ECUtil::get_hinfo_key());
1723 if (k == attrs->end()) {
1724 dout(5) << __func__ << " " << hoid << " missing hinfo attr" << dendl;
1726 bl.push_back(k->second);
1731 ghobject_t(hoid, ghobject_t::NO_GEN, get_parent()->whoami_shard().shard),
1732 ECUtil::get_hinfo_key(),
1735 dout(5) << __func__ << ": getattr failed: " << cpp_strerror(r) << dendl;
1736 bl.clear(); // just in case
1739 if (bl.length() > 0) {
1740 bufferlist::iterator bp = bl.begin();
1741 ::decode(hinfo, bp);
1742 if (checks && hinfo.get_total_chunk_size() != (uint64_t)st.st_size) {
1743 dout(0) << __func__ << ": Mismatch of total_chunk_size "
1744 << hinfo.get_total_chunk_size() << dendl;
1745 return ECUtil::HashInfoRef();
1747 } else if (st.st_size > 0) { // If empty object and no hinfo, create it
1748 return ECUtil::HashInfoRef();
1751 ref = unstable_hashinfo_registry.lookup_or_create(hoid, hinfo);
1756 void ECBackend::start_rmw(Op *op, PGTransactionUPtr &&t)
1760 op->plan = ECTransaction::get_write_plan(
1763 [&](const hobject_t &i) {
1764 ECUtil::HashInfoRef ref = get_hash_info(i, false);
1766 derr << __func__ << ": get_hash_info(" << i << ")"
1767 << " returned a null pointer and there is no "
1768 << " way to recover from such an error in this "
1769 << " context" << dendl;
1774 get_parent()->get_dpp());
1776 dout(10) << __func__ << ": " << *op << dendl;
1778 waiting_state.push_back(*op);
1782 bool ECBackend::try_state_to_reads()
1784 if (waiting_state.empty())
1787 Op *op = &(waiting_state.front());
1788 if (op->requires_rmw() && pipeline_state.cache_invalid()) {
1789 assert(get_parent()->get_pool().allows_ecoverwrites());
1790 dout(20) << __func__ << ": blocking " << *op
1791 << " because it requires an rmw and the cache is invalid "
1797 if (op->invalidates_cache()) {
1798 dout(20) << __func__ << ": invalidating cache after this op"
1800 pipeline_state.invalidate();
1801 op->using_cache = false;
1803 op->using_cache = pipeline_state.caching_enabled();
1806 waiting_state.pop_front();
1807 waiting_reads.push_back(*op);
1809 if (op->using_cache) {
1810 cache.open_write_pin(op->pin);
1813 for (auto &&hpair: op->plan.will_write) {
1814 auto to_read_plan_iter = op->plan.to_read.find(hpair.first);
1815 const extent_set &to_read_plan =
1816 to_read_plan_iter == op->plan.to_read.end() ?
1818 to_read_plan_iter->second;
1820 extent_set remote_read = cache.reserve_extents_for_rmw(
1826 extent_set pending_read = to_read_plan;
1827 pending_read.subtract(remote_read);
1829 if (!remote_read.empty()) {
1830 op->remote_read[hpair.first] = std::move(remote_read);
1832 if (!pending_read.empty()) {
1833 op->pending_read[hpair.first] = std::move(pending_read);
1837 op->remote_read = op->plan.to_read;
1840 dout(10) << __func__ << ": " << *op << dendl;
1842 if (!op->remote_read.empty()) {
1843 assert(get_parent()->get_pool().allows_ecoverwrites());
1844 objects_read_async_no_cache(
1846 [this, op](map<hobject_t,pair<int, extent_map> > &&results) {
1847 for (auto &&i: results) {
1848 op->remote_read_result.emplace(i.first, i.second.second);
1857 bool ECBackend::try_reads_to_commit()
1859 if (waiting_reads.empty())
1861 Op *op = &(waiting_reads.front());
1862 if (op->read_in_progress())
1864 waiting_reads.pop_front();
1865 waiting_commit.push_back(*op);
1867 dout(10) << __func__ << ": starting commit on " << *op << dendl;
1868 dout(20) << __func__ << ": " << cache << dendl;
1870 get_parent()->apply_stats(
1874 if (op->using_cache) {
1875 for (auto &&hpair: op->pending_read) {
1876 op->remote_read_result[hpair.first].insert(
1877 cache.get_remaining_extents_for_rmw(
1882 op->pending_read.clear();
1884 assert(op->pending_read.empty());
1887 map<shard_id_t, ObjectStore::Transaction> trans;
1888 for (set<pg_shard_t>::const_iterator i =
1889 get_parent()->get_actingbackfill_shards().begin();
1890 i != get_parent()->get_actingbackfill_shards().end();
1895 op->trace.event("start ec write");
1897 map<hobject_t,extent_map> written;
1899 ECTransaction::generate_transactions(
1902 get_parent()->get_info().pgid.pgid,
1903 (get_osdmap()->require_osd_release < CEPH_RELEASE_KRAKEN),
1905 op->remote_read_result,
1910 &(op->temp_cleared),
1911 get_parent()->get_dpp());
1914 dout(20) << __func__ << ": " << cache << dendl;
1915 dout(20) << __func__ << ": written: " << written << dendl;
1916 dout(20) << __func__ << ": op: " << *op << dendl;
1918 if (!get_parent()->get_pool().allows_ecoverwrites()) {
1919 for (auto &&i: op->log_entries) {
1920 if (i.requires_kraken()) {
1921 derr << __func__ << ": log entry " << i << " requires kraken"
1922 << " but overwrites are not enabled!" << dendl;
1928 map<hobject_t,extent_set> written_set;
1929 for (auto &&i: written) {
1930 written_set[i.first] = i.second.get_interval_set();
1932 dout(20) << __func__ << ": written_set: " << written_set << dendl;
1933 assert(written_set == op->plan.will_write);
1935 if (op->using_cache) {
1936 for (auto &&hpair: written) {
1937 dout(20) << __func__ << ": " << hpair << dendl;
1938 cache.present_rmw_update(hpair.first, op->pin, hpair.second);
1941 op->remote_read.clear();
1942 op->remote_read_result.clear();
1944 dout(10) << "onreadable_sync: " << op->on_local_applied_sync << dendl;
1945 ObjectStore::Transaction empty;
1946 bool should_write_local = false;
1947 ECSubWrite local_write_op;
1948 for (set<pg_shard_t>::const_iterator i =
1949 get_parent()->get_actingbackfill_shards().begin();
1950 i != get_parent()->get_actingbackfill_shards().end();
1952 op->pending_apply.insert(*i);
1953 op->pending_commit.insert(*i);
1954 map<shard_id_t, ObjectStore::Transaction>::iterator iter =
1955 trans.find(i->shard);
1956 assert(iter != trans.end());
1957 bool should_send = get_parent()->should_send_op(*i, op->hoid);
1958 const pg_stat_t &stats =
1961 parent->get_shard_info().find(*i)->second.stats;
1964 get_parent()->whoami_shard(),
1969 should_send ? iter->second : empty,
1972 op->roll_forward_to,
1974 op->updated_hit_set_history,
1979 ZTracer::Trace trace;
1981 // initialize a child span for this shard
1982 trace.init("ec sub write", nullptr, &op->trace);
1983 trace.keyval("shard", i->shard.id);
1986 if (*i == get_parent()->whoami_shard()) {
1987 should_write_local = true;
1988 local_write_op.claim(sop);
1990 MOSDECSubOpWrite *r = new MOSDECSubOpWrite(sop);
1991 r->pgid = spg_t(get_parent()->primary_spg_t().pgid, i->shard);
1992 r->map_epoch = get_parent()->get_epoch();
1993 r->min_epoch = get_parent()->get_interval_start_epoch();
1995 get_parent()->send_message_osd_cluster(
1996 i->osd, r, get_parent()->get_epoch());
1999 if (should_write_local) {
2001 get_parent()->whoami_shard(),
2005 op->on_local_applied_sync);
2006 op->on_local_applied_sync = 0;
2009 for (auto i = op->on_write.begin();
2010 i != op->on_write.end();
2011 op->on_write.erase(i++)) {
2018 bool ECBackend::try_finish_rmw()
2020 if (waiting_commit.empty())
2022 Op *op = &(waiting_commit.front());
2023 if (op->write_in_progress())
2025 waiting_commit.pop_front();
2027 dout(10) << __func__ << ": " << *op << dendl;
2028 dout(20) << __func__ << ": " << cache << dendl;
2030 if (op->roll_forward_to > completed_to)
2031 completed_to = op->roll_forward_to;
2032 if (op->version > committed_to)
2033 committed_to = op->version;
2035 if (get_osdmap()->require_osd_release >= CEPH_RELEASE_KRAKEN) {
2036 if (op->version > get_parent()->get_log().get_can_rollback_to() &&
2037 waiting_reads.empty() &&
2038 waiting_commit.empty()) {
2039 // submit a dummy transaction to kick the rollforward
2040 auto tid = get_parent()->get_tid();
2041 Op *nop = &(tid_to_op_map[tid]);
2042 nop->hoid = op->hoid;
2043 nop->trim_to = op->trim_to;
2044 nop->roll_forward_to = op->version;
2046 nop->reqid = op->reqid;
2047 waiting_reads.push_back(*nop);
2051 if (op->using_cache) {
2052 cache.release_write_pin(op->pin);
2054 tid_to_op_map.erase(op->tid);
2056 if (waiting_reads.empty() &&
2057 waiting_commit.empty()) {
2058 pipeline_state.clear();
2059 dout(20) << __func__ << ": clearing pipeline_state "
2066 void ECBackend::check_ops()
2068 while (try_state_to_reads() ||
2069 try_reads_to_commit() ||
2073 int ECBackend::objects_read_sync(
2074 const hobject_t &hoid,
2083 void ECBackend::objects_read_async(
2084 const hobject_t &hoid,
2085 const list<pair<boost::tuple<uint64_t, uint64_t, uint32_t>,
2086 pair<bufferlist*, Context*> > > &to_read,
2087 Context *on_complete,
2090 map<hobject_t,std::list<boost::tuple<uint64_t, uint64_t, uint32_t> > >
2095 for (list<pair<boost::tuple<uint64_t, uint64_t, uint32_t>,
2096 pair<bufferlist*, Context*> > >::const_iterator i =
2100 pair<uint64_t, uint64_t> tmp =
2101 sinfo.offset_len_to_stripe_bounds(
2102 make_pair(i->first.get<0>(), i->first.get<1>()));
2105 esnew.insert(tmp.first, tmp.second);
2107 flags |= i->first.get<2>();
2111 auto &offsets = reads[hoid];
2112 for (auto j = es.begin();
2126 list<pair<boost::tuple<uint64_t, uint64_t, uint32_t>,
2127 pair<bufferlist*, Context*> > > to_read;
2128 unique_ptr<Context> on_complete;
2129 cb(const cb&) = delete;
2130 cb(cb &&) = default;
2132 const hobject_t &hoid,
2133 const list<pair<boost::tuple<uint64_t, uint64_t, uint32_t>,
2134 pair<bufferlist*, Context*> > > &to_read,
2135 Context *on_complete)
2139 on_complete(on_complete) {}
2140 void operator()(map<hobject_t,pair<int, extent_map> > &&results) {
2141 auto dpp = ec->get_parent()->get_dpp();
2142 ldpp_dout(dpp, 20) << "objects_read_async_cb: got: " << results
2144 ldpp_dout(dpp, 20) << "objects_read_async_cb: cache: " << ec->cache
2147 auto &got = results[hoid];
2150 for (auto &&read: to_read) {
2151 if (got.first < 0) {
2152 if (read.second.second) {
2153 read.second.second->complete(got.first);
2158 assert(read.second.first);
2159 uint64_t offset = read.first.get<0>();
2160 uint64_t length = read.first.get<1>();
2161 auto range = got.second.get_containing_range(offset, length);
2162 assert(range.first != range.second);
2163 assert(range.first.get_off() <= offset);
2165 (offset + length) <=
2166 (range.first.get_off() + range.first.get_len()));
2167 read.second.first->substr_of(
2168 range.first.get_val(),
2169 offset - range.first.get_off(),
2171 if (read.second.second) {
2172 read.second.second->complete(length);
2173 read.second.second = nullptr;
2179 on_complete.release()->complete(r);
2183 for (auto &&i: to_read) {
2184 delete i.second.second;
2189 objects_read_and_reconstruct(
2192 make_gen_lambda_context<
2193 map<hobject_t,pair<int, extent_map> > &&, cb>(
2200 struct CallClientContexts :
2201 public GenContext<pair<RecoveryMessages*, ECBackend::read_result_t& > &> {
2204 ECBackend::ClientAsyncReadStatus *status;
2205 list<boost::tuple<uint64_t, uint64_t, uint32_t> > to_read;
2209 ECBackend::ClientAsyncReadStatus *status,
2210 const list<boost::tuple<uint64_t, uint64_t, uint32_t> > &to_read)
2211 : hoid(hoid), ec(ec), status(status), to_read(to_read) {}
2212 void finish(pair<RecoveryMessages *, ECBackend::read_result_t &> &in) override {
2213 ECBackend::read_result_t &res = in.second;
2217 assert(res.returned.size() == to_read.size());
2219 assert(res.errors.empty());
2220 for (auto &&read: to_read) {
2221 pair<uint64_t, uint64_t> adjusted =
2222 ec->sinfo.offset_len_to_stripe_bounds(
2223 make_pair(read.get<0>(), read.get<1>()));
2224 assert(res.returned.front().get<0>() == adjusted.first &&
2225 res.returned.front().get<1>() == adjusted.second);
2226 map<int, bufferlist> to_decode;
2228 for (map<pg_shard_t, bufferlist>::iterator j =
2229 res.returned.front().get<2>().begin();
2230 j != res.returned.front().get<2>().end();
2232 to_decode[j->first.shard].claim(j->second);
2234 int r = ECUtil::decode(
2246 read.get<0>() - adjusted.first,
2248 bl.length() - (read.get<0>() - adjusted.first)));
2250 read.get<0>(), trimmed.length(), std::move(trimmed));
2251 res.returned.pop_front();
2254 status->complete_object(hoid, res.r, std::move(result));
2259 void ECBackend::objects_read_and_reconstruct(
2260 const map<hobject_t,
2261 std::list<boost::tuple<uint64_t, uint64_t, uint32_t> >
2264 GenContextURef<map<hobject_t,pair<int, extent_map> > &&> &&func)
2266 in_progress_client_reads.emplace_back(
2267 reads.size(), std::move(func));
2268 if (!reads.size()) {
2273 set<int> want_to_read;
2274 get_want_to_read_shards(&want_to_read);
2276 map<hobject_t, read_request_t> for_read_op;
2277 for (auto &&to_read: reads) {
2278 set<pg_shard_t> shards;
2279 int r = get_min_avail_to_read_shards(
2287 CallClientContexts *c = new CallClientContexts(
2290 &(in_progress_client_reads.back()),
2303 CEPH_MSG_PRIO_DEFAULT,
2311 int ECBackend::send_all_remaining_reads(
2312 const hobject_t &hoid,
2315 set<int> already_read;
2316 const set<pg_shard_t>& ots = rop.obj_to_source[hoid];
2317 for (set<pg_shard_t>::iterator i = ots.begin(); i != ots.end(); ++i)
2318 already_read.insert(i->shard);
2319 dout(10) << __func__ << " have/error shards=" << already_read << dendl;
2320 set<pg_shard_t> shards;
2321 int r = get_remaining_shards(hoid, already_read, &shards);
2327 dout(10) << __func__ << " Read remaining shards " << shards << dendl;
2329 // TODOSAM: this doesn't seem right
2330 list<boost::tuple<uint64_t, uint64_t, uint32_t> > offsets =
2331 rop.to_read.find(hoid)->second.to_read;
2332 GenContext<pair<RecoveryMessages *, read_result_t& > &> *c =
2333 rop.to_read.find(hoid)->second.cb;
2335 map<hobject_t, read_request_t> for_read_op;
2345 rop.to_read.swap(for_read_op);
2350 int ECBackend::objects_get_attrs(
2351 const hobject_t &hoid,
2352 map<string, bufferlist> *out)
2354 int r = store->getattrs(
2356 ghobject_t(hoid, ghobject_t::NO_GEN, get_parent()->whoami_shard().shard),
2361 for (map<string, bufferlist>::iterator i = out->begin();
2364 if (ECUtil::is_hinfo_key_string(i->first))
2372 void ECBackend::rollback_append(
2373 const hobject_t &hoid,
2375 ObjectStore::Transaction *t)
2377 assert(old_size % sinfo.get_stripe_width() == 0);
2380 ghobject_t(hoid, ghobject_t::NO_GEN, get_parent()->whoami_shard().shard),
2381 sinfo.aligned_logical_offset_to_chunk_offset(
2385 void ECBackend::be_deep_scrub(
2386 const hobject_t &poid,
2388 ScrubMap::object &o,
2389 ThreadPool::TPHandle &handle) {
2390 bufferhash h(-1); // we always used -1
2392 uint64_t stride = cct->_conf->osd_deep_scrub_stride;
2393 if (stride % sinfo.get_chunk_size())
2394 stride += sinfo.get_chunk_size() - (stride % sinfo.get_chunk_size());
2397 uint32_t fadvise_flags = CEPH_OSD_OP_FLAG_FADVISE_SEQUENTIAL | CEPH_OSD_OP_FLAG_FADVISE_DONTNEED;
2401 handle.reset_tp_timeout();
2405 poid, ghobject_t::NO_GEN, get_parent()->whoami_shard().shard),
2411 if (bl.length() % sinfo.get_chunk_size()) {
2417 if ((unsigned)r < stride)
2422 dout(0) << "_scan_list " << poid << " got "
2423 << r << " on read, read_error" << dendl;
2424 o.read_error = true;
2428 ECUtil::HashInfoRef hinfo = get_hash_info(poid, false, &o.attrs);
2430 dout(0) << "_scan_list " << poid << " could not retrieve hash info" << dendl;
2431 o.read_error = true;
2432 o.digest_present = false;
2435 if (!get_parent()->get_pool().allows_ecoverwrites()) {
2436 assert(hinfo->has_chunk_hash());
2437 if (hinfo->get_total_chunk_size() != pos) {
2438 dout(0) << "_scan_list " << poid << " got incorrect size on read" << dendl;
2439 o.ec_size_mismatch = true;
2443 if (hinfo->get_chunk_hash(get_parent()->whoami_shard().shard) != h.digest()) {
2444 dout(0) << "_scan_list " << poid << " got incorrect hash on read" << dendl;
2445 o.ec_hash_mismatch = true;
2449 /* We checked above that we match our own stored hash. We cannot
2450 * send a hash of the actual object, so instead we simply send
2451 * our locally stored hash of shard 0 on the assumption that if
2452 * we match our chunk hash and our recollection of the hash for
2453 * chunk 0 matches that of our peers, there is likely no corruption.
2455 o.digest = hinfo->get_chunk_hash(0);
2456 o.digest_present = true;
2458 /* Hack! We must be using partial overwrites, and partial overwrites
2459 * don't support deep-scrub yet
2462 o.digest_present = true;
2466 o.omap_digest = seed;
2467 o.omap_digest_present = true;