Fix some bugs when testing opensds ansible
[stor4nfv.git] / src / ceph / src / osd / OSD.h
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- 
2 // vim: ts=8 sw=2 smarttab
3 /*
4  * Ceph - scalable distributed file system
5  *
6  * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
7  *
8  * This is free software; you can redistribute it and/or
9  * modify it under the terms of the GNU Lesser General Public
10  * License version 2.1, as published by the Free Software 
11  * Foundation.  See file COPYING.
12  * 
13  */
14
15 #ifndef CEPH_OSD_H
16 #define CEPH_OSD_H
17
18 #include "PG.h"
19
20 #include "msg/Dispatcher.h"
21
22 #include "common/Mutex.h"
23 #include "common/RWLock.h"
24 #include "common/Timer.h"
25 #include "common/WorkQueue.h"
26 #include "common/AsyncReserver.h"
27 #include "common/ceph_context.h"
28 #include "common/zipkin_trace.h"
29
30 #include "mgr/MgrClient.h"
31
32 #include "os/ObjectStore.h"
33 #include "OSDCap.h" 
34  
35 #include "auth/KeyRing.h"
36 #include "osd/ClassHandler.h"
37
38 #include "include/CompatSet.h"
39
40 #include "OpRequest.h"
41 #include "Session.h"
42
43 #include "osd/PGQueueable.h"
44
45 #include <atomic>
46 #include <map>
47 #include <memory>
48 #include "include/memory.h"
49 using namespace std;
50
51 #include "include/unordered_map.h"
52
53 #include "common/shared_cache.hpp"
54 #include "common/simple_cache.hpp"
55 #include "common/sharedptr_registry.hpp"
56 #include "common/WeightedPriorityQueue.h"
57 #include "common/PrioritizedQueue.h"
58 #include "osd/mClockOpClassQueue.h"
59 #include "osd/mClockClientQueue.h"
60 #include "messages/MOSDOp.h"
61 #include "include/Spinlock.h"
62 #include "common/EventTrace.h"
63
64 #define CEPH_OSD_PROTOCOL    10 /* cluster internal */
65
66
67 enum {
68   l_osd_first = 10000,
69   l_osd_op_wip,
70   l_osd_op,
71   l_osd_op_inb,
72   l_osd_op_outb,
73   l_osd_op_lat,
74   l_osd_op_process_lat,
75   l_osd_op_prepare_lat,
76   l_osd_op_r,
77   l_osd_op_r_outb,
78   l_osd_op_r_lat,
79   l_osd_op_r_lat_outb_hist,
80   l_osd_op_r_process_lat,
81   l_osd_op_r_prepare_lat,
82   l_osd_op_w,
83   l_osd_op_w_inb,
84   l_osd_op_w_lat,
85   l_osd_op_w_lat_inb_hist,
86   l_osd_op_w_process_lat,
87   l_osd_op_w_prepare_lat,
88   l_osd_op_rw,
89   l_osd_op_rw_inb,
90   l_osd_op_rw_outb,
91   l_osd_op_rw_lat,
92   l_osd_op_rw_lat_inb_hist,
93   l_osd_op_rw_lat_outb_hist,
94   l_osd_op_rw_process_lat,
95   l_osd_op_rw_prepare_lat,
96
97   l_osd_op_before_queue_op_lat,
98   l_osd_op_before_dequeue_op_lat,
99
100   l_osd_sop,
101   l_osd_sop_inb,
102   l_osd_sop_lat,
103   l_osd_sop_w,
104   l_osd_sop_w_inb,
105   l_osd_sop_w_lat,
106   l_osd_sop_pull,
107   l_osd_sop_pull_lat,
108   l_osd_sop_push,
109   l_osd_sop_push_inb,
110   l_osd_sop_push_lat,
111
112   l_osd_pull,
113   l_osd_push,
114   l_osd_push_outb,
115
116   l_osd_rop,
117
118   l_osd_loadavg,
119   l_osd_buf,
120   l_osd_history_alloc_bytes,
121   l_osd_history_alloc_num,
122   l_osd_cached_crc,
123   l_osd_cached_crc_adjusted,
124   l_osd_missed_crc,
125
126   l_osd_pg,
127   l_osd_pg_primary,
128   l_osd_pg_replica,
129   l_osd_pg_stray,
130   l_osd_hb_to,
131   l_osd_map,
132   l_osd_mape,
133   l_osd_mape_dup,
134
135   l_osd_waiting_for_map,
136
137   l_osd_map_cache_hit,
138   l_osd_map_cache_miss,
139   l_osd_map_cache_miss_low,
140   l_osd_map_cache_miss_low_avg,
141   l_osd_map_bl_cache_hit,
142   l_osd_map_bl_cache_miss,
143
144   l_osd_stat_bytes,
145   l_osd_stat_bytes_used,
146   l_osd_stat_bytes_avail,
147
148   l_osd_copyfrom,
149
150   l_osd_tier_promote,
151   l_osd_tier_flush,
152   l_osd_tier_flush_fail,
153   l_osd_tier_try_flush,
154   l_osd_tier_try_flush_fail,
155   l_osd_tier_evict,
156   l_osd_tier_whiteout,
157   l_osd_tier_dirty,
158   l_osd_tier_clean,
159   l_osd_tier_delay,
160   l_osd_tier_proxy_read,
161   l_osd_tier_proxy_write,
162
163   l_osd_agent_wake,
164   l_osd_agent_skip,
165   l_osd_agent_flush,
166   l_osd_agent_evict,
167
168   l_osd_object_ctx_cache_hit,
169   l_osd_object_ctx_cache_total,
170
171   l_osd_op_cache_hit,
172   l_osd_tier_flush_lat,
173   l_osd_tier_promote_lat,
174   l_osd_tier_r_lat,
175
176   l_osd_pg_info,
177   l_osd_pg_fastinfo,
178   l_osd_pg_biginfo,
179
180   l_osd_last,
181 };
182
183 // RecoveryState perf counters
184 enum {
185   rs_first = 20000,
186   rs_initial_latency,
187   rs_started_latency,
188   rs_reset_latency,
189   rs_start_latency,
190   rs_primary_latency,
191   rs_peering_latency,
192   rs_backfilling_latency,
193   rs_waitremotebackfillreserved_latency,
194   rs_waitlocalbackfillreserved_latency,
195   rs_notbackfilling_latency,
196   rs_repnotrecovering_latency,
197   rs_repwaitrecoveryreserved_latency,
198   rs_repwaitbackfillreserved_latency,
199   rs_reprecovering_latency,
200   rs_activating_latency,
201   rs_waitlocalrecoveryreserved_latency,
202   rs_waitremoterecoveryreserved_latency,
203   rs_recovering_latency,
204   rs_recovered_latency,
205   rs_clean_latency,
206   rs_active_latency,
207   rs_replicaactive_latency,
208   rs_stray_latency,
209   rs_getinfo_latency,
210   rs_getlog_latency,
211   rs_waitactingchange_latency,
212   rs_incomplete_latency,
213   rs_down_latency,
214   rs_getmissing_latency,
215   rs_waitupthru_latency,
216   rs_notrecovering_latency,
217   rs_last,
218 };
219
220 class Messenger;
221 class Message;
222 class MonClient;
223 class PerfCounters;
224 class ObjectStore;
225 class FuseStore;
226 class OSDMap;
227 class MLog;
228 class Objecter;
229
230 class Watch;
231 class PrimaryLogPG;
232
233 class AuthAuthorizeHandlerRegistry;
234
235 class TestOpsSocketHook;
236 struct C_CompleteSplits;
237 struct C_OpenPGs;
238 class LogChannel;
239 class CephContext;
240 typedef ceph::shared_ptr<ObjectStore::Sequencer> SequencerRef;
241 class MOSDOp;
242
243 class DeletingState {
244   Mutex lock;
245   Cond cond;
246   enum {
247     QUEUED,
248     CLEARING_DIR,
249     CLEARING_WAITING,
250     DELETING_DIR,
251     DELETED_DIR,
252     CANCELED,
253   } status;
254   bool stop_deleting;
255 public:
256   const spg_t pgid;
257   const PGRef old_pg_state;
258   explicit DeletingState(const pair<spg_t, PGRef> &in) :
259     lock("DeletingState::lock"), status(QUEUED), stop_deleting(false),
260     pgid(in.first), old_pg_state(in.second) {
261     }
262
263   /// transition status to CLEARING_WAITING
264   bool pause_clearing() {
265     Mutex::Locker l(lock);
266     assert(status == CLEARING_DIR);
267     if (stop_deleting) {
268       status = CANCELED;
269       cond.Signal();
270       return false;
271     }
272     status = CLEARING_WAITING;
273     return true;
274   } ///< @return false if we should cancel deletion
275
276   /// start or resume the clearing - transition the status to CLEARING_DIR
277   bool start_or_resume_clearing() {
278     Mutex::Locker l(lock);
279     assert(
280       status == QUEUED ||
281       status == DELETED_DIR ||
282       status == CLEARING_WAITING);
283     if (stop_deleting) {
284       status = CANCELED;
285       cond.Signal();
286       return false;
287     }
288     status = CLEARING_DIR;
289     return true;
290   } ///< @return false if we should cancel the deletion
291
292   /// transition status to CLEARING_DIR
293   bool resume_clearing() {
294     Mutex::Locker l(lock);
295     assert(status == CLEARING_WAITING);
296     if (stop_deleting) {
297       status = CANCELED;
298       cond.Signal();
299       return false;
300     }
301     status = CLEARING_DIR;
302     return true;
303   } ///< @return false if we should cancel deletion
304
305   /// transition status to deleting
306   bool start_deleting() {
307     Mutex::Locker l(lock);
308     assert(status == CLEARING_DIR);
309     if (stop_deleting) {
310       status = CANCELED;
311       cond.Signal();
312       return false;
313     }
314     status = DELETING_DIR;
315     return true;
316   } ///< @return false if we should cancel deletion
317
318   /// signal collection removal queued
319   void finish_deleting() {
320     Mutex::Locker l(lock);
321     assert(status == DELETING_DIR);
322     status = DELETED_DIR;
323     cond.Signal();
324   }
325
326   /// try to halt the deletion
327   bool try_stop_deletion() {
328     Mutex::Locker l(lock);
329     stop_deleting = true;
330     /**
331      * If we are in DELETING_DIR or CLEARING_DIR, there are in progress
332      * operations we have to wait for before continuing on.  States
333      * CLEARING_WAITING and QUEUED indicate that the remover will check
334      * stop_deleting before queueing any further operations.  CANCELED
335      * indicates that the remover has already halted.  DELETED_DIR
336      * indicates that the deletion has been fully queued.
337      */
338     while (status == DELETING_DIR || status == CLEARING_DIR)
339       cond.Wait(lock);
340     return status != DELETED_DIR;
341   } ///< @return true if we don't need to recreate the collection
342 };
343 typedef ceph::shared_ptr<DeletingState> DeletingStateRef;
344
345 class OSD;
346
347 class OSDService {
348 public:
349   OSD *osd;
350   CephContext *cct;
351   SharedPtrRegistry<spg_t, ObjectStore::Sequencer> osr_registry;
352   ceph::shared_ptr<ObjectStore::Sequencer> meta_osr;
353   SharedPtrRegistry<spg_t, DeletingState> deleting_pgs;
354   const int whoami;
355   ObjectStore *&store;
356   LogClient &log_client;
357   LogChannelRef clog;
358   PGRecoveryStats &pg_recovery_stats;
359 private:
360   Messenger *&cluster_messenger;
361   Messenger *&client_messenger;
362 public:
363   PerfCounters *&logger;
364   PerfCounters *&recoverystate_perf;
365   MonClient   *&monc;
366   ThreadPool::BatchWorkQueue<PG> &peering_wq;
367   GenContextWQ recovery_gen_wq;
368   ClassHandler  *&class_handler;
369
370   void enqueue_back(spg_t pgid, PGQueueable qi);
371   void enqueue_front(spg_t pgid, PGQueueable qi);
372
373   void maybe_inject_dispatch_delay() {
374     if (g_conf->osd_debug_inject_dispatch_delay_probability > 0) {
375       if (rand() % 10000 <
376           g_conf->osd_debug_inject_dispatch_delay_probability * 10000) {
377         utime_t t;
378         t.set_from_double(g_conf->osd_debug_inject_dispatch_delay_duration);
379         t.sleep();
380       }
381     }
382   }
383
384 private:
385   // -- map epoch lower bound --
386   Mutex pg_epoch_lock;
387   multiset<epoch_t> pg_epochs;
388   map<spg_t,epoch_t> pg_epoch;
389
390 public:
391   void pg_add_epoch(spg_t pgid, epoch_t epoch) {
392     Mutex::Locker l(pg_epoch_lock);
393     map<spg_t,epoch_t>::iterator t = pg_epoch.find(pgid);
394     assert(t == pg_epoch.end());
395     pg_epoch[pgid] = epoch;
396     pg_epochs.insert(epoch);
397   }
398   void pg_update_epoch(spg_t pgid, epoch_t epoch) {
399     Mutex::Locker l(pg_epoch_lock);
400     map<spg_t,epoch_t>::iterator t = pg_epoch.find(pgid);
401     assert(t != pg_epoch.end());
402     pg_epochs.erase(pg_epochs.find(t->second));
403     t->second = epoch;
404     pg_epochs.insert(epoch);
405   }
406   void pg_remove_epoch(spg_t pgid) {
407     Mutex::Locker l(pg_epoch_lock);
408     map<spg_t,epoch_t>::iterator t = pg_epoch.find(pgid);
409     if (t != pg_epoch.end()) {
410       pg_epochs.erase(pg_epochs.find(t->second));
411       pg_epoch.erase(t);
412     }
413   }
414   epoch_t get_min_pg_epoch() {
415     Mutex::Locker l(pg_epoch_lock);
416     if (pg_epochs.empty())
417       return 0;
418     else
419       return *pg_epochs.begin();
420   }
421
422 private:
423   // -- superblock --
424   Mutex publish_lock, pre_publish_lock; // pre-publish orders before publish
425   OSDSuperblock superblock;
426
427 public:
428   OSDSuperblock get_superblock() {
429     Mutex::Locker l(publish_lock);
430     return superblock;
431   }
432   void publish_superblock(const OSDSuperblock &block) {
433     Mutex::Locker l(publish_lock);
434     superblock = block;
435   }
436
437   int get_nodeid() const { return whoami; }
438
439   std::atomic<epoch_t> max_oldest_map;
440 private:
441   OSDMapRef osdmap;
442
443 public:
444   OSDMapRef get_osdmap() {
445     Mutex::Locker l(publish_lock);
446     return osdmap;
447   }
448   epoch_t get_osdmap_epoch() {
449     Mutex::Locker l(publish_lock);
450     return osdmap ? osdmap->get_epoch() : 0;
451   }
452   void publish_map(OSDMapRef map) {
453     Mutex::Locker l(publish_lock);
454     osdmap = map;
455   }
456
457   /*
458    * osdmap - current published map
459    * next_osdmap - pre_published map that is about to be published.
460    *
461    * We use the next_osdmap to send messages and initiate connections,
462    * but only if the target is the same instance as the one in the map
463    * epoch the current user is working from (i.e., the result is
464    * equivalent to what is in next_osdmap).
465    *
466    * This allows the helpers to start ignoring osds that are about to
467    * go down, and let OSD::handle_osd_map()/note_down_osd() mark them
468    * down, without worrying about reopening connections from threads
469    * working from old maps.
470    */
471 private:
472   OSDMapRef next_osdmap;
473   Cond pre_publish_cond;
474
475 public:
476   void pre_publish_map(OSDMapRef map) {
477     Mutex::Locker l(pre_publish_lock);
478     next_osdmap = std::move(map);
479   }
480
481   void activate_map();
482   /// map epochs reserved below
483   map<epoch_t, unsigned> map_reservations;
484
485   /// gets ref to next_osdmap and registers the epoch as reserved
486   OSDMapRef get_nextmap_reserved() {
487     Mutex::Locker l(pre_publish_lock);
488     if (!next_osdmap)
489       return OSDMapRef();
490     epoch_t e = next_osdmap->get_epoch();
491     map<epoch_t, unsigned>::iterator i =
492       map_reservations.insert(make_pair(e, 0)).first;
493     i->second++;
494     return next_osdmap;
495   }
496   /// releases reservation on map
497   void release_map(OSDMapRef osdmap) {
498     Mutex::Locker l(pre_publish_lock);
499     map<epoch_t, unsigned>::iterator i =
500       map_reservations.find(osdmap->get_epoch());
501     assert(i != map_reservations.end());
502     assert(i->second > 0);
503     if (--(i->second) == 0) {
504       map_reservations.erase(i);
505     }
506     pre_publish_cond.Signal();
507   }
508   /// blocks until there are no reserved maps prior to next_osdmap
509   void await_reserved_maps() {
510     Mutex::Locker l(pre_publish_lock);
511     assert(next_osdmap);
512     while (true) {
513       map<epoch_t, unsigned>::const_iterator i = map_reservations.cbegin();
514       if (i == map_reservations.cend() || i->first >= next_osdmap->get_epoch()) {
515         break;
516       } else {
517         pre_publish_cond.Wait(pre_publish_lock);
518       }
519     }
520   }
521
522 private:
523   Mutex peer_map_epoch_lock;
524   map<int, epoch_t> peer_map_epoch;
525 public:
526   epoch_t get_peer_epoch(int p);
527   epoch_t note_peer_epoch(int p, epoch_t e);
528   void forget_peer_epoch(int p, epoch_t e);
529
530   void send_map(class MOSDMap *m, Connection *con);
531   void send_incremental_map(epoch_t since, Connection *con, OSDMapRef& osdmap);
532   MOSDMap *build_incremental_map_msg(epoch_t from, epoch_t to,
533                                        OSDSuperblock& superblock);
534   bool should_share_map(entity_name_t name, Connection *con, epoch_t epoch,
535                         const OSDMapRef& osdmap, const epoch_t *sent_epoch_p);
536   void share_map(entity_name_t name, Connection *con, epoch_t epoch,
537                  OSDMapRef& osdmap, epoch_t *sent_epoch_p);
538   void share_map_peer(int peer, Connection *con,
539                       OSDMapRef map = OSDMapRef());
540
541   ConnectionRef get_con_osd_cluster(int peer, epoch_t from_epoch);
542   pair<ConnectionRef,ConnectionRef> get_con_osd_hb(int peer, epoch_t from_epoch);  // (back, front)
543   void send_message_osd_cluster(int peer, Message *m, epoch_t from_epoch);
544   void send_message_osd_cluster(Message *m, Connection *con) {
545     con->send_message(m);
546   }
547   void send_message_osd_cluster(Message *m, const ConnectionRef& con) {
548     con->send_message(m);
549   }
550   void send_message_osd_client(Message *m, Connection *con) {
551     con->send_message(m);
552   }
553   void send_message_osd_client(Message *m, const ConnectionRef& con) {
554     con->send_message(m);
555   }
556   entity_name_t get_cluster_msgr_name() {
557     return cluster_messenger->get_myname();
558   }
559
560 private:
561   // -- scrub scheduling --
562   Mutex sched_scrub_lock;
563   int scrubs_pending;
564   int scrubs_active;
565
566 public:
567   struct ScrubJob {
568     CephContext* cct;
569     /// pg to be scrubbed
570     spg_t pgid;
571     /// a time scheduled for scrub. but the scrub could be delayed if system
572     /// load is too high or it fails to fall in the scrub hours
573     utime_t sched_time;
574     /// the hard upper bound of scrub time
575     utime_t deadline;
576     ScrubJob() : cct(nullptr) {}
577     explicit ScrubJob(CephContext* cct, const spg_t& pg,
578                       const utime_t& timestamp,
579                       double pool_scrub_min_interval = 0,
580                       double pool_scrub_max_interval = 0, bool must = true);
581     /// order the jobs by sched_time
582     bool operator<(const ScrubJob& rhs) const;
583   };
584   set<ScrubJob> sched_scrub_pg;
585
586   /// @returns the scrub_reg_stamp used for unregister the scrub job
587   utime_t reg_pg_scrub(spg_t pgid, utime_t t, double pool_scrub_min_interval,
588                        double pool_scrub_max_interval, bool must) {
589     ScrubJob scrub(cct, pgid, t, pool_scrub_min_interval, pool_scrub_max_interval,
590                    must);
591     Mutex::Locker l(sched_scrub_lock);
592     sched_scrub_pg.insert(scrub);
593     return scrub.sched_time;
594   }
595   void unreg_pg_scrub(spg_t pgid, utime_t t) {
596     Mutex::Locker l(sched_scrub_lock);
597     size_t removed = sched_scrub_pg.erase(ScrubJob(cct, pgid, t));
598     assert(removed);
599   }
600   bool first_scrub_stamp(ScrubJob *out) {
601     Mutex::Locker l(sched_scrub_lock);
602     if (sched_scrub_pg.empty())
603       return false;
604     set<ScrubJob>::iterator iter = sched_scrub_pg.begin();
605     *out = *iter;
606     return true;
607   }
608   bool next_scrub_stamp(const ScrubJob& next,
609                         ScrubJob *out) {
610     Mutex::Locker l(sched_scrub_lock);
611     if (sched_scrub_pg.empty())
612       return false;
613     set<ScrubJob>::const_iterator iter = sched_scrub_pg.lower_bound(next);
614     if (iter == sched_scrub_pg.cend())
615       return false;
616     ++iter;
617     if (iter == sched_scrub_pg.cend())
618       return false;
619     *out = *iter;
620     return true;
621   }
622
623   void dumps_scrub(Formatter *f) {
624     assert(f != nullptr);
625     Mutex::Locker l(sched_scrub_lock);
626
627     f->open_array_section("scrubs");
628     for (const auto &i: sched_scrub_pg) {
629       f->open_object_section("scrub");
630       f->dump_stream("pgid") << i.pgid;
631       f->dump_stream("sched_time") << i.sched_time;
632       f->dump_stream("deadline") << i.deadline;
633       f->dump_bool("forced", i.sched_time == i.deadline);
634       f->close_section();
635     }
636     f->close_section();
637   }
638
639   bool can_inc_scrubs_pending();
640   bool inc_scrubs_pending();
641   void inc_scrubs_active(bool reserved);
642   void dec_scrubs_pending();
643   void dec_scrubs_active();
644
645   void reply_op_error(OpRequestRef op, int err);
646   void reply_op_error(OpRequestRef op, int err, eversion_t v, version_t uv);
647   void handle_misdirected_op(PG *pg, OpRequestRef op);
648
649
650 private:
651   // -- agent shared state --
652   Mutex agent_lock;
653   Cond agent_cond;
654   map<uint64_t, set<PGRef> > agent_queue;
655   set<PGRef>::iterator agent_queue_pos;
656   bool agent_valid_iterator;
657   int agent_ops;
658   int flush_mode_high_count; //once have one pg with FLUSH_MODE_HIGH then flush objects with high speed
659   set<hobject_t> agent_oids;
660   bool agent_active;
661   struct AgentThread : public Thread {
662     OSDService *osd;
663     explicit AgentThread(OSDService *o) : osd(o) {}
664     void *entry() override {
665       osd->agent_entry();
666       return NULL;
667     }
668   } agent_thread;
669   bool agent_stop_flag;
670   Mutex agent_timer_lock;
671   SafeTimer agent_timer;
672
673 public:
674   void agent_entry();
675   void agent_stop();
676
677   void _enqueue(PG *pg, uint64_t priority) {
678     if (!agent_queue.empty() &&
679         agent_queue.rbegin()->first < priority)
680       agent_valid_iterator = false;  // inserting higher-priority queue
681     set<PGRef>& nq = agent_queue[priority];
682     if (nq.empty())
683       agent_cond.Signal();
684     nq.insert(pg);
685   }
686
687   void _dequeue(PG *pg, uint64_t old_priority) {
688     set<PGRef>& oq = agent_queue[old_priority];
689     set<PGRef>::iterator p = oq.find(pg);
690     assert(p != oq.end());
691     if (p == agent_queue_pos)
692       ++agent_queue_pos;
693     oq.erase(p);
694     if (oq.empty()) {
695       if (agent_queue.rbegin()->first == old_priority)
696         agent_valid_iterator = false;
697       agent_queue.erase(old_priority);
698     }
699   }
700
701   /// enable agent for a pg
702   void agent_enable_pg(PG *pg, uint64_t priority) {
703     Mutex::Locker l(agent_lock);
704     _enqueue(pg, priority);
705   }
706
707   /// adjust priority for an enagled pg
708   void agent_adjust_pg(PG *pg, uint64_t old_priority, uint64_t new_priority) {
709     Mutex::Locker l(agent_lock);
710     assert(new_priority != old_priority);
711     _enqueue(pg, new_priority);
712     _dequeue(pg, old_priority);
713   }
714
715   /// disable agent for a pg
716   void agent_disable_pg(PG *pg, uint64_t old_priority) {
717     Mutex::Locker l(agent_lock);
718     _dequeue(pg, old_priority);
719   }
720
721   /// note start of an async (evict) op
722   void agent_start_evict_op() {
723     Mutex::Locker l(agent_lock);
724     ++agent_ops;
725   }
726
727   /// note finish or cancellation of an async (evict) op
728   void agent_finish_evict_op() {
729     Mutex::Locker l(agent_lock);
730     assert(agent_ops > 0);
731     --agent_ops;
732     agent_cond.Signal();
733   }
734
735   /// note start of an async (flush) op
736   void agent_start_op(const hobject_t& oid) {
737     Mutex::Locker l(agent_lock);
738     ++agent_ops;
739     assert(agent_oids.count(oid) == 0);
740     agent_oids.insert(oid);
741   }
742
743   /// note finish or cancellation of an async (flush) op
744   void agent_finish_op(const hobject_t& oid) {
745     Mutex::Locker l(agent_lock);
746     assert(agent_ops > 0);
747     --agent_ops;
748     assert(agent_oids.count(oid) == 1);
749     agent_oids.erase(oid);
750     agent_cond.Signal();
751   }
752
753   /// check if we are operating on an object
754   bool agent_is_active_oid(const hobject_t& oid) {
755     Mutex::Locker l(agent_lock);
756     return agent_oids.count(oid);
757   }
758
759   /// get count of active agent ops
760   int agent_get_num_ops() {
761     Mutex::Locker l(agent_lock);
762     return agent_ops;
763   }
764
765   void agent_inc_high_count() {
766     Mutex::Locker l(agent_lock);
767     flush_mode_high_count ++;
768   }
769
770   void agent_dec_high_count() {
771     Mutex::Locker l(agent_lock);
772     flush_mode_high_count --;
773   }
774
775 private:
776   /// throttle promotion attempts
777   std::atomic_uint promote_probability_millis{1000}; ///< probability thousands. one word.
778   PromoteCounter promote_counter;
779   utime_t last_recalibrate;
780   unsigned long promote_max_objects, promote_max_bytes;
781
782 public:
783   bool promote_throttle() {
784     // NOTE: lockless!  we rely on the probability being a single word.
785     promote_counter.attempt();
786     if ((unsigned)rand() % 1000 > promote_probability_millis)
787       return true;  // yes throttle (no promote)
788     if (promote_max_objects &&
789         promote_counter.objects > promote_max_objects)
790       return true;  // yes throttle
791     if (promote_max_bytes &&
792         promote_counter.bytes > promote_max_bytes)
793       return true;  // yes throttle
794     return false;   //  no throttle (promote)
795   }
796   void promote_finish(uint64_t bytes) {
797     promote_counter.finish(bytes);
798   }
799   void promote_throttle_recalibrate();
800
801   // -- Objecter, for tiering reads/writes from/to other OSDs --
802   Objecter *objecter;
803   Finisher objecter_finisher;
804
805   // -- Watch --
806   Mutex watch_lock;
807   SafeTimer watch_timer;
808   uint64_t next_notif_id;
809   uint64_t get_next_id(epoch_t cur_epoch) {
810     Mutex::Locker l(watch_lock);
811     return (((uint64_t)cur_epoch) << 32) | ((uint64_t)(next_notif_id++));
812   }
813
814   // -- Recovery/Backfill Request Scheduling --
815   Mutex recovery_request_lock;
816   SafeTimer recovery_request_timer;
817
818   // For async recovery sleep
819   bool recovery_needs_sleep = true;
820   utime_t recovery_schedule_time = utime_t();
821
822   Mutex recovery_sleep_lock;
823   SafeTimer recovery_sleep_timer;
824
825   // -- tids --
826   // for ops i issue
827   std::atomic_uint last_tid{0};
828   ceph_tid_t get_tid() {
829     return (ceph_tid_t)last_tid++;
830   }
831
832   // -- backfill_reservation --
833   Finisher reserver_finisher;
834   AsyncReserver<spg_t> local_reserver;
835   AsyncReserver<spg_t> remote_reserver;
836
837   // -- pg_temp --
838 private:
839   Mutex pg_temp_lock;
840   map<pg_t, vector<int> > pg_temp_wanted;
841   map<pg_t, vector<int> > pg_temp_pending;
842   void _sent_pg_temp();
843 public:
844   void queue_want_pg_temp(pg_t pgid, vector<int>& want);
845   void remove_want_pg_temp(pg_t pgid);
846   void requeue_pg_temp();
847   void send_pg_temp();
848
849   void send_pg_created(pg_t pgid);
850
851   void queue_for_peering(PG *pg);
852
853   Mutex snap_sleep_lock;
854   SafeTimer snap_sleep_timer;
855
856   Mutex scrub_sleep_lock;
857   SafeTimer scrub_sleep_timer;
858
859   AsyncReserver<spg_t> snap_reserver;
860   void queue_for_snap_trim(PG *pg);
861
862   void queue_for_scrub(PG *pg, bool with_high_priority) {
863     unsigned scrub_queue_priority = pg->scrubber.priority;
864     if (with_high_priority && scrub_queue_priority < cct->_conf->osd_client_op_priority) {
865       scrub_queue_priority = cct->_conf->osd_client_op_priority;
866     }
867     enqueue_back(
868       pg->info.pgid,
869       PGQueueable(
870         PGScrub(pg->get_osdmap()->get_epoch()),
871         cct->_conf->osd_scrub_cost,
872         scrub_queue_priority,
873         ceph_clock_now(),
874         entity_inst_t(),
875         pg->get_osdmap()->get_epoch()));
876   }
877
878 private:
879   // -- pg recovery and associated throttling --
880   Mutex recovery_lock;
881   list<pair<epoch_t, PGRef> > awaiting_throttle;
882
883   utime_t defer_recovery_until;
884   uint64_t recovery_ops_active;
885   uint64_t recovery_ops_reserved;
886   bool recovery_paused;
887 #ifdef DEBUG_RECOVERY_OIDS
888   map<spg_t, set<hobject_t> > recovery_oids;
889 #endif
890   bool _recover_now(uint64_t *available_pushes);
891   void _maybe_queue_recovery();
892   void _queue_for_recovery(
893     pair<epoch_t, PGRef> p, uint64_t reserved_pushes) {
894     assert(recovery_lock.is_locked_by_me());
895     enqueue_back(
896       p.second->info.pgid,
897       PGQueueable(
898         PGRecovery(p.first, reserved_pushes),
899         cct->_conf->osd_recovery_cost,
900         cct->_conf->osd_recovery_priority,
901         ceph_clock_now(),
902         entity_inst_t(),
903         p.first));
904   }
905 public:
906   void start_recovery_op(PG *pg, const hobject_t& soid);
907   void finish_recovery_op(PG *pg, const hobject_t& soid, bool dequeue);
908   bool is_recovery_active();
909   void release_reserved_pushes(uint64_t pushes) {
910     Mutex::Locker l(recovery_lock);
911     assert(recovery_ops_reserved >= pushes);
912     recovery_ops_reserved -= pushes;
913     _maybe_queue_recovery();
914   }
915   void defer_recovery(float defer_for) {
916     defer_recovery_until = ceph_clock_now();
917     defer_recovery_until += defer_for;
918   }
919   void pause_recovery() {
920     Mutex::Locker l(recovery_lock);
921     recovery_paused = true;
922   }
923   bool recovery_is_paused() {
924     Mutex::Locker l(recovery_lock);
925     return recovery_paused;
926   }
927   void unpause_recovery() {
928     Mutex::Locker l(recovery_lock);
929     recovery_paused = false;
930     _maybe_queue_recovery();
931   }
932   void kick_recovery_queue() {
933     Mutex::Locker l(recovery_lock);
934     _maybe_queue_recovery();
935   }
936   void clear_queued_recovery(PG *pg) {
937     Mutex::Locker l(recovery_lock);
938     for (list<pair<epoch_t, PGRef> >::iterator i = awaiting_throttle.begin();
939          i != awaiting_throttle.end();
940       ) {
941       if (i->second.get() == pg) {
942         awaiting_throttle.erase(i);
943         return;
944       } else {
945         ++i;
946       }
947     }
948   }
949   // delayed pg activation
950   void queue_for_recovery(PG *pg) {
951     Mutex::Locker l(recovery_lock);
952
953     if (pg->get_state() & (PG_STATE_FORCED_RECOVERY | PG_STATE_FORCED_BACKFILL)) {
954       awaiting_throttle.push_front(make_pair(pg->get_osdmap()->get_epoch(), pg));
955     } else {
956       awaiting_throttle.push_back(make_pair(pg->get_osdmap()->get_epoch(), pg));
957     }
958     _maybe_queue_recovery();
959   }
960   void queue_recovery_after_sleep(PG *pg, epoch_t queued, uint64_t reserved_pushes) {
961     Mutex::Locker l(recovery_lock);
962     _queue_for_recovery(make_pair(queued, pg), reserved_pushes);
963   }
964
965   void adjust_pg_priorities(const vector<PGRef>& pgs, int newflags);
966
967   // osd map cache (past osd maps)
968   Mutex map_cache_lock;
969   SharedLRU<epoch_t, const OSDMap> map_cache;
970   SimpleLRU<epoch_t, bufferlist> map_bl_cache;
971   SimpleLRU<epoch_t, bufferlist> map_bl_inc_cache;
972
973   OSDMapRef try_get_map(epoch_t e);
974   OSDMapRef get_map(epoch_t e) {
975     OSDMapRef ret(try_get_map(e));
976     assert(ret);
977     return ret;
978   }
979   OSDMapRef add_map(OSDMap *o) {
980     Mutex::Locker l(map_cache_lock);
981     return _add_map(o);
982   }
983   OSDMapRef _add_map(OSDMap *o);
984
985   void add_map_bl(epoch_t e, bufferlist& bl) {
986     Mutex::Locker l(map_cache_lock);
987     return _add_map_bl(e, bl);
988   }
989   void pin_map_bl(epoch_t e, bufferlist &bl);
990   void _add_map_bl(epoch_t e, bufferlist& bl);
991   bool get_map_bl(epoch_t e, bufferlist& bl) {
992     Mutex::Locker l(map_cache_lock);
993     return _get_map_bl(e, bl);
994   }
995   bool _get_map_bl(epoch_t e, bufferlist& bl);
996
997   void add_map_inc_bl(epoch_t e, bufferlist& bl) {
998     Mutex::Locker l(map_cache_lock);
999     return _add_map_inc_bl(e, bl);
1000   }
1001   void pin_map_inc_bl(epoch_t e, bufferlist &bl);
1002   void _add_map_inc_bl(epoch_t e, bufferlist& bl);
1003   bool get_inc_map_bl(epoch_t e, bufferlist& bl);
1004
1005   void clear_map_bl_cache_pins(epoch_t e);
1006
1007   void need_heartbeat_peer_update();
1008
1009   void pg_stat_queue_enqueue(PG *pg);
1010   void pg_stat_queue_dequeue(PG *pg);
1011
1012   void init();
1013   void final_init();  
1014   void start_shutdown();
1015   void shutdown_reserver();
1016   void shutdown();
1017
1018 private:
1019   // split
1020   Mutex in_progress_split_lock;
1021   map<spg_t, spg_t> pending_splits; // child -> parent
1022   map<spg_t, set<spg_t> > rev_pending_splits; // parent -> [children]
1023   set<spg_t> in_progress_splits;       // child
1024
1025 public:
1026   void _start_split(spg_t parent, const set<spg_t> &children);
1027   void start_split(spg_t parent, const set<spg_t> &children) {
1028     Mutex::Locker l(in_progress_split_lock);
1029     return _start_split(parent, children);
1030   }
1031   void mark_split_in_progress(spg_t parent, const set<spg_t> &pgs);
1032   void complete_split(const set<spg_t> &pgs);
1033   void cancel_pending_splits_for_parent(spg_t parent);
1034   void _cancel_pending_splits_for_parent(spg_t parent);
1035   bool splitting(spg_t pgid);
1036   void expand_pg_num(OSDMapRef old_map,
1037                      OSDMapRef new_map);
1038   void _maybe_split_pgid(OSDMapRef old_map,
1039                          OSDMapRef new_map,
1040                          spg_t pgid);
1041   void init_splits_between(spg_t pgid, OSDMapRef frommap, OSDMapRef tomap);
1042
1043   // -- stats --
1044   Mutex stat_lock;
1045   osd_stat_t osd_stat;
1046   uint32_t seq = 0;
1047
1048   void update_osd_stat(vector<int>& hb_peers);
1049   osd_stat_t set_osd_stat(const struct store_statfs_t &stbuf,
1050                           vector<int>& hb_peers,
1051                           int num_pgs);
1052   osd_stat_t get_osd_stat() {
1053     Mutex::Locker l(stat_lock);
1054     ++seq;
1055     osd_stat.up_from = up_epoch;
1056     osd_stat.seq = ((uint64_t)osd_stat.up_from << 32) + seq;
1057     return osd_stat;
1058   }
1059   uint64_t get_osd_stat_seq() {
1060     Mutex::Locker l(stat_lock);
1061     return osd_stat.seq;
1062   }
1063
1064   // -- OSD Full Status --
1065 private:
1066   friend TestOpsSocketHook;
1067   mutable Mutex full_status_lock;
1068   enum s_names { INVALID = -1, NONE, NEARFULL, BACKFILLFULL, FULL, FAILSAFE } cur_state;  // ascending
1069   const char *get_full_state_name(s_names s) const {
1070     switch (s) {
1071     case NONE: return "none";
1072     case NEARFULL: return "nearfull";
1073     case BACKFILLFULL: return "backfillfull";
1074     case FULL: return "full";
1075     case FAILSAFE: return "failsafe";
1076     default: return "???";
1077     }
1078   }
1079   s_names get_full_state(string type) const {
1080     if (type == "none")
1081       return NONE;
1082     else if (type == "failsafe")
1083       return FAILSAFE;
1084     else if (type == "full")
1085       return FULL;
1086     else if (type == "backfillfull")
1087       return BACKFILLFULL;
1088     else if (type == "nearfull")
1089       return NEARFULL;
1090     else
1091       return INVALID;
1092   }
1093   double cur_ratio;  ///< current utilization
1094   mutable int64_t injectfull = 0;
1095   s_names injectfull_state = NONE;
1096   float get_failsafe_full_ratio();
1097   void check_full_status(float ratio);
1098   bool _check_full(s_names type, ostream &ss) const;
1099 public:
1100   bool check_failsafe_full(ostream &ss) const;
1101   bool check_full(ostream &ss) const;
1102   bool check_backfill_full(ostream &ss) const;
1103   bool check_nearfull(ostream &ss) const;
1104   bool is_failsafe_full() const;
1105   bool is_full() const;
1106   bool is_backfillfull() const;
1107   bool is_nearfull() const;
1108   bool need_fullness_update();  ///< osdmap state needs update
1109   void set_injectfull(s_names type, int64_t count);
1110   bool check_osdmap_full(const set<pg_shard_t> &missing_on);
1111
1112
1113   // -- epochs --
1114 private:
1115   mutable Mutex epoch_lock; // protects access to boot_epoch, up_epoch, bind_epoch
1116   epoch_t boot_epoch;  // _first_ epoch we were marked up (after this process started)
1117   epoch_t up_epoch;    // _most_recent_ epoch we were marked up
1118   epoch_t bind_epoch;  // epoch we last did a bind to new ip:ports
1119 public:
1120   /**
1121    * Retrieve the boot_, up_, and bind_ epochs the OSD has set. The params
1122    * can be NULL if you don't care about them.
1123    */
1124   void retrieve_epochs(epoch_t *_boot_epoch, epoch_t *_up_epoch,
1125                        epoch_t *_bind_epoch) const;
1126   /**
1127    * Set the boot, up, and bind epochs. Any NULL params will not be set.
1128    */
1129   void set_epochs(const epoch_t *_boot_epoch, const epoch_t *_up_epoch,
1130                   const epoch_t *_bind_epoch);
1131   epoch_t get_boot_epoch() const {
1132     epoch_t ret;
1133     retrieve_epochs(&ret, NULL, NULL);
1134     return ret;
1135   }
1136   epoch_t get_up_epoch() const {
1137     epoch_t ret;
1138     retrieve_epochs(NULL, &ret, NULL);
1139     return ret;
1140   }
1141   epoch_t get_bind_epoch() const {
1142     epoch_t ret;
1143     retrieve_epochs(NULL, NULL, &ret);
1144     return ret;
1145   }
1146
1147   void request_osdmap_update(epoch_t e);
1148
1149   // -- stopping --
1150   Mutex is_stopping_lock;
1151   Cond is_stopping_cond;
1152   enum {
1153     NOT_STOPPING,
1154     PREPARING_TO_STOP,
1155     STOPPING };
1156   std::atomic_int state{NOT_STOPPING};
1157   int get_state() {
1158     return state;
1159   }
1160   void set_state(int s) {
1161     state = s;
1162   }
1163   bool is_stopping() const {
1164     return state == STOPPING;
1165   }
1166   bool is_preparing_to_stop() const {
1167     return state == PREPARING_TO_STOP;
1168   }
1169   bool prepare_to_stop();
1170   void got_stop_ack();
1171
1172
1173 #ifdef PG_DEBUG_REFS
1174   Mutex pgid_lock;
1175   map<spg_t, int> pgid_tracker;
1176   map<spg_t, PG*> live_pgs;
1177   void add_pgid(spg_t pgid, PG *pg);
1178   void remove_pgid(spg_t pgid, PG *pg);
1179   void dump_live_pgids();
1180 #endif
1181
1182   explicit OSDService(OSD *osd);
1183   ~OSDService();
1184 };
1185
1186 class OSD : public Dispatcher,
1187             public md_config_obs_t {
1188   /** OSD **/
1189   Mutex osd_lock;                       // global lock
1190   SafeTimer tick_timer;    // safe timer (osd_lock)
1191
1192   // Tick timer for those stuff that do not need osd_lock
1193   Mutex tick_timer_lock;
1194   SafeTimer tick_timer_without_osd_lock;
1195 public:
1196   // config observer bits
1197   const char** get_tracked_conf_keys() const override;
1198   void handle_conf_change(const struct md_config_t *conf,
1199                           const std::set <std::string> &changed) override;
1200   void update_log_config();
1201   void check_config();
1202
1203 protected:
1204
1205   static const double OSD_TICK_INTERVAL; // tick interval for tick_timer and tick_timer_without_osd_lock
1206
1207   AuthAuthorizeHandlerRegistry *authorize_handler_cluster_registry;
1208   AuthAuthorizeHandlerRegistry *authorize_handler_service_registry;
1209
1210   Messenger   *cluster_messenger;
1211   Messenger   *client_messenger;
1212   Messenger   *objecter_messenger;
1213   MonClient   *monc; // check the "monc helpers" list before accessing directly
1214   MgrClient   mgrc;
1215   PerfCounters      *logger;
1216   PerfCounters      *recoverystate_perf;
1217   ObjectStore *store;
1218 #ifdef HAVE_LIBFUSE
1219   FuseStore *fuse_store = nullptr;
1220 #endif
1221   LogClient log_client;
1222   LogChannelRef clog;
1223
1224   int whoami;
1225   std::string dev_path, journal_path;
1226
1227   bool store_is_rotational = true;
1228   bool journal_is_rotational = true;
1229
1230   ZTracer::Endpoint trace_endpoint;
1231   void create_logger();
1232   void create_recoverystate_perf();
1233   void tick();
1234   void tick_without_osd_lock();
1235   void _dispatch(Message *m);
1236   void dispatch_op(OpRequestRef op);
1237
1238   void check_osdmap_features(ObjectStore *store);
1239
1240   // asok
1241   friend class OSDSocketHook;
1242   class OSDSocketHook *asok_hook;
1243   bool asok_command(string admin_command, cmdmap_t& cmdmap, string format, ostream& ss);
1244
1245 public:
1246   ClassHandler  *class_handler = nullptr;
1247   int get_nodeid() { return whoami; }
1248   
1249   static ghobject_t get_osdmap_pobject_name(epoch_t epoch) {
1250     char foo[20];
1251     snprintf(foo, sizeof(foo), "osdmap.%d", epoch);
1252     return ghobject_t(hobject_t(sobject_t(object_t(foo), 0)));
1253   }
1254   static ghobject_t get_inc_osdmap_pobject_name(epoch_t epoch) {
1255     char foo[22];
1256     snprintf(foo, sizeof(foo), "inc_osdmap.%d", epoch);
1257     return ghobject_t(hobject_t(sobject_t(object_t(foo), 0)));
1258   }
1259
1260   static ghobject_t make_snapmapper_oid() {
1261     return ghobject_t(hobject_t(
1262       sobject_t(
1263         object_t("snapmapper"),
1264         0)));
1265   }
1266
1267   static ghobject_t make_pg_log_oid(spg_t pg) {
1268     stringstream ss;
1269     ss << "pglog_" << pg;
1270     string s;
1271     getline(ss, s);
1272     return ghobject_t(hobject_t(sobject_t(object_t(s.c_str()), 0)));
1273   }
1274   
1275   static ghobject_t make_pg_biginfo_oid(spg_t pg) {
1276     stringstream ss;
1277     ss << "pginfo_" << pg;
1278     string s;
1279     getline(ss, s);
1280     return ghobject_t(hobject_t(sobject_t(object_t(s.c_str()), 0)));
1281   }
1282   static ghobject_t make_infos_oid() {
1283     hobject_t oid(sobject_t("infos", CEPH_NOSNAP));
1284     return ghobject_t(oid);
1285   }
1286   static void recursive_remove_collection(CephContext* cct,
1287                                           ObjectStore *store,
1288                                           spg_t pgid,
1289                                           coll_t tmp);
1290
1291   /**
1292    * get_osd_initial_compat_set()
1293    *
1294    * Get the initial feature set for this OSD.  Features
1295    * here are automatically upgraded.
1296    *
1297    * Return value: Initial osd CompatSet
1298    */
1299   static CompatSet get_osd_initial_compat_set();
1300
1301   /**
1302    * get_osd_compat_set()
1303    *
1304    * Get all features supported by this OSD
1305    *
1306    * Return value: CompatSet of all supported features
1307    */
1308   static CompatSet get_osd_compat_set();
1309   
1310
1311 private:
1312   class C_Tick;
1313   class C_Tick_WithoutOSDLock;
1314
1315   // -- superblock --
1316   OSDSuperblock superblock;
1317
1318   void write_superblock();
1319   void write_superblock(ObjectStore::Transaction& t);
1320   int read_superblock();
1321
1322   void clear_temp_objects();
1323
1324   CompatSet osd_compat;
1325
1326   // -- state --
1327 public:
1328   typedef enum {
1329     STATE_INITIALIZING = 1,
1330     STATE_PREBOOT,
1331     STATE_BOOTING,
1332     STATE_ACTIVE,
1333     STATE_STOPPING,
1334     STATE_WAITING_FOR_HEALTHY
1335   } osd_state_t;
1336
1337   static const char *get_state_name(int s) {
1338     switch (s) {
1339     case STATE_INITIALIZING: return "initializing";
1340     case STATE_PREBOOT: return "preboot";
1341     case STATE_BOOTING: return "booting";
1342     case STATE_ACTIVE: return "active";
1343     case STATE_STOPPING: return "stopping";
1344     case STATE_WAITING_FOR_HEALTHY: return "waiting_for_healthy";
1345     default: return "???";
1346     }
1347   }
1348
1349 private:
1350   std::atomic_int state{STATE_INITIALIZING};
1351   bool waiting_for_luminous_mons = false;
1352
1353 public:
1354   int get_state() const {
1355     return state;
1356   }
1357   void set_state(int s) {
1358     state = s;
1359   }
1360   bool is_initializing() const {
1361     return state == STATE_INITIALIZING;
1362   }
1363   bool is_preboot() const {
1364     return state == STATE_PREBOOT;
1365   }
1366   bool is_booting() const {
1367     return state == STATE_BOOTING;
1368   }
1369   bool is_active() const {
1370     return state == STATE_ACTIVE;
1371   }
1372   bool is_stopping() const {
1373     return state == STATE_STOPPING;
1374   }
1375   bool is_waiting_for_healthy() const {
1376     return state == STATE_WAITING_FOR_HEALTHY;
1377   }
1378
1379 private:
1380
1381   ThreadPool peering_tp;
1382   ShardedThreadPool osd_op_tp;
1383   ThreadPool disk_tp;
1384   ThreadPool command_tp;
1385
1386   void set_disk_tp_priority();
1387   void get_latest_osdmap();
1388
1389   // -- sessions --
1390 private:
1391   void dispatch_session_waiting(Session *session, OSDMapRef osdmap);
1392   void maybe_share_map(Session *session, OpRequestRef op, OSDMapRef osdmap);
1393
1394   Mutex session_waiting_lock;
1395   set<Session*> session_waiting_for_map;
1396
1397   /// Caller assumes refs for included Sessions
1398   void get_sessions_waiting_for_map(set<Session*> *out) {
1399     Mutex::Locker l(session_waiting_lock);
1400     out->swap(session_waiting_for_map);
1401   }
1402   void register_session_waiting_on_map(Session *session) {
1403     Mutex::Locker l(session_waiting_lock);
1404     if (session_waiting_for_map.insert(session).second) {
1405       session->get();
1406     }
1407   }
1408   void clear_session_waiting_on_map(Session *session) {
1409     Mutex::Locker l(session_waiting_lock);
1410     set<Session*>::iterator i = session_waiting_for_map.find(session);
1411     if (i != session_waiting_for_map.end()) {
1412       (*i)->put();
1413       session_waiting_for_map.erase(i);
1414     }
1415   }
1416   void dispatch_sessions_waiting_on_map() {
1417     set<Session*> sessions_to_check;
1418     get_sessions_waiting_for_map(&sessions_to_check);
1419     for (set<Session*>::iterator i = sessions_to_check.begin();
1420          i != sessions_to_check.end();
1421          sessions_to_check.erase(i++)) {
1422       (*i)->session_dispatch_lock.Lock();
1423       dispatch_session_waiting(*i, osdmap);
1424       (*i)->session_dispatch_lock.Unlock();
1425       (*i)->put();
1426     }
1427   }
1428   void session_handle_reset(Session *session) {
1429     Mutex::Locker l(session->session_dispatch_lock);
1430     clear_session_waiting_on_map(session);
1431
1432     session->clear_backoffs();
1433
1434     /* Messages have connection refs, we need to clear the
1435      * connection->session->message->connection
1436      * cycles which result.
1437      * Bug #12338
1438      */
1439     session->waiting_on_map.clear_and_dispose(TrackedOp::Putter());
1440   }
1441
1442 private:
1443   /**
1444    * @defgroup monc helpers
1445    * @{
1446    * Right now we only have the one
1447    */
1448
1449   /**
1450    * Ask the Monitors for a sequence of OSDMaps.
1451    *
1452    * @param epoch The epoch to start with when replying
1453    * @param force_request True if this request forces a new subscription to
1454    * the monitors; false if an outstanding request that encompasses it is
1455    * sufficient.
1456    */
1457   void osdmap_subscribe(version_t epoch, bool force_request);
1458   /** @} monc helpers */
1459
1460   Mutex osdmap_subscribe_lock;
1461   epoch_t latest_subscribed_epoch{0};
1462
1463   // -- heartbeat --
1464   /// information about a heartbeat peer
1465   struct HeartbeatInfo {
1466     int peer;           ///< peer
1467     ConnectionRef con_front;   ///< peer connection (front)
1468     ConnectionRef con_back;    ///< peer connection (back)
1469     utime_t first_tx;   ///< time we sent our first ping request
1470     utime_t last_tx;    ///< last time we sent a ping request
1471     utime_t last_rx_front;  ///< last time we got a ping reply on the front side
1472     utime_t last_rx_back;   ///< last time we got a ping reply on the back side
1473     epoch_t epoch;      ///< most recent epoch we wanted this peer
1474
1475     bool is_unhealthy(utime_t cutoff) const {
1476       return
1477         ! ((last_rx_front > cutoff ||
1478             (last_rx_front == utime_t() && (last_tx == utime_t() ||
1479                                             first_tx > cutoff))) &&
1480            (last_rx_back > cutoff ||
1481             (last_rx_back == utime_t() && (last_tx == utime_t() ||
1482                                            first_tx > cutoff))));
1483     }
1484     bool is_healthy(utime_t cutoff) const {
1485       return last_rx_front > cutoff && last_rx_back > cutoff;
1486     }
1487
1488   };
1489   /// state attached to outgoing heartbeat connections
1490   struct HeartbeatSession : public RefCountedObject {
1491     int peer;
1492     explicit HeartbeatSession(int p) : peer(p) {}
1493   };
1494   Mutex heartbeat_lock;
1495   map<int, int> debug_heartbeat_drops_remaining;
1496   Cond heartbeat_cond;
1497   bool heartbeat_stop;
1498   std::atomic_bool heartbeat_need_update;   
1499   map<int,HeartbeatInfo> heartbeat_peers;  ///< map of osd id to HeartbeatInfo
1500   utime_t last_mon_heartbeat;
1501   Messenger *hb_front_client_messenger;
1502   Messenger *hb_back_client_messenger;
1503   Messenger *hb_front_server_messenger;
1504   Messenger *hb_back_server_messenger;
1505   utime_t last_heartbeat_resample;   ///< last time we chose random peers in waiting-for-healthy state
1506   double daily_loadavg;
1507   
1508   void _add_heartbeat_peer(int p);
1509   void _remove_heartbeat_peer(int p);
1510   bool heartbeat_reset(Connection *con);
1511   void maybe_update_heartbeat_peers();
1512   void reset_heartbeat_peers();
1513   bool heartbeat_peers_need_update() {
1514     return heartbeat_need_update.load();
1515   }
1516   void heartbeat_set_peers_need_update() {
1517     heartbeat_need_update.store(true);
1518   }
1519   void heartbeat_clear_peers_need_update() {
1520     heartbeat_need_update.store(false);
1521   }
1522   void heartbeat();
1523   void heartbeat_check();
1524   void heartbeat_entry();
1525   void need_heartbeat_peer_update();
1526
1527   void heartbeat_kick() {
1528     Mutex::Locker l(heartbeat_lock);
1529     heartbeat_cond.Signal();
1530   }
1531
1532   struct T_Heartbeat : public Thread {
1533     OSD *osd;
1534     explicit T_Heartbeat(OSD *o) : osd(o) {}
1535     void *entry() override {
1536       osd->heartbeat_entry();
1537       return 0;
1538     }
1539   } heartbeat_thread;
1540
1541 public:
1542   bool heartbeat_dispatch(Message *m);
1543
1544   struct HeartbeatDispatcher : public Dispatcher {
1545     OSD *osd;
1546     explicit HeartbeatDispatcher(OSD *o) : Dispatcher(o->cct), osd(o) {}
1547
1548     bool ms_can_fast_dispatch_any() const override { return true; }
1549     bool ms_can_fast_dispatch(const Message *m) const override {
1550       switch (m->get_type()) {
1551         case CEPH_MSG_PING:
1552         case MSG_OSD_PING:
1553           return true;
1554         default:
1555           return false;
1556         }
1557     }
1558     void ms_fast_dispatch(Message *m) override {
1559       osd->heartbeat_dispatch(m);
1560     }
1561     bool ms_dispatch(Message *m) override {
1562       return osd->heartbeat_dispatch(m);
1563     }
1564     bool ms_handle_reset(Connection *con) override {
1565       return osd->heartbeat_reset(con);
1566     }
1567     void ms_handle_remote_reset(Connection *con) override {}
1568     bool ms_handle_refused(Connection *con) override {
1569       return osd->ms_handle_refused(con);
1570     }
1571     bool ms_verify_authorizer(Connection *con, int peer_type,
1572                               int protocol, bufferlist& authorizer_data, bufferlist& authorizer_reply,
1573                               bool& isvalid, CryptoKey& session_key) override {
1574       isvalid = true;
1575       return true;
1576     }
1577   } heartbeat_dispatcher;
1578
1579 private:
1580   // -- waiters --
1581   list<OpRequestRef> finished;
1582   
1583   void take_waiters(list<OpRequestRef>& ls) {
1584     assert(osd_lock.is_locked());
1585     finished.splice(finished.end(), ls);
1586   }
1587   void do_waiters();
1588   
1589   // -- op tracking --
1590   OpTracker op_tracker;
1591   void check_ops_in_flight();
1592   void test_ops(std::string command, std::string args, ostream& ss);
1593   friend class TestOpsSocketHook;
1594   TestOpsSocketHook *test_ops_hook;
1595   friend struct C_CompleteSplits;
1596   friend struct C_OpenPGs;
1597
1598   // -- op queue --
1599   enum class io_queue {
1600     prioritized,
1601     weightedpriority,
1602     mclock_opclass,
1603     mclock_client,
1604   };
1605   friend std::ostream& operator<<(std::ostream& out, const OSD::io_queue& q);
1606
1607   const io_queue op_queue;
1608   const unsigned int op_prio_cutoff;
1609
1610   /*
1611    * The ordered op delivery chain is:
1612    *
1613    *   fast dispatch -> pqueue back
1614    *                    pqueue front <-> to_process back
1615    *                                     to_process front  -> RunVis(item)
1616    *                                                      <- queue_front()
1617    *
1618    * The pqueue is per-shard, and to_process is per pg_slot.  Items can be
1619    * pushed back up into to_process and/or pqueue while order is preserved.
1620    *
1621    * Multiple worker threads can operate on each shard.
1622    *
1623    * Under normal circumstances, num_running == to_proces.size().  There are
1624    * two times when that is not true: (1) when waiting_for_pg == true and
1625    * to_process is accumulating requests that are waiting for the pg to be
1626    * instantiated; in that case they will all get requeued together by
1627    * wake_pg_waiters, and (2) when wake_pg_waiters just ran, waiting_for_pg
1628    * and already requeued the items.
1629    */
1630   friend class PGQueueable;
1631
1632   class ShardedOpWQ
1633     : public ShardedThreadPool::ShardedWQ<pair<spg_t,PGQueueable>>
1634   {
1635     struct ShardData {
1636       Mutex sdata_lock;
1637       Cond sdata_cond;
1638
1639       Mutex sdata_op_ordering_lock;   ///< protects all members below
1640
1641       OSDMapRef waiting_for_pg_osdmap;
1642       struct pg_slot {
1643         PGRef pg;                     ///< cached pg reference [optional]
1644         list<PGQueueable> to_process; ///< order items for this slot
1645         int num_running = 0;          ///< _process threads doing pg lookup/lock
1646
1647         /// true if pg does/did not exist. if so all new items go directly to
1648         /// to_process.  cleared by prune_pg_waiters.
1649         bool waiting_for_pg = false;
1650
1651         /// incremented by wake_pg_waiters; indicates racing _process threads
1652         /// should bail out (their op has been requeued)
1653         uint64_t requeue_seq = 0;
1654       };
1655
1656       /// map of slots for each spg_t.  maintains ordering of items dequeued
1657       /// from pqueue while _process thread drops shard lock to acquire the
1658       /// pg lock.  slots are removed only by prune_pg_waiters.
1659       unordered_map<spg_t,pg_slot> pg_slots;
1660
1661       /// priority queue
1662       std::unique_ptr<OpQueue< pair<spg_t, PGQueueable>, entity_inst_t>> pqueue;
1663
1664       void _enqueue_front(pair<spg_t, PGQueueable> item, unsigned cutoff) {
1665         unsigned priority = item.second.get_priority();
1666         unsigned cost = item.second.get_cost();
1667         if (priority >= cutoff)
1668           pqueue->enqueue_strict_front(
1669             item.second.get_owner(),
1670             priority, item);
1671         else
1672           pqueue->enqueue_front(
1673             item.second.get_owner(),
1674             priority, cost, item);
1675       }
1676
1677       ShardData(
1678         string lock_name, string ordering_lock,
1679         uint64_t max_tok_per_prio, uint64_t min_cost, CephContext *cct,
1680         io_queue opqueue)
1681         : sdata_lock(lock_name.c_str(), false, true, false, cct),
1682           sdata_op_ordering_lock(ordering_lock.c_str(), false, true,
1683                                  false, cct) {
1684         if (opqueue == io_queue::weightedpriority) {
1685           pqueue = std::unique_ptr
1686             <WeightedPriorityQueue<pair<spg_t,PGQueueable>,entity_inst_t>>(
1687               new WeightedPriorityQueue<pair<spg_t,PGQueueable>,entity_inst_t>(
1688                 max_tok_per_prio, min_cost));
1689         } else if (opqueue == io_queue::prioritized) {
1690           pqueue = std::unique_ptr
1691             <PrioritizedQueue<pair<spg_t,PGQueueable>,entity_inst_t>>(
1692               new PrioritizedQueue<pair<spg_t,PGQueueable>,entity_inst_t>(
1693                 max_tok_per_prio, min_cost));
1694         } else if (opqueue == io_queue::mclock_opclass) {
1695           pqueue = std::unique_ptr
1696             <ceph::mClockOpClassQueue>(new ceph::mClockOpClassQueue(cct));
1697         } else if (opqueue == io_queue::mclock_client) {
1698           pqueue = std::unique_ptr
1699             <ceph::mClockClientQueue>(new ceph::mClockClientQueue(cct));
1700         }
1701       }
1702     }; // struct ShardData
1703
1704     vector<ShardData*> shard_list;
1705     OSD *osd;
1706     uint32_t num_shards;
1707
1708   public:
1709     ShardedOpWQ(uint32_t pnum_shards,
1710                 OSD *o,
1711                 time_t ti,
1712                 time_t si,
1713                 ShardedThreadPool* tp)
1714       : ShardedThreadPool::ShardedWQ<pair<spg_t,PGQueueable>>(ti, si, tp),
1715         osd(o),
1716         num_shards(pnum_shards) {
1717       for (uint32_t i = 0; i < num_shards; i++) {
1718         char lock_name[32] = {0};
1719         snprintf(lock_name, sizeof(lock_name), "%s.%d", "OSD:ShardedOpWQ:", i);
1720         char order_lock[32] = {0};
1721         snprintf(order_lock, sizeof(order_lock), "%s.%d",
1722                  "OSD:ShardedOpWQ:order:", i);
1723         ShardData* one_shard = new ShardData(
1724           lock_name, order_lock,
1725           osd->cct->_conf->osd_op_pq_max_tokens_per_priority, 
1726           osd->cct->_conf->osd_op_pq_min_cost, osd->cct, osd->op_queue);
1727         shard_list.push_back(one_shard);
1728       }
1729     }
1730     ~ShardedOpWQ() override {
1731       while (!shard_list.empty()) {
1732         delete shard_list.back();
1733         shard_list.pop_back();
1734       }
1735     }
1736
1737     /// wake any pg waiters after a PG is created/instantiated
1738     void wake_pg_waiters(spg_t pgid);
1739
1740     /// prune ops (and possiblye pg_slots) for pgs that shouldn't be here
1741     void prune_pg_waiters(OSDMapRef osdmap, int whoami);
1742
1743     /// clear cached PGRef on pg deletion
1744     void clear_pg_pointer(spg_t pgid);
1745
1746     /// clear pg_slots on shutdown
1747     void clear_pg_slots();
1748
1749     /// try to do some work
1750     void _process(uint32_t thread_index, heartbeat_handle_d *hb) override;
1751
1752     /// enqueue a new item
1753     void _enqueue(pair <spg_t, PGQueueable> item) override;
1754
1755     /// requeue an old item (at the front of the line)
1756     void _enqueue_front(pair <spg_t, PGQueueable> item) override;
1757       
1758     void return_waiting_threads() override {
1759       for(uint32_t i = 0; i < num_shards; i++) {
1760         ShardData* sdata = shard_list[i];
1761         assert (NULL != sdata); 
1762         sdata->sdata_lock.Lock();
1763         sdata->sdata_cond.Signal();
1764         sdata->sdata_lock.Unlock();
1765       }
1766     }
1767
1768     void dump(Formatter *f) {
1769       for(uint32_t i = 0; i < num_shards; i++) {
1770         ShardData* sdata = shard_list[i];
1771         char lock_name[32] = {0};
1772         snprintf(lock_name, sizeof(lock_name), "%s%d", "OSD:ShardedOpWQ:", i);
1773         assert (NULL != sdata);
1774         sdata->sdata_op_ordering_lock.Lock();
1775         f->open_object_section(lock_name);
1776         sdata->pqueue->dump(f);
1777         f->close_section();
1778         sdata->sdata_op_ordering_lock.Unlock();
1779       }
1780     }
1781
1782     /// Must be called on ops queued back to front
1783     struct Pred {
1784       spg_t pgid;
1785       list<OpRequestRef> *out_ops;
1786       uint64_t reserved_pushes_to_free;
1787       Pred(spg_t pg, list<OpRequestRef> *out_ops = 0)
1788         : pgid(pg), out_ops(out_ops), reserved_pushes_to_free(0) {}
1789       void accumulate(const PGQueueable &op) {
1790         reserved_pushes_to_free += op.get_reserved_pushes();
1791         if (out_ops) {
1792           boost::optional<OpRequestRef> mop = op.maybe_get_op();
1793           if (mop)
1794             out_ops->push_front(*mop);
1795         }
1796       }
1797       bool operator()(const pair<spg_t, PGQueueable> &op) {
1798         if (op.first == pgid) {
1799           accumulate(op.second);
1800           return true;
1801         } else {
1802           return false;
1803         }
1804       }
1805       uint64_t get_reserved_pushes_to_free() const {
1806         return reserved_pushes_to_free;
1807       }
1808     };
1809
1810     bool is_shard_empty(uint32_t thread_index) override {
1811       uint32_t shard_index = thread_index % num_shards; 
1812       ShardData* sdata = shard_list[shard_index];
1813       assert(NULL != sdata);
1814       Mutex::Locker l(sdata->sdata_op_ordering_lock);
1815       return sdata->pqueue->empty();
1816     }
1817   } op_shardedwq;
1818
1819
1820   void enqueue_op(spg_t pg, OpRequestRef& op, epoch_t epoch);
1821   void dequeue_op(
1822     PGRef pg, OpRequestRef op,
1823     ThreadPool::TPHandle &handle);
1824
1825   // -- peering queue --
1826   struct PeeringWQ : public ThreadPool::BatchWorkQueue<PG> {
1827     list<PG*> peering_queue;
1828     OSD *osd;
1829     set<PG*> in_use;
1830     PeeringWQ(OSD *o, time_t ti, time_t si, ThreadPool *tp)
1831       : ThreadPool::BatchWorkQueue<PG>(
1832         "OSD::PeeringWQ", ti, si, tp), osd(o) {}
1833
1834     void _dequeue(PG *pg) override {
1835       for (list<PG*>::iterator i = peering_queue.begin();
1836            i != peering_queue.end();
1837            ) {
1838         if (*i == pg) {
1839           peering_queue.erase(i++);
1840           pg->put("PeeringWQ");
1841         } else {
1842           ++i;
1843         }
1844       }
1845     }
1846     bool _enqueue(PG *pg) override {
1847       pg->get("PeeringWQ");
1848       peering_queue.push_back(pg);
1849       return true;
1850     }
1851     bool _empty() override {
1852       return peering_queue.empty();
1853     }
1854     void _dequeue(list<PG*> *out) override;
1855     void _process(
1856       const list<PG *> &pgs,
1857       ThreadPool::TPHandle &handle) override {
1858       assert(!pgs.empty());
1859       osd->process_peering_events(pgs, handle);
1860       for (list<PG *>::const_iterator i = pgs.begin();
1861            i != pgs.end();
1862            ++i) {
1863         (*i)->put("PeeringWQ");
1864       }
1865     }
1866     void _process_finish(const list<PG *> &pgs) override {
1867       for (list<PG*>::const_iterator i = pgs.begin();
1868            i != pgs.end();
1869            ++i) {
1870         in_use.erase(*i);
1871       }
1872     }
1873     void _clear() override {
1874       assert(peering_queue.empty());
1875     }
1876   } peering_wq;
1877
1878   void process_peering_events(
1879     const list<PG*> &pg,
1880     ThreadPool::TPHandle &handle);
1881
1882   friend class PG;
1883   friend class PrimaryLogPG;
1884
1885
1886  protected:
1887
1888   // -- osd map --
1889   OSDMapRef       osdmap;
1890   OSDMapRef get_osdmap() {
1891     return osdmap;
1892   }
1893   epoch_t get_osdmap_epoch() const {
1894     return osdmap ? osdmap->get_epoch() : 0;
1895   }
1896
1897   utime_t         had_map_since;
1898   RWLock          map_lock;
1899   list<OpRequestRef>  waiting_for_osdmap;
1900   deque<utime_t> osd_markdown_log;
1901
1902   friend struct send_map_on_destruct;
1903
1904   void wait_for_new_map(OpRequestRef op);
1905   void handle_osd_map(class MOSDMap *m);
1906   void _committed_osd_maps(epoch_t first, epoch_t last, class MOSDMap *m);
1907   void trim_maps(epoch_t oldest, int nreceived, bool skip_maps);
1908   void note_down_osd(int osd);
1909   void note_up_osd(int osd);
1910   friend class C_OnMapCommit;
1911
1912   bool advance_pg(
1913     epoch_t advance_to, PG *pg,
1914     ThreadPool::TPHandle &handle,
1915     PG::RecoveryCtx *rctx,
1916     set<PGRef> *split_pgs
1917   );
1918   void consume_map();
1919   void activate_map();
1920
1921   // osd map cache (past osd maps)
1922   OSDMapRef get_map(epoch_t e) {
1923     return service.get_map(e);
1924   }
1925   OSDMapRef add_map(OSDMap *o) {
1926     return service.add_map(o);
1927   }
1928   void add_map_bl(epoch_t e, bufferlist& bl) {
1929     return service.add_map_bl(e, bl);
1930   }
1931   void pin_map_bl(epoch_t e, bufferlist &bl) {
1932     return service.pin_map_bl(e, bl);
1933   }
1934   bool get_map_bl(epoch_t e, bufferlist& bl) {
1935     return service.get_map_bl(e, bl);
1936   }
1937   void add_map_inc_bl(epoch_t e, bufferlist& bl) {
1938     return service.add_map_inc_bl(e, bl);
1939   }
1940   void pin_map_inc_bl(epoch_t e, bufferlist &bl) {
1941     return service.pin_map_inc_bl(e, bl);
1942   }
1943
1944 protected:
1945   // -- placement groups --
1946   RWLock pg_map_lock; // this lock orders *above* individual PG _locks
1947   ceph::unordered_map<spg_t, PG*> pg_map; // protected by pg_map lock
1948
1949   std::mutex pending_creates_lock;
1950   std::set<pg_t> pending_creates_from_osd;
1951   unsigned pending_creates_from_mon = 0;
1952
1953   map<spg_t, list<PG::CephPeeringEvtRef> > peering_wait_for_split;
1954   PGRecoveryStats pg_recovery_stats;
1955
1956   PGPool _get_pool(int id, OSDMapRef createmap);
1957
1958   PG   *_lookup_lock_pg_with_map_lock_held(spg_t pgid);
1959   PG   *_lookup_lock_pg(spg_t pgid);
1960
1961 public:
1962   PG   *lookup_lock_pg(spg_t pgid);
1963
1964   int get_num_pgs() {
1965     RWLock::RLocker l(pg_map_lock);
1966     return pg_map.size();
1967   }
1968
1969 protected:
1970   PG   *_open_lock_pg(OSDMapRef createmap,
1971                       spg_t pg, bool no_lockdep_check=false);
1972   enum res_result {
1973     RES_PARENT,    // resurrected a parent
1974     RES_SELF,      // resurrected self
1975     RES_NONE       // nothing relevant deleting
1976   };
1977   res_result _try_resurrect_pg(
1978     OSDMapRef curmap, spg_t pgid, spg_t *resurrected, PGRef *old_pg_state);
1979
1980   PG   *_create_lock_pg(
1981     OSDMapRef createmap,
1982     spg_t pgid,
1983     bool hold_map_lock,
1984     bool backfill,
1985     int role,
1986     vector<int>& up, int up_primary,
1987     vector<int>& acting, int acting_primary,
1988     pg_history_t history,
1989     const PastIntervals& pi,
1990     ObjectStore::Transaction& t);
1991
1992   PG* _make_pg(OSDMapRef createmap, spg_t pgid);
1993   void add_newly_split_pg(PG *pg,
1994                           PG::RecoveryCtx *rctx);
1995
1996   int handle_pg_peering_evt(
1997     spg_t pgid,
1998     const pg_history_t& orig_history,
1999     const PastIntervals& pi,
2000     epoch_t epoch,
2001     PG::CephPeeringEvtRef evt);
2002   bool maybe_wait_for_max_pg(spg_t pgid, bool is_mon_create);
2003   void resume_creating_pg();
2004
2005   void load_pgs();
2006   void build_past_intervals_parallel();
2007
2008   /// build initial pg history and intervals on create
2009   void build_initial_pg_history(
2010     spg_t pgid,
2011     epoch_t created,
2012     utime_t created_stamp,
2013     pg_history_t *h,
2014     PastIntervals *pi);
2015
2016   /// project pg history from from to now
2017   bool project_pg_history(
2018     spg_t pgid, pg_history_t& h, epoch_t from,
2019     const vector<int>& lastup,
2020     int lastupprimary,
2021     const vector<int>& lastacting,
2022     int lastactingprimary
2023     ); ///< @return false if there was a map gap between from and now
2024
2025   // this must be called with pg->lock held on any pg addition to pg_map
2026   void wake_pg_waiters(PGRef pg) {
2027     assert(pg->is_locked());
2028     op_shardedwq.wake_pg_waiters(pg->info.pgid);
2029   }
2030   epoch_t last_pg_create_epoch;
2031
2032   void handle_pg_create(OpRequestRef op);
2033
2034   void split_pgs(
2035     PG *parent,
2036     const set<spg_t> &childpgids, set<PGRef> *out_pgs,
2037     OSDMapRef curmap,
2038     OSDMapRef nextmap,
2039     PG::RecoveryCtx *rctx);
2040
2041   // == monitor interaction ==
2042   Mutex mon_report_lock;
2043   utime_t last_mon_report;
2044   utime_t last_pg_stats_sent;
2045
2046   /* if our monitor dies, we want to notice it and reconnect.
2047    *  So we keep track of when it last acked our stat updates,
2048    *  and if too much time passes (and we've been sending
2049    *  more updates) then we can call it dead and reconnect
2050    *  elsewhere.
2051    */
2052   utime_t last_pg_stats_ack;
2053   float stats_ack_timeout;
2054   set<uint64_t> outstanding_pg_stats; // how many stat updates haven't been acked yet
2055
2056   // -- boot --
2057   void start_boot();
2058   void _got_mon_epochs(epoch_t oldest, epoch_t newest);
2059   void _preboot(epoch_t oldest, epoch_t newest);
2060   void _send_boot();
2061   void _collect_metadata(map<string,string> *pmeta);
2062
2063   void start_waiting_for_healthy();
2064   bool _is_healthy();
2065
2066   void send_full_update();
2067   
2068   friend struct C_OSD_GetVersion;
2069
2070   // -- alive --
2071   epoch_t up_thru_wanted;
2072
2073   void queue_want_up_thru(epoch_t want);
2074   void send_alive();
2075
2076   // -- full map requests --
2077   epoch_t requested_full_first, requested_full_last;
2078
2079   void request_full_map(epoch_t first, epoch_t last);
2080   void rerequest_full_maps() {
2081     epoch_t first = requested_full_first;
2082     epoch_t last = requested_full_last;
2083     requested_full_first = 0;
2084     requested_full_last = 0;
2085     request_full_map(first, last);
2086   }
2087   void got_full_map(epoch_t e);
2088
2089   // -- failures --
2090   map<int,utime_t> failure_queue;
2091   map<int,pair<utime_t,entity_inst_t> > failure_pending;
2092
2093   void requeue_failures();
2094   void send_failures();
2095   void send_still_alive(epoch_t epoch, const entity_inst_t &i);
2096
2097   // -- pg stats --
2098   Mutex pg_stat_queue_lock;
2099   Cond pg_stat_queue_cond;
2100   xlist<PG*> pg_stat_queue;
2101   bool osd_stat_updated;
2102   uint64_t pg_stat_tid, pg_stat_tid_flushed;
2103
2104   void send_pg_stats(const utime_t &now);
2105   void handle_pg_stats_ack(class MPGStatsAck *ack);
2106   void flush_pg_stats();
2107
2108   ceph::coarse_mono_clock::time_point last_sent_beacon;
2109   Mutex min_last_epoch_clean_lock{"OSD::min_last_epoch_clean_lock"};
2110   epoch_t min_last_epoch_clean = 0;
2111   // which pgs were scanned for min_lec
2112   std::vector<pg_t> min_last_epoch_clean_pgs;
2113   void send_beacon(const ceph::coarse_mono_clock::time_point& now);
2114
2115   void pg_stat_queue_enqueue(PG *pg) {
2116     pg_stat_queue_lock.Lock();
2117     if (pg->is_primary() && !pg->stat_queue_item.is_on_list()) {
2118       pg->get("pg_stat_queue");
2119       pg_stat_queue.push_back(&pg->stat_queue_item);
2120     }
2121     osd_stat_updated = true;
2122     pg_stat_queue_lock.Unlock();
2123   }
2124   void pg_stat_queue_dequeue(PG *pg) {
2125     pg_stat_queue_lock.Lock();
2126     if (pg->stat_queue_item.remove_myself())
2127       pg->put("pg_stat_queue");
2128     pg_stat_queue_lock.Unlock();
2129   }
2130   void clear_pg_stat_queue() {
2131     pg_stat_queue_lock.Lock();
2132     while (!pg_stat_queue.empty()) {
2133       PG *pg = pg_stat_queue.front();
2134       pg_stat_queue.pop_front();
2135       pg->put("pg_stat_queue");
2136     }
2137     pg_stat_queue_lock.Unlock();
2138   }
2139   void clear_outstanding_pg_stats(){
2140     Mutex::Locker l(pg_stat_queue_lock);
2141     outstanding_pg_stats.clear();
2142   }
2143
2144   ceph_tid_t get_tid() {
2145     return service.get_tid();
2146   }
2147
2148   // -- generic pg peering --
2149   PG::RecoveryCtx create_context();
2150   void dispatch_context(PG::RecoveryCtx &ctx, PG *pg, OSDMapRef curmap,
2151                         ThreadPool::TPHandle *handle = NULL);
2152   void dispatch_context_transaction(PG::RecoveryCtx &ctx, PG *pg,
2153                                     ThreadPool::TPHandle *handle = NULL);
2154   void do_notifies(map<int,
2155                        vector<pair<pg_notify_t, PastIntervals> > >&
2156                        notify_list,
2157                    OSDMapRef map);
2158   void do_queries(map<int, map<spg_t,pg_query_t> >& query_map,
2159                   OSDMapRef map);
2160   void do_infos(map<int,
2161                     vector<pair<pg_notify_t, PastIntervals> > >& info_map,
2162                 OSDMapRef map);
2163
2164   bool require_mon_peer(const Message *m);
2165   bool require_mon_or_mgr_peer(const Message *m);
2166   bool require_osd_peer(const Message *m);
2167   /***
2168    * Verifies that we were alive in the given epoch, and that
2169    * still are.
2170    */
2171   bool require_self_aliveness(const Message *m, epoch_t alive_since);
2172   /**
2173    * Verifies that the OSD who sent the given op has the same
2174    * address as in the given map.
2175    * @pre op was sent by an OSD using the cluster messenger
2176    */
2177   bool require_same_peer_instance(const Message *m, OSDMapRef& map,
2178                                   bool is_fast_dispatch);
2179
2180   bool require_same_or_newer_map(OpRequestRef& op, epoch_t e,
2181                                  bool is_fast_dispatch);
2182
2183   void handle_pg_query(OpRequestRef op);
2184   void handle_pg_notify(OpRequestRef op);
2185   void handle_pg_log(OpRequestRef op);
2186   void handle_pg_info(OpRequestRef op);
2187   void handle_pg_trim(OpRequestRef op);
2188
2189   void handle_pg_backfill_reserve(OpRequestRef op);
2190   void handle_pg_recovery_reserve(OpRequestRef op);
2191
2192   void handle_force_recovery(Message *m);
2193
2194   void handle_pg_remove(OpRequestRef op);
2195   void _remove_pg(PG *pg);
2196
2197   // -- commands --
2198   struct Command {
2199     vector<string> cmd;
2200     ceph_tid_t tid;
2201     bufferlist indata;
2202     ConnectionRef con;
2203
2204     Command(vector<string>& c, ceph_tid_t t, bufferlist& bl, Connection *co)
2205       : cmd(c), tid(t), indata(bl), con(co) {}
2206   };
2207   list<Command*> command_queue;
2208   struct CommandWQ : public ThreadPool::WorkQueue<Command> {
2209     OSD *osd;
2210     CommandWQ(OSD *o, time_t ti, time_t si, ThreadPool *tp)
2211       : ThreadPool::WorkQueue<Command>("OSD::CommandWQ", ti, si, tp), osd(o) {}
2212
2213     bool _empty() override {
2214       return osd->command_queue.empty();
2215     }
2216     bool _enqueue(Command *c) override {
2217       osd->command_queue.push_back(c);
2218       return true;
2219     }
2220     void _dequeue(Command *pg) override {
2221       ceph_abort();
2222     }
2223     Command *_dequeue() override {
2224       if (osd->command_queue.empty())
2225         return NULL;
2226       Command *c = osd->command_queue.front();
2227       osd->command_queue.pop_front();
2228       return c;
2229     }
2230     void _process(Command *c, ThreadPool::TPHandle &) override {
2231       osd->osd_lock.Lock();
2232       if (osd->is_stopping()) {
2233         osd->osd_lock.Unlock();
2234         delete c;
2235         return;
2236       }
2237       osd->do_command(c->con.get(), c->tid, c->cmd, c->indata);
2238       osd->osd_lock.Unlock();
2239       delete c;
2240     }
2241     void _clear() override {
2242       while (!osd->command_queue.empty()) {
2243         Command *c = osd->command_queue.front();
2244         osd->command_queue.pop_front();
2245         delete c;
2246       }
2247     }
2248   } command_wq;
2249
2250   void handle_command(class MMonCommand *m);
2251   void handle_command(class MCommand *m);
2252   void do_command(Connection *con, ceph_tid_t tid, vector<string>& cmd, bufferlist& data);
2253
2254   // -- pg recovery --
2255   void do_recovery(PG *pg, epoch_t epoch_queued, uint64_t pushes_reserved,
2256                    ThreadPool::TPHandle &handle);
2257
2258
2259   // -- scrubbing --
2260   void sched_scrub();
2261   bool scrub_random_backoff();
2262   bool scrub_load_below_threshold();
2263   bool scrub_time_permit(utime_t now);
2264
2265   // -- removing --
2266   struct RemoveWQ :
2267     public ThreadPool::WorkQueueVal<pair<PGRef, DeletingStateRef> > {
2268     CephContext* cct;
2269     ObjectStore *&store;
2270     list<pair<PGRef, DeletingStateRef> > remove_queue;
2271     RemoveWQ(CephContext* cct, ObjectStore *&o, time_t ti, time_t si,
2272              ThreadPool *tp)
2273       : ThreadPool::WorkQueueVal<pair<PGRef, DeletingStateRef> >(
2274         "OSD::RemoveWQ", ti, si, tp), cct(cct), store(o) {}
2275
2276     bool _empty() override {
2277       return remove_queue.empty();
2278     }
2279     void _enqueue(pair<PGRef, DeletingStateRef> item) override {
2280       remove_queue.push_back(item);
2281     }
2282     void _enqueue_front(pair<PGRef, DeletingStateRef> item) override {
2283       remove_queue.push_front(item);
2284     }
2285     bool _dequeue(pair<PGRef, DeletingStateRef> item) {
2286       ceph_abort();
2287     }
2288     pair<PGRef, DeletingStateRef> _dequeue() override {
2289       assert(!remove_queue.empty());
2290       pair<PGRef, DeletingStateRef> item = remove_queue.front();
2291       remove_queue.pop_front();
2292       return item;
2293     }
2294     void _process(pair<PGRef, DeletingStateRef>,
2295                   ThreadPool::TPHandle &) override;
2296     void _clear() override {
2297       remove_queue.clear();
2298     }
2299   } remove_wq;
2300
2301 private:
2302   bool ms_can_fast_dispatch_any() const override { return true; }
2303   bool ms_can_fast_dispatch(const Message *m) const override {
2304     switch (m->get_type()) {
2305     case CEPH_MSG_OSD_OP:
2306     case CEPH_MSG_OSD_BACKOFF:
2307     case MSG_OSD_SUBOP:
2308     case MSG_OSD_REPOP:
2309     case MSG_OSD_SUBOPREPLY:
2310     case MSG_OSD_REPOPREPLY:
2311     case MSG_OSD_PG_PUSH:
2312     case MSG_OSD_PG_PULL:
2313     case MSG_OSD_PG_PUSH_REPLY:
2314     case MSG_OSD_PG_SCAN:
2315     case MSG_OSD_PG_BACKFILL:
2316     case MSG_OSD_PG_BACKFILL_REMOVE:
2317     case MSG_OSD_EC_WRITE:
2318     case MSG_OSD_EC_WRITE_REPLY:
2319     case MSG_OSD_EC_READ:
2320     case MSG_OSD_EC_READ_REPLY:
2321     case MSG_OSD_SCRUB_RESERVE:
2322     case MSG_OSD_REP_SCRUB:
2323     case MSG_OSD_REP_SCRUBMAP:
2324     case MSG_OSD_PG_UPDATE_LOG_MISSING:
2325     case MSG_OSD_PG_UPDATE_LOG_MISSING_REPLY:
2326     case MSG_OSD_PG_RECOVERY_DELETE:
2327     case MSG_OSD_PG_RECOVERY_DELETE_REPLY:
2328       return true;
2329     default:
2330       return false;
2331     }
2332   }
2333   void ms_fast_dispatch(Message *m) override;
2334   void ms_fast_preprocess(Message *m) override;
2335   bool ms_dispatch(Message *m) override;
2336   bool ms_get_authorizer(int dest_type, AuthAuthorizer **authorizer, bool force_new) override;
2337   bool ms_verify_authorizer(Connection *con, int peer_type,
2338                             int protocol, bufferlist& authorizer, bufferlist& authorizer_reply,
2339                             bool& isvalid, CryptoKey& session_key) override;
2340   void ms_handle_connect(Connection *con) override;
2341   void ms_handle_fast_connect(Connection *con) override;
2342   void ms_handle_fast_accept(Connection *con) override;
2343   bool ms_handle_reset(Connection *con) override;
2344   void ms_handle_remote_reset(Connection *con) override {}
2345   bool ms_handle_refused(Connection *con) override;
2346
2347   io_queue get_io_queue() const {
2348     if (cct->_conf->osd_op_queue == "debug_random") {
2349       static io_queue index_lookup[] = { io_queue::prioritized,
2350                                          io_queue::weightedpriority,
2351                                          io_queue::mclock_opclass,
2352                                          io_queue::mclock_client };
2353       srand(time(NULL));
2354       unsigned which = rand() % (sizeof(index_lookup) / sizeof(index_lookup[0]));
2355       return index_lookup[which];
2356     } else if (cct->_conf->osd_op_queue == "prioritized") {
2357       return io_queue::prioritized;
2358     } else if (cct->_conf->osd_op_queue == "mclock_opclass") {
2359       return io_queue::mclock_opclass;
2360     } else if (cct->_conf->osd_op_queue == "mclock_client") {
2361       return io_queue::mclock_client;
2362     } else {
2363       // default / catch-all is 'wpq'
2364       return io_queue::weightedpriority;
2365     }
2366   }
2367
2368   unsigned int get_io_prio_cut() const {
2369     if (cct->_conf->osd_op_queue_cut_off == "debug_random") {
2370       srand(time(NULL));
2371       return (rand() % 2 < 1) ? CEPH_MSG_PRIO_HIGH : CEPH_MSG_PRIO_LOW;
2372     } else if (cct->_conf->osd_op_queue_cut_off == "high") {
2373       return CEPH_MSG_PRIO_HIGH;
2374     } else {
2375       // default / catch-all is 'low'
2376       return CEPH_MSG_PRIO_LOW;
2377     }
2378   }
2379
2380  public:
2381   /* internal and external can point to the same messenger, they will still
2382    * be cleaned up properly*/
2383   OSD(CephContext *cct_,
2384       ObjectStore *store_,
2385       int id,
2386       Messenger *internal,
2387       Messenger *external,
2388       Messenger *hb_front_client,
2389       Messenger *hb_back_client,
2390       Messenger *hb_front_server,
2391       Messenger *hb_back_server,
2392       Messenger *osdc_messenger,
2393       MonClient *mc, const std::string &dev, const std::string &jdev);
2394   ~OSD() override;
2395
2396   // static bits
2397   static int mkfs(CephContext *cct, ObjectStore *store,
2398                   const string& dev,
2399                   uuid_d fsid, int whoami);
2400   /* remove any non-user xattrs from a map of them */
2401   void filter_xattrs(map<string, bufferptr>& attrs) {
2402     for (map<string, bufferptr>::iterator iter = attrs.begin();
2403          iter != attrs.end();
2404          ) {
2405       if (('_' != iter->first.at(0)) || (iter->first.size() == 1))
2406         attrs.erase(iter++);
2407       else ++iter;
2408     }
2409   }
2410
2411 private:
2412   int mon_cmd_maybe_osd_create(string &cmd);
2413   int update_crush_device_class();
2414   int update_crush_location();
2415
2416   static int write_meta(CephContext *cct,
2417                         ObjectStore *store,
2418                         uuid_d& cluster_fsid, uuid_d& osd_fsid, int whoami);
2419
2420   void handle_pg_scrub(struct MOSDScrub *m, PG* pg);
2421   void handle_scrub(struct MOSDScrub *m);
2422   void handle_osd_ping(class MOSDPing *m);
2423
2424   int init_op_flags(OpRequestRef& op);
2425
2426   int get_num_op_shards();
2427   int get_num_op_threads();
2428
2429   float get_osd_recovery_sleep();
2430
2431 public:
2432   static int peek_meta(ObjectStore *store, string& magic,
2433                        uuid_d& cluster_fsid, uuid_d& osd_fsid, int& whoami);
2434   
2435
2436   // startup/shutdown
2437   int pre_init();
2438   int init();
2439   void final_init();
2440
2441   int enable_disable_fuse(bool stop);
2442
2443   void suicide(int exitcode);
2444   int shutdown();
2445
2446   void handle_signal(int signum);
2447
2448   /// check if we can throw out op from a disconnected client
2449   static bool op_is_discardable(const MOSDOp *m);
2450
2451 public:
2452   OSDService service;
2453   friend class OSDService;
2454 };
2455
2456
2457 std::ostream& operator<<(std::ostream& out, const OSD::io_queue& q);
2458
2459
2460 //compatibility of the executable
2461 extern const CompatSet::Feature ceph_osd_feature_compat[];
2462 extern const CompatSet::Feature ceph_osd_feature_ro_compat[];
2463 extern const CompatSet::Feature ceph_osd_feature_incompat[];
2464
2465 #endif // CEPH_OSD_H