--- /dev/null
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software
+ * Foundation. See file COPYING.
+ *
+ */
+
+#ifndef CEPH_OSD_H
+#define CEPH_OSD_H
+
+#include "PG.h"
+
+#include "msg/Dispatcher.h"
+
+#include "common/Mutex.h"
+#include "common/RWLock.h"
+#include "common/Timer.h"
+#include "common/WorkQueue.h"
+#include "common/AsyncReserver.h"
+#include "common/ceph_context.h"
+#include "common/zipkin_trace.h"
+
+#include "mgr/MgrClient.h"
+
+#include "os/ObjectStore.h"
+#include "OSDCap.h"
+
+#include "auth/KeyRing.h"
+#include "osd/ClassHandler.h"
+
+#include "include/CompatSet.h"
+
+#include "OpRequest.h"
+#include "Session.h"
+
+#include "osd/PGQueueable.h"
+
+#include <atomic>
+#include <map>
+#include <memory>
+#include "include/memory.h"
+using namespace std;
+
+#include "include/unordered_map.h"
+
+#include "common/shared_cache.hpp"
+#include "common/simple_cache.hpp"
+#include "common/sharedptr_registry.hpp"
+#include "common/WeightedPriorityQueue.h"
+#include "common/PrioritizedQueue.h"
+#include "osd/mClockOpClassQueue.h"
+#include "osd/mClockClientQueue.h"
+#include "messages/MOSDOp.h"
+#include "include/Spinlock.h"
+#include "common/EventTrace.h"
+
+#define CEPH_OSD_PROTOCOL 10 /* cluster internal */
+
+
+enum {
+ l_osd_first = 10000,
+ l_osd_op_wip,
+ l_osd_op,
+ l_osd_op_inb,
+ l_osd_op_outb,
+ l_osd_op_lat,
+ l_osd_op_process_lat,
+ l_osd_op_prepare_lat,
+ l_osd_op_r,
+ l_osd_op_r_outb,
+ l_osd_op_r_lat,
+ l_osd_op_r_lat_outb_hist,
+ l_osd_op_r_process_lat,
+ l_osd_op_r_prepare_lat,
+ l_osd_op_w,
+ l_osd_op_w_inb,
+ l_osd_op_w_lat,
+ l_osd_op_w_lat_inb_hist,
+ l_osd_op_w_process_lat,
+ l_osd_op_w_prepare_lat,
+ l_osd_op_rw,
+ l_osd_op_rw_inb,
+ l_osd_op_rw_outb,
+ l_osd_op_rw_lat,
+ l_osd_op_rw_lat_inb_hist,
+ l_osd_op_rw_lat_outb_hist,
+ l_osd_op_rw_process_lat,
+ l_osd_op_rw_prepare_lat,
+
+ l_osd_op_before_queue_op_lat,
+ l_osd_op_before_dequeue_op_lat,
+
+ l_osd_sop,
+ l_osd_sop_inb,
+ l_osd_sop_lat,
+ l_osd_sop_w,
+ l_osd_sop_w_inb,
+ l_osd_sop_w_lat,
+ l_osd_sop_pull,
+ l_osd_sop_pull_lat,
+ l_osd_sop_push,
+ l_osd_sop_push_inb,
+ l_osd_sop_push_lat,
+
+ l_osd_pull,
+ l_osd_push,
+ l_osd_push_outb,
+
+ l_osd_rop,
+
+ l_osd_loadavg,
+ l_osd_buf,
+ l_osd_history_alloc_bytes,
+ l_osd_history_alloc_num,
+ l_osd_cached_crc,
+ l_osd_cached_crc_adjusted,
+ l_osd_missed_crc,
+
+ l_osd_pg,
+ l_osd_pg_primary,
+ l_osd_pg_replica,
+ l_osd_pg_stray,
+ l_osd_hb_to,
+ l_osd_map,
+ l_osd_mape,
+ l_osd_mape_dup,
+
+ l_osd_waiting_for_map,
+
+ l_osd_map_cache_hit,
+ l_osd_map_cache_miss,
+ l_osd_map_cache_miss_low,
+ l_osd_map_cache_miss_low_avg,
+ l_osd_map_bl_cache_hit,
+ l_osd_map_bl_cache_miss,
+
+ l_osd_stat_bytes,
+ l_osd_stat_bytes_used,
+ l_osd_stat_bytes_avail,
+
+ l_osd_copyfrom,
+
+ l_osd_tier_promote,
+ l_osd_tier_flush,
+ l_osd_tier_flush_fail,
+ l_osd_tier_try_flush,
+ l_osd_tier_try_flush_fail,
+ l_osd_tier_evict,
+ l_osd_tier_whiteout,
+ l_osd_tier_dirty,
+ l_osd_tier_clean,
+ l_osd_tier_delay,
+ l_osd_tier_proxy_read,
+ l_osd_tier_proxy_write,
+
+ l_osd_agent_wake,
+ l_osd_agent_skip,
+ l_osd_agent_flush,
+ l_osd_agent_evict,
+
+ l_osd_object_ctx_cache_hit,
+ l_osd_object_ctx_cache_total,
+
+ l_osd_op_cache_hit,
+ l_osd_tier_flush_lat,
+ l_osd_tier_promote_lat,
+ l_osd_tier_r_lat,
+
+ l_osd_pg_info,
+ l_osd_pg_fastinfo,
+ l_osd_pg_biginfo,
+
+ l_osd_last,
+};
+
+// RecoveryState perf counters
+enum {
+ rs_first = 20000,
+ rs_initial_latency,
+ rs_started_latency,
+ rs_reset_latency,
+ rs_start_latency,
+ rs_primary_latency,
+ rs_peering_latency,
+ rs_backfilling_latency,
+ rs_waitremotebackfillreserved_latency,
+ rs_waitlocalbackfillreserved_latency,
+ rs_notbackfilling_latency,
+ rs_repnotrecovering_latency,
+ rs_repwaitrecoveryreserved_latency,
+ rs_repwaitbackfillreserved_latency,
+ rs_reprecovering_latency,
+ rs_activating_latency,
+ rs_waitlocalrecoveryreserved_latency,
+ rs_waitremoterecoveryreserved_latency,
+ rs_recovering_latency,
+ rs_recovered_latency,
+ rs_clean_latency,
+ rs_active_latency,
+ rs_replicaactive_latency,
+ rs_stray_latency,
+ rs_getinfo_latency,
+ rs_getlog_latency,
+ rs_waitactingchange_latency,
+ rs_incomplete_latency,
+ rs_down_latency,
+ rs_getmissing_latency,
+ rs_waitupthru_latency,
+ rs_notrecovering_latency,
+ rs_last,
+};
+
+class Messenger;
+class Message;
+class MonClient;
+class PerfCounters;
+class ObjectStore;
+class FuseStore;
+class OSDMap;
+class MLog;
+class Objecter;
+
+class Watch;
+class PrimaryLogPG;
+
+class AuthAuthorizeHandlerRegistry;
+
+class TestOpsSocketHook;
+struct C_CompleteSplits;
+struct C_OpenPGs;
+class LogChannel;
+class CephContext;
+typedef ceph::shared_ptr<ObjectStore::Sequencer> SequencerRef;
+class MOSDOp;
+
+class DeletingState {
+ Mutex lock;
+ Cond cond;
+ enum {
+ QUEUED,
+ CLEARING_DIR,
+ CLEARING_WAITING,
+ DELETING_DIR,
+ DELETED_DIR,
+ CANCELED,
+ } status;
+ bool stop_deleting;
+public:
+ const spg_t pgid;
+ const PGRef old_pg_state;
+ explicit DeletingState(const pair<spg_t, PGRef> &in) :
+ lock("DeletingState::lock"), status(QUEUED), stop_deleting(false),
+ pgid(in.first), old_pg_state(in.second) {
+ }
+
+ /// transition status to CLEARING_WAITING
+ bool pause_clearing() {
+ Mutex::Locker l(lock);
+ assert(status == CLEARING_DIR);
+ if (stop_deleting) {
+ status = CANCELED;
+ cond.Signal();
+ return false;
+ }
+ status = CLEARING_WAITING;
+ return true;
+ } ///< @return false if we should cancel deletion
+
+ /// start or resume the clearing - transition the status to CLEARING_DIR
+ bool start_or_resume_clearing() {
+ Mutex::Locker l(lock);
+ assert(
+ status == QUEUED ||
+ status == DELETED_DIR ||
+ status == CLEARING_WAITING);
+ if (stop_deleting) {
+ status = CANCELED;
+ cond.Signal();
+ return false;
+ }
+ status = CLEARING_DIR;
+ return true;
+ } ///< @return false if we should cancel the deletion
+
+ /// transition status to CLEARING_DIR
+ bool resume_clearing() {
+ Mutex::Locker l(lock);
+ assert(status == CLEARING_WAITING);
+ if (stop_deleting) {
+ status = CANCELED;
+ cond.Signal();
+ return false;
+ }
+ status = CLEARING_DIR;
+ return true;
+ } ///< @return false if we should cancel deletion
+
+ /// transition status to deleting
+ bool start_deleting() {
+ Mutex::Locker l(lock);
+ assert(status == CLEARING_DIR);
+ if (stop_deleting) {
+ status = CANCELED;
+ cond.Signal();
+ return false;
+ }
+ status = DELETING_DIR;
+ return true;
+ } ///< @return false if we should cancel deletion
+
+ /// signal collection removal queued
+ void finish_deleting() {
+ Mutex::Locker l(lock);
+ assert(status == DELETING_DIR);
+ status = DELETED_DIR;
+ cond.Signal();
+ }
+
+ /// try to halt the deletion
+ bool try_stop_deletion() {
+ Mutex::Locker l(lock);
+ stop_deleting = true;
+ /**
+ * If we are in DELETING_DIR or CLEARING_DIR, there are in progress
+ * operations we have to wait for before continuing on. States
+ * CLEARING_WAITING and QUEUED indicate that the remover will check
+ * stop_deleting before queueing any further operations. CANCELED
+ * indicates that the remover has already halted. DELETED_DIR
+ * indicates that the deletion has been fully queued.
+ */
+ while (status == DELETING_DIR || status == CLEARING_DIR)
+ cond.Wait(lock);
+ return status != DELETED_DIR;
+ } ///< @return true if we don't need to recreate the collection
+};
+typedef ceph::shared_ptr<DeletingState> DeletingStateRef;
+
+class OSD;
+
+class OSDService {
+public:
+ OSD *osd;
+ CephContext *cct;
+ SharedPtrRegistry<spg_t, ObjectStore::Sequencer> osr_registry;
+ ceph::shared_ptr<ObjectStore::Sequencer> meta_osr;
+ SharedPtrRegistry<spg_t, DeletingState> deleting_pgs;
+ const int whoami;
+ ObjectStore *&store;
+ LogClient &log_client;
+ LogChannelRef clog;
+ PGRecoveryStats &pg_recovery_stats;
+private:
+ Messenger *&cluster_messenger;
+ Messenger *&client_messenger;
+public:
+ PerfCounters *&logger;
+ PerfCounters *&recoverystate_perf;
+ MonClient *&monc;
+ ThreadPool::BatchWorkQueue<PG> &peering_wq;
+ GenContextWQ recovery_gen_wq;
+ ClassHandler *&class_handler;
+
+ void enqueue_back(spg_t pgid, PGQueueable qi);
+ void enqueue_front(spg_t pgid, PGQueueable qi);
+
+ void maybe_inject_dispatch_delay() {
+ if (g_conf->osd_debug_inject_dispatch_delay_probability > 0) {
+ if (rand() % 10000 <
+ g_conf->osd_debug_inject_dispatch_delay_probability * 10000) {
+ utime_t t;
+ t.set_from_double(g_conf->osd_debug_inject_dispatch_delay_duration);
+ t.sleep();
+ }
+ }
+ }
+
+private:
+ // -- map epoch lower bound --
+ Mutex pg_epoch_lock;
+ multiset<epoch_t> pg_epochs;
+ map<spg_t,epoch_t> pg_epoch;
+
+public:
+ void pg_add_epoch(spg_t pgid, epoch_t epoch) {
+ Mutex::Locker l(pg_epoch_lock);
+ map<spg_t,epoch_t>::iterator t = pg_epoch.find(pgid);
+ assert(t == pg_epoch.end());
+ pg_epoch[pgid] = epoch;
+ pg_epochs.insert(epoch);
+ }
+ void pg_update_epoch(spg_t pgid, epoch_t epoch) {
+ Mutex::Locker l(pg_epoch_lock);
+ map<spg_t,epoch_t>::iterator t = pg_epoch.find(pgid);
+ assert(t != pg_epoch.end());
+ pg_epochs.erase(pg_epochs.find(t->second));
+ t->second = epoch;
+ pg_epochs.insert(epoch);
+ }
+ void pg_remove_epoch(spg_t pgid) {
+ Mutex::Locker l(pg_epoch_lock);
+ map<spg_t,epoch_t>::iterator t = pg_epoch.find(pgid);
+ if (t != pg_epoch.end()) {
+ pg_epochs.erase(pg_epochs.find(t->second));
+ pg_epoch.erase(t);
+ }
+ }
+ epoch_t get_min_pg_epoch() {
+ Mutex::Locker l(pg_epoch_lock);
+ if (pg_epochs.empty())
+ return 0;
+ else
+ return *pg_epochs.begin();
+ }
+
+private:
+ // -- superblock --
+ Mutex publish_lock, pre_publish_lock; // pre-publish orders before publish
+ OSDSuperblock superblock;
+
+public:
+ OSDSuperblock get_superblock() {
+ Mutex::Locker l(publish_lock);
+ return superblock;
+ }
+ void publish_superblock(const OSDSuperblock &block) {
+ Mutex::Locker l(publish_lock);
+ superblock = block;
+ }
+
+ int get_nodeid() const { return whoami; }
+
+ std::atomic<epoch_t> max_oldest_map;
+private:
+ OSDMapRef osdmap;
+
+public:
+ OSDMapRef get_osdmap() {
+ Mutex::Locker l(publish_lock);
+ return osdmap;
+ }
+ epoch_t get_osdmap_epoch() {
+ Mutex::Locker l(publish_lock);
+ return osdmap ? osdmap->get_epoch() : 0;
+ }
+ void publish_map(OSDMapRef map) {
+ Mutex::Locker l(publish_lock);
+ osdmap = map;
+ }
+
+ /*
+ * osdmap - current published map
+ * next_osdmap - pre_published map that is about to be published.
+ *
+ * We use the next_osdmap to send messages and initiate connections,
+ * but only if the target is the same instance as the one in the map
+ * epoch the current user is working from (i.e., the result is
+ * equivalent to what is in next_osdmap).
+ *
+ * This allows the helpers to start ignoring osds that are about to
+ * go down, and let OSD::handle_osd_map()/note_down_osd() mark them
+ * down, without worrying about reopening connections from threads
+ * working from old maps.
+ */
+private:
+ OSDMapRef next_osdmap;
+ Cond pre_publish_cond;
+
+public:
+ void pre_publish_map(OSDMapRef map) {
+ Mutex::Locker l(pre_publish_lock);
+ next_osdmap = std::move(map);
+ }
+
+ void activate_map();
+ /// map epochs reserved below
+ map<epoch_t, unsigned> map_reservations;
+
+ /// gets ref to next_osdmap and registers the epoch as reserved
+ OSDMapRef get_nextmap_reserved() {
+ Mutex::Locker l(pre_publish_lock);
+ if (!next_osdmap)
+ return OSDMapRef();
+ epoch_t e = next_osdmap->get_epoch();
+ map<epoch_t, unsigned>::iterator i =
+ map_reservations.insert(make_pair(e, 0)).first;
+ i->second++;
+ return next_osdmap;
+ }
+ /// releases reservation on map
+ void release_map(OSDMapRef osdmap) {
+ Mutex::Locker l(pre_publish_lock);
+ map<epoch_t, unsigned>::iterator i =
+ map_reservations.find(osdmap->get_epoch());
+ assert(i != map_reservations.end());
+ assert(i->second > 0);
+ if (--(i->second) == 0) {
+ map_reservations.erase(i);
+ }
+ pre_publish_cond.Signal();
+ }
+ /// blocks until there are no reserved maps prior to next_osdmap
+ void await_reserved_maps() {
+ Mutex::Locker l(pre_publish_lock);
+ assert(next_osdmap);
+ while (true) {
+ map<epoch_t, unsigned>::const_iterator i = map_reservations.cbegin();
+ if (i == map_reservations.cend() || i->first >= next_osdmap->get_epoch()) {
+ break;
+ } else {
+ pre_publish_cond.Wait(pre_publish_lock);
+ }
+ }
+ }
+
+private:
+ Mutex peer_map_epoch_lock;
+ map<int, epoch_t> peer_map_epoch;
+public:
+ epoch_t get_peer_epoch(int p);
+ epoch_t note_peer_epoch(int p, epoch_t e);
+ void forget_peer_epoch(int p, epoch_t e);
+
+ void send_map(class MOSDMap *m, Connection *con);
+ void send_incremental_map(epoch_t since, Connection *con, OSDMapRef& osdmap);
+ MOSDMap *build_incremental_map_msg(epoch_t from, epoch_t to,
+ OSDSuperblock& superblock);
+ bool should_share_map(entity_name_t name, Connection *con, epoch_t epoch,
+ const OSDMapRef& osdmap, const epoch_t *sent_epoch_p);
+ void share_map(entity_name_t name, Connection *con, epoch_t epoch,
+ OSDMapRef& osdmap, epoch_t *sent_epoch_p);
+ void share_map_peer(int peer, Connection *con,
+ OSDMapRef map = OSDMapRef());
+
+ ConnectionRef get_con_osd_cluster(int peer, epoch_t from_epoch);
+ pair<ConnectionRef,ConnectionRef> get_con_osd_hb(int peer, epoch_t from_epoch); // (back, front)
+ void send_message_osd_cluster(int peer, Message *m, epoch_t from_epoch);
+ void send_message_osd_cluster(Message *m, Connection *con) {
+ con->send_message(m);
+ }
+ void send_message_osd_cluster(Message *m, const ConnectionRef& con) {
+ con->send_message(m);
+ }
+ void send_message_osd_client(Message *m, Connection *con) {
+ con->send_message(m);
+ }
+ void send_message_osd_client(Message *m, const ConnectionRef& con) {
+ con->send_message(m);
+ }
+ entity_name_t get_cluster_msgr_name() {
+ return cluster_messenger->get_myname();
+ }
+
+private:
+ // -- scrub scheduling --
+ Mutex sched_scrub_lock;
+ int scrubs_pending;
+ int scrubs_active;
+
+public:
+ struct ScrubJob {
+ CephContext* cct;
+ /// pg to be scrubbed
+ spg_t pgid;
+ /// a time scheduled for scrub. but the scrub could be delayed if system
+ /// load is too high or it fails to fall in the scrub hours
+ utime_t sched_time;
+ /// the hard upper bound of scrub time
+ utime_t deadline;
+ ScrubJob() : cct(nullptr) {}
+ explicit ScrubJob(CephContext* cct, const spg_t& pg,
+ const utime_t& timestamp,
+ double pool_scrub_min_interval = 0,
+ double pool_scrub_max_interval = 0, bool must = true);
+ /// order the jobs by sched_time
+ bool operator<(const ScrubJob& rhs) const;
+ };
+ set<ScrubJob> sched_scrub_pg;
+
+ /// @returns the scrub_reg_stamp used for unregister the scrub job
+ utime_t reg_pg_scrub(spg_t pgid, utime_t t, double pool_scrub_min_interval,
+ double pool_scrub_max_interval, bool must) {
+ ScrubJob scrub(cct, pgid, t, pool_scrub_min_interval, pool_scrub_max_interval,
+ must);
+ Mutex::Locker l(sched_scrub_lock);
+ sched_scrub_pg.insert(scrub);
+ return scrub.sched_time;
+ }
+ void unreg_pg_scrub(spg_t pgid, utime_t t) {
+ Mutex::Locker l(sched_scrub_lock);
+ size_t removed = sched_scrub_pg.erase(ScrubJob(cct, pgid, t));
+ assert(removed);
+ }
+ bool first_scrub_stamp(ScrubJob *out) {
+ Mutex::Locker l(sched_scrub_lock);
+ if (sched_scrub_pg.empty())
+ return false;
+ set<ScrubJob>::iterator iter = sched_scrub_pg.begin();
+ *out = *iter;
+ return true;
+ }
+ bool next_scrub_stamp(const ScrubJob& next,
+ ScrubJob *out) {
+ Mutex::Locker l(sched_scrub_lock);
+ if (sched_scrub_pg.empty())
+ return false;
+ set<ScrubJob>::const_iterator iter = sched_scrub_pg.lower_bound(next);
+ if (iter == sched_scrub_pg.cend())
+ return false;
+ ++iter;
+ if (iter == sched_scrub_pg.cend())
+ return false;
+ *out = *iter;
+ return true;
+ }
+
+ void dumps_scrub(Formatter *f) {
+ assert(f != nullptr);
+ Mutex::Locker l(sched_scrub_lock);
+
+ f->open_array_section("scrubs");
+ for (const auto &i: sched_scrub_pg) {
+ f->open_object_section("scrub");
+ f->dump_stream("pgid") << i.pgid;
+ f->dump_stream("sched_time") << i.sched_time;
+ f->dump_stream("deadline") << i.deadline;
+ f->dump_bool("forced", i.sched_time == i.deadline);
+ f->close_section();
+ }
+ f->close_section();
+ }
+
+ bool can_inc_scrubs_pending();
+ bool inc_scrubs_pending();
+ void inc_scrubs_active(bool reserved);
+ void dec_scrubs_pending();
+ void dec_scrubs_active();
+
+ void reply_op_error(OpRequestRef op, int err);
+ void reply_op_error(OpRequestRef op, int err, eversion_t v, version_t uv);
+ void handle_misdirected_op(PG *pg, OpRequestRef op);
+
+
+private:
+ // -- agent shared state --
+ Mutex agent_lock;
+ Cond agent_cond;
+ map<uint64_t, set<PGRef> > agent_queue;
+ set<PGRef>::iterator agent_queue_pos;
+ bool agent_valid_iterator;
+ int agent_ops;
+ int flush_mode_high_count; //once have one pg with FLUSH_MODE_HIGH then flush objects with high speed
+ set<hobject_t> agent_oids;
+ bool agent_active;
+ struct AgentThread : public Thread {
+ OSDService *osd;
+ explicit AgentThread(OSDService *o) : osd(o) {}
+ void *entry() override {
+ osd->agent_entry();
+ return NULL;
+ }
+ } agent_thread;
+ bool agent_stop_flag;
+ Mutex agent_timer_lock;
+ SafeTimer agent_timer;
+
+public:
+ void agent_entry();
+ void agent_stop();
+
+ void _enqueue(PG *pg, uint64_t priority) {
+ if (!agent_queue.empty() &&
+ agent_queue.rbegin()->first < priority)
+ agent_valid_iterator = false; // inserting higher-priority queue
+ set<PGRef>& nq = agent_queue[priority];
+ if (nq.empty())
+ agent_cond.Signal();
+ nq.insert(pg);
+ }
+
+ void _dequeue(PG *pg, uint64_t old_priority) {
+ set<PGRef>& oq = agent_queue[old_priority];
+ set<PGRef>::iterator p = oq.find(pg);
+ assert(p != oq.end());
+ if (p == agent_queue_pos)
+ ++agent_queue_pos;
+ oq.erase(p);
+ if (oq.empty()) {
+ if (agent_queue.rbegin()->first == old_priority)
+ agent_valid_iterator = false;
+ agent_queue.erase(old_priority);
+ }
+ }
+
+ /// enable agent for a pg
+ void agent_enable_pg(PG *pg, uint64_t priority) {
+ Mutex::Locker l(agent_lock);
+ _enqueue(pg, priority);
+ }
+
+ /// adjust priority for an enagled pg
+ void agent_adjust_pg(PG *pg, uint64_t old_priority, uint64_t new_priority) {
+ Mutex::Locker l(agent_lock);
+ assert(new_priority != old_priority);
+ _enqueue(pg, new_priority);
+ _dequeue(pg, old_priority);
+ }
+
+ /// disable agent for a pg
+ void agent_disable_pg(PG *pg, uint64_t old_priority) {
+ Mutex::Locker l(agent_lock);
+ _dequeue(pg, old_priority);
+ }
+
+ /// note start of an async (evict) op
+ void agent_start_evict_op() {
+ Mutex::Locker l(agent_lock);
+ ++agent_ops;
+ }
+
+ /// note finish or cancellation of an async (evict) op
+ void agent_finish_evict_op() {
+ Mutex::Locker l(agent_lock);
+ assert(agent_ops > 0);
+ --agent_ops;
+ agent_cond.Signal();
+ }
+
+ /// note start of an async (flush) op
+ void agent_start_op(const hobject_t& oid) {
+ Mutex::Locker l(agent_lock);
+ ++agent_ops;
+ assert(agent_oids.count(oid) == 0);
+ agent_oids.insert(oid);
+ }
+
+ /// note finish or cancellation of an async (flush) op
+ void agent_finish_op(const hobject_t& oid) {
+ Mutex::Locker l(agent_lock);
+ assert(agent_ops > 0);
+ --agent_ops;
+ assert(agent_oids.count(oid) == 1);
+ agent_oids.erase(oid);
+ agent_cond.Signal();
+ }
+
+ /// check if we are operating on an object
+ bool agent_is_active_oid(const hobject_t& oid) {
+ Mutex::Locker l(agent_lock);
+ return agent_oids.count(oid);
+ }
+
+ /// get count of active agent ops
+ int agent_get_num_ops() {
+ Mutex::Locker l(agent_lock);
+ return agent_ops;
+ }
+
+ void agent_inc_high_count() {
+ Mutex::Locker l(agent_lock);
+ flush_mode_high_count ++;
+ }
+
+ void agent_dec_high_count() {
+ Mutex::Locker l(agent_lock);
+ flush_mode_high_count --;
+ }
+
+private:
+ /// throttle promotion attempts
+ std::atomic_uint promote_probability_millis{1000}; ///< probability thousands. one word.
+ PromoteCounter promote_counter;
+ utime_t last_recalibrate;
+ unsigned long promote_max_objects, promote_max_bytes;
+
+public:
+ bool promote_throttle() {
+ // NOTE: lockless! we rely on the probability being a single word.
+ promote_counter.attempt();
+ if ((unsigned)rand() % 1000 > promote_probability_millis)
+ return true; // yes throttle (no promote)
+ if (promote_max_objects &&
+ promote_counter.objects > promote_max_objects)
+ return true; // yes throttle
+ if (promote_max_bytes &&
+ promote_counter.bytes > promote_max_bytes)
+ return true; // yes throttle
+ return false; // no throttle (promote)
+ }
+ void promote_finish(uint64_t bytes) {
+ promote_counter.finish(bytes);
+ }
+ void promote_throttle_recalibrate();
+
+ // -- Objecter, for tiering reads/writes from/to other OSDs --
+ Objecter *objecter;
+ Finisher objecter_finisher;
+
+ // -- Watch --
+ Mutex watch_lock;
+ SafeTimer watch_timer;
+ uint64_t next_notif_id;
+ uint64_t get_next_id(epoch_t cur_epoch) {
+ Mutex::Locker l(watch_lock);
+ return (((uint64_t)cur_epoch) << 32) | ((uint64_t)(next_notif_id++));
+ }
+
+ // -- Recovery/Backfill Request Scheduling --
+ Mutex recovery_request_lock;
+ SafeTimer recovery_request_timer;
+
+ // For async recovery sleep
+ bool recovery_needs_sleep = true;
+ utime_t recovery_schedule_time = utime_t();
+
+ Mutex recovery_sleep_lock;
+ SafeTimer recovery_sleep_timer;
+
+ // -- tids --
+ // for ops i issue
+ std::atomic_uint last_tid{0};
+ ceph_tid_t get_tid() {
+ return (ceph_tid_t)last_tid++;
+ }
+
+ // -- backfill_reservation --
+ Finisher reserver_finisher;
+ AsyncReserver<spg_t> local_reserver;
+ AsyncReserver<spg_t> remote_reserver;
+
+ // -- pg_temp --
+private:
+ Mutex pg_temp_lock;
+ map<pg_t, vector<int> > pg_temp_wanted;
+ map<pg_t, vector<int> > pg_temp_pending;
+ void _sent_pg_temp();
+public:
+ void queue_want_pg_temp(pg_t pgid, vector<int>& want);
+ void remove_want_pg_temp(pg_t pgid);
+ void requeue_pg_temp();
+ void send_pg_temp();
+
+ void send_pg_created(pg_t pgid);
+
+ void queue_for_peering(PG *pg);
+
+ Mutex snap_sleep_lock;
+ SafeTimer snap_sleep_timer;
+
+ Mutex scrub_sleep_lock;
+ SafeTimer scrub_sleep_timer;
+
+ AsyncReserver<spg_t> snap_reserver;
+ void queue_for_snap_trim(PG *pg);
+
+ void queue_for_scrub(PG *pg, bool with_high_priority) {
+ unsigned scrub_queue_priority = pg->scrubber.priority;
+ if (with_high_priority && scrub_queue_priority < cct->_conf->osd_client_op_priority) {
+ scrub_queue_priority = cct->_conf->osd_client_op_priority;
+ }
+ enqueue_back(
+ pg->info.pgid,
+ PGQueueable(
+ PGScrub(pg->get_osdmap()->get_epoch()),
+ cct->_conf->osd_scrub_cost,
+ scrub_queue_priority,
+ ceph_clock_now(),
+ entity_inst_t(),
+ pg->get_osdmap()->get_epoch()));
+ }
+
+private:
+ // -- pg recovery and associated throttling --
+ Mutex recovery_lock;
+ list<pair<epoch_t, PGRef> > awaiting_throttle;
+
+ utime_t defer_recovery_until;
+ uint64_t recovery_ops_active;
+ uint64_t recovery_ops_reserved;
+ bool recovery_paused;
+#ifdef DEBUG_RECOVERY_OIDS
+ map<spg_t, set<hobject_t> > recovery_oids;
+#endif
+ bool _recover_now(uint64_t *available_pushes);
+ void _maybe_queue_recovery();
+ void _queue_for_recovery(
+ pair<epoch_t, PGRef> p, uint64_t reserved_pushes) {
+ assert(recovery_lock.is_locked_by_me());
+ enqueue_back(
+ p.second->info.pgid,
+ PGQueueable(
+ PGRecovery(p.first, reserved_pushes),
+ cct->_conf->osd_recovery_cost,
+ cct->_conf->osd_recovery_priority,
+ ceph_clock_now(),
+ entity_inst_t(),
+ p.first));
+ }
+public:
+ void start_recovery_op(PG *pg, const hobject_t& soid);
+ void finish_recovery_op(PG *pg, const hobject_t& soid, bool dequeue);
+ bool is_recovery_active();
+ void release_reserved_pushes(uint64_t pushes) {
+ Mutex::Locker l(recovery_lock);
+ assert(recovery_ops_reserved >= pushes);
+ recovery_ops_reserved -= pushes;
+ _maybe_queue_recovery();
+ }
+ void defer_recovery(float defer_for) {
+ defer_recovery_until = ceph_clock_now();
+ defer_recovery_until += defer_for;
+ }
+ void pause_recovery() {
+ Mutex::Locker l(recovery_lock);
+ recovery_paused = true;
+ }
+ bool recovery_is_paused() {
+ Mutex::Locker l(recovery_lock);
+ return recovery_paused;
+ }
+ void unpause_recovery() {
+ Mutex::Locker l(recovery_lock);
+ recovery_paused = false;
+ _maybe_queue_recovery();
+ }
+ void kick_recovery_queue() {
+ Mutex::Locker l(recovery_lock);
+ _maybe_queue_recovery();
+ }
+ void clear_queued_recovery(PG *pg) {
+ Mutex::Locker l(recovery_lock);
+ for (list<pair<epoch_t, PGRef> >::iterator i = awaiting_throttle.begin();
+ i != awaiting_throttle.end();
+ ) {
+ if (i->second.get() == pg) {
+ awaiting_throttle.erase(i);
+ return;
+ } else {
+ ++i;
+ }
+ }
+ }
+ // delayed pg activation
+ void queue_for_recovery(PG *pg) {
+ Mutex::Locker l(recovery_lock);
+
+ if (pg->get_state() & (PG_STATE_FORCED_RECOVERY | PG_STATE_FORCED_BACKFILL)) {
+ awaiting_throttle.push_front(make_pair(pg->get_osdmap()->get_epoch(), pg));
+ } else {
+ awaiting_throttle.push_back(make_pair(pg->get_osdmap()->get_epoch(), pg));
+ }
+ _maybe_queue_recovery();
+ }
+ void queue_recovery_after_sleep(PG *pg, epoch_t queued, uint64_t reserved_pushes) {
+ Mutex::Locker l(recovery_lock);
+ _queue_for_recovery(make_pair(queued, pg), reserved_pushes);
+ }
+
+ void adjust_pg_priorities(const vector<PGRef>& pgs, int newflags);
+
+ // osd map cache (past osd maps)
+ Mutex map_cache_lock;
+ SharedLRU<epoch_t, const OSDMap> map_cache;
+ SimpleLRU<epoch_t, bufferlist> map_bl_cache;
+ SimpleLRU<epoch_t, bufferlist> map_bl_inc_cache;
+
+ OSDMapRef try_get_map(epoch_t e);
+ OSDMapRef get_map(epoch_t e) {
+ OSDMapRef ret(try_get_map(e));
+ assert(ret);
+ return ret;
+ }
+ OSDMapRef add_map(OSDMap *o) {
+ Mutex::Locker l(map_cache_lock);
+ return _add_map(o);
+ }
+ OSDMapRef _add_map(OSDMap *o);
+
+ void add_map_bl(epoch_t e, bufferlist& bl) {
+ Mutex::Locker l(map_cache_lock);
+ return _add_map_bl(e, bl);
+ }
+ void pin_map_bl(epoch_t e, bufferlist &bl);
+ void _add_map_bl(epoch_t e, bufferlist& bl);
+ bool get_map_bl(epoch_t e, bufferlist& bl) {
+ Mutex::Locker l(map_cache_lock);
+ return _get_map_bl(e, bl);
+ }
+ bool _get_map_bl(epoch_t e, bufferlist& bl);
+
+ void add_map_inc_bl(epoch_t e, bufferlist& bl) {
+ Mutex::Locker l(map_cache_lock);
+ return _add_map_inc_bl(e, bl);
+ }
+ void pin_map_inc_bl(epoch_t e, bufferlist &bl);
+ void _add_map_inc_bl(epoch_t e, bufferlist& bl);
+ bool get_inc_map_bl(epoch_t e, bufferlist& bl);
+
+ void clear_map_bl_cache_pins(epoch_t e);
+
+ void need_heartbeat_peer_update();
+
+ void pg_stat_queue_enqueue(PG *pg);
+ void pg_stat_queue_dequeue(PG *pg);
+
+ void init();
+ void final_init();
+ void start_shutdown();
+ void shutdown_reserver();
+ void shutdown();
+
+private:
+ // split
+ Mutex in_progress_split_lock;
+ map<spg_t, spg_t> pending_splits; // child -> parent
+ map<spg_t, set<spg_t> > rev_pending_splits; // parent -> [children]
+ set<spg_t> in_progress_splits; // child
+
+public:
+ void _start_split(spg_t parent, const set<spg_t> &children);
+ void start_split(spg_t parent, const set<spg_t> &children) {
+ Mutex::Locker l(in_progress_split_lock);
+ return _start_split(parent, children);
+ }
+ void mark_split_in_progress(spg_t parent, const set<spg_t> &pgs);
+ void complete_split(const set<spg_t> &pgs);
+ void cancel_pending_splits_for_parent(spg_t parent);
+ void _cancel_pending_splits_for_parent(spg_t parent);
+ bool splitting(spg_t pgid);
+ void expand_pg_num(OSDMapRef old_map,
+ OSDMapRef new_map);
+ void _maybe_split_pgid(OSDMapRef old_map,
+ OSDMapRef new_map,
+ spg_t pgid);
+ void init_splits_between(spg_t pgid, OSDMapRef frommap, OSDMapRef tomap);
+
+ // -- stats --
+ Mutex stat_lock;
+ osd_stat_t osd_stat;
+ uint32_t seq = 0;
+
+ void update_osd_stat(vector<int>& hb_peers);
+ osd_stat_t set_osd_stat(const struct store_statfs_t &stbuf,
+ vector<int>& hb_peers,
+ int num_pgs);
+ osd_stat_t get_osd_stat() {
+ Mutex::Locker l(stat_lock);
+ ++seq;
+ osd_stat.up_from = up_epoch;
+ osd_stat.seq = ((uint64_t)osd_stat.up_from << 32) + seq;
+ return osd_stat;
+ }
+ uint64_t get_osd_stat_seq() {
+ Mutex::Locker l(stat_lock);
+ return osd_stat.seq;
+ }
+
+ // -- OSD Full Status --
+private:
+ friend TestOpsSocketHook;
+ mutable Mutex full_status_lock;
+ enum s_names { INVALID = -1, NONE, NEARFULL, BACKFILLFULL, FULL, FAILSAFE } cur_state; // ascending
+ const char *get_full_state_name(s_names s) const {
+ switch (s) {
+ case NONE: return "none";
+ case NEARFULL: return "nearfull";
+ case BACKFILLFULL: return "backfillfull";
+ case FULL: return "full";
+ case FAILSAFE: return "failsafe";
+ default: return "???";
+ }
+ }
+ s_names get_full_state(string type) const {
+ if (type == "none")
+ return NONE;
+ else if (type == "failsafe")
+ return FAILSAFE;
+ else if (type == "full")
+ return FULL;
+ else if (type == "backfillfull")
+ return BACKFILLFULL;
+ else if (type == "nearfull")
+ return NEARFULL;
+ else
+ return INVALID;
+ }
+ double cur_ratio; ///< current utilization
+ mutable int64_t injectfull = 0;
+ s_names injectfull_state = NONE;
+ float get_failsafe_full_ratio();
+ void check_full_status(float ratio);
+ bool _check_full(s_names type, ostream &ss) const;
+public:
+ bool check_failsafe_full(ostream &ss) const;
+ bool check_full(ostream &ss) const;
+ bool check_backfill_full(ostream &ss) const;
+ bool check_nearfull(ostream &ss) const;
+ bool is_failsafe_full() const;
+ bool is_full() const;
+ bool is_backfillfull() const;
+ bool is_nearfull() const;
+ bool need_fullness_update(); ///< osdmap state needs update
+ void set_injectfull(s_names type, int64_t count);
+ bool check_osdmap_full(const set<pg_shard_t> &missing_on);
+
+
+ // -- epochs --
+private:
+ mutable Mutex epoch_lock; // protects access to boot_epoch, up_epoch, bind_epoch
+ epoch_t boot_epoch; // _first_ epoch we were marked up (after this process started)
+ epoch_t up_epoch; // _most_recent_ epoch we were marked up
+ epoch_t bind_epoch; // epoch we last did a bind to new ip:ports
+public:
+ /**
+ * Retrieve the boot_, up_, and bind_ epochs the OSD has set. The params
+ * can be NULL if you don't care about them.
+ */
+ void retrieve_epochs(epoch_t *_boot_epoch, epoch_t *_up_epoch,
+ epoch_t *_bind_epoch) const;
+ /**
+ * Set the boot, up, and bind epochs. Any NULL params will not be set.
+ */
+ void set_epochs(const epoch_t *_boot_epoch, const epoch_t *_up_epoch,
+ const epoch_t *_bind_epoch);
+ epoch_t get_boot_epoch() const {
+ epoch_t ret;
+ retrieve_epochs(&ret, NULL, NULL);
+ return ret;
+ }
+ epoch_t get_up_epoch() const {
+ epoch_t ret;
+ retrieve_epochs(NULL, &ret, NULL);
+ return ret;
+ }
+ epoch_t get_bind_epoch() const {
+ epoch_t ret;
+ retrieve_epochs(NULL, NULL, &ret);
+ return ret;
+ }
+
+ void request_osdmap_update(epoch_t e);
+
+ // -- stopping --
+ Mutex is_stopping_lock;
+ Cond is_stopping_cond;
+ enum {
+ NOT_STOPPING,
+ PREPARING_TO_STOP,
+ STOPPING };
+ std::atomic_int state{NOT_STOPPING};
+ int get_state() {
+ return state;
+ }
+ void set_state(int s) {
+ state = s;
+ }
+ bool is_stopping() const {
+ return state == STOPPING;
+ }
+ bool is_preparing_to_stop() const {
+ return state == PREPARING_TO_STOP;
+ }
+ bool prepare_to_stop();
+ void got_stop_ack();
+
+
+#ifdef PG_DEBUG_REFS
+ Mutex pgid_lock;
+ map<spg_t, int> pgid_tracker;
+ map<spg_t, PG*> live_pgs;
+ void add_pgid(spg_t pgid, PG *pg);
+ void remove_pgid(spg_t pgid, PG *pg);
+ void dump_live_pgids();
+#endif
+
+ explicit OSDService(OSD *osd);
+ ~OSDService();
+};
+
+class OSD : public Dispatcher,
+ public md_config_obs_t {
+ /** OSD **/
+ Mutex osd_lock; // global lock
+ SafeTimer tick_timer; // safe timer (osd_lock)
+
+ // Tick timer for those stuff that do not need osd_lock
+ Mutex tick_timer_lock;
+ SafeTimer tick_timer_without_osd_lock;
+public:
+ // config observer bits
+ const char** get_tracked_conf_keys() const override;
+ void handle_conf_change(const struct md_config_t *conf,
+ const std::set <std::string> &changed) override;
+ void update_log_config();
+ void check_config();
+
+protected:
+
+ static const double OSD_TICK_INTERVAL; // tick interval for tick_timer and tick_timer_without_osd_lock
+
+ AuthAuthorizeHandlerRegistry *authorize_handler_cluster_registry;
+ AuthAuthorizeHandlerRegistry *authorize_handler_service_registry;
+
+ Messenger *cluster_messenger;
+ Messenger *client_messenger;
+ Messenger *objecter_messenger;
+ MonClient *monc; // check the "monc helpers" list before accessing directly
+ MgrClient mgrc;
+ PerfCounters *logger;
+ PerfCounters *recoverystate_perf;
+ ObjectStore *store;
+#ifdef HAVE_LIBFUSE
+ FuseStore *fuse_store = nullptr;
+#endif
+ LogClient log_client;
+ LogChannelRef clog;
+
+ int whoami;
+ std::string dev_path, journal_path;
+
+ bool store_is_rotational = true;
+ bool journal_is_rotational = true;
+
+ ZTracer::Endpoint trace_endpoint;
+ void create_logger();
+ void create_recoverystate_perf();
+ void tick();
+ void tick_without_osd_lock();
+ void _dispatch(Message *m);
+ void dispatch_op(OpRequestRef op);
+
+ void check_osdmap_features(ObjectStore *store);
+
+ // asok
+ friend class OSDSocketHook;
+ class OSDSocketHook *asok_hook;
+ bool asok_command(string admin_command, cmdmap_t& cmdmap, string format, ostream& ss);
+
+public:
+ ClassHandler *class_handler = nullptr;
+ int get_nodeid() { return whoami; }
+
+ static ghobject_t get_osdmap_pobject_name(epoch_t epoch) {
+ char foo[20];
+ snprintf(foo, sizeof(foo), "osdmap.%d", epoch);
+ return ghobject_t(hobject_t(sobject_t(object_t(foo), 0)));
+ }
+ static ghobject_t get_inc_osdmap_pobject_name(epoch_t epoch) {
+ char foo[22];
+ snprintf(foo, sizeof(foo), "inc_osdmap.%d", epoch);
+ return ghobject_t(hobject_t(sobject_t(object_t(foo), 0)));
+ }
+
+ static ghobject_t make_snapmapper_oid() {
+ return ghobject_t(hobject_t(
+ sobject_t(
+ object_t("snapmapper"),
+ 0)));
+ }
+
+ static ghobject_t make_pg_log_oid(spg_t pg) {
+ stringstream ss;
+ ss << "pglog_" << pg;
+ string s;
+ getline(ss, s);
+ return ghobject_t(hobject_t(sobject_t(object_t(s.c_str()), 0)));
+ }
+
+ static ghobject_t make_pg_biginfo_oid(spg_t pg) {
+ stringstream ss;
+ ss << "pginfo_" << pg;
+ string s;
+ getline(ss, s);
+ return ghobject_t(hobject_t(sobject_t(object_t(s.c_str()), 0)));
+ }
+ static ghobject_t make_infos_oid() {
+ hobject_t oid(sobject_t("infos", CEPH_NOSNAP));
+ return ghobject_t(oid);
+ }
+ static void recursive_remove_collection(CephContext* cct,
+ ObjectStore *store,
+ spg_t pgid,
+ coll_t tmp);
+
+ /**
+ * get_osd_initial_compat_set()
+ *
+ * Get the initial feature set for this OSD. Features
+ * here are automatically upgraded.
+ *
+ * Return value: Initial osd CompatSet
+ */
+ static CompatSet get_osd_initial_compat_set();
+
+ /**
+ * get_osd_compat_set()
+ *
+ * Get all features supported by this OSD
+ *
+ * Return value: CompatSet of all supported features
+ */
+ static CompatSet get_osd_compat_set();
+
+
+private:
+ class C_Tick;
+ class C_Tick_WithoutOSDLock;
+
+ // -- superblock --
+ OSDSuperblock superblock;
+
+ void write_superblock();
+ void write_superblock(ObjectStore::Transaction& t);
+ int read_superblock();
+
+ void clear_temp_objects();
+
+ CompatSet osd_compat;
+
+ // -- state --
+public:
+ typedef enum {
+ STATE_INITIALIZING = 1,
+ STATE_PREBOOT,
+ STATE_BOOTING,
+ STATE_ACTIVE,
+ STATE_STOPPING,
+ STATE_WAITING_FOR_HEALTHY
+ } osd_state_t;
+
+ static const char *get_state_name(int s) {
+ switch (s) {
+ case STATE_INITIALIZING: return "initializing";
+ case STATE_PREBOOT: return "preboot";
+ case STATE_BOOTING: return "booting";
+ case STATE_ACTIVE: return "active";
+ case STATE_STOPPING: return "stopping";
+ case STATE_WAITING_FOR_HEALTHY: return "waiting_for_healthy";
+ default: return "???";
+ }
+ }
+
+private:
+ std::atomic_int state{STATE_INITIALIZING};
+ bool waiting_for_luminous_mons = false;
+
+public:
+ int get_state() const {
+ return state;
+ }
+ void set_state(int s) {
+ state = s;
+ }
+ bool is_initializing() const {
+ return state == STATE_INITIALIZING;
+ }
+ bool is_preboot() const {
+ return state == STATE_PREBOOT;
+ }
+ bool is_booting() const {
+ return state == STATE_BOOTING;
+ }
+ bool is_active() const {
+ return state == STATE_ACTIVE;
+ }
+ bool is_stopping() const {
+ return state == STATE_STOPPING;
+ }
+ bool is_waiting_for_healthy() const {
+ return state == STATE_WAITING_FOR_HEALTHY;
+ }
+
+private:
+
+ ThreadPool peering_tp;
+ ShardedThreadPool osd_op_tp;
+ ThreadPool disk_tp;
+ ThreadPool command_tp;
+
+ void set_disk_tp_priority();
+ void get_latest_osdmap();
+
+ // -- sessions --
+private:
+ void dispatch_session_waiting(Session *session, OSDMapRef osdmap);
+ void maybe_share_map(Session *session, OpRequestRef op, OSDMapRef osdmap);
+
+ Mutex session_waiting_lock;
+ set<Session*> session_waiting_for_map;
+
+ /// Caller assumes refs for included Sessions
+ void get_sessions_waiting_for_map(set<Session*> *out) {
+ Mutex::Locker l(session_waiting_lock);
+ out->swap(session_waiting_for_map);
+ }
+ void register_session_waiting_on_map(Session *session) {
+ Mutex::Locker l(session_waiting_lock);
+ if (session_waiting_for_map.insert(session).second) {
+ session->get();
+ }
+ }
+ void clear_session_waiting_on_map(Session *session) {
+ Mutex::Locker l(session_waiting_lock);
+ set<Session*>::iterator i = session_waiting_for_map.find(session);
+ if (i != session_waiting_for_map.end()) {
+ (*i)->put();
+ session_waiting_for_map.erase(i);
+ }
+ }
+ void dispatch_sessions_waiting_on_map() {
+ set<Session*> sessions_to_check;
+ get_sessions_waiting_for_map(&sessions_to_check);
+ for (set<Session*>::iterator i = sessions_to_check.begin();
+ i != sessions_to_check.end();
+ sessions_to_check.erase(i++)) {
+ (*i)->session_dispatch_lock.Lock();
+ dispatch_session_waiting(*i, osdmap);
+ (*i)->session_dispatch_lock.Unlock();
+ (*i)->put();
+ }
+ }
+ void session_handle_reset(Session *session) {
+ Mutex::Locker l(session->session_dispatch_lock);
+ clear_session_waiting_on_map(session);
+
+ session->clear_backoffs();
+
+ /* Messages have connection refs, we need to clear the
+ * connection->session->message->connection
+ * cycles which result.
+ * Bug #12338
+ */
+ session->waiting_on_map.clear_and_dispose(TrackedOp::Putter());
+ }
+
+private:
+ /**
+ * @defgroup monc helpers
+ * @{
+ * Right now we only have the one
+ */
+
+ /**
+ * Ask the Monitors for a sequence of OSDMaps.
+ *
+ * @param epoch The epoch to start with when replying
+ * @param force_request True if this request forces a new subscription to
+ * the monitors; false if an outstanding request that encompasses it is
+ * sufficient.
+ */
+ void osdmap_subscribe(version_t epoch, bool force_request);
+ /** @} monc helpers */
+
+ Mutex osdmap_subscribe_lock;
+ epoch_t latest_subscribed_epoch{0};
+
+ // -- heartbeat --
+ /// information about a heartbeat peer
+ struct HeartbeatInfo {
+ int peer; ///< peer
+ ConnectionRef con_front; ///< peer connection (front)
+ ConnectionRef con_back; ///< peer connection (back)
+ utime_t first_tx; ///< time we sent our first ping request
+ utime_t last_tx; ///< last time we sent a ping request
+ utime_t last_rx_front; ///< last time we got a ping reply on the front side
+ utime_t last_rx_back; ///< last time we got a ping reply on the back side
+ epoch_t epoch; ///< most recent epoch we wanted this peer
+
+ bool is_unhealthy(utime_t cutoff) const {
+ return
+ ! ((last_rx_front > cutoff ||
+ (last_rx_front == utime_t() && (last_tx == utime_t() ||
+ first_tx > cutoff))) &&
+ (last_rx_back > cutoff ||
+ (last_rx_back == utime_t() && (last_tx == utime_t() ||
+ first_tx > cutoff))));
+ }
+ bool is_healthy(utime_t cutoff) const {
+ return last_rx_front > cutoff && last_rx_back > cutoff;
+ }
+
+ };
+ /// state attached to outgoing heartbeat connections
+ struct HeartbeatSession : public RefCountedObject {
+ int peer;
+ explicit HeartbeatSession(int p) : peer(p) {}
+ };
+ Mutex heartbeat_lock;
+ map<int, int> debug_heartbeat_drops_remaining;
+ Cond heartbeat_cond;
+ bool heartbeat_stop;
+ std::atomic_bool heartbeat_need_update;
+ map<int,HeartbeatInfo> heartbeat_peers; ///< map of osd id to HeartbeatInfo
+ utime_t last_mon_heartbeat;
+ Messenger *hb_front_client_messenger;
+ Messenger *hb_back_client_messenger;
+ Messenger *hb_front_server_messenger;
+ Messenger *hb_back_server_messenger;
+ utime_t last_heartbeat_resample; ///< last time we chose random peers in waiting-for-healthy state
+ double daily_loadavg;
+
+ void _add_heartbeat_peer(int p);
+ void _remove_heartbeat_peer(int p);
+ bool heartbeat_reset(Connection *con);
+ void maybe_update_heartbeat_peers();
+ void reset_heartbeat_peers();
+ bool heartbeat_peers_need_update() {
+ return heartbeat_need_update.load();
+ }
+ void heartbeat_set_peers_need_update() {
+ heartbeat_need_update.store(true);
+ }
+ void heartbeat_clear_peers_need_update() {
+ heartbeat_need_update.store(false);
+ }
+ void heartbeat();
+ void heartbeat_check();
+ void heartbeat_entry();
+ void need_heartbeat_peer_update();
+
+ void heartbeat_kick() {
+ Mutex::Locker l(heartbeat_lock);
+ heartbeat_cond.Signal();
+ }
+
+ struct T_Heartbeat : public Thread {
+ OSD *osd;
+ explicit T_Heartbeat(OSD *o) : osd(o) {}
+ void *entry() override {
+ osd->heartbeat_entry();
+ return 0;
+ }
+ } heartbeat_thread;
+
+public:
+ bool heartbeat_dispatch(Message *m);
+
+ struct HeartbeatDispatcher : public Dispatcher {
+ OSD *osd;
+ explicit HeartbeatDispatcher(OSD *o) : Dispatcher(o->cct), osd(o) {}
+
+ bool ms_can_fast_dispatch_any() const override { return true; }
+ bool ms_can_fast_dispatch(const Message *m) const override {
+ switch (m->get_type()) {
+ case CEPH_MSG_PING:
+ case MSG_OSD_PING:
+ return true;
+ default:
+ return false;
+ }
+ }
+ void ms_fast_dispatch(Message *m) override {
+ osd->heartbeat_dispatch(m);
+ }
+ bool ms_dispatch(Message *m) override {
+ return osd->heartbeat_dispatch(m);
+ }
+ bool ms_handle_reset(Connection *con) override {
+ return osd->heartbeat_reset(con);
+ }
+ void ms_handle_remote_reset(Connection *con) override {}
+ bool ms_handle_refused(Connection *con) override {
+ return osd->ms_handle_refused(con);
+ }
+ bool ms_verify_authorizer(Connection *con, int peer_type,
+ int protocol, bufferlist& authorizer_data, bufferlist& authorizer_reply,
+ bool& isvalid, CryptoKey& session_key) override {
+ isvalid = true;
+ return true;
+ }
+ } heartbeat_dispatcher;
+
+private:
+ // -- waiters --
+ list<OpRequestRef> finished;
+
+ void take_waiters(list<OpRequestRef>& ls) {
+ assert(osd_lock.is_locked());
+ finished.splice(finished.end(), ls);
+ }
+ void do_waiters();
+
+ // -- op tracking --
+ OpTracker op_tracker;
+ void check_ops_in_flight();
+ void test_ops(std::string command, std::string args, ostream& ss);
+ friend class TestOpsSocketHook;
+ TestOpsSocketHook *test_ops_hook;
+ friend struct C_CompleteSplits;
+ friend struct C_OpenPGs;
+
+ // -- op queue --
+ enum class io_queue {
+ prioritized,
+ weightedpriority,
+ mclock_opclass,
+ mclock_client,
+ };
+ friend std::ostream& operator<<(std::ostream& out, const OSD::io_queue& q);
+
+ const io_queue op_queue;
+ const unsigned int op_prio_cutoff;
+
+ /*
+ * The ordered op delivery chain is:
+ *
+ * fast dispatch -> pqueue back
+ * pqueue front <-> to_process back
+ * to_process front -> RunVis(item)
+ * <- queue_front()
+ *
+ * The pqueue is per-shard, and to_process is per pg_slot. Items can be
+ * pushed back up into to_process and/or pqueue while order is preserved.
+ *
+ * Multiple worker threads can operate on each shard.
+ *
+ * Under normal circumstances, num_running == to_proces.size(). There are
+ * two times when that is not true: (1) when waiting_for_pg == true and
+ * to_process is accumulating requests that are waiting for the pg to be
+ * instantiated; in that case they will all get requeued together by
+ * wake_pg_waiters, and (2) when wake_pg_waiters just ran, waiting_for_pg
+ * and already requeued the items.
+ */
+ friend class PGQueueable;
+
+ class ShardedOpWQ
+ : public ShardedThreadPool::ShardedWQ<pair<spg_t,PGQueueable>>
+ {
+ struct ShardData {
+ Mutex sdata_lock;
+ Cond sdata_cond;
+
+ Mutex sdata_op_ordering_lock; ///< protects all members below
+
+ OSDMapRef waiting_for_pg_osdmap;
+ struct pg_slot {
+ PGRef pg; ///< cached pg reference [optional]
+ list<PGQueueable> to_process; ///< order items for this slot
+ int num_running = 0; ///< _process threads doing pg lookup/lock
+
+ /// true if pg does/did not exist. if so all new items go directly to
+ /// to_process. cleared by prune_pg_waiters.
+ bool waiting_for_pg = false;
+
+ /// incremented by wake_pg_waiters; indicates racing _process threads
+ /// should bail out (their op has been requeued)
+ uint64_t requeue_seq = 0;
+ };
+
+ /// map of slots for each spg_t. maintains ordering of items dequeued
+ /// from pqueue while _process thread drops shard lock to acquire the
+ /// pg lock. slots are removed only by prune_pg_waiters.
+ unordered_map<spg_t,pg_slot> pg_slots;
+
+ /// priority queue
+ std::unique_ptr<OpQueue< pair<spg_t, PGQueueable>, entity_inst_t>> pqueue;
+
+ void _enqueue_front(pair<spg_t, PGQueueable> item, unsigned cutoff) {
+ unsigned priority = item.second.get_priority();
+ unsigned cost = item.second.get_cost();
+ if (priority >= cutoff)
+ pqueue->enqueue_strict_front(
+ item.second.get_owner(),
+ priority, item);
+ else
+ pqueue->enqueue_front(
+ item.second.get_owner(),
+ priority, cost, item);
+ }
+
+ ShardData(
+ string lock_name, string ordering_lock,
+ uint64_t max_tok_per_prio, uint64_t min_cost, CephContext *cct,
+ io_queue opqueue)
+ : sdata_lock(lock_name.c_str(), false, true, false, cct),
+ sdata_op_ordering_lock(ordering_lock.c_str(), false, true,
+ false, cct) {
+ if (opqueue == io_queue::weightedpriority) {
+ pqueue = std::unique_ptr
+ <WeightedPriorityQueue<pair<spg_t,PGQueueable>,entity_inst_t>>(
+ new WeightedPriorityQueue<pair<spg_t,PGQueueable>,entity_inst_t>(
+ max_tok_per_prio, min_cost));
+ } else if (opqueue == io_queue::prioritized) {
+ pqueue = std::unique_ptr
+ <PrioritizedQueue<pair<spg_t,PGQueueable>,entity_inst_t>>(
+ new PrioritizedQueue<pair<spg_t,PGQueueable>,entity_inst_t>(
+ max_tok_per_prio, min_cost));
+ } else if (opqueue == io_queue::mclock_opclass) {
+ pqueue = std::unique_ptr
+ <ceph::mClockOpClassQueue>(new ceph::mClockOpClassQueue(cct));
+ } else if (opqueue == io_queue::mclock_client) {
+ pqueue = std::unique_ptr
+ <ceph::mClockClientQueue>(new ceph::mClockClientQueue(cct));
+ }
+ }
+ }; // struct ShardData
+
+ vector<ShardData*> shard_list;
+ OSD *osd;
+ uint32_t num_shards;
+
+ public:
+ ShardedOpWQ(uint32_t pnum_shards,
+ OSD *o,
+ time_t ti,
+ time_t si,
+ ShardedThreadPool* tp)
+ : ShardedThreadPool::ShardedWQ<pair<spg_t,PGQueueable>>(ti, si, tp),
+ osd(o),
+ num_shards(pnum_shards) {
+ for (uint32_t i = 0; i < num_shards; i++) {
+ char lock_name[32] = {0};
+ snprintf(lock_name, sizeof(lock_name), "%s.%d", "OSD:ShardedOpWQ:", i);
+ char order_lock[32] = {0};
+ snprintf(order_lock, sizeof(order_lock), "%s.%d",
+ "OSD:ShardedOpWQ:order:", i);
+ ShardData* one_shard = new ShardData(
+ lock_name, order_lock,
+ osd->cct->_conf->osd_op_pq_max_tokens_per_priority,
+ osd->cct->_conf->osd_op_pq_min_cost, osd->cct, osd->op_queue);
+ shard_list.push_back(one_shard);
+ }
+ }
+ ~ShardedOpWQ() override {
+ while (!shard_list.empty()) {
+ delete shard_list.back();
+ shard_list.pop_back();
+ }
+ }
+
+ /// wake any pg waiters after a PG is created/instantiated
+ void wake_pg_waiters(spg_t pgid);
+
+ /// prune ops (and possiblye pg_slots) for pgs that shouldn't be here
+ void prune_pg_waiters(OSDMapRef osdmap, int whoami);
+
+ /// clear cached PGRef on pg deletion
+ void clear_pg_pointer(spg_t pgid);
+
+ /// clear pg_slots on shutdown
+ void clear_pg_slots();
+
+ /// try to do some work
+ void _process(uint32_t thread_index, heartbeat_handle_d *hb) override;
+
+ /// enqueue a new item
+ void _enqueue(pair <spg_t, PGQueueable> item) override;
+
+ /// requeue an old item (at the front of the line)
+ void _enqueue_front(pair <spg_t, PGQueueable> item) override;
+
+ void return_waiting_threads() override {
+ for(uint32_t i = 0; i < num_shards; i++) {
+ ShardData* sdata = shard_list[i];
+ assert (NULL != sdata);
+ sdata->sdata_lock.Lock();
+ sdata->sdata_cond.Signal();
+ sdata->sdata_lock.Unlock();
+ }
+ }
+
+ void dump(Formatter *f) {
+ for(uint32_t i = 0; i < num_shards; i++) {
+ ShardData* sdata = shard_list[i];
+ char lock_name[32] = {0};
+ snprintf(lock_name, sizeof(lock_name), "%s%d", "OSD:ShardedOpWQ:", i);
+ assert (NULL != sdata);
+ sdata->sdata_op_ordering_lock.Lock();
+ f->open_object_section(lock_name);
+ sdata->pqueue->dump(f);
+ f->close_section();
+ sdata->sdata_op_ordering_lock.Unlock();
+ }
+ }
+
+ /// Must be called on ops queued back to front
+ struct Pred {
+ spg_t pgid;
+ list<OpRequestRef> *out_ops;
+ uint64_t reserved_pushes_to_free;
+ Pred(spg_t pg, list<OpRequestRef> *out_ops = 0)
+ : pgid(pg), out_ops(out_ops), reserved_pushes_to_free(0) {}
+ void accumulate(const PGQueueable &op) {
+ reserved_pushes_to_free += op.get_reserved_pushes();
+ if (out_ops) {
+ boost::optional<OpRequestRef> mop = op.maybe_get_op();
+ if (mop)
+ out_ops->push_front(*mop);
+ }
+ }
+ bool operator()(const pair<spg_t, PGQueueable> &op) {
+ if (op.first == pgid) {
+ accumulate(op.second);
+ return true;
+ } else {
+ return false;
+ }
+ }
+ uint64_t get_reserved_pushes_to_free() const {
+ return reserved_pushes_to_free;
+ }
+ };
+
+ bool is_shard_empty(uint32_t thread_index) override {
+ uint32_t shard_index = thread_index % num_shards;
+ ShardData* sdata = shard_list[shard_index];
+ assert(NULL != sdata);
+ Mutex::Locker l(sdata->sdata_op_ordering_lock);
+ return sdata->pqueue->empty();
+ }
+ } op_shardedwq;
+
+
+ void enqueue_op(spg_t pg, OpRequestRef& op, epoch_t epoch);
+ void dequeue_op(
+ PGRef pg, OpRequestRef op,
+ ThreadPool::TPHandle &handle);
+
+ // -- peering queue --
+ struct PeeringWQ : public ThreadPool::BatchWorkQueue<PG> {
+ list<PG*> peering_queue;
+ OSD *osd;
+ set<PG*> in_use;
+ PeeringWQ(OSD *o, time_t ti, time_t si, ThreadPool *tp)
+ : ThreadPool::BatchWorkQueue<PG>(
+ "OSD::PeeringWQ", ti, si, tp), osd(o) {}
+
+ void _dequeue(PG *pg) override {
+ for (list<PG*>::iterator i = peering_queue.begin();
+ i != peering_queue.end();
+ ) {
+ if (*i == pg) {
+ peering_queue.erase(i++);
+ pg->put("PeeringWQ");
+ } else {
+ ++i;
+ }
+ }
+ }
+ bool _enqueue(PG *pg) override {
+ pg->get("PeeringWQ");
+ peering_queue.push_back(pg);
+ return true;
+ }
+ bool _empty() override {
+ return peering_queue.empty();
+ }
+ void _dequeue(list<PG*> *out) override;
+ void _process(
+ const list<PG *> &pgs,
+ ThreadPool::TPHandle &handle) override {
+ assert(!pgs.empty());
+ osd->process_peering_events(pgs, handle);
+ for (list<PG *>::const_iterator i = pgs.begin();
+ i != pgs.end();
+ ++i) {
+ (*i)->put("PeeringWQ");
+ }
+ }
+ void _process_finish(const list<PG *> &pgs) override {
+ for (list<PG*>::const_iterator i = pgs.begin();
+ i != pgs.end();
+ ++i) {
+ in_use.erase(*i);
+ }
+ }
+ void _clear() override {
+ assert(peering_queue.empty());
+ }
+ } peering_wq;
+
+ void process_peering_events(
+ const list<PG*> &pg,
+ ThreadPool::TPHandle &handle);
+
+ friend class PG;
+ friend class PrimaryLogPG;
+
+
+ protected:
+
+ // -- osd map --
+ OSDMapRef osdmap;
+ OSDMapRef get_osdmap() {
+ return osdmap;
+ }
+ epoch_t get_osdmap_epoch() const {
+ return osdmap ? osdmap->get_epoch() : 0;
+ }
+
+ utime_t had_map_since;
+ RWLock map_lock;
+ list<OpRequestRef> waiting_for_osdmap;
+ deque<utime_t> osd_markdown_log;
+
+ friend struct send_map_on_destruct;
+
+ void wait_for_new_map(OpRequestRef op);
+ void handle_osd_map(class MOSDMap *m);
+ void _committed_osd_maps(epoch_t first, epoch_t last, class MOSDMap *m);
+ void trim_maps(epoch_t oldest, int nreceived, bool skip_maps);
+ void note_down_osd(int osd);
+ void note_up_osd(int osd);
+ friend class C_OnMapCommit;
+
+ bool advance_pg(
+ epoch_t advance_to, PG *pg,
+ ThreadPool::TPHandle &handle,
+ PG::RecoveryCtx *rctx,
+ set<PGRef> *split_pgs
+ );
+ void consume_map();
+ void activate_map();
+
+ // osd map cache (past osd maps)
+ OSDMapRef get_map(epoch_t e) {
+ return service.get_map(e);
+ }
+ OSDMapRef add_map(OSDMap *o) {
+ return service.add_map(o);
+ }
+ void add_map_bl(epoch_t e, bufferlist& bl) {
+ return service.add_map_bl(e, bl);
+ }
+ void pin_map_bl(epoch_t e, bufferlist &bl) {
+ return service.pin_map_bl(e, bl);
+ }
+ bool get_map_bl(epoch_t e, bufferlist& bl) {
+ return service.get_map_bl(e, bl);
+ }
+ void add_map_inc_bl(epoch_t e, bufferlist& bl) {
+ return service.add_map_inc_bl(e, bl);
+ }
+ void pin_map_inc_bl(epoch_t e, bufferlist &bl) {
+ return service.pin_map_inc_bl(e, bl);
+ }
+
+protected:
+ // -- placement groups --
+ RWLock pg_map_lock; // this lock orders *above* individual PG _locks
+ ceph::unordered_map<spg_t, PG*> pg_map; // protected by pg_map lock
+
+ std::mutex pending_creates_lock;
+ std::set<pg_t> pending_creates_from_osd;
+ unsigned pending_creates_from_mon = 0;
+
+ map<spg_t, list<PG::CephPeeringEvtRef> > peering_wait_for_split;
+ PGRecoveryStats pg_recovery_stats;
+
+ PGPool _get_pool(int id, OSDMapRef createmap);
+
+ PG *_lookup_lock_pg_with_map_lock_held(spg_t pgid);
+ PG *_lookup_lock_pg(spg_t pgid);
+
+public:
+ PG *lookup_lock_pg(spg_t pgid);
+
+ int get_num_pgs() {
+ RWLock::RLocker l(pg_map_lock);
+ return pg_map.size();
+ }
+
+protected:
+ PG *_open_lock_pg(OSDMapRef createmap,
+ spg_t pg, bool no_lockdep_check=false);
+ enum res_result {
+ RES_PARENT, // resurrected a parent
+ RES_SELF, // resurrected self
+ RES_NONE // nothing relevant deleting
+ };
+ res_result _try_resurrect_pg(
+ OSDMapRef curmap, spg_t pgid, spg_t *resurrected, PGRef *old_pg_state);
+
+ PG *_create_lock_pg(
+ OSDMapRef createmap,
+ spg_t pgid,
+ bool hold_map_lock,
+ bool backfill,
+ int role,
+ vector<int>& up, int up_primary,
+ vector<int>& acting, int acting_primary,
+ pg_history_t history,
+ const PastIntervals& pi,
+ ObjectStore::Transaction& t);
+
+ PG* _make_pg(OSDMapRef createmap, spg_t pgid);
+ void add_newly_split_pg(PG *pg,
+ PG::RecoveryCtx *rctx);
+
+ int handle_pg_peering_evt(
+ spg_t pgid,
+ const pg_history_t& orig_history,
+ const PastIntervals& pi,
+ epoch_t epoch,
+ PG::CephPeeringEvtRef evt);
+ bool maybe_wait_for_max_pg(spg_t pgid, bool is_mon_create);
+ void resume_creating_pg();
+
+ void load_pgs();
+ void build_past_intervals_parallel();
+
+ /// build initial pg history and intervals on create
+ void build_initial_pg_history(
+ spg_t pgid,
+ epoch_t created,
+ utime_t created_stamp,
+ pg_history_t *h,
+ PastIntervals *pi);
+
+ /// project pg history from from to now
+ bool project_pg_history(
+ spg_t pgid, pg_history_t& h, epoch_t from,
+ const vector<int>& lastup,
+ int lastupprimary,
+ const vector<int>& lastacting,
+ int lastactingprimary
+ ); ///< @return false if there was a map gap between from and now
+
+ // this must be called with pg->lock held on any pg addition to pg_map
+ void wake_pg_waiters(PGRef pg) {
+ assert(pg->is_locked());
+ op_shardedwq.wake_pg_waiters(pg->info.pgid);
+ }
+ epoch_t last_pg_create_epoch;
+
+ void handle_pg_create(OpRequestRef op);
+
+ void split_pgs(
+ PG *parent,
+ const set<spg_t> &childpgids, set<PGRef> *out_pgs,
+ OSDMapRef curmap,
+ OSDMapRef nextmap,
+ PG::RecoveryCtx *rctx);
+
+ // == monitor interaction ==
+ Mutex mon_report_lock;
+ utime_t last_mon_report;
+ utime_t last_pg_stats_sent;
+
+ /* if our monitor dies, we want to notice it and reconnect.
+ * So we keep track of when it last acked our stat updates,
+ * and if too much time passes (and we've been sending
+ * more updates) then we can call it dead and reconnect
+ * elsewhere.
+ */
+ utime_t last_pg_stats_ack;
+ float stats_ack_timeout;
+ set<uint64_t> outstanding_pg_stats; // how many stat updates haven't been acked yet
+
+ // -- boot --
+ void start_boot();
+ void _got_mon_epochs(epoch_t oldest, epoch_t newest);
+ void _preboot(epoch_t oldest, epoch_t newest);
+ void _send_boot();
+ void _collect_metadata(map<string,string> *pmeta);
+
+ void start_waiting_for_healthy();
+ bool _is_healthy();
+
+ void send_full_update();
+
+ friend struct C_OSD_GetVersion;
+
+ // -- alive --
+ epoch_t up_thru_wanted;
+
+ void queue_want_up_thru(epoch_t want);
+ void send_alive();
+
+ // -- full map requests --
+ epoch_t requested_full_first, requested_full_last;
+
+ void request_full_map(epoch_t first, epoch_t last);
+ void rerequest_full_maps() {
+ epoch_t first = requested_full_first;
+ epoch_t last = requested_full_last;
+ requested_full_first = 0;
+ requested_full_last = 0;
+ request_full_map(first, last);
+ }
+ void got_full_map(epoch_t e);
+
+ // -- failures --
+ map<int,utime_t> failure_queue;
+ map<int,pair<utime_t,entity_inst_t> > failure_pending;
+
+ void requeue_failures();
+ void send_failures();
+ void send_still_alive(epoch_t epoch, const entity_inst_t &i);
+
+ // -- pg stats --
+ Mutex pg_stat_queue_lock;
+ Cond pg_stat_queue_cond;
+ xlist<PG*> pg_stat_queue;
+ bool osd_stat_updated;
+ uint64_t pg_stat_tid, pg_stat_tid_flushed;
+
+ void send_pg_stats(const utime_t &now);
+ void handle_pg_stats_ack(class MPGStatsAck *ack);
+ void flush_pg_stats();
+
+ ceph::coarse_mono_clock::time_point last_sent_beacon;
+ Mutex min_last_epoch_clean_lock{"OSD::min_last_epoch_clean_lock"};
+ epoch_t min_last_epoch_clean = 0;
+ // which pgs were scanned for min_lec
+ std::vector<pg_t> min_last_epoch_clean_pgs;
+ void send_beacon(const ceph::coarse_mono_clock::time_point& now);
+
+ void pg_stat_queue_enqueue(PG *pg) {
+ pg_stat_queue_lock.Lock();
+ if (pg->is_primary() && !pg->stat_queue_item.is_on_list()) {
+ pg->get("pg_stat_queue");
+ pg_stat_queue.push_back(&pg->stat_queue_item);
+ }
+ osd_stat_updated = true;
+ pg_stat_queue_lock.Unlock();
+ }
+ void pg_stat_queue_dequeue(PG *pg) {
+ pg_stat_queue_lock.Lock();
+ if (pg->stat_queue_item.remove_myself())
+ pg->put("pg_stat_queue");
+ pg_stat_queue_lock.Unlock();
+ }
+ void clear_pg_stat_queue() {
+ pg_stat_queue_lock.Lock();
+ while (!pg_stat_queue.empty()) {
+ PG *pg = pg_stat_queue.front();
+ pg_stat_queue.pop_front();
+ pg->put("pg_stat_queue");
+ }
+ pg_stat_queue_lock.Unlock();
+ }
+ void clear_outstanding_pg_stats(){
+ Mutex::Locker l(pg_stat_queue_lock);
+ outstanding_pg_stats.clear();
+ }
+
+ ceph_tid_t get_tid() {
+ return service.get_tid();
+ }
+
+ // -- generic pg peering --
+ PG::RecoveryCtx create_context();
+ void dispatch_context(PG::RecoveryCtx &ctx, PG *pg, OSDMapRef curmap,
+ ThreadPool::TPHandle *handle = NULL);
+ void dispatch_context_transaction(PG::RecoveryCtx &ctx, PG *pg,
+ ThreadPool::TPHandle *handle = NULL);
+ void do_notifies(map<int,
+ vector<pair<pg_notify_t, PastIntervals> > >&
+ notify_list,
+ OSDMapRef map);
+ void do_queries(map<int, map<spg_t,pg_query_t> >& query_map,
+ OSDMapRef map);
+ void do_infos(map<int,
+ vector<pair<pg_notify_t, PastIntervals> > >& info_map,
+ OSDMapRef map);
+
+ bool require_mon_peer(const Message *m);
+ bool require_mon_or_mgr_peer(const Message *m);
+ bool require_osd_peer(const Message *m);
+ /***
+ * Verifies that we were alive in the given epoch, and that
+ * still are.
+ */
+ bool require_self_aliveness(const Message *m, epoch_t alive_since);
+ /**
+ * Verifies that the OSD who sent the given op has the same
+ * address as in the given map.
+ * @pre op was sent by an OSD using the cluster messenger
+ */
+ bool require_same_peer_instance(const Message *m, OSDMapRef& map,
+ bool is_fast_dispatch);
+
+ bool require_same_or_newer_map(OpRequestRef& op, epoch_t e,
+ bool is_fast_dispatch);
+
+ void handle_pg_query(OpRequestRef op);
+ void handle_pg_notify(OpRequestRef op);
+ void handle_pg_log(OpRequestRef op);
+ void handle_pg_info(OpRequestRef op);
+ void handle_pg_trim(OpRequestRef op);
+
+ void handle_pg_backfill_reserve(OpRequestRef op);
+ void handle_pg_recovery_reserve(OpRequestRef op);
+
+ void handle_force_recovery(Message *m);
+
+ void handle_pg_remove(OpRequestRef op);
+ void _remove_pg(PG *pg);
+
+ // -- commands --
+ struct Command {
+ vector<string> cmd;
+ ceph_tid_t tid;
+ bufferlist indata;
+ ConnectionRef con;
+
+ Command(vector<string>& c, ceph_tid_t t, bufferlist& bl, Connection *co)
+ : cmd(c), tid(t), indata(bl), con(co) {}
+ };
+ list<Command*> command_queue;
+ struct CommandWQ : public ThreadPool::WorkQueue<Command> {
+ OSD *osd;
+ CommandWQ(OSD *o, time_t ti, time_t si, ThreadPool *tp)
+ : ThreadPool::WorkQueue<Command>("OSD::CommandWQ", ti, si, tp), osd(o) {}
+
+ bool _empty() override {
+ return osd->command_queue.empty();
+ }
+ bool _enqueue(Command *c) override {
+ osd->command_queue.push_back(c);
+ return true;
+ }
+ void _dequeue(Command *pg) override {
+ ceph_abort();
+ }
+ Command *_dequeue() override {
+ if (osd->command_queue.empty())
+ return NULL;
+ Command *c = osd->command_queue.front();
+ osd->command_queue.pop_front();
+ return c;
+ }
+ void _process(Command *c, ThreadPool::TPHandle &) override {
+ osd->osd_lock.Lock();
+ if (osd->is_stopping()) {
+ osd->osd_lock.Unlock();
+ delete c;
+ return;
+ }
+ osd->do_command(c->con.get(), c->tid, c->cmd, c->indata);
+ osd->osd_lock.Unlock();
+ delete c;
+ }
+ void _clear() override {
+ while (!osd->command_queue.empty()) {
+ Command *c = osd->command_queue.front();
+ osd->command_queue.pop_front();
+ delete c;
+ }
+ }
+ } command_wq;
+
+ void handle_command(class MMonCommand *m);
+ void handle_command(class MCommand *m);
+ void do_command(Connection *con, ceph_tid_t tid, vector<string>& cmd, bufferlist& data);
+
+ // -- pg recovery --
+ void do_recovery(PG *pg, epoch_t epoch_queued, uint64_t pushes_reserved,
+ ThreadPool::TPHandle &handle);
+
+
+ // -- scrubbing --
+ void sched_scrub();
+ bool scrub_random_backoff();
+ bool scrub_load_below_threshold();
+ bool scrub_time_permit(utime_t now);
+
+ // -- removing --
+ struct RemoveWQ :
+ public ThreadPool::WorkQueueVal<pair<PGRef, DeletingStateRef> > {
+ CephContext* cct;
+ ObjectStore *&store;
+ list<pair<PGRef, DeletingStateRef> > remove_queue;
+ RemoveWQ(CephContext* cct, ObjectStore *&o, time_t ti, time_t si,
+ ThreadPool *tp)
+ : ThreadPool::WorkQueueVal<pair<PGRef, DeletingStateRef> >(
+ "OSD::RemoveWQ", ti, si, tp), cct(cct), store(o) {}
+
+ bool _empty() override {
+ return remove_queue.empty();
+ }
+ void _enqueue(pair<PGRef, DeletingStateRef> item) override {
+ remove_queue.push_back(item);
+ }
+ void _enqueue_front(pair<PGRef, DeletingStateRef> item) override {
+ remove_queue.push_front(item);
+ }
+ bool _dequeue(pair<PGRef, DeletingStateRef> item) {
+ ceph_abort();
+ }
+ pair<PGRef, DeletingStateRef> _dequeue() override {
+ assert(!remove_queue.empty());
+ pair<PGRef, DeletingStateRef> item = remove_queue.front();
+ remove_queue.pop_front();
+ return item;
+ }
+ void _process(pair<PGRef, DeletingStateRef>,
+ ThreadPool::TPHandle &) override;
+ void _clear() override {
+ remove_queue.clear();
+ }
+ } remove_wq;
+
+private:
+ bool ms_can_fast_dispatch_any() const override { return true; }
+ bool ms_can_fast_dispatch(const Message *m) const override {
+ switch (m->get_type()) {
+ case CEPH_MSG_OSD_OP:
+ case CEPH_MSG_OSD_BACKOFF:
+ case MSG_OSD_SUBOP:
+ case MSG_OSD_REPOP:
+ case MSG_OSD_SUBOPREPLY:
+ case MSG_OSD_REPOPREPLY:
+ case MSG_OSD_PG_PUSH:
+ case MSG_OSD_PG_PULL:
+ case MSG_OSD_PG_PUSH_REPLY:
+ case MSG_OSD_PG_SCAN:
+ case MSG_OSD_PG_BACKFILL:
+ case MSG_OSD_PG_BACKFILL_REMOVE:
+ case MSG_OSD_EC_WRITE:
+ case MSG_OSD_EC_WRITE_REPLY:
+ case MSG_OSD_EC_READ:
+ case MSG_OSD_EC_READ_REPLY:
+ case MSG_OSD_SCRUB_RESERVE:
+ case MSG_OSD_REP_SCRUB:
+ case MSG_OSD_REP_SCRUBMAP:
+ case MSG_OSD_PG_UPDATE_LOG_MISSING:
+ case MSG_OSD_PG_UPDATE_LOG_MISSING_REPLY:
+ case MSG_OSD_PG_RECOVERY_DELETE:
+ case MSG_OSD_PG_RECOVERY_DELETE_REPLY:
+ return true;
+ default:
+ return false;
+ }
+ }
+ void ms_fast_dispatch(Message *m) override;
+ void ms_fast_preprocess(Message *m) override;
+ bool ms_dispatch(Message *m) override;
+ bool ms_get_authorizer(int dest_type, AuthAuthorizer **authorizer, bool force_new) override;
+ bool ms_verify_authorizer(Connection *con, int peer_type,
+ int protocol, bufferlist& authorizer, bufferlist& authorizer_reply,
+ bool& isvalid, CryptoKey& session_key) override;
+ void ms_handle_connect(Connection *con) override;
+ void ms_handle_fast_connect(Connection *con) override;
+ void ms_handle_fast_accept(Connection *con) override;
+ bool ms_handle_reset(Connection *con) override;
+ void ms_handle_remote_reset(Connection *con) override {}
+ bool ms_handle_refused(Connection *con) override;
+
+ io_queue get_io_queue() const {
+ if (cct->_conf->osd_op_queue == "debug_random") {
+ static io_queue index_lookup[] = { io_queue::prioritized,
+ io_queue::weightedpriority,
+ io_queue::mclock_opclass,
+ io_queue::mclock_client };
+ srand(time(NULL));
+ unsigned which = rand() % (sizeof(index_lookup) / sizeof(index_lookup[0]));
+ return index_lookup[which];
+ } else if (cct->_conf->osd_op_queue == "prioritized") {
+ return io_queue::prioritized;
+ } else if (cct->_conf->osd_op_queue == "mclock_opclass") {
+ return io_queue::mclock_opclass;
+ } else if (cct->_conf->osd_op_queue == "mclock_client") {
+ return io_queue::mclock_client;
+ } else {
+ // default / catch-all is 'wpq'
+ return io_queue::weightedpriority;
+ }
+ }
+
+ unsigned int get_io_prio_cut() const {
+ if (cct->_conf->osd_op_queue_cut_off == "debug_random") {
+ srand(time(NULL));
+ return (rand() % 2 < 1) ? CEPH_MSG_PRIO_HIGH : CEPH_MSG_PRIO_LOW;
+ } else if (cct->_conf->osd_op_queue_cut_off == "high") {
+ return CEPH_MSG_PRIO_HIGH;
+ } else {
+ // default / catch-all is 'low'
+ return CEPH_MSG_PRIO_LOW;
+ }
+ }
+
+ public:
+ /* internal and external can point to the same messenger, they will still
+ * be cleaned up properly*/
+ OSD(CephContext *cct_,
+ ObjectStore *store_,
+ int id,
+ Messenger *internal,
+ Messenger *external,
+ Messenger *hb_front_client,
+ Messenger *hb_back_client,
+ Messenger *hb_front_server,
+ Messenger *hb_back_server,
+ Messenger *osdc_messenger,
+ MonClient *mc, const std::string &dev, const std::string &jdev);
+ ~OSD() override;
+
+ // static bits
+ static int mkfs(CephContext *cct, ObjectStore *store,
+ const string& dev,
+ uuid_d fsid, int whoami);
+ /* remove any non-user xattrs from a map of them */
+ void filter_xattrs(map<string, bufferptr>& attrs) {
+ for (map<string, bufferptr>::iterator iter = attrs.begin();
+ iter != attrs.end();
+ ) {
+ if (('_' != iter->first.at(0)) || (iter->first.size() == 1))
+ attrs.erase(iter++);
+ else ++iter;
+ }
+ }
+
+private:
+ int mon_cmd_maybe_osd_create(string &cmd);
+ int update_crush_device_class();
+ int update_crush_location();
+
+ static int write_meta(CephContext *cct,
+ ObjectStore *store,
+ uuid_d& cluster_fsid, uuid_d& osd_fsid, int whoami);
+
+ void handle_pg_scrub(struct MOSDScrub *m, PG* pg);
+ void handle_scrub(struct MOSDScrub *m);
+ void handle_osd_ping(class MOSDPing *m);
+
+ int init_op_flags(OpRequestRef& op);
+
+ int get_num_op_shards();
+ int get_num_op_threads();
+
+ float get_osd_recovery_sleep();
+
+public:
+ static int peek_meta(ObjectStore *store, string& magic,
+ uuid_d& cluster_fsid, uuid_d& osd_fsid, int& whoami);
+
+
+ // startup/shutdown
+ int pre_init();
+ int init();
+ void final_init();
+
+ int enable_disable_fuse(bool stop);
+
+ void suicide(int exitcode);
+ int shutdown();
+
+ void handle_signal(int signum);
+
+ /// check if we can throw out op from a disconnected client
+ static bool op_is_discardable(const MOSDOp *m);
+
+public:
+ OSDService service;
+ friend class OSDService;
+};
+
+
+std::ostream& operator<<(std::ostream& out, const OSD::io_queue& q);
+
+
+//compatibility of the executable
+extern const CompatSet::Feature ceph_osd_feature_compat[];
+extern const CompatSet::Feature ceph_osd_feature_ro_compat[];
+extern const CompatSet::Feature ceph_osd_feature_incompat[];
+
+#endif // CEPH_OSD_H