Fix some bugs when testing opensds ansible
[stor4nfv.git] / src / ceph / src / osd / ECBackend.h
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
3 /*
4  * Ceph - scalable distributed file system
5  *
6  * Copyright (C) 2013 Inktank Storage, Inc.
7  *
8  * This is free software; you can redistribute it and/or
9  * modify it under the terms of the GNU Lesser General Public
10  * License version 2.1, as published by the Free Software
11  * Foundation.  See file COPYING.
12  *
13  */
14
15 #ifndef ECBACKEND_H
16 #define ECBACKEND_H
17
18 #include <boost/intrusive/set.hpp>
19 #include <boost/intrusive/list.hpp>
20
21 #include "OSD.h"
22 #include "PGBackend.h"
23 #include "erasure-code/ErasureCodeInterface.h"
24 #include "ECUtil.h"
25 #include "ECTransaction.h"
26 #include "ExtentCache.h"
27
28 //forward declaration
29 struct ECSubWrite;
30 struct ECSubWriteReply;
31 struct ECSubRead;
32 struct ECSubReadReply;
33
34 struct RecoveryMessages;
35 class ECBackend : public PGBackend {
36 public:
37   RecoveryHandle *open_recovery_op() override;
38
39   void run_recovery_op(
40     RecoveryHandle *h,
41     int priority
42     ) override;
43
44   int recover_object(
45     const hobject_t &hoid,
46     eversion_t v,
47     ObjectContextRef head,
48     ObjectContextRef obc,
49     RecoveryHandle *h
50     ) override;
51
52   bool _handle_message(
53     OpRequestRef op
54     ) override;
55   bool can_handle_while_inactive(
56     OpRequestRef op
57     ) override;
58   friend struct SubWriteApplied;
59   friend struct SubWriteCommitted;
60   void sub_write_applied(
61     ceph_tid_t tid,
62     eversion_t version,
63     const ZTracer::Trace &trace);
64   void sub_write_committed(
65     ceph_tid_t tid,
66     eversion_t version,
67     eversion_t last_complete,
68     const ZTracer::Trace &trace);
69   void handle_sub_write(
70     pg_shard_t from,
71     OpRequestRef msg,
72     ECSubWrite &op,
73     const ZTracer::Trace &trace,
74     Context *on_local_applied_sync = 0
75     );
76   void handle_sub_read(
77     pg_shard_t from,
78     const ECSubRead &op,
79     ECSubReadReply *reply,
80     const ZTracer::Trace &trace
81     );
82   void handle_sub_write_reply(
83     pg_shard_t from,
84     const ECSubWriteReply &op,
85     const ZTracer::Trace &trace
86     );
87   void handle_sub_read_reply(
88     pg_shard_t from,
89     ECSubReadReply &op,
90     RecoveryMessages *m,
91     const ZTracer::Trace &trace
92     );
93
94   /// @see ReadOp below
95   void check_recovery_sources(const OSDMapRef& osdmap) override;
96
97   void on_change() override;
98   void clear_recovery_state() override;
99
100   void on_flushed() override;
101
102   void dump_recovery_info(Formatter *f) const override;
103
104   void call_write_ordered(std::function<void(void)> &&cb) override;
105
106   void submit_transaction(
107     const hobject_t &hoid,
108     const object_stat_sum_t &delta_stats,
109     const eversion_t &at_version,
110     PGTransactionUPtr &&t,
111     const eversion_t &trim_to,
112     const eversion_t &roll_forward_to,
113     const vector<pg_log_entry_t> &log_entries,
114     boost::optional<pg_hit_set_history_t> &hset_history,
115     Context *on_local_applied_sync,
116     Context *on_all_applied,
117     Context *on_all_commit,
118     ceph_tid_t tid,
119     osd_reqid_t reqid,
120     OpRequestRef op
121     ) override;
122
123   int objects_read_sync(
124     const hobject_t &hoid,
125     uint64_t off,
126     uint64_t len,
127     uint32_t op_flags,
128     bufferlist *bl) override;
129
130   /**
131    * Async read mechanism
132    *
133    * Async reads use the same async read mechanism as does recovery.
134    * CallClientContexts is responsible for reconstructing the response
135    * buffer as well as for calling the callbacks.
136    *
137    * One tricky bit is that two reads may possibly not read from the same
138    * set of replicas.  This could result in two reads completing in the
139    * wrong (from the interface user's point of view) order.  Thus, we
140    * maintain a queue of in progress reads (@see in_progress_client_reads)
141    * to ensure that we always call the completion callback in order.
142    *
143    * Another subtely is that while we may read a degraded object, we will
144    * still only perform a client read from shards in the acting set.  This
145    * ensures that we won't ever have to restart a client initiated read in
146    * check_recovery_sources.
147    */
148   void objects_read_and_reconstruct(
149     const map<hobject_t, std::list<boost::tuple<uint64_t, uint64_t, uint32_t> >
150     > &reads,
151     bool fast_read,
152     GenContextURef<map<hobject_t,pair<int, extent_map> > &&> &&func);
153
154   friend struct CallClientContexts;
155   struct ClientAsyncReadStatus {
156     unsigned objects_to_read;
157     GenContextURef<map<hobject_t,pair<int, extent_map> > &&> func;
158     map<hobject_t,pair<int, extent_map> > results;
159     explicit ClientAsyncReadStatus(
160       unsigned objects_to_read,
161       GenContextURef<map<hobject_t,pair<int, extent_map> > &&> &&func)
162       : objects_to_read(objects_to_read), func(std::move(func)) {}
163     void complete_object(
164       const hobject_t &hoid,
165       int err,
166       extent_map &&buffers) {
167       assert(objects_to_read);
168       --objects_to_read;
169       assert(!results.count(hoid));
170       results.emplace(hoid, make_pair(err, std::move(buffers)));
171     }
172     bool is_complete() const {
173       return objects_to_read == 0;
174     }
175     void run() {
176       func.release()->complete(std::move(results));
177     }
178   };
179   list<ClientAsyncReadStatus> in_progress_client_reads;
180   void objects_read_async(
181     const hobject_t &hoid,
182     const list<pair<boost::tuple<uint64_t, uint64_t, uint32_t>,
183                     pair<bufferlist*, Context*> > > &to_read,
184     Context *on_complete,
185     bool fast_read = false) override;
186
187   template <typename Func>
188   void objects_read_async_no_cache(
189     const map<hobject_t,extent_set> &to_read,
190     Func &&on_complete) {
191     map<hobject_t,std::list<boost::tuple<uint64_t, uint64_t, uint32_t> > > _to_read;
192     for (auto &&hpair: to_read) {
193       auto &l = _to_read[hpair.first];
194       for (auto extent: hpair.second) {
195         l.emplace_back(extent.first, extent.second, 0);
196       }
197     }
198     objects_read_and_reconstruct(
199       _to_read,
200       false,
201       make_gen_lambda_context<
202       map<hobject_t,pair<int, extent_map> > &&, Func>(
203           std::forward<Func>(on_complete)));
204   }
205   void kick_reads() {
206     while (in_progress_client_reads.size() &&
207            in_progress_client_reads.front().is_complete()) {
208       in_progress_client_reads.front().run();
209       in_progress_client_reads.pop_front();
210     }
211   }
212
213 private:
214   friend struct ECRecoveryHandle;
215   uint64_t get_recovery_chunk_size() const {
216     return ROUND_UP_TO(cct->_conf->osd_recovery_max_chunk,
217                         sinfo.get_stripe_width());
218   }
219
220   void get_want_to_read_shards(set<int> *want_to_read) const {
221     const vector<int> &chunk_mapping = ec_impl->get_chunk_mapping();
222     for (int i = 0; i < (int)ec_impl->get_data_chunk_count(); ++i) {
223       int chunk = (int)chunk_mapping.size() > i ? chunk_mapping[i] : i;
224       want_to_read->insert(chunk);
225     }
226   }
227
228   /**
229    * Recovery
230    *
231    * Recovery uses the same underlying read mechanism as client reads
232    * with the slight difference that recovery reads may come from non
233    * acting shards.  Thus, check_recovery_sources may wind up calling
234    * cancel_pull for a read originating with RecoveryOp.
235    *
236    * The recovery process is expressed as a state machine:
237    * - IDLE: Nothing is currently in progress, reads will be started and
238    *         we will transition to READING
239    * - READING: We are awaiting a pending read op.  Once complete, we will
240    *            decode the buffers and proceed to WRITING
241    * - WRITING: We are awaiting a completed push.  Once complete, we will
242    *            either transition to COMPLETE or to IDLE to continue.
243    * - COMPLETE: complete
244    *
245    * We use the existing Push and PushReply messages and structures to
246    * handle actually shuffling the data over to the replicas.  recovery_info
247    * and recovery_progress are expressed in terms of the logical offset
248    * space except for data_included which is in terms of the chunked object
249    * space (to match the passed buffer).
250    *
251    * xattrs are requested on the first read and used to initialize the
252    * object_context if missing on completion of the first read.
253    *
254    * In order to batch up reads and writes, we batch Push, PushReply,
255    * Transaction, and reads in a RecoveryMessages object which is passed
256    * among the recovery methods.
257    */
258   struct RecoveryOp {
259     hobject_t hoid;
260     eversion_t v;
261     set<pg_shard_t> missing_on;
262     set<shard_id_t> missing_on_shards;
263
264     ObjectRecoveryInfo recovery_info;
265     ObjectRecoveryProgress recovery_progress;
266
267     enum state_t { IDLE, READING, WRITING, COMPLETE } state;
268
269     static const char* tostr(state_t state) {
270       switch (state) {
271       case ECBackend::RecoveryOp::IDLE:
272         return "IDLE";
273         break;
274       case ECBackend::RecoveryOp::READING:
275         return "READING";
276         break;
277       case ECBackend::RecoveryOp::WRITING:
278         return "WRITING";
279         break;
280       case ECBackend::RecoveryOp::COMPLETE:
281         return "COMPLETE";
282         break;
283       default:
284         ceph_abort();
285         return "";
286       }
287     }
288
289     // must be filled if state == WRITING
290     map<int, bufferlist> returned_data;
291     map<string, bufferlist> xattrs;
292     ECUtil::HashInfoRef hinfo;
293     ObjectContextRef obc;
294     set<pg_shard_t> waiting_on_pushes;
295
296     // valid in state READING
297     pair<uint64_t, uint64_t> extent_requested;
298
299     void dump(Formatter *f) const;
300
301     RecoveryOp() : state(IDLE) {}
302   };
303   friend ostream &operator<<(ostream &lhs, const RecoveryOp &rhs);
304   map<hobject_t, RecoveryOp> recovery_ops;
305
306   void continue_recovery_op(
307     RecoveryOp &op,
308     RecoveryMessages *m);
309   void dispatch_recovery_messages(RecoveryMessages &m, int priority);
310   friend struct OnRecoveryReadComplete;
311   void handle_recovery_read_complete(
312     const hobject_t &hoid,
313     boost::tuple<uint64_t, uint64_t, map<pg_shard_t, bufferlist> > &to_read,
314     boost::optional<map<string, bufferlist> > attrs,
315     RecoveryMessages *m);
316   void handle_recovery_push(
317     const PushOp &op,
318     RecoveryMessages *m);
319   void handle_recovery_push_reply(
320     const PushReplyOp &op,
321     pg_shard_t from,
322     RecoveryMessages *m);
323
324 public:
325   /**
326    * Low level async read mechanism
327    *
328    * To avoid duplicating the logic for requesting and waiting for
329    * multiple object shards, there is a common async read mechanism
330    * taking a map of hobject_t->read_request_t which defines callbacks
331    * taking read_result_ts as arguments.
332    *
333    * tid_to_read_map gives open read ops.  check_recovery_sources uses
334    * shard_to_read_map and ReadOp::source_to_obj to restart reads
335    * involving down osds.
336    *
337    * The user is responsible for specifying replicas on which to read
338    * and for reassembling the buffer on the other side since client
339    * reads require the original object buffer while recovery only needs
340    * the missing pieces.
341    *
342    * Rather than handling reads on the primary directly, we simply send
343    * ourselves a message.  This avoids a dedicated primary path for that
344    * part.
345    */
346   struct read_result_t {
347     int r;
348     map<pg_shard_t, int> errors;
349     boost::optional<map<string, bufferlist> > attrs;
350     list<
351       boost::tuple<
352         uint64_t, uint64_t, map<pg_shard_t, bufferlist> > > returned;
353     read_result_t() : r(0) {}
354   };
355   struct read_request_t {
356     const list<boost::tuple<uint64_t, uint64_t, uint32_t> > to_read;
357     const set<pg_shard_t> need;
358     const bool want_attrs;
359     GenContext<pair<RecoveryMessages *, read_result_t& > &> *cb;
360     read_request_t(
361       const list<boost::tuple<uint64_t, uint64_t, uint32_t> > &to_read,
362       const set<pg_shard_t> &need,
363       bool want_attrs,
364       GenContext<pair<RecoveryMessages *, read_result_t& > &> *cb)
365       : to_read(to_read), need(need), want_attrs(want_attrs),
366         cb(cb) {}
367   };
368   friend ostream &operator<<(ostream &lhs, const read_request_t &rhs);
369
370   struct ReadOp {
371     int priority;
372     ceph_tid_t tid;
373     OpRequestRef op; // may be null if not on behalf of a client
374     // True if redundant reads are issued, false otherwise,
375     // this is useful to tradeoff some resources (redundant ops) for
376     // low latency read, especially on relatively idle cluster
377     bool do_redundant_reads;
378     // True if reading for recovery which could possibly reading only a subset
379     // of the available shards.
380     bool for_recovery;
381
382     ZTracer::Trace trace;
383
384     map<hobject_t, read_request_t> to_read;
385     map<hobject_t, read_result_t> complete;
386
387     map<hobject_t, set<pg_shard_t>> obj_to_source;
388     map<pg_shard_t, set<hobject_t> > source_to_obj;
389
390     void dump(Formatter *f) const;
391
392     set<pg_shard_t> in_progress;
393
394     ReadOp(
395       int priority,
396       ceph_tid_t tid,
397       bool do_redundant_reads,
398       bool for_recovery,
399       OpRequestRef op,
400       map<hobject_t, read_request_t> &&_to_read)
401       : priority(priority), tid(tid), op(op), do_redundant_reads(do_redundant_reads),
402         for_recovery(for_recovery), to_read(std::move(_to_read)) {
403       for (auto &&hpair: to_read) {
404         auto &returned = complete[hpair.first].returned;
405         for (auto &&extent: hpair.second.to_read) {
406           returned.push_back(
407             boost::make_tuple(
408               extent.get<0>(),
409               extent.get<1>(),
410               map<pg_shard_t, bufferlist>()));
411         }
412       }
413     }
414     ReadOp() = delete;
415     ReadOp(const ReadOp &) = default;
416     ReadOp(ReadOp &&) = default;
417   };
418   friend struct FinishReadOp;
419   void filter_read_op(
420     const OSDMapRef& osdmap,
421     ReadOp &op);
422   void complete_read_op(ReadOp &rop, RecoveryMessages *m);
423   friend ostream &operator<<(ostream &lhs, const ReadOp &rhs);
424   map<ceph_tid_t, ReadOp> tid_to_read_map;
425   map<pg_shard_t, set<ceph_tid_t> > shard_to_read_map;
426   void start_read_op(
427     int priority,
428     map<hobject_t, read_request_t> &to_read,
429     OpRequestRef op,
430     bool do_redundant_reads, bool for_recovery);
431
432   void do_read_op(ReadOp &rop);
433   int send_all_remaining_reads(
434     const hobject_t &hoid,
435     ReadOp &rop);
436
437
438   /**
439    * Client writes
440    *
441    * ECTransaction is responsible for generating a transaction for
442    * each shard to which we need to send the write.  As required
443    * by the PGBackend interface, the ECBackend write mechanism
444    * passes trim information with the write and last_complete back
445    * with the reply.
446    *
447    * As with client reads, there is a possibility of out-of-order
448    * completions. Thus, callbacks and completion are called in order
449    * on the writing list.
450    */
451   struct Op : boost::intrusive::list_base_hook<> {
452     /// From submit_transaction caller, decribes operation
453     hobject_t hoid;
454     object_stat_sum_t delta_stats;
455     eversion_t version;
456     eversion_t trim_to;
457     boost::optional<pg_hit_set_history_t> updated_hit_set_history;
458     vector<pg_log_entry_t> log_entries;
459     ceph_tid_t tid;
460     osd_reqid_t reqid;
461     ZTracer::Trace trace;
462
463     eversion_t roll_forward_to; /// Soon to be generated internally
464
465     /// Ancillary also provided from submit_transaction caller
466     map<hobject_t, ObjectContextRef> obc_map;
467
468     /// see call_write_ordered
469     std::list<std::function<void(void)> > on_write;
470
471     /// Generated internally
472     set<hobject_t> temp_added;
473     set<hobject_t> temp_cleared;
474
475     ECTransaction::WritePlan plan;
476     bool requires_rmw() const { return !plan.to_read.empty(); }
477     bool invalidates_cache() const { return plan.invalidates_cache; }
478
479     // must be true if requires_rmw(), must be false if invalidates_cache()
480     bool using_cache = false;
481
482     /// In progress read state;
483     map<hobject_t,extent_set> pending_read; // subset already being read
484     map<hobject_t,extent_set> remote_read;  // subset we must read
485     map<hobject_t,extent_map> remote_read_result;
486     bool read_in_progress() const {
487       return !remote_read.empty() && remote_read_result.empty();
488     }
489
490     /// In progress write state
491     set<pg_shard_t> pending_commit;
492     set<pg_shard_t> pending_apply;
493     bool write_in_progress() const {
494       return !pending_commit.empty() || !pending_apply.empty();
495     }
496
497     /// optional, may be null, for tracking purposes
498     OpRequestRef client_op;
499
500     /// pin for cache
501     ExtentCache::write_pin pin;
502
503     /// Callbacks
504     Context *on_local_applied_sync = nullptr;
505     Context *on_all_applied = nullptr;
506     Context *on_all_commit = nullptr;
507     ~Op() {
508       delete on_local_applied_sync;
509       delete on_all_applied;
510       delete on_all_commit;
511     }
512   };
513   using op_list = boost::intrusive::list<Op>;
514   friend ostream &operator<<(ostream &lhs, const Op &rhs);
515
516   ExtentCache cache;
517   map<ceph_tid_t, Op> tid_to_op_map; /// Owns Op structure
518
519   /**
520    * We model the possible rmw states as a set of waitlists.
521    * All writes at this time complete in order, so a write blocked
522    * at waiting_state blocks all writes behind it as well (same for
523    * other states).
524    *
525    * Future work: We can break this up into a per-object pipeline
526    * (almost).  First, provide an ordering token to submit_transaction
527    * and require that all operations within a single transaction take
528    * place on a subset of hobject_t space partitioned by that token
529    * (the hashid seem about right to me -- even works for temp objects
530    * if you recall that a temp object created for object head foo will
531    * only ever be referenced by other transactions on foo and aren't
532    * reused).  Next, factor this part into a class and maintain one per
533    * ordering token.  Next, fixup PrimaryLogPG's repop queue to be
534    * partitioned by ordering token.  Finally, refactor the op pipeline
535    * so that the log entries passed into submit_tranaction aren't
536    * versioned.  We can't assign versions to them until we actually
537    * submit the operation.  That's probably going to be the hard part.
538    */
539   class pipeline_state_t {
540     enum {
541       CACHE_VALID = 0,
542       CACHE_INVALID = 1
543     } pipeline_state = CACHE_VALID;
544   public:
545     bool caching_enabled() const {
546       return pipeline_state == CACHE_VALID;
547     }
548     bool cache_invalid() const {
549       return !caching_enabled();
550     }
551     void invalidate() {
552       pipeline_state = CACHE_INVALID;
553     }
554     void clear() {
555       pipeline_state = CACHE_VALID;
556     }
557     friend ostream &operator<<(ostream &lhs, const pipeline_state_t &rhs);
558   } pipeline_state;
559
560
561   op_list waiting_state;        /// writes waiting on pipe_state
562   op_list waiting_reads;        /// writes waiting on partial stripe reads
563   op_list waiting_commit;       /// writes waiting on initial commit
564   eversion_t completed_to;
565   eversion_t committed_to;
566   void start_rmw(Op *op, PGTransactionUPtr &&t);
567   bool try_state_to_reads();
568   bool try_reads_to_commit();
569   bool try_finish_rmw();
570   void check_ops();
571
572   ErasureCodeInterfaceRef ec_impl;
573
574
575   /**
576    * ECRecPred
577    *
578    * Determines the whether _have is suffient to recover an object
579    */
580   class ECRecPred : public IsPGRecoverablePredicate {
581     set<int> want;
582     ErasureCodeInterfaceRef ec_impl;
583   public:
584     explicit ECRecPred(ErasureCodeInterfaceRef ec_impl) : ec_impl(ec_impl) {
585       for (unsigned i = 0; i < ec_impl->get_chunk_count(); ++i) {
586         want.insert(i);
587       }
588     }
589     bool operator()(const set<pg_shard_t> &_have) const override {
590       set<int> have;
591       for (set<pg_shard_t>::const_iterator i = _have.begin();
592            i != _have.end();
593            ++i) {
594         have.insert(i->shard);
595       }
596       set<int> min;
597       return ec_impl->minimum_to_decode(want, have, &min) == 0;
598     }
599   };
600   IsPGRecoverablePredicate *get_is_recoverable_predicate() override {
601     return new ECRecPred(ec_impl);
602   }
603
604   /**
605    * ECReadPred
606    *
607    * Determines the whether _have is suffient to read an object
608    */
609   class ECReadPred : public IsPGReadablePredicate {
610     pg_shard_t whoami;
611     ECRecPred rec_pred;
612   public:
613     ECReadPred(
614       pg_shard_t whoami,
615       ErasureCodeInterfaceRef ec_impl) : whoami(whoami), rec_pred(ec_impl) {}
616     bool operator()(const set<pg_shard_t> &_have) const override {
617       return _have.count(whoami) && rec_pred(_have);
618     }
619   };
620   IsPGReadablePredicate *get_is_readable_predicate() override {
621     return new ECReadPred(get_parent()->whoami_shard(), ec_impl);
622   }
623
624
625   const ECUtil::stripe_info_t sinfo;
626   /// If modified, ensure that the ref is held until the update is applied
627   SharedPtrRegistry<hobject_t, ECUtil::HashInfo> unstable_hashinfo_registry;
628   ECUtil::HashInfoRef get_hash_info(const hobject_t &hoid, bool checks = true,
629                                     const map<string,bufferptr> *attr = NULL);
630
631 public:
632   ECBackend(
633     PGBackend::Listener *pg,
634     coll_t coll,
635     ObjectStore::CollectionHandle &ch,
636     ObjectStore *store,
637     CephContext *cct,
638     ErasureCodeInterfaceRef ec_impl,
639     uint64_t stripe_width);
640
641   /// Returns to_read replicas sufficient to reconstruct want
642   int get_min_avail_to_read_shards(
643     const hobject_t &hoid,     ///< [in] object
644     const set<int> &want,      ///< [in] desired shards
645     bool for_recovery,         ///< [in] true if we may use non-acting replicas
646     bool do_redundant_reads,   ///< [in] true if we want to issue redundant reads to reduce latency
647     set<pg_shard_t> *to_read   ///< [out] shards to read
648     ); ///< @return error code, 0 on success
649
650   int get_remaining_shards(
651     const hobject_t &hoid,
652     const set<int> &avail,
653     set<pg_shard_t> *to_read);
654
655   int objects_get_attrs(
656     const hobject_t &hoid,
657     map<string, bufferlist> *out) override;
658
659   void rollback_append(
660     const hobject_t &hoid,
661     uint64_t old_size,
662     ObjectStore::Transaction *t) override;
663
664   bool scrub_supported() override { return true; }
665   bool auto_repair_supported() const override { return true; }
666
667   void be_deep_scrub(
668     const hobject_t &obj,
669     uint32_t seed,
670     ScrubMap::object &o,
671     ThreadPool::TPHandle &handle) override;
672   uint64_t be_get_ondisk_size(uint64_t logical_size) override {
673     return sinfo.logical_to_next_chunk_offset(logical_size);
674   }
675   void _failed_push(const hobject_t &hoid,
676     pair<RecoveryMessages *, ECBackend::read_result_t &> &in);
677 };
678 ostream &operator<<(ostream &lhs, const ECBackend::pipeline_state_t &rhs);
679
680 #endif