initial code repo
[stor4nfv.git] / src / ceph / src / osd / ReplicatedBackend.h
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
3 /*
4  * Ceph - scalable distributed file system
5  *
6  * Copyright (C) 2013 Inktank Storage, Inc.
7  *
8  * This is free software; you can redistribute it and/or
9  * modify it under the terms of the GNU Lesser General Public
10  * License version 2.1, as published by the Free Software
11  * Foundation.  See file COPYING.
12  *
13  */
14
15 #ifndef REPBACKEND_H
16 #define REPBACKEND_H
17
18 #include "OSD.h"
19 #include "PGBackend.h"
20 #include "include/memory.h"
21
22 struct C_ReplicatedBackend_OnPullComplete;
23 class ReplicatedBackend : public PGBackend {
24   struct RPGHandle : public PGBackend::RecoveryHandle {
25     map<pg_shard_t, vector<PushOp> > pushes;
26     map<pg_shard_t, vector<PullOp> > pulls;
27   };
28   friend struct C_ReplicatedBackend_OnPullComplete;
29 public:
30   ReplicatedBackend(
31     PGBackend::Listener *pg,
32     coll_t coll,
33     ObjectStore::CollectionHandle &ch,
34     ObjectStore *store,
35     CephContext *cct);
36
37   /// @see PGBackend::open_recovery_op
38   RPGHandle *_open_recovery_op() {
39     return new RPGHandle();
40   }
41   PGBackend::RecoveryHandle *open_recovery_op() override {
42     return _open_recovery_op();
43   }
44
45   /// @see PGBackend::run_recovery_op
46   void run_recovery_op(
47     PGBackend::RecoveryHandle *h,
48     int priority) override;
49
50   /// @see PGBackend::recover_object
51   int recover_object(
52     const hobject_t &hoid,
53     eversion_t v,
54     ObjectContextRef head,
55     ObjectContextRef obc,
56     RecoveryHandle *h
57     ) override;
58
59   void check_recovery_sources(const OSDMapRef& osdmap) override;
60
61   bool can_handle_while_inactive(OpRequestRef op) override;
62
63   /// @see PGBackend::handle_message
64   bool _handle_message(
65     OpRequestRef op
66     ) override;
67
68   void on_change() override;
69   void clear_recovery_state() override;
70   void on_flushed() override;
71
72   class RPCRecPred : public IsPGRecoverablePredicate {
73   public:
74     bool operator()(const set<pg_shard_t> &have) const override {
75       return !have.empty();
76     }
77   };
78   IsPGRecoverablePredicate *get_is_recoverable_predicate() override {
79     return new RPCRecPred;
80   }
81
82   class RPCReadPred : public IsPGReadablePredicate {
83     pg_shard_t whoami;
84   public:
85     explicit RPCReadPred(pg_shard_t whoami) : whoami(whoami) {}
86     bool operator()(const set<pg_shard_t> &have) const override {
87       return have.count(whoami);
88     }
89   };
90   IsPGReadablePredicate *get_is_readable_predicate() override {
91     return new RPCReadPred(get_parent()->whoami_shard());
92   }
93
94   void dump_recovery_info(Formatter *f) const override {
95     {
96       f->open_array_section("pull_from_peer");
97       for (map<pg_shard_t, set<hobject_t> >::const_iterator i = pull_from_peer.begin();
98            i != pull_from_peer.end();
99            ++i) {
100         f->open_object_section("pulling_from");
101         f->dump_stream("pull_from") << i->first;
102         {
103           f->open_array_section("pulls");
104           for (set<hobject_t>::const_iterator j = i->second.begin();
105                j != i->second.end();
106                ++j) {
107             f->open_object_section("pull_info");
108             assert(pulling.count(*j));
109             pulling.find(*j)->second.dump(f);
110             f->close_section();
111           }
112           f->close_section();
113         }
114         f->close_section();
115       }
116       f->close_section();
117     }
118     {
119       f->open_array_section("pushing");
120       for (map<hobject_t, map<pg_shard_t, PushInfo>>::const_iterator i =
121              pushing.begin();
122            i != pushing.end();
123            ++i) {
124         f->open_object_section("object");
125         f->dump_stream("pushing") << i->first;
126         {
127           f->open_array_section("pushing_to");
128           for (map<pg_shard_t, PushInfo>::const_iterator j = i->second.begin();
129                j != i->second.end();
130                ++j) {
131             f->open_object_section("push_progress");
132             f->dump_stream("pushing_to") << j->first;
133             {
134               f->open_object_section("push_info");
135               j->second.dump(f);
136               f->close_section();
137             }
138             f->close_section();
139           }
140           f->close_section();
141         }
142         f->close_section();
143       }
144       f->close_section();
145     }
146   }
147
148   int objects_read_sync(
149     const hobject_t &hoid,
150     uint64_t off,
151     uint64_t len,
152     uint32_t op_flags,
153     bufferlist *bl) override;
154
155   void objects_read_async(
156     const hobject_t &hoid,
157     const list<pair<boost::tuple<uint64_t, uint64_t, uint32_t>,
158                pair<bufferlist*, Context*> > > &to_read,
159                Context *on_complete,
160                bool fast_read = false) override;
161
162 private:
163   // push
164   struct PushInfo {
165     ObjectRecoveryProgress recovery_progress;
166     ObjectRecoveryInfo recovery_info;
167     ObjectContextRef obc;
168     object_stat_sum_t stat;
169     ObcLockManager lock_manager;
170
171     void dump(Formatter *f) const {
172       {
173         f->open_object_section("recovery_progress");
174         recovery_progress.dump(f);
175         f->close_section();
176       }
177       {
178         f->open_object_section("recovery_info");
179         recovery_info.dump(f);
180         f->close_section();
181       }
182     }
183   };
184   map<hobject_t, map<pg_shard_t, PushInfo>> pushing;
185
186   // pull
187   struct PullInfo {
188     pg_shard_t from;
189     hobject_t soid;
190     ObjectRecoveryProgress recovery_progress;
191     ObjectRecoveryInfo recovery_info;
192     ObjectContextRef head_ctx;
193     ObjectContextRef obc;
194     object_stat_sum_t stat;
195     bool cache_dont_need;
196     ObcLockManager lock_manager;
197
198     void dump(Formatter *f) const {
199       {
200         f->open_object_section("recovery_progress");
201         recovery_progress.dump(f);
202         f->close_section();
203       }
204       {
205         f->open_object_section("recovery_info");
206         recovery_info.dump(f);
207         f->close_section();
208       }
209     }
210
211     bool is_complete() const {
212       return recovery_progress.is_complete(recovery_info);
213     }
214   };
215
216   map<hobject_t, PullInfo> pulling;
217
218   // Reverse mapping from osd peer to objects beging pulled from that peer
219   map<pg_shard_t, set<hobject_t> > pull_from_peer;
220   void clear_pull(
221     map<hobject_t, PullInfo>::iterator piter,
222     bool clear_pull_from_peer = true);
223   void clear_pull_from(
224     map<hobject_t, PullInfo>::iterator piter);
225
226   void _do_push(OpRequestRef op);
227   void _do_pull_response(OpRequestRef op);
228   void do_push(OpRequestRef op) {
229     if (is_primary()) {
230       _do_pull_response(op);
231     } else {
232       _do_push(op);
233     }
234   }
235   void do_pull(OpRequestRef op);
236   void do_push_reply(OpRequestRef op);
237
238   bool handle_push_reply(pg_shard_t peer, const PushReplyOp &op, PushOp *reply);
239   void handle_pull(pg_shard_t peer, PullOp &op, PushOp *reply);
240
241   struct pull_complete_info {
242     hobject_t hoid;
243     object_stat_sum_t stat;
244   };
245   bool handle_pull_response(
246     pg_shard_t from, const PushOp &op, PullOp *response,
247     list<pull_complete_info> *to_continue,
248     ObjectStore::Transaction *t);
249   void handle_push(pg_shard_t from, const PushOp &op, PushReplyOp *response,
250                    ObjectStore::Transaction *t);
251
252   static void trim_pushed_data(const interval_set<uint64_t> &copy_subset,
253                                const interval_set<uint64_t> &intervals_received,
254                                bufferlist data_received,
255                                interval_set<uint64_t> *intervals_usable,
256                                bufferlist *data_usable);
257   void _failed_pull(pg_shard_t from, const hobject_t &soid);
258
259   void send_pushes(int prio, map<pg_shard_t, vector<PushOp> > &pushes);
260   void prep_push_op_blank(const hobject_t& soid, PushOp *op);
261   void send_pulls(
262     int priority,
263     map<pg_shard_t, vector<PullOp> > &pulls);
264
265   int build_push_op(const ObjectRecoveryInfo &recovery_info,
266                     const ObjectRecoveryProgress &progress,
267                     ObjectRecoveryProgress *out_progress,
268                     PushOp *out_op,
269                     object_stat_sum_t *stat = 0,
270                     bool cache_dont_need = true);
271   void submit_push_data(const ObjectRecoveryInfo &recovery_info,
272                         bool first,
273                         bool complete,
274                         bool cache_dont_need,
275                         const interval_set<uint64_t> &intervals_included,
276                         bufferlist data_included,
277                         bufferlist omap_header,
278                         const map<string, bufferlist> &attrs,
279                         const map<string, bufferlist> &omap_entries,
280                         ObjectStore::Transaction *t);
281   void submit_push_complete(const ObjectRecoveryInfo &recovery_info,
282                             ObjectStore::Transaction *t);
283
284   void calc_clone_subsets(
285     SnapSet& snapset, const hobject_t& poid, const pg_missing_t& missing,
286     const hobject_t &last_backfill,
287     interval_set<uint64_t>& data_subset,
288     map<hobject_t, interval_set<uint64_t>>& clone_subsets,
289     ObcLockManager &lock_manager);
290   void prepare_pull(
291     eversion_t v,
292     const hobject_t& soid,
293     ObjectContextRef headctx,
294     RPGHandle *h);
295   int start_pushes(
296     const hobject_t &soid,
297     ObjectContextRef obj,
298     RPGHandle *h);
299   int prep_push_to_replica(
300     ObjectContextRef obc, const hobject_t& soid, pg_shard_t peer,
301     PushOp *pop, bool cache_dont_need = true);
302   int prep_push(
303     ObjectContextRef obc,
304     const hobject_t& oid, pg_shard_t dest,
305     PushOp *op,
306     bool cache_dont_need);
307   int prep_push(
308     ObjectContextRef obc,
309     const hobject_t& soid, pg_shard_t peer,
310     eversion_t version,
311     interval_set<uint64_t> &data_subset,
312     map<hobject_t, interval_set<uint64_t>>& clone_subsets,
313     PushOp *op,
314     bool cache,
315     ObcLockManager &&lock_manager);
316   void calc_head_subsets(
317     ObjectContextRef obc, SnapSet& snapset, const hobject_t& head,
318     const pg_missing_t& missing,
319     const hobject_t &last_backfill,
320     interval_set<uint64_t>& data_subset,
321     map<hobject_t, interval_set<uint64_t>>& clone_subsets,
322     ObcLockManager &lock_manager);
323   ObjectRecoveryInfo recalc_subsets(
324     const ObjectRecoveryInfo& recovery_info,
325     SnapSetContext *ssc,
326     ObcLockManager &lock_manager);
327
328   /**
329    * Client IO
330    */
331   struct InProgressOp {
332     ceph_tid_t tid;
333     set<pg_shard_t> waiting_for_commit;
334     set<pg_shard_t> waiting_for_applied;
335     Context *on_commit;
336     Context *on_applied;
337     OpRequestRef op;
338     eversion_t v;
339     InProgressOp(
340       ceph_tid_t tid, Context *on_commit, Context *on_applied,
341       OpRequestRef op, eversion_t v)
342       : tid(tid), on_commit(on_commit), on_applied(on_applied),
343         op(op), v(v) {}
344     bool done() const {
345       return waiting_for_commit.empty() &&
346         waiting_for_applied.empty();
347     }
348   };
349   map<ceph_tid_t, InProgressOp> in_progress_ops;
350 public:
351   friend class C_OSD_OnOpCommit;
352   friend class C_OSD_OnOpApplied;
353
354   void call_write_ordered(std::function<void(void)> &&cb) override {
355     // ReplicatedBackend submits writes inline in submit_transaction, so
356     // we can just call the callback.
357     cb();
358   }
359
360   void submit_transaction(
361     const hobject_t &hoid,
362     const object_stat_sum_t &delta_stats,
363     const eversion_t &at_version,
364     PGTransactionUPtr &&t,
365     const eversion_t &trim_to,
366     const eversion_t &roll_forward_to,
367     const vector<pg_log_entry_t> &log_entries,
368     boost::optional<pg_hit_set_history_t> &hset_history,
369     Context *on_local_applied_sync,
370     Context *on_all_applied,
371     Context *on_all_commit,
372     ceph_tid_t tid,
373     osd_reqid_t reqid,
374     OpRequestRef op
375     ) override;
376
377 private:
378   Message * generate_subop(
379     const hobject_t &soid,
380     const eversion_t &at_version,
381     ceph_tid_t tid,
382     osd_reqid_t reqid,
383     eversion_t pg_trim_to,
384     eversion_t pg_roll_forward_to,
385     hobject_t new_temp_oid,
386     hobject_t discard_temp_oid,
387     const vector<pg_log_entry_t> &log_entries,
388     boost::optional<pg_hit_set_history_t> &hset_history,
389     ObjectStore::Transaction &op_t,
390     pg_shard_t peer,
391     const pg_info_t &pinfo);
392   void issue_op(
393     const hobject_t &soid,
394     const eversion_t &at_version,
395     ceph_tid_t tid,
396     osd_reqid_t reqid,
397     eversion_t pg_trim_to,
398     eversion_t pg_roll_forward_to,
399     hobject_t new_temp_oid,
400     hobject_t discard_temp_oid,
401     const vector<pg_log_entry_t> &log_entries,
402     boost::optional<pg_hit_set_history_t> &hset_history,
403     InProgressOp *op,
404     ObjectStore::Transaction &op_t);
405   void op_applied(InProgressOp *op);
406   void op_commit(InProgressOp *op);
407   void do_repop_reply(OpRequestRef op);
408   void do_repop(OpRequestRef op);
409
410   struct RepModify {
411     OpRequestRef op;
412     bool applied, committed;
413     int ackerosd;
414     eversion_t last_complete;
415     epoch_t epoch_started;
416
417     ObjectStore::Transaction opt, localt;
418     
419     RepModify() : applied(false), committed(false), ackerosd(-1),
420                   epoch_started(0) {}
421   };
422   typedef ceph::shared_ptr<RepModify> RepModifyRef;
423
424   struct C_OSD_RepModifyApply;
425   struct C_OSD_RepModifyCommit;
426
427   void repop_applied(RepModifyRef rm);
428   void repop_commit(RepModifyRef rm);
429   bool scrub_supported() override { return true; }
430   bool auto_repair_supported() const override { return false; }
431
432
433   void be_deep_scrub(
434     const hobject_t &obj,
435     uint32_t seed,
436     ScrubMap::object &o,
437     ThreadPool::TPHandle &handle) override;
438   uint64_t be_get_ondisk_size(uint64_t logical_size) override { return logical_size; }
439 };
440
441 #endif