Fix some bugs when testing opensds ansible
[stor4nfv.git] / src / ceph / src / osd / PrimaryLogPG.h
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- 
2 /*
3  * Ceph - scalable distributed file system
4  *
5  * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
6  * Copyright (C) 2013 Cloudwatt <libre.licensing@cloudwatt.com>
7  *
8  * Author: Loic Dachary <loic@dachary.org>
9  *
10  * This is free software; you can redistribute it and/or
11  * modify it under the terms of the GNU Lesser General Public
12  * License version 2.1, as published by the Free Software 
13  * Foundation.  See file COPYING.
14  * 
15  */
16
17 #ifndef CEPH_REPLICATEDPG_H
18 #define CEPH_REPLICATEDPG_H
19
20 #include <boost/tuple/tuple.hpp>
21 #include "include/assert.h" 
22 #include "PG.h"
23 #include "Watch.h"
24 #include "TierAgentState.h"
25 #include "messages/MOSDOpReply.h"
26 #include "common/Checksummer.h"
27 #include "common/sharedptr_registry.hpp"
28 #include "ReplicatedBackend.h"
29 #include "PGTransaction.h"
30
31 class CopyFromCallback;
32 class PromoteCallback;
33
34 class PrimaryLogPG;
35 class PGLSFilter;
36 class HitSet;
37 struct TierAgentState;
38 class MOSDOp;
39 class MOSDOpReply;
40 class OSDService;
41
42 void intrusive_ptr_add_ref(PrimaryLogPG *pg);
43 void intrusive_ptr_release(PrimaryLogPG *pg);
44 uint64_t get_with_id(PrimaryLogPG *pg);
45 void put_with_id(PrimaryLogPG *pg, uint64_t id);
46
47 #ifdef PG_DEBUG_REFS
48   typedef TrackedIntPtr<PrimaryLogPG> PrimaryLogPGRef;
49 #else
50   typedef boost::intrusive_ptr<PrimaryLogPG> PrimaryLogPGRef;
51 #endif
52
53 struct inconsistent_snapset_wrapper;
54
55 class PrimaryLogPG : public PG, public PGBackend::Listener {
56   friend class OSD;
57   friend class Watch;
58
59 public:
60   MEMPOOL_CLASS_HELPERS();
61
62   /*
63    * state associated with a copy operation
64    */
65   struct OpContext;
66   class CopyCallback;
67
68   /**
69    * CopyResults stores the object metadata of interest to a copy initiator.
70    */
71   struct CopyResults {
72     ceph::real_time mtime; ///< the copy source's mtime
73     uint64_t object_size; ///< the copied object's size
74     bool started_temp_obj; ///< true if the callback needs to delete temp object
75     hobject_t temp_oid;    ///< temp object (if any)
76
77     /**
78      * Function to fill in transaction; if non-empty the callback
79      * must execute it before any other accesses to the object
80      * (in order to complete the copy).
81      */
82     std::function<void(PGTransaction *)> fill_in_final_tx;
83
84     version_t user_version; ///< The copy source's user version
85     bool should_requeue;  ///< op should be requeued on cancel
86     vector<snapid_t> snaps;  ///< src's snaps (if clone)
87     snapid_t snap_seq;       ///< src's snap_seq (if head)
88     librados::snap_set_t snapset; ///< src snapset (if head)
89     bool mirror_snapset;
90     bool has_omap;
91     uint32_t flags;    // object_copy_data_t::FLAG_*
92     uint32_t source_data_digest, source_omap_digest;
93     uint32_t data_digest, omap_digest;
94     mempool::osd_pglog::vector<pair<osd_reqid_t, version_t> > reqids; // [(reqid, user_version)]
95     map<string, bufferlist> attrs; // xattrs
96     uint64_t truncate_seq;
97     uint64_t truncate_size;
98     bool is_data_digest() {
99       return flags & object_copy_data_t::FLAG_DATA_DIGEST;
100     }
101     bool is_omap_digest() {
102       return flags & object_copy_data_t::FLAG_OMAP_DIGEST;
103     }
104     CopyResults()
105       : object_size(0), started_temp_obj(false),
106         user_version(0),
107         should_requeue(false), mirror_snapset(false),
108         has_omap(false),
109         flags(0),
110         source_data_digest(-1), source_omap_digest(-1),
111         data_digest(-1), omap_digest(-1),
112         truncate_seq(0), truncate_size(0)
113     {}
114   };
115
116   struct CopyOp {
117     CopyCallback *cb;
118     ObjectContextRef obc;
119     hobject_t src;
120     object_locator_t oloc;
121     unsigned flags;
122     bool mirror_snapset;
123
124     CopyResults results;
125
126     ceph_tid_t objecter_tid;
127     ceph_tid_t objecter_tid2;
128
129     object_copy_cursor_t cursor;
130     map<string,bufferlist> attrs;
131     bufferlist data;
132     bufferlist omap_header;
133     bufferlist omap_data;
134     int rval;
135
136     object_copy_cursor_t temp_cursor;
137
138     /*
139      * For CopyOp the process is:
140      * step1: read the data(attr/omap/data) from the source object
141      * step2: handle those data(w/ those data create a new object)
142      * src_obj_fadvise_flags used in step1;
143      * dest_obj_fadvise_flags used in step2
144      */
145     unsigned src_obj_fadvise_flags;
146     unsigned dest_obj_fadvise_flags;
147
148     CopyOp(CopyCallback *cb_, ObjectContextRef _obc, hobject_t s,
149            object_locator_t l,
150            version_t v,
151            unsigned f,
152            bool ms,
153            unsigned src_obj_fadvise_flags,
154            unsigned dest_obj_fadvise_flags)
155       : cb(cb_), obc(_obc), src(s), oloc(l), flags(f),
156         mirror_snapset(ms),
157         objecter_tid(0),
158         objecter_tid2(0),
159         rval(-1),
160         src_obj_fadvise_flags(src_obj_fadvise_flags),
161         dest_obj_fadvise_flags(dest_obj_fadvise_flags)
162     {
163       results.user_version = v;
164       results.mirror_snapset = mirror_snapset;
165     }
166   };
167   typedef ceph::shared_ptr<CopyOp> CopyOpRef;
168
169   /**
170    * The CopyCallback class defines an interface for completions to the
171    * copy_start code. Users of the copy infrastructure must implement
172    * one and give an instance of the class to start_copy.
173    *
174    * The implementer is responsible for making sure that the CopyCallback
175    * can associate itself with the correct copy operation.
176    */
177   typedef boost::tuple<int, CopyResults*> CopyCallbackResults;
178
179   friend class CopyFromCallback;
180   friend class CopyFromFinisher;
181   friend class PromoteCallback;
182
183   struct ProxyReadOp {
184     OpRequestRef op;
185     hobject_t soid;
186     ceph_tid_t objecter_tid;
187     vector<OSDOp> &ops;
188     version_t user_version;
189     int data_offset;
190     bool canceled;              ///< true if canceled
191
192     ProxyReadOp(OpRequestRef _op, hobject_t oid, vector<OSDOp>& _ops)
193       : op(_op), soid(oid),
194         objecter_tid(0), ops(_ops),
195         user_version(0), data_offset(0),
196         canceled(false) { }
197   };
198   typedef ceph::shared_ptr<ProxyReadOp> ProxyReadOpRef;
199
200   struct ProxyWriteOp {
201     OpContext *ctx;
202     OpRequestRef op;
203     hobject_t soid;
204     ceph_tid_t objecter_tid;
205     vector<OSDOp> &ops;
206     version_t user_version;
207     bool sent_reply;
208     utime_t mtime;
209     bool canceled;
210     osd_reqid_t reqid;
211
212     ProxyWriteOp(OpRequestRef _op, hobject_t oid, vector<OSDOp>& _ops, osd_reqid_t _reqid)
213       : ctx(NULL), op(_op), soid(oid),
214         objecter_tid(0), ops(_ops),
215         user_version(0), sent_reply(false),
216         canceled(false),
217         reqid(_reqid) { }
218   };
219   typedef ceph::shared_ptr<ProxyWriteOp> ProxyWriteOpRef;
220
221   struct FlushOp {
222     ObjectContextRef obc;       ///< obc we are flushing
223     OpRequestRef op;            ///< initiating op
224     list<OpRequestRef> dup_ops; ///< bandwagon jumpers
225     version_t flushed_version;  ///< user version we are flushing
226     ceph_tid_t objecter_tid;    ///< copy-from request tid
227     int rval;                   ///< copy-from result
228     bool blocking;              ///< whether we are blocking updates
229     bool removal;               ///< we are removing the backend object
230     boost::optional<std::function<void()>> on_flush; ///< callback, may be null
231
232     FlushOp()
233       : flushed_version(0), objecter_tid(0), rval(0),
234         blocking(false), removal(false) {}
235     ~FlushOp() { assert(!on_flush); }
236   };
237   typedef ceph::shared_ptr<FlushOp> FlushOpRef;
238
239   boost::scoped_ptr<PGBackend> pgbackend;
240   PGBackend *get_pgbackend() override {
241     return pgbackend.get();
242   }
243
244   /// Listener methods
245   DoutPrefixProvider *get_dpp() override {
246     return this;
247   }
248
249   void on_local_recover(
250     const hobject_t &oid,
251     const ObjectRecoveryInfo &recovery_info,
252     ObjectContextRef obc,
253     bool is_delete,
254     ObjectStore::Transaction *t
255     ) override;
256   void on_peer_recover(
257     pg_shard_t peer,
258     const hobject_t &oid,
259     const ObjectRecoveryInfo &recovery_info
260     ) override;
261   void begin_peer_recover(
262     pg_shard_t peer,
263     const hobject_t oid) override;
264   void on_global_recover(
265     const hobject_t &oid,
266     const object_stat_sum_t &stat_diff,
267     bool is_delete) override;
268   void failed_push(const list<pg_shard_t> &from, const hobject_t &soid) override;
269   void primary_failed(const hobject_t &soid) override;
270   bool primary_error(const hobject_t& soid, eversion_t v) override;
271   void cancel_pull(const hobject_t &soid) override;
272   void apply_stats(
273     const hobject_t &soid,
274     const object_stat_sum_t &delta_stats) override;
275   void on_primary_error(const hobject_t &oid, eversion_t v) override;
276   void remove_missing_object(const hobject_t &oid,
277                              eversion_t v,
278                              Context *on_complete) override;
279
280   template<class T> class BlessedGenContext;
281   class BlessedContext;
282   Context *bless_context(Context *c) override;
283
284   GenContext<ThreadPool::TPHandle&> *bless_gencontext(
285     GenContext<ThreadPool::TPHandle&> *c) override;
286     
287   void send_message(int to_osd, Message *m) override {
288     osd->send_message_osd_cluster(to_osd, m, get_osdmap()->get_epoch());
289   }
290   void queue_transaction(ObjectStore::Transaction&& t,
291                          OpRequestRef op) override {
292     osd->store->queue_transaction(osr.get(), std::move(t), 0, 0, 0, op);
293   }
294   void queue_transactions(vector<ObjectStore::Transaction>& tls,
295                           OpRequestRef op) override {
296     osd->store->queue_transactions(osr.get(), tls, 0, 0, 0, op, NULL);
297   }
298   epoch_t get_epoch() const override {
299     return get_osdmap()->get_epoch();
300   }
301   epoch_t get_interval_start_epoch() const override {
302     return info.history.same_interval_since;
303   }
304   epoch_t get_last_peering_reset_epoch() const override {
305     return get_last_peering_reset();
306   }
307   const set<pg_shard_t> &get_actingbackfill_shards() const override {
308     return actingbackfill;
309   }
310   const set<pg_shard_t> &get_acting_shards() const override {
311     return actingset;
312   }
313   const set<pg_shard_t> &get_backfill_shards() const override {
314     return backfill_targets;
315   }
316
317   std::string gen_dbg_prefix() const override { return gen_prefix(); }
318   
319   const map<hobject_t, set<pg_shard_t>>
320     &get_missing_loc_shards() const override {
321     return missing_loc.get_missing_locs();
322   }
323   const map<pg_shard_t, pg_missing_t> &get_shard_missing() const override {
324     return peer_missing;
325   }
326   using PGBackend::Listener::get_shard_missing;
327   const map<pg_shard_t, pg_info_t> &get_shard_info() const override {
328     return peer_info;
329   }
330   using PGBackend::Listener::get_shard_info;  
331   const pg_missing_tracker_t &get_local_missing() const override {
332     return pg_log.get_missing();
333   }
334   const PGLog &get_log() const override {
335     return pg_log;
336   }
337   bool pgb_is_primary() const override {
338     return is_primary();
339   }
340   OSDMapRef pgb_get_osdmap() const override {
341     return get_osdmap();
342   }
343   const pg_info_t &get_info() const override {
344     return info;
345   }
346   const pg_pool_t &get_pool() const override {
347     return pool.info;
348   }
349
350   ObjectContextRef get_obc(
351     const hobject_t &hoid,
352     const map<string, bufferlist> &attrs) override {
353     return get_object_context(hoid, true, &attrs);
354   }
355
356   bool try_lock_for_read(
357     const hobject_t &hoid,
358     ObcLockManager &manager) override {
359     if (is_missing_object(hoid))
360       return false;
361     auto obc = get_object_context(hoid, false, nullptr);
362     if (!obc)
363       return false;
364     return manager.try_get_read_lock(hoid, obc);
365   }
366
367   void release_locks(ObcLockManager &manager) override {
368     release_object_locks(manager);
369   }
370
371   void pgb_set_object_snap_mapping(
372     const hobject_t &soid,
373     const set<snapid_t> &snaps,
374     ObjectStore::Transaction *t) override {
375     return update_object_snap_mapping(t, soid, snaps);
376   }
377   void pgb_clear_object_snap_mapping(
378     const hobject_t &soid,
379     ObjectStore::Transaction *t) override {
380     return clear_object_snap_mapping(t, soid);
381   }
382
383   void log_operation(
384     const vector<pg_log_entry_t> &logv,
385     const boost::optional<pg_hit_set_history_t> &hset_history,
386     const eversion_t &trim_to,
387     const eversion_t &roll_forward_to,
388     bool transaction_applied,
389     ObjectStore::Transaction &t) override {
390     if (hset_history) {
391       info.hit_set = *hset_history;
392     }
393     append_log(logv, trim_to, roll_forward_to, t, transaction_applied);
394   }
395
396   struct C_OSD_OnApplied;
397   void op_applied(
398     const eversion_t &applied_version) override;
399
400   bool should_send_op(
401     pg_shard_t peer,
402     const hobject_t &hoid) override {
403     if (peer == get_primary())
404       return true;
405     assert(peer_info.count(peer));
406     bool should_send =
407       hoid.pool != (int64_t)info.pgid.pool() ||
408       hoid <= last_backfill_started ||
409       hoid <= peer_info[peer].last_backfill;
410     if (!should_send)
411       assert(is_backfill_targets(peer));
412     return should_send;
413   }
414   
415   void update_peer_last_complete_ondisk(
416     pg_shard_t fromosd,
417     eversion_t lcod) override {
418     peer_last_complete_ondisk[fromosd] = lcod;
419   }
420
421   void update_last_complete_ondisk(
422     eversion_t lcod) override {
423     last_complete_ondisk = lcod;
424   }
425
426   void update_stats(
427     const pg_stat_t &stat) override {
428     info.stats = stat;
429   }
430
431   void schedule_recovery_work(
432     GenContext<ThreadPool::TPHandle&> *c) override;
433
434   pg_shard_t whoami_shard() const override {
435     return pg_whoami;
436   }
437   spg_t primary_spg_t() const override {
438     return spg_t(info.pgid.pgid, primary.shard);
439   }
440   pg_shard_t primary_shard() const override {
441     return primary;
442   }
443   uint64_t min_peer_features() const override {
444     return get_min_peer_features();
445   }
446
447   void send_message_osd_cluster(
448     int peer, Message *m, epoch_t from_epoch) override;
449   void send_message_osd_cluster(
450     Message *m, Connection *con) override;
451   void send_message_osd_cluster(
452     Message *m, const ConnectionRef& con) override;
453   ConnectionRef get_con_osd_cluster(int peer, epoch_t from_epoch) override;
454   entity_name_t get_cluster_msgr_name() override {
455     return osd->get_cluster_msgr_name();
456   }
457
458   PerfCounters *get_logger() override;
459
460   ceph_tid_t get_tid() override { return osd->get_tid(); }
461
462   LogClientTemp clog_error() override { return osd->clog->error(); }
463   LogClientTemp clog_warn() override { return osd->clog->warn(); }
464
465   struct watch_disconnect_t {
466     uint64_t cookie;
467     entity_name_t name;
468     bool send_disconnect;
469     watch_disconnect_t(uint64_t c, entity_name_t n, bool sd)
470       : cookie(c), name(n), send_disconnect(sd) {}
471   };
472   void complete_disconnect_watches(
473     ObjectContextRef obc,
474     const list<watch_disconnect_t> &to_disconnect);
475
476   struct OpFinisher {
477     virtual ~OpFinisher() {
478     }
479
480     virtual int execute() = 0;
481   };
482
483   /*
484    * Capture all object state associated with an in-progress read or write.
485    */
486   struct OpContext {
487     OpRequestRef op;
488     osd_reqid_t reqid;
489     vector<OSDOp> *ops;
490
491     const ObjectState *obs; // Old objectstate
492     const SnapSet *snapset; // Old snapset
493
494     ObjectState new_obs;  // resulting ObjectState
495     SnapSet new_snapset;  // resulting SnapSet (in case of a write)
496     //pg_stat_t new_stats;  // resulting Stats
497     object_stat_sum_t delta_stats;
498
499     bool modify;          // (force) modification (even if op_t is empty)
500     bool user_modify;     // user-visible modification
501     bool undirty;         // user explicitly un-dirtying this object
502     bool cache_evict;     ///< true if this is a cache eviction
503     bool ignore_cache;    ///< true if IGNORE_CACHE flag is set
504     bool ignore_log_op_stats;  // don't log op stats
505     bool update_log_only; ///< this is a write that returned an error - just record in pg log for dup detection
506
507     // side effects
508     list<pair<watch_info_t,bool> > watch_connects; ///< new watch + will_ping flag
509     list<watch_disconnect_t> watch_disconnects; ///< old watch + send_discon
510     list<notify_info_t> notifies;
511     struct NotifyAck {
512       boost::optional<uint64_t> watch_cookie;
513       uint64_t notify_id;
514       bufferlist reply_bl;
515       explicit NotifyAck(uint64_t notify_id) : notify_id(notify_id) {}
516       NotifyAck(uint64_t notify_id, uint64_t cookie, bufferlist& rbl)
517         : watch_cookie(cookie), notify_id(notify_id) {
518         reply_bl.claim(rbl);
519       }
520     };
521     list<NotifyAck> notify_acks;
522
523     uint64_t bytes_written, bytes_read;
524
525     utime_t mtime;
526     SnapContext snapc;           // writer snap context
527     eversion_t at_version;       // pg's current version pointer
528     version_t user_at_version;   // pg's current user version pointer
529
530     int current_osd_subop_num;
531
532     PGTransactionUPtr op_t;
533     vector<pg_log_entry_t> log;
534     boost::optional<pg_hit_set_history_t> updated_hset_history;
535
536     interval_set<uint64_t> modified_ranges;
537     ObjectContextRef obc;
538     ObjectContextRef clone_obc;    // if we created a clone
539     ObjectContextRef snapset_obc;  // if we created/deleted a snapdir
540
541     // FIXME: we may want to kill this msgr hint off at some point!
542     boost::optional<int> data_off = boost::none;
543
544     MOSDOpReply *reply;
545
546     PrimaryLogPG *pg;
547
548     int num_read;    ///< count read ops
549     int num_write;   ///< count update ops
550
551     mempool::osd_pglog::vector<pair<osd_reqid_t, version_t> > extra_reqids;
552
553     hobject_t new_temp_oid, discard_temp_oid;  ///< temp objects we should start/stop tracking
554
555     list<std::function<void()>> on_applied;
556     list<std::function<void()>> on_committed;
557     list<std::function<void()>> on_finish;
558     list<std::function<void()>> on_success;
559     template <typename F>
560     void register_on_finish(F &&f) {
561       on_finish.emplace_back(std::forward<F>(f));
562     }
563     template <typename F>
564     void register_on_success(F &&f) {
565       on_success.emplace_back(std::forward<F>(f));
566     }
567     template <typename F>
568     void register_on_applied(F &&f) {
569       on_applied.emplace_back(std::forward<F>(f));
570     }
571     template <typename F>
572     void register_on_commit(F &&f) {
573       on_committed.emplace_back(std::forward<F>(f));
574     }
575
576     bool sent_reply;
577
578     // pending async reads <off, len, op_flags> -> <outbl, outr>
579     list<pair<boost::tuple<uint64_t, uint64_t, unsigned>,
580               pair<bufferlist*, Context*> > > pending_async_reads;
581     int inflightreads;
582     friend struct OnReadComplete;
583     void start_async_reads(PrimaryLogPG *pg);
584     void finish_read(PrimaryLogPG *pg);
585     bool async_reads_complete() {
586       return inflightreads == 0;
587     }
588
589     ObjectContext::RWState::State lock_type;
590     ObcLockManager lock_manager;
591
592     std::map<int, std::unique_ptr<OpFinisher>> op_finishers;
593
594     OpContext(const OpContext& other);
595     const OpContext& operator=(const OpContext& other);
596
597     OpContext(OpRequestRef _op, osd_reqid_t _reqid, vector<OSDOp>* _ops,
598               ObjectContextRef& obc,
599               PrimaryLogPG *_pg) :
600       op(_op), reqid(_reqid), ops(_ops),
601       obs(&obc->obs),
602       snapset(0),
603       new_obs(obs->oi, obs->exists),
604       modify(false), user_modify(false), undirty(false), cache_evict(false),
605       ignore_cache(false), ignore_log_op_stats(false), update_log_only(false),
606       bytes_written(0), bytes_read(0), user_at_version(0),
607       current_osd_subop_num(0),
608       obc(obc),
609       reply(NULL), pg(_pg),
610       num_read(0),
611       num_write(0),
612       sent_reply(false),
613       inflightreads(0),
614       lock_type(ObjectContext::RWState::RWNONE) {
615       if (obc->ssc) {
616         new_snapset = obc->ssc->snapset;
617         snapset = &obc->ssc->snapset;
618       }
619     }
620     OpContext(OpRequestRef _op, osd_reqid_t _reqid,
621               vector<OSDOp>* _ops, PrimaryLogPG *_pg) :
622       op(_op), reqid(_reqid), ops(_ops), obs(NULL), snapset(0),
623       modify(false), user_modify(false), undirty(false), cache_evict(false),
624       ignore_cache(false), ignore_log_op_stats(false), update_log_only(false),
625       bytes_written(0), bytes_read(0), user_at_version(0),
626       current_osd_subop_num(0),
627       reply(NULL), pg(_pg),
628       num_read(0),
629       num_write(0),
630       inflightreads(0),
631       lock_type(ObjectContext::RWState::RWNONE) {}
632     void reset_obs(ObjectContextRef obc) {
633       new_obs = ObjectState(obc->obs.oi, obc->obs.exists);
634       if (obc->ssc) {
635         new_snapset = obc->ssc->snapset;
636         snapset = &obc->ssc->snapset;
637       }
638     }
639     ~OpContext() {
640       assert(!op_t);
641       if (reply)
642         reply->put();
643       for (list<pair<boost::tuple<uint64_t, uint64_t, unsigned>,
644                      pair<bufferlist*, Context*> > >::iterator i =
645              pending_async_reads.begin();
646            i != pending_async_reads.end();
647            pending_async_reads.erase(i++)) {
648         delete i->second.second;
649       }
650     }
651     uint64_t get_features() {
652       if (op && op->get_req()) {
653         return op->get_req()->get_connection()->get_features();
654       }
655       return -1ull;
656     }
657   };
658   using OpContextUPtr = std::unique_ptr<OpContext>;
659   friend struct OpContext;
660
661   /*
662    * State on the PG primary associated with the replicated mutation
663    */
664   class RepGather {
665   public:
666     hobject_t hoid;
667     OpRequestRef op;
668     xlist<RepGather*>::item queue_item;
669     int nref;
670
671     eversion_t v;
672     int r = 0;
673
674     ceph_tid_t rep_tid;
675
676     bool rep_aborted, rep_done;
677
678     bool all_applied;
679     bool all_committed;
680     const bool applies_with_commit;
681     
682     utime_t   start;
683     
684     eversion_t          pg_local_last_complete;
685
686     ObcLockManager lock_manager;
687
688     list<std::function<void()>> on_applied;
689     list<std::function<void()>> on_committed;
690     list<std::function<void()>> on_success;
691     list<std::function<void()>> on_finish;
692     
693     RepGather(
694       OpContext *c, ceph_tid_t rt,
695       eversion_t lc,
696       bool applies_with_commit) :
697       hoid(c->obc->obs.oi.soid),
698       op(c->op),
699       queue_item(this),
700       nref(1),
701       rep_tid(rt), 
702       rep_aborted(false), rep_done(false),
703       all_applied(false), all_committed(false),
704       applies_with_commit(applies_with_commit),
705       pg_local_last_complete(lc),
706       lock_manager(std::move(c->lock_manager)),
707       on_applied(std::move(c->on_applied)),
708       on_committed(std::move(c->on_committed)),
709       on_success(std::move(c->on_success)),
710       on_finish(std::move(c->on_finish)) {}
711
712     RepGather(
713       ObcLockManager &&manager,
714       OpRequestRef &&o,
715       boost::optional<std::function<void(void)> > &&on_complete,
716       ceph_tid_t rt,
717       eversion_t lc,
718       bool applies_with_commit,
719       int r) :
720       op(o),
721       queue_item(this),
722       nref(1),
723       r(r),
724       rep_tid(rt),
725       rep_aborted(false), rep_done(false),
726       all_applied(false), all_committed(false),
727       applies_with_commit(applies_with_commit),
728       pg_local_last_complete(lc),
729       lock_manager(std::move(manager)) {
730       if (on_complete) {
731         on_success.push_back(std::move(*on_complete));
732       }
733     }
734
735     RepGather *get() {
736       nref++;
737       return this;
738     }
739     void put() {
740       assert(nref > 0);
741       if (--nref == 0) {
742         assert(on_applied.empty());
743         delete this;
744         //generic_dout(0) << "deleting " << this << dendl;
745       }
746     }
747   };
748
749
750 protected:
751
752   /**
753    * Grabs locks for OpContext, should be cleaned up in close_op_ctx
754    *
755    * @param ctx [in,out] ctx to get locks for
756    * @return true on success, false if we are queued
757    */
758   bool get_rw_locks(bool write_ordered, OpContext *ctx) {
759     /* If snapset_obc, !obc->obs->exists and we will always take the
760      * snapdir lock *before* the head lock.  Since all callers will do
761      * this (read or write) if we get the first we will be guaranteed
762      * to get the second.
763      */
764     if (write_ordered && ctx->op->may_read()) {
765       ctx->lock_type = ObjectContext::RWState::RWEXCL;
766     } else if (write_ordered) {
767       ctx->lock_type = ObjectContext::RWState::RWWRITE;
768     } else {
769       assert(ctx->op->may_read());
770       ctx->lock_type = ObjectContext::RWState::RWREAD;
771     }
772
773     if (ctx->snapset_obc) {
774       assert(!ctx->obc->obs.exists);
775       if (!ctx->lock_manager.get_lock_type(
776             ctx->lock_type,
777             ctx->snapset_obc->obs.oi.soid,
778             ctx->snapset_obc,
779             ctx->op)) {
780         ctx->lock_type = ObjectContext::RWState::RWNONE;
781         return false;
782       }
783     }
784     if (ctx->lock_manager.get_lock_type(
785           ctx->lock_type,
786           ctx->obc->obs.oi.soid,
787           ctx->obc,
788           ctx->op)) {
789       return true;
790     } else {
791       assert(!ctx->snapset_obc);
792       ctx->lock_type = ObjectContext::RWState::RWNONE;
793       return false;
794     }
795   }
796
797   /**
798    * Cleans up OpContext
799    *
800    * @param ctx [in] ctx to clean up
801    */
802   void close_op_ctx(OpContext *ctx);
803
804   /**
805    * Releases locks
806    *
807    * @param manager [in] manager with locks to release
808    */
809   void release_object_locks(
810     ObcLockManager &lock_manager) {
811     list<pair<hobject_t, list<OpRequestRef> > > to_req;
812     bool requeue_recovery = false;
813     bool requeue_snaptrim = false;
814     lock_manager.put_locks(
815       &to_req,
816       &requeue_recovery,
817       &requeue_snaptrim);
818     if (requeue_recovery)
819       queue_recovery();
820     if (requeue_snaptrim)
821       snap_trimmer_machine.process_event(TrimWriteUnblocked());
822
823     if (!to_req.empty()) {
824       // requeue at front of scrub blocking queue if we are blocked by scrub
825       for (auto &&p: to_req) {
826         if (scrubber.write_blocked_by_scrub(p.first.get_head())) {
827           waiting_for_scrub.splice(
828             waiting_for_scrub.begin(),
829             p.second,
830             p.second.begin(),
831             p.second.end());
832         } else {
833           requeue_ops(p.second);
834         }
835       }
836     }
837   }
838
839   // replica ops
840   // [primary|tail]
841   xlist<RepGather*> repop_queue;
842
843   friend class C_OSD_RepopApplied;
844   friend class C_OSD_RepopCommit;
845   void repop_all_applied(RepGather *repop);
846   void repop_all_committed(RepGather *repop);
847   void eval_repop(RepGather*);
848   void issue_repop(RepGather *repop, OpContext *ctx);
849   RepGather *new_repop(
850     OpContext *ctx,
851     ObjectContextRef obc,
852     ceph_tid_t rep_tid);
853   boost::intrusive_ptr<RepGather> new_repop(
854     eversion_t version,
855     int r,
856     ObcLockManager &&manager,
857     OpRequestRef &&op,
858     boost::optional<std::function<void(void)> > &&on_complete);
859   void remove_repop(RepGather *repop);
860
861   OpContextUPtr simple_opc_create(ObjectContextRef obc);
862   void simple_opc_submit(OpContextUPtr ctx);
863
864   /**
865    * Merge entries atomically into all actingbackfill osds
866    * adjusting missing and recovery state as necessary.
867    *
868    * Also used to store error log entries for dup detection.
869    */
870   void submit_log_entries(
871     const mempool::osd_pglog::list<pg_log_entry_t> &entries,
872     ObcLockManager &&manager,
873     boost::optional<std::function<void(void)> > &&on_complete,
874     OpRequestRef op = OpRequestRef(),
875     int r = 0);
876   struct LogUpdateCtx {
877     boost::intrusive_ptr<RepGather> repop;
878     set<pg_shard_t> waiting_on;
879   };
880   void cancel_log_updates();
881   map<ceph_tid_t, LogUpdateCtx> log_entry_update_waiting_on;
882
883
884   // hot/cold tracking
885   HitSetRef hit_set;        ///< currently accumulating HitSet
886   utime_t hit_set_start_stamp;    ///< time the current HitSet started recording
887
888
889   void hit_set_clear();     ///< discard any HitSet state
890   void hit_set_setup();     ///< initialize HitSet state
891   void hit_set_create();    ///< create a new HitSet
892   void hit_set_persist();   ///< persist hit info
893   bool hit_set_apply_log(); ///< apply log entries to update in-memory HitSet
894   void hit_set_trim(OpContextUPtr &ctx, unsigned max); ///< discard old HitSets
895   void hit_set_in_memory_trim(uint32_t max_in_memory); ///< discard old in memory HitSets
896   void hit_set_remove_all();
897
898   hobject_t get_hit_set_current_object(utime_t stamp);
899   hobject_t get_hit_set_archive_object(utime_t start,
900                                        utime_t end,
901                                        bool using_gmt);
902
903   // agent
904   boost::scoped_ptr<TierAgentState> agent_state;
905
906   void agent_setup();       ///< initialize agent state
907   bool agent_work(int max) override ///< entry point to do some agent work
908   {
909     return agent_work(max, max);
910   }
911   bool agent_work(int max, int agent_flush_quota) override;
912   bool agent_maybe_flush(ObjectContextRef& obc);  ///< maybe flush
913   bool agent_maybe_evict(ObjectContextRef& obc, bool after_flush);  ///< maybe evict
914
915   void agent_load_hit_sets();  ///< load HitSets, if needed
916
917   /// estimate object atime and temperature
918   ///
919   /// @param oid [in] object name
920   /// @param temperature [out] relative temperature (# consider both access time and frequency)
921   void agent_estimate_temp(const hobject_t& oid, int *temperature);
922
923   /// stop the agent
924   void agent_stop() override;
925   void agent_delay() override;
926
927   /// clear agent state
928   void agent_clear() override;
929
930   /// choose (new) agent mode(s), returns true if op is requeued
931   bool agent_choose_mode(bool restart = false, OpRequestRef op = OpRequestRef());
932   void agent_choose_mode_restart() override;
933
934   /// true if we can send an ondisk/commit for v
935   bool already_complete(eversion_t v);
936   /// true if we can send an ack for v
937   bool already_ack(eversion_t v);
938
939   // projected object info
940   SharedLRU<hobject_t, ObjectContext> object_contexts;
941   // map from oid.snapdir() to SnapSetContext *
942   map<hobject_t, SnapSetContext*> snapset_contexts;
943   Mutex snapset_contexts_lock;
944
945   // debug order that client ops are applied
946   map<hobject_t, map<client_t, ceph_tid_t>> debug_op_order;
947
948   void populate_obc_watchers(ObjectContextRef obc);
949   void check_blacklisted_obc_watchers(ObjectContextRef obc);
950   void check_blacklisted_watchers() override;
951   void get_watchers(list<obj_watch_item_t> &pg_watchers) override;
952   void get_obc_watchers(ObjectContextRef obc, list<obj_watch_item_t> &pg_watchers);
953 public:
954   void handle_watch_timeout(WatchRef watch);
955 protected:
956
957   ObjectContextRef create_object_context(const object_info_t& oi, SnapSetContext *ssc);
958   ObjectContextRef get_object_context(
959     const hobject_t& soid,
960     bool can_create,
961     const map<string, bufferlist> *attrs = 0
962     );
963
964   void context_registry_on_change();
965   void object_context_destructor_callback(ObjectContext *obc);
966   class C_PG_ObjectContext;
967
968   int find_object_context(const hobject_t& oid,
969                           ObjectContextRef *pobc,
970                           bool can_create,
971                           bool map_snapid_to_clone=false,
972                           hobject_t *missing_oid=NULL);
973
974   void add_object_context_to_pg_stat(ObjectContextRef obc, pg_stat_t *stat);
975
976   void get_src_oloc(const object_t& oid, const object_locator_t& oloc, object_locator_t& src_oloc);
977
978   SnapSetContext *get_snapset_context(
979     const hobject_t& oid,
980     bool can_create,
981     const map<string, bufferlist> *attrs = 0,
982     bool oid_existed = true //indicate this oid whether exsited in backend
983     );
984   void register_snapset_context(SnapSetContext *ssc) {
985     Mutex::Locker l(snapset_contexts_lock);
986     _register_snapset_context(ssc);
987   }
988   void _register_snapset_context(SnapSetContext *ssc) {
989     assert(snapset_contexts_lock.is_locked());
990     if (!ssc->registered) {
991       assert(snapset_contexts.count(ssc->oid) == 0);
992       ssc->registered = true;
993       snapset_contexts[ssc->oid] = ssc;
994     }
995   }
996   void put_snapset_context(SnapSetContext *ssc);
997
998   map<hobject_t, ObjectContextRef> recovering;
999
1000   /*
1001    * Backfill
1002    *
1003    * peer_info[backfill_target].last_backfill == info.last_backfill on the peer.
1004    *
1005    * objects prior to peer_info[backfill_target].last_backfill
1006    *   - are on the peer
1007    *   - are included in the peer stats
1008    *
1009    * objects \in (last_backfill, last_backfill_started]
1010    *   - are on the peer or are in backfills_in_flight
1011    *   - are not included in pg stats (yet)
1012    *   - have their stats in pending_backfill_updates on the primary
1013    */
1014   set<hobject_t> backfills_in_flight;
1015   map<hobject_t, pg_stat_t> pending_backfill_updates;
1016
1017   void dump_recovery_info(Formatter *f) const override {
1018     f->open_array_section("backfill_targets");
1019     for (set<pg_shard_t>::const_iterator p = backfill_targets.begin();
1020         p != backfill_targets.end(); ++p)
1021       f->dump_stream("replica") << *p;
1022     f->close_section();
1023     f->open_array_section("waiting_on_backfill");
1024     for (set<pg_shard_t>::const_iterator p = waiting_on_backfill.begin();
1025         p != waiting_on_backfill.end(); ++p)
1026       f->dump_stream("osd") << *p;
1027     f->close_section();
1028     f->dump_stream("last_backfill_started") << last_backfill_started;
1029     {
1030       f->open_object_section("backfill_info");
1031       backfill_info.dump(f);
1032       f->close_section();
1033     }
1034     {
1035       f->open_array_section("peer_backfill_info");
1036       for (map<pg_shard_t, BackfillInterval>::const_iterator pbi =
1037              peer_backfill_info.begin();
1038           pbi != peer_backfill_info.end(); ++pbi) {
1039         f->dump_stream("osd") << pbi->first;
1040         f->open_object_section("BackfillInterval");
1041           pbi->second.dump(f);
1042         f->close_section();
1043       }
1044       f->close_section();
1045     }
1046     {
1047       f->open_array_section("backfills_in_flight");
1048       for (set<hobject_t>::const_iterator i = backfills_in_flight.begin();
1049            i != backfills_in_flight.end();
1050            ++i) {
1051         f->dump_stream("object") << *i;
1052       }
1053       f->close_section();
1054     }
1055     {
1056       f->open_array_section("recovering");
1057       for (map<hobject_t, ObjectContextRef>::const_iterator i = recovering.begin();
1058            i != recovering.end();
1059            ++i) {
1060         f->dump_stream("object") << i->first;
1061       }
1062       f->close_section();
1063     }
1064     {
1065       f->open_object_section("pg_backend");
1066       pgbackend->dump_recovery_info(f);
1067       f->close_section();
1068     }
1069   }
1070
1071   /// last backfill operation started
1072   hobject_t last_backfill_started;
1073   bool new_backfill;
1074
1075   int prep_object_replica_pushes(const hobject_t& soid, eversion_t v,
1076                                  PGBackend::RecoveryHandle *h);
1077   int prep_object_replica_deletes(const hobject_t& soid, eversion_t v,
1078                                   PGBackend::RecoveryHandle *h);
1079
1080   void finish_degraded_object(const hobject_t& oid);
1081
1082   // Cancels/resets pulls from peer
1083   void check_recovery_sources(const OSDMapRef& map) override ;
1084
1085   int recover_missing(
1086     const hobject_t& oid,
1087     eversion_t v,
1088     int priority,
1089     PGBackend::RecoveryHandle *h);
1090
1091   // low level ops
1092
1093   void _make_clone(
1094     OpContext *ctx,
1095     PGTransaction* t,
1096     ObjectContextRef obc,
1097     const hobject_t& head, const hobject_t& coid,
1098     object_info_t *poi);
1099   void execute_ctx(OpContext *ctx);
1100   void finish_ctx(OpContext *ctx, int log_op_type, bool maintain_ssc=true);
1101   void reply_ctx(OpContext *ctx, int err);
1102   void reply_ctx(OpContext *ctx, int err, eversion_t v, version_t uv);
1103   void make_writeable(OpContext *ctx);
1104   void log_op_stats(OpContext *ctx);
1105
1106   void write_update_size_and_usage(object_stat_sum_t& stats, object_info_t& oi,
1107                                    interval_set<uint64_t>& modified, uint64_t offset,
1108                                    uint64_t length, bool write_full=false);
1109   void add_interval_usage(interval_set<uint64_t>& s, object_stat_sum_t& st);
1110
1111
1112   enum class cache_result_t {
1113     NOOP,
1114     BLOCKED_FULL,
1115     BLOCKED_PROMOTE,
1116     HANDLED_PROXY,
1117     HANDLED_REDIRECT,
1118   };
1119   cache_result_t maybe_handle_cache_detail(OpRequestRef op,
1120                                            bool write_ordered,
1121                                            ObjectContextRef obc, int r,
1122                                            hobject_t missing_oid,
1123                                            bool must_promote,
1124                                            bool in_hit_set,
1125                                            ObjectContextRef *promote_obc);
1126   cache_result_t maybe_handle_manifest_detail(OpRequestRef op,
1127                                                      bool write_ordered,
1128                                                      ObjectContextRef obc);
1129   bool maybe_handle_manifest(OpRequestRef op,
1130                               bool write_ordered,
1131                               ObjectContextRef obc) {
1132     return cache_result_t::NOOP != maybe_handle_manifest_detail(
1133       op,
1134       write_ordered,
1135       obc);
1136   }
1137
1138   /**
1139    * This helper function is called from do_op if the ObjectContext lookup fails.
1140    * @returns true if the caching code is handling the Op, false otherwise.
1141    */
1142   bool maybe_handle_cache(OpRequestRef op,
1143                           bool write_ordered,
1144                           ObjectContextRef obc, int r,
1145                           const hobject_t& missing_oid,
1146                           bool must_promote,
1147                           bool in_hit_set = false) {
1148     return cache_result_t::NOOP != maybe_handle_cache_detail(
1149       op,
1150       write_ordered,
1151       obc,
1152       r,
1153       missing_oid,
1154       must_promote,
1155       in_hit_set,
1156       nullptr);
1157   }
1158
1159   /**
1160    * This helper function checks if a promotion is needed.
1161    */
1162   bool maybe_promote(ObjectContextRef obc,
1163                      const hobject_t& missing_oid,
1164                      const object_locator_t& oloc,
1165                      bool in_hit_set,
1166                      uint32_t recency,
1167                      OpRequestRef promote_op,
1168                      ObjectContextRef *promote_obc = nullptr);
1169   /**
1170    * This helper function tells the client to redirect their request elsewhere.
1171    */
1172   void do_cache_redirect(OpRequestRef op);
1173   /**
1174    * This function attempts to start a promote.  Either it succeeds,
1175    * or places op on a wait list.  If op is null, failure means that
1176    * this is a noop.  If a future user wants to be able to distinguish
1177    * these cases, a return value should be added.
1178    */
1179   void promote_object(
1180     ObjectContextRef obc,            ///< [optional] obc
1181     const hobject_t& missing_object, ///< oid (if !obc)
1182     const object_locator_t& oloc,    ///< locator for obc|oid
1183     OpRequestRef op,                 ///< [optional] client op
1184     ObjectContextRef *promote_obc = nullptr ///< [optional] new obc for object
1185     );
1186
1187   int prepare_transaction(OpContext *ctx);
1188   list<pair<OpRequestRef, OpContext*> > in_progress_async_reads;
1189   void complete_read_ctx(int result, OpContext *ctx);
1190   
1191   // pg on-disk content
1192   void check_local() override;
1193
1194   void _clear_recovery_state() override;
1195
1196   bool start_recovery_ops(
1197     uint64_t max,
1198     ThreadPool::TPHandle &handle, uint64_t *started) override;
1199
1200   uint64_t recover_primary(uint64_t max, ThreadPool::TPHandle &handle);
1201   uint64_t recover_replicas(uint64_t max, ThreadPool::TPHandle &handle);
1202   hobject_t earliest_peer_backfill() const;
1203   bool all_peer_done() const;
1204   /**
1205    * @param work_started will be set to true if recover_backfill got anywhere
1206    * @returns the number of operations started
1207    */
1208   uint64_t recover_backfill(uint64_t max, ThreadPool::TPHandle &handle,
1209                             bool *work_started);
1210
1211   /**
1212    * scan a (hash) range of objects in the current pg
1213    *
1214    * @begin first item should be >= this value
1215    * @min return at least this many items, unless we are done
1216    * @max return no more than this many items
1217    * @bi [out] resulting map of objects to eversion_t's
1218    */
1219   void scan_range(
1220     int min, int max, BackfillInterval *bi,
1221     ThreadPool::TPHandle &handle
1222     );
1223
1224   /// Update a hash range to reflect changes since the last scan
1225   void update_range(
1226     BackfillInterval *bi,        ///< [in,out] interval to update
1227     ThreadPool::TPHandle &handle ///< [in] tp handle
1228     );
1229
1230   int prep_backfill_object_push(
1231     hobject_t oid, eversion_t v, ObjectContextRef obc,
1232     vector<pg_shard_t> peers,
1233     PGBackend::RecoveryHandle *h);
1234   void send_remove_op(const hobject_t& oid, eversion_t v, pg_shard_t peer);
1235
1236
1237   class C_OSD_OndiskWriteUnlock;
1238   class C_OSD_AppliedRecoveredObject;
1239   class C_OSD_CommittedPushedObject;
1240   class C_OSD_AppliedRecoveredObjectReplica;
1241   void sub_op_remove(OpRequestRef op);
1242
1243   void _applied_recovered_object(ObjectContextRef obc);
1244   void _applied_recovered_object_replica();
1245   void _committed_pushed_object(epoch_t epoch, eversion_t lc);
1246   void recover_got(hobject_t oid, eversion_t v);
1247
1248   // -- copyfrom --
1249   map<hobject_t, CopyOpRef> copy_ops;
1250
1251   int do_copy_get(OpContext *ctx, bufferlist::iterator& bp, OSDOp& op,
1252                   ObjectContextRef& obc);
1253   int finish_copy_get();
1254
1255   void fill_in_copy_get_noent(OpRequestRef& op, hobject_t oid,
1256                               OSDOp& osd_op);
1257
1258   /**
1259    * To copy an object, call start_copy.
1260    *
1261    * @param cb: The CopyCallback to be activated when the copy is complete
1262    * @param obc: The ObjectContext we are copying into
1263    * @param src: The source object
1264    * @param oloc: the source object locator
1265    * @param version: the version of the source object to copy (0 for any)
1266    */
1267   void start_copy(CopyCallback *cb, ObjectContextRef obc, hobject_t src,
1268                   object_locator_t oloc, version_t version, unsigned flags,
1269                   bool mirror_snapset, unsigned src_obj_fadvise_flags,
1270                   unsigned dest_obj_fadvise_flags);
1271   void process_copy_chunk(hobject_t oid, ceph_tid_t tid, int r);
1272   void _write_copy_chunk(CopyOpRef cop, PGTransaction *t);
1273   uint64_t get_copy_chunk_size() const {
1274     uint64_t size = cct->_conf->osd_copyfrom_max_chunk;
1275     if (pool.info.requires_aligned_append()) {
1276       uint64_t alignment = pool.info.required_alignment();
1277       if (size % alignment) {
1278         size += alignment - (size % alignment);
1279       }
1280     }
1281     return size;
1282   }
1283   void _copy_some(ObjectContextRef obc, CopyOpRef cop);
1284   void finish_copyfrom(CopyFromCallback *cb);
1285   void finish_promote(int r, CopyResults *results, ObjectContextRef obc);
1286   void cancel_copy(CopyOpRef cop, bool requeue);
1287   void cancel_copy_ops(bool requeue);
1288
1289   friend struct C_Copyfrom;
1290
1291   // -- flush --
1292   map<hobject_t, FlushOpRef> flush_ops;
1293
1294   /// start_flush takes ownership of on_flush iff ret == -EINPROGRESS
1295   int start_flush(
1296     OpRequestRef op, ObjectContextRef obc,
1297     bool blocking, hobject_t *pmissing,
1298     boost::optional<std::function<void()>> &&on_flush);
1299   void finish_flush(hobject_t oid, ceph_tid_t tid, int r);
1300   int try_flush_mark_clean(FlushOpRef fop);
1301   void cancel_flush(FlushOpRef fop, bool requeue);
1302   void cancel_flush_ops(bool requeue);
1303
1304   /// @return false if clone is has been evicted
1305   bool is_present_clone(hobject_t coid);
1306
1307   friend struct C_Flush;
1308
1309   // -- scrub --
1310   bool _range_available_for_scrub(
1311     const hobject_t &begin, const hobject_t &end) override;
1312   void scrub_snapshot_metadata(
1313     ScrubMap &map,
1314     const std::map<hobject_t, pair<uint32_t, uint32_t>> &missing_digest) override;
1315   void _scrub_clear_state() override;
1316   void _scrub_finish() override;
1317   object_stat_collection_t scrub_cstat;
1318
1319   void _split_into(pg_t child_pgid, PG *child,
1320                    unsigned split_bits) override;
1321   void apply_and_flush_repops(bool requeue);
1322
1323   void calc_trim_to() override;
1324   int do_xattr_cmp_u64(int op, __u64 v1, bufferlist& xattr);
1325   int do_xattr_cmp_str(int op, string& v1s, bufferlist& xattr);
1326
1327   // -- checksum --
1328   int do_checksum(OpContext *ctx, OSDOp& osd_op, bufferlist::iterator *bl_it);
1329   int finish_checksum(OSDOp& osd_op, Checksummer::CSumType csum_type,
1330                       bufferlist::iterator *init_value_bl_it,
1331                       const bufferlist &read_bl);
1332
1333   friend class C_ChecksumRead;
1334
1335   int do_extent_cmp(OpContext *ctx, OSDOp& osd_op);
1336   int finish_extent_cmp(OSDOp& osd_op, const bufferlist &read_bl);
1337
1338   friend class C_ExtentCmpRead;
1339
1340   int do_read(OpContext *ctx, OSDOp& osd_op);
1341   int do_sparse_read(OpContext *ctx, OSDOp& osd_op);
1342   int do_writesame(OpContext *ctx, OSDOp& osd_op);
1343
1344   bool pgls_filter(PGLSFilter *filter, hobject_t& sobj, bufferlist& outdata);
1345   int get_pgls_filter(bufferlist::iterator& iter, PGLSFilter **pfilter);
1346
1347   map<hobject_t, list<OpRequestRef>> in_progress_proxy_ops;
1348   void kick_proxy_ops_blocked(hobject_t& soid);
1349   void cancel_proxy_ops(bool requeue);
1350
1351   // -- proxyread --
1352   map<ceph_tid_t, ProxyReadOpRef> proxyread_ops;
1353
1354   void do_proxy_read(OpRequestRef op, ObjectContextRef obc = NULL);
1355   void finish_proxy_read(hobject_t oid, ceph_tid_t tid, int r);
1356   void cancel_proxy_read(ProxyReadOpRef prdop);
1357
1358   friend struct C_ProxyRead;
1359
1360   // -- proxywrite --
1361   map<ceph_tid_t, ProxyWriteOpRef> proxywrite_ops;
1362
1363   void do_proxy_write(OpRequestRef op, const hobject_t& missing_oid, ObjectContextRef obc = NULL);
1364   void finish_proxy_write(hobject_t oid, ceph_tid_t tid, int r);
1365   void cancel_proxy_write(ProxyWriteOpRef pwop);
1366
1367   friend struct C_ProxyWrite_Commit;
1368
1369 public:
1370   PrimaryLogPG(OSDService *o, OSDMapRef curmap,
1371                const PGPool &_pool, spg_t p);
1372   ~PrimaryLogPG() override {}
1373
1374   int do_command(
1375     cmdmap_t cmdmap,
1376     ostream& ss,
1377     bufferlist& idata,
1378     bufferlist& odata,
1379     ConnectionRef conn,
1380     ceph_tid_t tid) override;
1381
1382   void do_request(
1383     OpRequestRef& op,
1384     ThreadPool::TPHandle &handle) override;
1385   void do_op(OpRequestRef& op) override;
1386   void record_write_error(OpRequestRef op, const hobject_t &soid,
1387                           MOSDOpReply *orig_reply, int r);
1388   void do_pg_op(OpRequestRef op);
1389   void do_sub_op(OpRequestRef op) override;
1390   void do_sub_op_reply(OpRequestRef op) override;
1391   void do_scan(
1392     OpRequestRef op,
1393     ThreadPool::TPHandle &handle) override;
1394   void do_backfill(OpRequestRef op) override;
1395   void do_backfill_remove(OpRequestRef op);
1396
1397   void handle_backoff(OpRequestRef& op);
1398
1399   int trim_object(bool first, const hobject_t &coid, OpContextUPtr *ctxp);
1400   void snap_trimmer(epoch_t e) override;
1401   void kick_snap_trim() override;
1402   void snap_trimmer_scrub_complete() override;
1403   int do_osd_ops(OpContext *ctx, vector<OSDOp>& ops);
1404
1405   int _get_tmap(OpContext *ctx, bufferlist *header, bufferlist *vals);
1406   int do_tmap2omap(OpContext *ctx, unsigned flags);
1407   int do_tmapup(OpContext *ctx, bufferlist::iterator& bp, OSDOp& osd_op);
1408   int do_tmapup_slow(OpContext *ctx, bufferlist::iterator& bp, OSDOp& osd_op, bufferlist& bl);
1409
1410   void do_osd_op_effects(OpContext *ctx, const ConnectionRef& conn);
1411 private:
1412   int do_scrub_ls(MOSDOp *op, OSDOp *osd_op);
1413   hobject_t earliest_backfill() const;
1414   bool check_src_targ(const hobject_t& soid, const hobject_t& toid) const;
1415
1416   uint64_t temp_seq; ///< last id for naming temp objects
1417   /// generate a new temp object name
1418   hobject_t generate_temp_object(const hobject_t& target);
1419   /// generate a new temp object name (for recovery)
1420   hobject_t get_temp_recovery_object(const hobject_t& target,
1421                                      eversion_t version) override;
1422   int get_recovery_op_priority() const {
1423       int pri = 0;
1424       pool.info.opts.get(pool_opts_t::RECOVERY_OP_PRIORITY, &pri);
1425       return  pri > 0 ? pri : cct->_conf->osd_recovery_op_priority;
1426   }
1427   void log_missing(unsigned missing,
1428                         const boost::optional<hobject_t> &head,
1429                         LogChannelRef clog,
1430                         const spg_t &pgid,
1431                         const char *func,
1432                         const char *mode,
1433                         bool allow_incomplete_clones);
1434   unsigned process_clones_to(const boost::optional<hobject_t> &head,
1435     const boost::optional<SnapSet> &snapset,
1436     LogChannelRef clog,
1437     const spg_t &pgid,
1438     const char *mode,
1439     bool allow_incomplete_clones,
1440     boost::optional<snapid_t> target,
1441     vector<snapid_t>::reverse_iterator *curclone,
1442     inconsistent_snapset_wrapper &snap_error);
1443
1444 public:
1445   coll_t get_coll() {
1446     return coll;
1447   }
1448   void split_colls(
1449     spg_t child,
1450     int split_bits,
1451     int seed,
1452     const pg_pool_t *pool,
1453     ObjectStore::Transaction *t) override {
1454     coll_t target = coll_t(child);
1455     PG::_create(*t, child, split_bits);
1456     t->split_collection(
1457       coll,
1458       split_bits,
1459       seed,
1460       target);
1461     PG::_init(*t, child, pool);
1462   }
1463 private:
1464
1465   struct DoSnapWork : boost::statechart::event< DoSnapWork > {
1466     DoSnapWork() : boost::statechart::event < DoSnapWork >() {}
1467   };
1468   struct KickTrim : boost::statechart::event< KickTrim > {
1469     KickTrim() : boost::statechart::event < KickTrim >() {}
1470   };
1471   struct RepopsComplete : boost::statechart::event< RepopsComplete > {
1472     RepopsComplete() : boost::statechart::event < RepopsComplete >() {}
1473   };
1474   struct ScrubComplete : boost::statechart::event< ScrubComplete > {
1475     ScrubComplete() : boost::statechart::event < ScrubComplete >() {}
1476   };
1477   struct TrimWriteUnblocked : boost::statechart::event< TrimWriteUnblocked > {
1478     TrimWriteUnblocked() : boost::statechart::event < TrimWriteUnblocked >() {}
1479   };
1480   struct Reset : boost::statechart::event< Reset > {
1481     Reset() : boost::statechart::event< Reset >() {}
1482   };
1483   struct SnapTrimReserved : boost::statechart::event< SnapTrimReserved > {
1484     SnapTrimReserved() : boost::statechart::event< SnapTrimReserved >() {}
1485   };
1486   struct SnapTrimTimerReady : boost::statechart::event< SnapTrimTimerReady > {
1487     SnapTrimTimerReady() : boost::statechart::event< SnapTrimTimerReady >() {}
1488   };
1489
1490   struct NotTrimming;
1491   struct SnapTrimmer : public boost::statechart::state_machine< SnapTrimmer, NotTrimming > {
1492     PrimaryLogPG *pg;
1493     explicit SnapTrimmer(PrimaryLogPG *pg) : pg(pg) {}
1494     void log_enter(const char *state_name);
1495     void log_exit(const char *state_name, utime_t duration);
1496     bool can_trim() {
1497       return pg->is_clean() && !pg->scrubber.active && !pg->snap_trimq.empty();
1498     }
1499   } snap_trimmer_machine;
1500
1501   struct WaitReservation;
1502   struct Trimming : boost::statechart::state< Trimming, SnapTrimmer, WaitReservation >, NamedState {
1503     typedef boost::mpl::list <
1504       boost::statechart::custom_reaction< KickTrim >,
1505       boost::statechart::transition< Reset, NotTrimming >
1506       > reactions;
1507
1508     set<hobject_t> in_flight;
1509     snapid_t snap_to_trim;
1510
1511     explicit Trimming(my_context ctx)
1512       : my_base(ctx),
1513         NamedState(context< SnapTrimmer >().pg, "Trimming") {
1514       context< SnapTrimmer >().log_enter(state_name);
1515       assert(context< SnapTrimmer >().can_trim());
1516       assert(in_flight.empty());
1517     }
1518     void exit() {
1519       context< SnapTrimmer >().log_exit(state_name, enter_time);
1520       auto *pg = context< SnapTrimmer >().pg;
1521       pg->osd->snap_reserver.cancel_reservation(pg->get_pgid());
1522       pg->state_clear(PG_STATE_SNAPTRIM);
1523       pg->publish_stats_to_osd();
1524     }
1525     boost::statechart::result react(const KickTrim&) {
1526       return discard_event();
1527     }
1528   };
1529
1530   /* SnapTrimmerStates */
1531   struct WaitTrimTimer : boost::statechart::state< WaitTrimTimer, Trimming >, NamedState {
1532     typedef boost::mpl::list <
1533       boost::statechart::custom_reaction< SnapTrimTimerReady >
1534       > reactions;
1535     Context *wakeup = nullptr;
1536     explicit WaitTrimTimer(my_context ctx)
1537       : my_base(ctx),
1538         NamedState(context< SnapTrimmer >().pg, "Trimming/WaitTrimTimer") {
1539       context< SnapTrimmer >().log_enter(state_name);
1540       assert(context<Trimming>().in_flight.empty());
1541       struct OnTimer : Context {
1542         PrimaryLogPGRef pg;
1543         epoch_t epoch;
1544         OnTimer(PrimaryLogPGRef pg, epoch_t epoch) : pg(pg), epoch(epoch) {}
1545         void finish(int) override {
1546           pg->lock();
1547           if (!pg->pg_has_reset_since(epoch))
1548             pg->snap_trimmer_machine.process_event(SnapTrimTimerReady());
1549           pg->unlock();
1550         }
1551       };
1552       auto *pg = context< SnapTrimmer >().pg;
1553       if (pg->cct->_conf->osd_snap_trim_sleep > 0) {
1554         Mutex::Locker l(pg->osd->snap_sleep_lock);
1555         wakeup = pg->osd->snap_sleep_timer.add_event_after(
1556           pg->cct->_conf->osd_snap_trim_sleep,
1557           new OnTimer{pg, pg->get_osdmap()->get_epoch()});
1558       } else {
1559         post_event(SnapTrimTimerReady());
1560       }
1561     }
1562     void exit() {
1563       context< SnapTrimmer >().log_exit(state_name, enter_time);
1564       auto *pg = context< SnapTrimmer >().pg;
1565       if (wakeup) {
1566         Mutex::Locker l(pg->osd->snap_sleep_lock);
1567         pg->osd->snap_sleep_timer.cancel_event(wakeup);
1568         wakeup = nullptr;
1569       }
1570     }
1571     boost::statechart::result react(const SnapTrimTimerReady &) {
1572       wakeup = nullptr;
1573       if (!context< SnapTrimmer >().can_trim()) {
1574         post_event(KickTrim());
1575         return transit< NotTrimming >();
1576       } else {
1577         return transit< AwaitAsyncWork >();
1578       }
1579     }
1580   };
1581
1582   struct WaitRWLock : boost::statechart::state< WaitRWLock, Trimming >, NamedState {
1583     typedef boost::mpl::list <
1584       boost::statechart::custom_reaction< TrimWriteUnblocked >
1585       > reactions;
1586     explicit WaitRWLock(my_context ctx)
1587       : my_base(ctx),
1588         NamedState(context< SnapTrimmer >().pg, "Trimming/WaitRWLock") {
1589       context< SnapTrimmer >().log_enter(state_name);
1590       assert(context<Trimming>().in_flight.empty());
1591     }
1592     void exit() {
1593       context< SnapTrimmer >().log_exit(state_name, enter_time);
1594     }
1595     boost::statechart::result react(const TrimWriteUnblocked&) {
1596       if (!context< SnapTrimmer >().can_trim()) {
1597         post_event(KickTrim());
1598         return transit< NotTrimming >();
1599       } else {
1600         return transit< AwaitAsyncWork >();
1601       }
1602     }
1603   };
1604
1605   struct WaitRepops : boost::statechart::state< WaitRepops, Trimming >, NamedState {
1606     typedef boost::mpl::list <
1607       boost::statechart::custom_reaction< RepopsComplete >
1608       > reactions;
1609     explicit WaitRepops(my_context ctx)
1610       : my_base(ctx),
1611         NamedState(context< SnapTrimmer >().pg, "Trimming/WaitRepops") {
1612       context< SnapTrimmer >().log_enter(state_name);
1613       assert(!context<Trimming>().in_flight.empty());
1614     }
1615     void exit() {
1616       context< SnapTrimmer >().log_exit(state_name, enter_time);
1617     }
1618     boost::statechart::result react(const RepopsComplete&) {
1619       if (!context< SnapTrimmer >().can_trim()) {
1620         post_event(KickTrim());
1621         return transit< NotTrimming >();
1622       } else {
1623         return transit< WaitTrimTimer >();
1624       }
1625     }
1626   };
1627
1628   struct AwaitAsyncWork : boost::statechart::state< AwaitAsyncWork, Trimming >, NamedState {
1629     typedef boost::mpl::list <
1630       boost::statechart::custom_reaction< DoSnapWork >
1631       > reactions;
1632     explicit AwaitAsyncWork(my_context ctx);
1633     void exit() {
1634       context< SnapTrimmer >().log_exit(state_name, enter_time);
1635     }
1636     boost::statechart::result react(const DoSnapWork&);
1637   };
1638
1639   struct WaitReservation : boost::statechart::state< WaitReservation, Trimming >, NamedState {
1640     /* WaitReservation is a sub-state of trimming simply so that exiting Trimming
1641      * always cancels the reservation */
1642     typedef boost::mpl::list <
1643       boost::statechart::custom_reaction< SnapTrimReserved >
1644       > reactions;
1645     struct ReservationCB : public Context {
1646       PrimaryLogPGRef pg;
1647       bool canceled;
1648       ReservationCB(PrimaryLogPG *pg) : pg(pg), canceled(false) {}
1649       void finish(int) override {
1650         pg->lock();
1651         if (!canceled)
1652           pg->snap_trimmer_machine.process_event(SnapTrimReserved());
1653         pg->unlock();
1654       }
1655       void cancel() {
1656         assert(pg->is_locked());
1657         assert(!canceled);
1658         canceled = true;
1659       }
1660     };
1661     ReservationCB *pending = nullptr;
1662
1663     explicit WaitReservation(my_context ctx)
1664       : my_base(ctx),
1665         NamedState(context< SnapTrimmer >().pg, "Trimming/WaitReservation") {
1666       context< SnapTrimmer >().log_enter(state_name);
1667       assert(context<Trimming>().in_flight.empty());
1668       auto *pg = context< SnapTrimmer >().pg;
1669       pending = new ReservationCB(pg);
1670       pg->osd->snap_reserver.request_reservation(
1671         pg->get_pgid(),
1672         pending,
1673         0);
1674       pg->state_set(PG_STATE_SNAPTRIM_WAIT);
1675       pg->publish_stats_to_osd();
1676     }
1677     boost::statechart::result react(const SnapTrimReserved&);
1678     void exit() {
1679       context< SnapTrimmer >().log_exit(state_name, enter_time);
1680       if (pending)
1681         pending->cancel();
1682       pending = nullptr;
1683       auto *pg = context< SnapTrimmer >().pg;
1684       pg->state_clear(PG_STATE_SNAPTRIM_WAIT);
1685       pg->state_clear(PG_STATE_SNAPTRIM_ERROR);
1686       pg->publish_stats_to_osd();
1687     }
1688   };
1689
1690   struct WaitScrub : boost::statechart::state< WaitScrub, SnapTrimmer >, NamedState {
1691     typedef boost::mpl::list <
1692       boost::statechart::custom_reaction< ScrubComplete >,
1693       boost::statechart::custom_reaction< KickTrim >,
1694       boost::statechart::transition< Reset, NotTrimming >
1695       > reactions;
1696     explicit WaitScrub(my_context ctx)
1697       : my_base(ctx),
1698         NamedState(context< SnapTrimmer >().pg, "Trimming/WaitScrub") {
1699       context< SnapTrimmer >().log_enter(state_name);
1700     }
1701     void exit() {
1702       context< SnapTrimmer >().log_exit(state_name, enter_time);
1703     }
1704     boost::statechart::result react(const ScrubComplete&) {
1705       post_event(KickTrim());
1706       return transit< NotTrimming >();
1707     }
1708     boost::statechart::result react(const KickTrim&) {
1709       return discard_event();
1710     }
1711   };
1712
1713   struct NotTrimming : boost::statechart::state< NotTrimming, SnapTrimmer >, NamedState {
1714     typedef boost::mpl::list <
1715       boost::statechart::custom_reaction< KickTrim >,
1716       boost::statechart::transition< Reset, NotTrimming >
1717       > reactions;
1718     explicit NotTrimming(my_context ctx);
1719     void exit();
1720     boost::statechart::result react(const KickTrim&);
1721   };
1722
1723   int _verify_no_head_clones(const hobject_t& soid,
1724                              const SnapSet& ss);
1725   // return true if we're creating a local object, false for a
1726   // whiteout or no change.
1727   void maybe_create_new_object(OpContext *ctx, bool ignore_transaction=false);
1728   int _delete_oid(OpContext *ctx, bool no_whiteout, bool try_no_whiteout);
1729   int _rollback_to(OpContext *ctx, ceph_osd_op& op);
1730 public:
1731   bool is_missing_object(const hobject_t& oid) const;
1732   bool is_unreadable_object(const hobject_t &oid) const {
1733     return is_missing_object(oid) ||
1734       !missing_loc.readable_with_acting(oid, actingset);
1735   }
1736   void maybe_kick_recovery(const hobject_t &soid);
1737   void wait_for_unreadable_object(const hobject_t& oid, OpRequestRef op);
1738   void wait_for_all_missing(OpRequestRef op);
1739
1740   bool is_degraded_or_backfilling_object(const hobject_t& oid);
1741   void wait_for_degraded_object(const hobject_t& oid, OpRequestRef op);
1742
1743   void block_write_on_full_cache(
1744     const hobject_t& oid, OpRequestRef op);
1745   void block_for_clean(
1746     const hobject_t& oid, OpRequestRef op);
1747   void block_write_on_snap_rollback(
1748     const hobject_t& oid, ObjectContextRef obc, OpRequestRef op);
1749   void block_write_on_degraded_snap(const hobject_t& oid, OpRequestRef op);
1750
1751   bool maybe_await_blocked_snapset(const hobject_t &soid, OpRequestRef op);
1752   void wait_for_blocked_object(const hobject_t& soid, OpRequestRef op);
1753   void kick_object_context_blocked(ObjectContextRef obc);
1754
1755   void maybe_force_recovery();
1756
1757   void mark_all_unfound_lost(
1758     int what,
1759     ConnectionRef con,
1760     ceph_tid_t tid);
1761   eversion_t pick_newest_available(const hobject_t& oid);
1762
1763   void do_update_log_missing(
1764     OpRequestRef &op);
1765
1766   void do_update_log_missing_reply(
1767     OpRequestRef &op);
1768
1769   void on_role_change() override;
1770   void on_pool_change() override;
1771   void _on_new_interval() override;
1772   void clear_async_reads();
1773   void on_change(ObjectStore::Transaction *t) override;
1774   void on_activate() override;
1775   void on_flushed() override;
1776   void on_removal(ObjectStore::Transaction *t) override;
1777   void on_shutdown() override;
1778   bool check_failsafe_full(ostream &ss) override;
1779   bool check_osdmap_full(const set<pg_shard_t> &missing_on) override;
1780   int rep_repair_primary_object(const hobject_t& soid, OpRequestRef op);
1781
1782   // attr cache handling
1783   void setattr_maybe_cache(
1784     ObjectContextRef obc,
1785     OpContext *op,
1786     PGTransaction *t,
1787     const string &key,
1788     bufferlist &val);
1789   void setattrs_maybe_cache(
1790     ObjectContextRef obc,
1791     OpContext *op,
1792     PGTransaction *t,
1793     map<string, bufferlist> &attrs);
1794   void rmattr_maybe_cache(
1795     ObjectContextRef obc,
1796     OpContext *op,
1797     PGTransaction *t,
1798     const string &key);
1799   int getattr_maybe_cache(
1800     ObjectContextRef obc,
1801     const string &key,
1802     bufferlist *val);
1803   int getattrs_maybe_cache(
1804     ObjectContextRef obc,
1805     map<string, bufferlist> *out,
1806     bool user_only = false);
1807 };
1808
1809 inline ostream& operator<<(ostream& out, const PrimaryLogPG::RepGather& repop)
1810 {
1811   out << "repgather(" << &repop
1812       << " " << repop.v
1813       << " rep_tid=" << repop.rep_tid 
1814       << " committed?=" << repop.all_committed
1815       << " applied?=" << repop.all_applied
1816       << " r=" << repop.r
1817       << ")";
1818   return out;
1819 }
1820
1821 inline ostream& operator<<(ostream& out,
1822                            const PrimaryLogPG::ProxyWriteOpRef& pwop)
1823 {
1824   out << "proxywrite(" << &pwop
1825       << " " << pwop->user_version
1826       << " pwop_tid=" << pwop->objecter_tid;
1827   if (pwop->ctx->op)
1828     out << " op=" << *(pwop->ctx->op->get_req());
1829   out << ")";
1830   return out;
1831 }
1832
1833 void intrusive_ptr_add_ref(PrimaryLogPG::RepGather *repop);
1834 void intrusive_ptr_release(PrimaryLogPG::RepGather *repop);
1835
1836
1837 #endif