// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- // vim: ts=8 sw=2 smarttab /* * Ceph - scalable distributed file system * * Copyright (C) 2004-2006 Sage Weil * * This is free software; you can redistribute it and/or * modify it under the terms of the GNU Lesser General Public * License version 2.1, as published by the Free Software * Foundation. See file COPYING. * */ #ifndef CEPH_OSD_H #define CEPH_OSD_H #include "PG.h" #include "msg/Dispatcher.h" #include "common/Mutex.h" #include "common/RWLock.h" #include "common/Timer.h" #include "common/WorkQueue.h" #include "common/AsyncReserver.h" #include "common/ceph_context.h" #include "common/zipkin_trace.h" #include "mgr/MgrClient.h" #include "os/ObjectStore.h" #include "OSDCap.h" #include "auth/KeyRing.h" #include "osd/ClassHandler.h" #include "include/CompatSet.h" #include "OpRequest.h" #include "Session.h" #include "osd/PGQueueable.h" #include #include #include #include "include/memory.h" using namespace std; #include "include/unordered_map.h" #include "common/shared_cache.hpp" #include "common/simple_cache.hpp" #include "common/sharedptr_registry.hpp" #include "common/WeightedPriorityQueue.h" #include "common/PrioritizedQueue.h" #include "osd/mClockOpClassQueue.h" #include "osd/mClockClientQueue.h" #include "messages/MOSDOp.h" #include "include/Spinlock.h" #include "common/EventTrace.h" #define CEPH_OSD_PROTOCOL 10 /* cluster internal */ enum { l_osd_first = 10000, l_osd_op_wip, l_osd_op, l_osd_op_inb, l_osd_op_outb, l_osd_op_lat, l_osd_op_process_lat, l_osd_op_prepare_lat, l_osd_op_r, l_osd_op_r_outb, l_osd_op_r_lat, l_osd_op_r_lat_outb_hist, l_osd_op_r_process_lat, l_osd_op_r_prepare_lat, l_osd_op_w, l_osd_op_w_inb, l_osd_op_w_lat, l_osd_op_w_lat_inb_hist, l_osd_op_w_process_lat, l_osd_op_w_prepare_lat, l_osd_op_rw, l_osd_op_rw_inb, l_osd_op_rw_outb, l_osd_op_rw_lat, l_osd_op_rw_lat_inb_hist, l_osd_op_rw_lat_outb_hist, l_osd_op_rw_process_lat, l_osd_op_rw_prepare_lat, l_osd_op_before_queue_op_lat, l_osd_op_before_dequeue_op_lat, l_osd_sop, l_osd_sop_inb, l_osd_sop_lat, l_osd_sop_w, l_osd_sop_w_inb, l_osd_sop_w_lat, l_osd_sop_pull, l_osd_sop_pull_lat, l_osd_sop_push, l_osd_sop_push_inb, l_osd_sop_push_lat, l_osd_pull, l_osd_push, l_osd_push_outb, l_osd_rop, l_osd_loadavg, l_osd_buf, l_osd_history_alloc_bytes, l_osd_history_alloc_num, l_osd_cached_crc, l_osd_cached_crc_adjusted, l_osd_missed_crc, l_osd_pg, l_osd_pg_primary, l_osd_pg_replica, l_osd_pg_stray, l_osd_hb_to, l_osd_map, l_osd_mape, l_osd_mape_dup, l_osd_waiting_for_map, l_osd_map_cache_hit, l_osd_map_cache_miss, l_osd_map_cache_miss_low, l_osd_map_cache_miss_low_avg, l_osd_map_bl_cache_hit, l_osd_map_bl_cache_miss, l_osd_stat_bytes, l_osd_stat_bytes_used, l_osd_stat_bytes_avail, l_osd_copyfrom, l_osd_tier_promote, l_osd_tier_flush, l_osd_tier_flush_fail, l_osd_tier_try_flush, l_osd_tier_try_flush_fail, l_osd_tier_evict, l_osd_tier_whiteout, l_osd_tier_dirty, l_osd_tier_clean, l_osd_tier_delay, l_osd_tier_proxy_read, l_osd_tier_proxy_write, l_osd_agent_wake, l_osd_agent_skip, l_osd_agent_flush, l_osd_agent_evict, l_osd_object_ctx_cache_hit, l_osd_object_ctx_cache_total, l_osd_op_cache_hit, l_osd_tier_flush_lat, l_osd_tier_promote_lat, l_osd_tier_r_lat, l_osd_pg_info, l_osd_pg_fastinfo, l_osd_pg_biginfo, l_osd_last, }; // RecoveryState perf counters enum { rs_first = 20000, rs_initial_latency, rs_started_latency, rs_reset_latency, rs_start_latency, rs_primary_latency, rs_peering_latency, rs_backfilling_latency, rs_waitremotebackfillreserved_latency, rs_waitlocalbackfillreserved_latency, rs_notbackfilling_latency, rs_repnotrecovering_latency, rs_repwaitrecoveryreserved_latency, rs_repwaitbackfillreserved_latency, rs_reprecovering_latency, rs_activating_latency, rs_waitlocalrecoveryreserved_latency, rs_waitremoterecoveryreserved_latency, rs_recovering_latency, rs_recovered_latency, rs_clean_latency, rs_active_latency, rs_replicaactive_latency, rs_stray_latency, rs_getinfo_latency, rs_getlog_latency, rs_waitactingchange_latency, rs_incomplete_latency, rs_down_latency, rs_getmissing_latency, rs_waitupthru_latency, rs_notrecovering_latency, rs_last, }; class Messenger; class Message; class MonClient; class PerfCounters; class ObjectStore; class FuseStore; class OSDMap; class MLog; class Objecter; class Watch; class PrimaryLogPG; class AuthAuthorizeHandlerRegistry; class TestOpsSocketHook; struct C_CompleteSplits; struct C_OpenPGs; class LogChannel; class CephContext; typedef ceph::shared_ptr SequencerRef; class MOSDOp; class DeletingState { Mutex lock; Cond cond; enum { QUEUED, CLEARING_DIR, CLEARING_WAITING, DELETING_DIR, DELETED_DIR, CANCELED, } status; bool stop_deleting; public: const spg_t pgid; const PGRef old_pg_state; explicit DeletingState(const pair &in) : lock("DeletingState::lock"), status(QUEUED), stop_deleting(false), pgid(in.first), old_pg_state(in.second) { } /// transition status to CLEARING_WAITING bool pause_clearing() { Mutex::Locker l(lock); assert(status == CLEARING_DIR); if (stop_deleting) { status = CANCELED; cond.Signal(); return false; } status = CLEARING_WAITING; return true; } ///< @return false if we should cancel deletion /// start or resume the clearing - transition the status to CLEARING_DIR bool start_or_resume_clearing() { Mutex::Locker l(lock); assert( status == QUEUED || status == DELETED_DIR || status == CLEARING_WAITING); if (stop_deleting) { status = CANCELED; cond.Signal(); return false; } status = CLEARING_DIR; return true; } ///< @return false if we should cancel the deletion /// transition status to CLEARING_DIR bool resume_clearing() { Mutex::Locker l(lock); assert(status == CLEARING_WAITING); if (stop_deleting) { status = CANCELED; cond.Signal(); return false; } status = CLEARING_DIR; return true; } ///< @return false if we should cancel deletion /// transition status to deleting bool start_deleting() { Mutex::Locker l(lock); assert(status == CLEARING_DIR); if (stop_deleting) { status = CANCELED; cond.Signal(); return false; } status = DELETING_DIR; return true; } ///< @return false if we should cancel deletion /// signal collection removal queued void finish_deleting() { Mutex::Locker l(lock); assert(status == DELETING_DIR); status = DELETED_DIR; cond.Signal(); } /// try to halt the deletion bool try_stop_deletion() { Mutex::Locker l(lock); stop_deleting = true; /** * If we are in DELETING_DIR or CLEARING_DIR, there are in progress * operations we have to wait for before continuing on. States * CLEARING_WAITING and QUEUED indicate that the remover will check * stop_deleting before queueing any further operations. CANCELED * indicates that the remover has already halted. DELETED_DIR * indicates that the deletion has been fully queued. */ while (status == DELETING_DIR || status == CLEARING_DIR) cond.Wait(lock); return status != DELETED_DIR; } ///< @return true if we don't need to recreate the collection }; typedef ceph::shared_ptr DeletingStateRef; class OSD; class OSDService { public: OSD *osd; CephContext *cct; SharedPtrRegistry osr_registry; ceph::shared_ptr meta_osr; SharedPtrRegistry deleting_pgs; const int whoami; ObjectStore *&store; LogClient &log_client; LogChannelRef clog; PGRecoveryStats &pg_recovery_stats; private: Messenger *&cluster_messenger; Messenger *&client_messenger; public: PerfCounters *&logger; PerfCounters *&recoverystate_perf; MonClient *&monc; ThreadPool::BatchWorkQueue &peering_wq; GenContextWQ recovery_gen_wq; ClassHandler *&class_handler; void enqueue_back(spg_t pgid, PGQueueable qi); void enqueue_front(spg_t pgid, PGQueueable qi); void maybe_inject_dispatch_delay() { if (g_conf->osd_debug_inject_dispatch_delay_probability > 0) { if (rand() % 10000 < g_conf->osd_debug_inject_dispatch_delay_probability * 10000) { utime_t t; t.set_from_double(g_conf->osd_debug_inject_dispatch_delay_duration); t.sleep(); } } } private: // -- map epoch lower bound -- Mutex pg_epoch_lock; multiset pg_epochs; map pg_epoch; public: void pg_add_epoch(spg_t pgid, epoch_t epoch) { Mutex::Locker l(pg_epoch_lock); map::iterator t = pg_epoch.find(pgid); assert(t == pg_epoch.end()); pg_epoch[pgid] = epoch; pg_epochs.insert(epoch); } void pg_update_epoch(spg_t pgid, epoch_t epoch) { Mutex::Locker l(pg_epoch_lock); map::iterator t = pg_epoch.find(pgid); assert(t != pg_epoch.end()); pg_epochs.erase(pg_epochs.find(t->second)); t->second = epoch; pg_epochs.insert(epoch); } void pg_remove_epoch(spg_t pgid) { Mutex::Locker l(pg_epoch_lock); map::iterator t = pg_epoch.find(pgid); if (t != pg_epoch.end()) { pg_epochs.erase(pg_epochs.find(t->second)); pg_epoch.erase(t); } } epoch_t get_min_pg_epoch() { Mutex::Locker l(pg_epoch_lock); if (pg_epochs.empty()) return 0; else return *pg_epochs.begin(); } private: // -- superblock -- Mutex publish_lock, pre_publish_lock; // pre-publish orders before publish OSDSuperblock superblock; public: OSDSuperblock get_superblock() { Mutex::Locker l(publish_lock); return superblock; } void publish_superblock(const OSDSuperblock &block) { Mutex::Locker l(publish_lock); superblock = block; } int get_nodeid() const { return whoami; } std::atomic max_oldest_map; private: OSDMapRef osdmap; public: OSDMapRef get_osdmap() { Mutex::Locker l(publish_lock); return osdmap; } epoch_t get_osdmap_epoch() { Mutex::Locker l(publish_lock); return osdmap ? osdmap->get_epoch() : 0; } void publish_map(OSDMapRef map) { Mutex::Locker l(publish_lock); osdmap = map; } /* * osdmap - current published map * next_osdmap - pre_published map that is about to be published. * * We use the next_osdmap to send messages and initiate connections, * but only if the target is the same instance as the one in the map * epoch the current user is working from (i.e., the result is * equivalent to what is in next_osdmap). * * This allows the helpers to start ignoring osds that are about to * go down, and let OSD::handle_osd_map()/note_down_osd() mark them * down, without worrying about reopening connections from threads * working from old maps. */ private: OSDMapRef next_osdmap; Cond pre_publish_cond; public: void pre_publish_map(OSDMapRef map) { Mutex::Locker l(pre_publish_lock); next_osdmap = std::move(map); } void activate_map(); /// map epochs reserved below map map_reservations; /// gets ref to next_osdmap and registers the epoch as reserved OSDMapRef get_nextmap_reserved() { Mutex::Locker l(pre_publish_lock); if (!next_osdmap) return OSDMapRef(); epoch_t e = next_osdmap->get_epoch(); map::iterator i = map_reservations.insert(make_pair(e, 0)).first; i->second++; return next_osdmap; } /// releases reservation on map void release_map(OSDMapRef osdmap) { Mutex::Locker l(pre_publish_lock); map::iterator i = map_reservations.find(osdmap->get_epoch()); assert(i != map_reservations.end()); assert(i->second > 0); if (--(i->second) == 0) { map_reservations.erase(i); } pre_publish_cond.Signal(); } /// blocks until there are no reserved maps prior to next_osdmap void await_reserved_maps() { Mutex::Locker l(pre_publish_lock); assert(next_osdmap); while (true) { map::const_iterator i = map_reservations.cbegin(); if (i == map_reservations.cend() || i->first >= next_osdmap->get_epoch()) { break; } else { pre_publish_cond.Wait(pre_publish_lock); } } } private: Mutex peer_map_epoch_lock; map peer_map_epoch; public: epoch_t get_peer_epoch(int p); epoch_t note_peer_epoch(int p, epoch_t e); void forget_peer_epoch(int p, epoch_t e); void send_map(class MOSDMap *m, Connection *con); void send_incremental_map(epoch_t since, Connection *con, OSDMapRef& osdmap); MOSDMap *build_incremental_map_msg(epoch_t from, epoch_t to, OSDSuperblock& superblock); bool should_share_map(entity_name_t name, Connection *con, epoch_t epoch, const OSDMapRef& osdmap, const epoch_t *sent_epoch_p); void share_map(entity_name_t name, Connection *con, epoch_t epoch, OSDMapRef& osdmap, epoch_t *sent_epoch_p); void share_map_peer(int peer, Connection *con, OSDMapRef map = OSDMapRef()); ConnectionRef get_con_osd_cluster(int peer, epoch_t from_epoch); pair get_con_osd_hb(int peer, epoch_t from_epoch); // (back, front) void send_message_osd_cluster(int peer, Message *m, epoch_t from_epoch); void send_message_osd_cluster(Message *m, Connection *con) { con->send_message(m); } void send_message_osd_cluster(Message *m, const ConnectionRef& con) { con->send_message(m); } void send_message_osd_client(Message *m, Connection *con) { con->send_message(m); } void send_message_osd_client(Message *m, const ConnectionRef& con) { con->send_message(m); } entity_name_t get_cluster_msgr_name() { return cluster_messenger->get_myname(); } private: // -- scrub scheduling -- Mutex sched_scrub_lock; int scrubs_pending; int scrubs_active; public: struct ScrubJob { CephContext* cct; /// pg to be scrubbed spg_t pgid; /// a time scheduled for scrub. but the scrub could be delayed if system /// load is too high or it fails to fall in the scrub hours utime_t sched_time; /// the hard upper bound of scrub time utime_t deadline; ScrubJob() : cct(nullptr) {} explicit ScrubJob(CephContext* cct, const spg_t& pg, const utime_t& timestamp, double pool_scrub_min_interval = 0, double pool_scrub_max_interval = 0, bool must = true); /// order the jobs by sched_time bool operator<(const ScrubJob& rhs) const; }; set sched_scrub_pg; /// @returns the scrub_reg_stamp used for unregister the scrub job utime_t reg_pg_scrub(spg_t pgid, utime_t t, double pool_scrub_min_interval, double pool_scrub_max_interval, bool must) { ScrubJob scrub(cct, pgid, t, pool_scrub_min_interval, pool_scrub_max_interval, must); Mutex::Locker l(sched_scrub_lock); sched_scrub_pg.insert(scrub); return scrub.sched_time; } void unreg_pg_scrub(spg_t pgid, utime_t t) { Mutex::Locker l(sched_scrub_lock); size_t removed = sched_scrub_pg.erase(ScrubJob(cct, pgid, t)); assert(removed); } bool first_scrub_stamp(ScrubJob *out) { Mutex::Locker l(sched_scrub_lock); if (sched_scrub_pg.empty()) return false; set::iterator iter = sched_scrub_pg.begin(); *out = *iter; return true; } bool next_scrub_stamp(const ScrubJob& next, ScrubJob *out) { Mutex::Locker l(sched_scrub_lock); if (sched_scrub_pg.empty()) return false; set::const_iterator iter = sched_scrub_pg.lower_bound(next); if (iter == sched_scrub_pg.cend()) return false; ++iter; if (iter == sched_scrub_pg.cend()) return false; *out = *iter; return true; } void dumps_scrub(Formatter *f) { assert(f != nullptr); Mutex::Locker l(sched_scrub_lock); f->open_array_section("scrubs"); for (const auto &i: sched_scrub_pg) { f->open_object_section("scrub"); f->dump_stream("pgid") << i.pgid; f->dump_stream("sched_time") << i.sched_time; f->dump_stream("deadline") << i.deadline; f->dump_bool("forced", i.sched_time == i.deadline); f->close_section(); } f->close_section(); } bool can_inc_scrubs_pending(); bool inc_scrubs_pending(); void inc_scrubs_active(bool reserved); void dec_scrubs_pending(); void dec_scrubs_active(); void reply_op_error(OpRequestRef op, int err); void reply_op_error(OpRequestRef op, int err, eversion_t v, version_t uv); void handle_misdirected_op(PG *pg, OpRequestRef op); private: // -- agent shared state -- Mutex agent_lock; Cond agent_cond; map > agent_queue; set::iterator agent_queue_pos; bool agent_valid_iterator; int agent_ops; int flush_mode_high_count; //once have one pg with FLUSH_MODE_HIGH then flush objects with high speed set agent_oids; bool agent_active; struct AgentThread : public Thread { OSDService *osd; explicit AgentThread(OSDService *o) : osd(o) {} void *entry() override { osd->agent_entry(); return NULL; } } agent_thread; bool agent_stop_flag; Mutex agent_timer_lock; SafeTimer agent_timer; public: void agent_entry(); void agent_stop(); void _enqueue(PG *pg, uint64_t priority) { if (!agent_queue.empty() && agent_queue.rbegin()->first < priority) agent_valid_iterator = false; // inserting higher-priority queue set& nq = agent_queue[priority]; if (nq.empty()) agent_cond.Signal(); nq.insert(pg); } void _dequeue(PG *pg, uint64_t old_priority) { set& oq = agent_queue[old_priority]; set::iterator p = oq.find(pg); assert(p != oq.end()); if (p == agent_queue_pos) ++agent_queue_pos; oq.erase(p); if (oq.empty()) { if (agent_queue.rbegin()->first == old_priority) agent_valid_iterator = false; agent_queue.erase(old_priority); } } /// enable agent for a pg void agent_enable_pg(PG *pg, uint64_t priority) { Mutex::Locker l(agent_lock); _enqueue(pg, priority); } /// adjust priority for an enagled pg void agent_adjust_pg(PG *pg, uint64_t old_priority, uint64_t new_priority) { Mutex::Locker l(agent_lock); assert(new_priority != old_priority); _enqueue(pg, new_priority); _dequeue(pg, old_priority); } /// disable agent for a pg void agent_disable_pg(PG *pg, uint64_t old_priority) { Mutex::Locker l(agent_lock); _dequeue(pg, old_priority); } /// note start of an async (evict) op void agent_start_evict_op() { Mutex::Locker l(agent_lock); ++agent_ops; } /// note finish or cancellation of an async (evict) op void agent_finish_evict_op() { Mutex::Locker l(agent_lock); assert(agent_ops > 0); --agent_ops; agent_cond.Signal(); } /// note start of an async (flush) op void agent_start_op(const hobject_t& oid) { Mutex::Locker l(agent_lock); ++agent_ops; assert(agent_oids.count(oid) == 0); agent_oids.insert(oid); } /// note finish or cancellation of an async (flush) op void agent_finish_op(const hobject_t& oid) { Mutex::Locker l(agent_lock); assert(agent_ops > 0); --agent_ops; assert(agent_oids.count(oid) == 1); agent_oids.erase(oid); agent_cond.Signal(); } /// check if we are operating on an object bool agent_is_active_oid(const hobject_t& oid) { Mutex::Locker l(agent_lock); return agent_oids.count(oid); } /// get count of active agent ops int agent_get_num_ops() { Mutex::Locker l(agent_lock); return agent_ops; } void agent_inc_high_count() { Mutex::Locker l(agent_lock); flush_mode_high_count ++; } void agent_dec_high_count() { Mutex::Locker l(agent_lock); flush_mode_high_count --; } private: /// throttle promotion attempts std::atomic_uint promote_probability_millis{1000}; ///< probability thousands. one word. PromoteCounter promote_counter; utime_t last_recalibrate; unsigned long promote_max_objects, promote_max_bytes; public: bool promote_throttle() { // NOTE: lockless! we rely on the probability being a single word. promote_counter.attempt(); if ((unsigned)rand() % 1000 > promote_probability_millis) return true; // yes throttle (no promote) if (promote_max_objects && promote_counter.objects > promote_max_objects) return true; // yes throttle if (promote_max_bytes && promote_counter.bytes > promote_max_bytes) return true; // yes throttle return false; // no throttle (promote) } void promote_finish(uint64_t bytes) { promote_counter.finish(bytes); } void promote_throttle_recalibrate(); // -- Objecter, for tiering reads/writes from/to other OSDs -- Objecter *objecter; Finisher objecter_finisher; // -- Watch -- Mutex watch_lock; SafeTimer watch_timer; uint64_t next_notif_id; uint64_t get_next_id(epoch_t cur_epoch) { Mutex::Locker l(watch_lock); return (((uint64_t)cur_epoch) << 32) | ((uint64_t)(next_notif_id++)); } // -- Recovery/Backfill Request Scheduling -- Mutex recovery_request_lock; SafeTimer recovery_request_timer; // For async recovery sleep bool recovery_needs_sleep = true; utime_t recovery_schedule_time = utime_t(); Mutex recovery_sleep_lock; SafeTimer recovery_sleep_timer; // -- tids -- // for ops i issue std::atomic_uint last_tid{0}; ceph_tid_t get_tid() { return (ceph_tid_t)last_tid++; } // -- backfill_reservation -- Finisher reserver_finisher; AsyncReserver local_reserver; AsyncReserver remote_reserver; // -- pg_temp -- private: Mutex pg_temp_lock; map > pg_temp_wanted; map > pg_temp_pending; void _sent_pg_temp(); public: void queue_want_pg_temp(pg_t pgid, vector& want); void remove_want_pg_temp(pg_t pgid); void requeue_pg_temp(); void send_pg_temp(); void send_pg_created(pg_t pgid); void queue_for_peering(PG *pg); Mutex snap_sleep_lock; SafeTimer snap_sleep_timer; Mutex scrub_sleep_lock; SafeTimer scrub_sleep_timer; AsyncReserver snap_reserver; void queue_for_snap_trim(PG *pg); void queue_for_scrub(PG *pg, bool with_high_priority) { unsigned scrub_queue_priority = pg->scrubber.priority; if (with_high_priority && scrub_queue_priority < cct->_conf->osd_client_op_priority) { scrub_queue_priority = cct->_conf->osd_client_op_priority; } enqueue_back( pg->info.pgid, PGQueueable( PGScrub(pg->get_osdmap()->get_epoch()), cct->_conf->osd_scrub_cost, scrub_queue_priority, ceph_clock_now(), entity_inst_t(), pg->get_osdmap()->get_epoch())); } private: // -- pg recovery and associated throttling -- Mutex recovery_lock; list > awaiting_throttle; utime_t defer_recovery_until; uint64_t recovery_ops_active; uint64_t recovery_ops_reserved; bool recovery_paused; #ifdef DEBUG_RECOVERY_OIDS map > recovery_oids; #endif bool _recover_now(uint64_t *available_pushes); void _maybe_queue_recovery(); void _queue_for_recovery( pair p, uint64_t reserved_pushes) { assert(recovery_lock.is_locked_by_me()); enqueue_back( p.second->info.pgid, PGQueueable( PGRecovery(p.first, reserved_pushes), cct->_conf->osd_recovery_cost, cct->_conf->osd_recovery_priority, ceph_clock_now(), entity_inst_t(), p.first)); } public: void start_recovery_op(PG *pg, const hobject_t& soid); void finish_recovery_op(PG *pg, const hobject_t& soid, bool dequeue); bool is_recovery_active(); void release_reserved_pushes(uint64_t pushes) { Mutex::Locker l(recovery_lock); assert(recovery_ops_reserved >= pushes); recovery_ops_reserved -= pushes; _maybe_queue_recovery(); } void defer_recovery(float defer_for) { defer_recovery_until = ceph_clock_now(); defer_recovery_until += defer_for; } void pause_recovery() { Mutex::Locker l(recovery_lock); recovery_paused = true; } bool recovery_is_paused() { Mutex::Locker l(recovery_lock); return recovery_paused; } void unpause_recovery() { Mutex::Locker l(recovery_lock); recovery_paused = false; _maybe_queue_recovery(); } void kick_recovery_queue() { Mutex::Locker l(recovery_lock); _maybe_queue_recovery(); } void clear_queued_recovery(PG *pg) { Mutex::Locker l(recovery_lock); for (list >::iterator i = awaiting_throttle.begin(); i != awaiting_throttle.end(); ) { if (i->second.get() == pg) { awaiting_throttle.erase(i); return; } else { ++i; } } } // delayed pg activation void queue_for_recovery(PG *pg) { Mutex::Locker l(recovery_lock); if (pg->get_state() & (PG_STATE_FORCED_RECOVERY | PG_STATE_FORCED_BACKFILL)) { awaiting_throttle.push_front(make_pair(pg->get_osdmap()->get_epoch(), pg)); } else { awaiting_throttle.push_back(make_pair(pg->get_osdmap()->get_epoch(), pg)); } _maybe_queue_recovery(); } void queue_recovery_after_sleep(PG *pg, epoch_t queued, uint64_t reserved_pushes) { Mutex::Locker l(recovery_lock); _queue_for_recovery(make_pair(queued, pg), reserved_pushes); } void adjust_pg_priorities(const vector& pgs, int newflags); // osd map cache (past osd maps) Mutex map_cache_lock; SharedLRU map_cache; SimpleLRU map_bl_cache; SimpleLRU map_bl_inc_cache; OSDMapRef try_get_map(epoch_t e); OSDMapRef get_map(epoch_t e) { OSDMapRef ret(try_get_map(e)); assert(ret); return ret; } OSDMapRef add_map(OSDMap *o) { Mutex::Locker l(map_cache_lock); return _add_map(o); } OSDMapRef _add_map(OSDMap *o); void add_map_bl(epoch_t e, bufferlist& bl) { Mutex::Locker l(map_cache_lock); return _add_map_bl(e, bl); } void pin_map_bl(epoch_t e, bufferlist &bl); void _add_map_bl(epoch_t e, bufferlist& bl); bool get_map_bl(epoch_t e, bufferlist& bl) { Mutex::Locker l(map_cache_lock); return _get_map_bl(e, bl); } bool _get_map_bl(epoch_t e, bufferlist& bl); void add_map_inc_bl(epoch_t e, bufferlist& bl) { Mutex::Locker l(map_cache_lock); return _add_map_inc_bl(e, bl); } void pin_map_inc_bl(epoch_t e, bufferlist &bl); void _add_map_inc_bl(epoch_t e, bufferlist& bl); bool get_inc_map_bl(epoch_t e, bufferlist& bl); void clear_map_bl_cache_pins(epoch_t e); void need_heartbeat_peer_update(); void pg_stat_queue_enqueue(PG *pg); void pg_stat_queue_dequeue(PG *pg); void init(); void final_init(); void start_shutdown(); void shutdown_reserver(); void shutdown(); private: // split Mutex in_progress_split_lock; map pending_splits; // child -> parent map > rev_pending_splits; // parent -> [children] set in_progress_splits; // child public: void _start_split(spg_t parent, const set &children); void start_split(spg_t parent, const set &children) { Mutex::Locker l(in_progress_split_lock); return _start_split(parent, children); } void mark_split_in_progress(spg_t parent, const set &pgs); void complete_split(const set &pgs); void cancel_pending_splits_for_parent(spg_t parent); void _cancel_pending_splits_for_parent(spg_t parent); bool splitting(spg_t pgid); void expand_pg_num(OSDMapRef old_map, OSDMapRef new_map); void _maybe_split_pgid(OSDMapRef old_map, OSDMapRef new_map, spg_t pgid); void init_splits_between(spg_t pgid, OSDMapRef frommap, OSDMapRef tomap); // -- stats -- Mutex stat_lock; osd_stat_t osd_stat; uint32_t seq = 0; void update_osd_stat(vector& hb_peers); osd_stat_t set_osd_stat(const struct store_statfs_t &stbuf, vector& hb_peers, int num_pgs); osd_stat_t get_osd_stat() { Mutex::Locker l(stat_lock); ++seq; osd_stat.up_from = up_epoch; osd_stat.seq = ((uint64_t)osd_stat.up_from << 32) + seq; return osd_stat; } uint64_t get_osd_stat_seq() { Mutex::Locker l(stat_lock); return osd_stat.seq; } // -- OSD Full Status -- private: friend TestOpsSocketHook; mutable Mutex full_status_lock; enum s_names { INVALID = -1, NONE, NEARFULL, BACKFILLFULL, FULL, FAILSAFE } cur_state; // ascending const char *get_full_state_name(s_names s) const { switch (s) { case NONE: return "none"; case NEARFULL: return "nearfull"; case BACKFILLFULL: return "backfillfull"; case FULL: return "full"; case FAILSAFE: return "failsafe"; default: return "???"; } } s_names get_full_state(string type) const { if (type == "none") return NONE; else if (type == "failsafe") return FAILSAFE; else if (type == "full") return FULL; else if (type == "backfillfull") return BACKFILLFULL; else if (type == "nearfull") return NEARFULL; else return INVALID; } double cur_ratio; ///< current utilization mutable int64_t injectfull = 0; s_names injectfull_state = NONE; float get_failsafe_full_ratio(); void check_full_status(float ratio); bool _check_full(s_names type, ostream &ss) const; public: bool check_failsafe_full(ostream &ss) const; bool check_full(ostream &ss) const; bool check_backfill_full(ostream &ss) const; bool check_nearfull(ostream &ss) const; bool is_failsafe_full() const; bool is_full() const; bool is_backfillfull() const; bool is_nearfull() const; bool need_fullness_update(); ///< osdmap state needs update void set_injectfull(s_names type, int64_t count); bool check_osdmap_full(const set &missing_on); // -- epochs -- private: mutable Mutex epoch_lock; // protects access to boot_epoch, up_epoch, bind_epoch epoch_t boot_epoch; // _first_ epoch we were marked up (after this process started) epoch_t up_epoch; // _most_recent_ epoch we were marked up epoch_t bind_epoch; // epoch we last did a bind to new ip:ports public: /** * Retrieve the boot_, up_, and bind_ epochs the OSD has set. The params * can be NULL if you don't care about them. */ void retrieve_epochs(epoch_t *_boot_epoch, epoch_t *_up_epoch, epoch_t *_bind_epoch) const; /** * Set the boot, up, and bind epochs. Any NULL params will not be set. */ void set_epochs(const epoch_t *_boot_epoch, const epoch_t *_up_epoch, const epoch_t *_bind_epoch); epoch_t get_boot_epoch() const { epoch_t ret; retrieve_epochs(&ret, NULL, NULL); return ret; } epoch_t get_up_epoch() const { epoch_t ret; retrieve_epochs(NULL, &ret, NULL); return ret; } epoch_t get_bind_epoch() const { epoch_t ret; retrieve_epochs(NULL, NULL, &ret); return ret; } void request_osdmap_update(epoch_t e); // -- stopping -- Mutex is_stopping_lock; Cond is_stopping_cond; enum { NOT_STOPPING, PREPARING_TO_STOP, STOPPING }; std::atomic_int state{NOT_STOPPING}; int get_state() { return state; } void set_state(int s) { state = s; } bool is_stopping() const { return state == STOPPING; } bool is_preparing_to_stop() const { return state == PREPARING_TO_STOP; } bool prepare_to_stop(); void got_stop_ack(); #ifdef PG_DEBUG_REFS Mutex pgid_lock; map pgid_tracker; map live_pgs; void add_pgid(spg_t pgid, PG *pg); void remove_pgid(spg_t pgid, PG *pg); void dump_live_pgids(); #endif explicit OSDService(OSD *osd); ~OSDService(); }; class OSD : public Dispatcher, public md_config_obs_t { /** OSD **/ Mutex osd_lock; // global lock SafeTimer tick_timer; // safe timer (osd_lock) // Tick timer for those stuff that do not need osd_lock Mutex tick_timer_lock; SafeTimer tick_timer_without_osd_lock; public: // config observer bits const char** get_tracked_conf_keys() const override; void handle_conf_change(const struct md_config_t *conf, const std::set &changed) override; void update_log_config(); void check_config(); protected: static const double OSD_TICK_INTERVAL; // tick interval for tick_timer and tick_timer_without_osd_lock AuthAuthorizeHandlerRegistry *authorize_handler_cluster_registry; AuthAuthorizeHandlerRegistry *authorize_handler_service_registry; Messenger *cluster_messenger; Messenger *client_messenger; Messenger *objecter_messenger; MonClient *monc; // check the "monc helpers" list before accessing directly MgrClient mgrc; PerfCounters *logger; PerfCounters *recoverystate_perf; ObjectStore *store; #ifdef HAVE_LIBFUSE FuseStore *fuse_store = nullptr; #endif LogClient log_client; LogChannelRef clog; int whoami; std::string dev_path, journal_path; bool store_is_rotational = true; bool journal_is_rotational = true; ZTracer::Endpoint trace_endpoint; void create_logger(); void create_recoverystate_perf(); void tick(); void tick_without_osd_lock(); void _dispatch(Message *m); void dispatch_op(OpRequestRef op); void check_osdmap_features(ObjectStore *store); // asok friend class OSDSocketHook; class OSDSocketHook *asok_hook; bool asok_command(string admin_command, cmdmap_t& cmdmap, string format, ostream& ss); public: ClassHandler *class_handler = nullptr; int get_nodeid() { return whoami; } static ghobject_t get_osdmap_pobject_name(epoch_t epoch) { char foo[20]; snprintf(foo, sizeof(foo), "osdmap.%d", epoch); return ghobject_t(hobject_t(sobject_t(object_t(foo), 0))); } static ghobject_t get_inc_osdmap_pobject_name(epoch_t epoch) { char foo[22]; snprintf(foo, sizeof(foo), "inc_osdmap.%d", epoch); return ghobject_t(hobject_t(sobject_t(object_t(foo), 0))); } static ghobject_t make_snapmapper_oid() { return ghobject_t(hobject_t( sobject_t( object_t("snapmapper"), 0))); } static ghobject_t make_pg_log_oid(spg_t pg) { stringstream ss; ss << "pglog_" << pg; string s; getline(ss, s); return ghobject_t(hobject_t(sobject_t(object_t(s.c_str()), 0))); } static ghobject_t make_pg_biginfo_oid(spg_t pg) { stringstream ss; ss << "pginfo_" << pg; string s; getline(ss, s); return ghobject_t(hobject_t(sobject_t(object_t(s.c_str()), 0))); } static ghobject_t make_infos_oid() { hobject_t oid(sobject_t("infos", CEPH_NOSNAP)); return ghobject_t(oid); } static void recursive_remove_collection(CephContext* cct, ObjectStore *store, spg_t pgid, coll_t tmp); /** * get_osd_initial_compat_set() * * Get the initial feature set for this OSD. Features * here are automatically upgraded. * * Return value: Initial osd CompatSet */ static CompatSet get_osd_initial_compat_set(); /** * get_osd_compat_set() * * Get all features supported by this OSD * * Return value: CompatSet of all supported features */ static CompatSet get_osd_compat_set(); private: class C_Tick; class C_Tick_WithoutOSDLock; // -- superblock -- OSDSuperblock superblock; void write_superblock(); void write_superblock(ObjectStore::Transaction& t); int read_superblock(); void clear_temp_objects(); CompatSet osd_compat; // -- state -- public: typedef enum { STATE_INITIALIZING = 1, STATE_PREBOOT, STATE_BOOTING, STATE_ACTIVE, STATE_STOPPING, STATE_WAITING_FOR_HEALTHY } osd_state_t; static const char *get_state_name(int s) { switch (s) { case STATE_INITIALIZING: return "initializing"; case STATE_PREBOOT: return "preboot"; case STATE_BOOTING: return "booting"; case STATE_ACTIVE: return "active"; case STATE_STOPPING: return "stopping"; case STATE_WAITING_FOR_HEALTHY: return "waiting_for_healthy"; default: return "???"; } } private: std::atomic_int state{STATE_INITIALIZING}; bool waiting_for_luminous_mons = false; public: int get_state() const { return state; } void set_state(int s) { state = s; } bool is_initializing() const { return state == STATE_INITIALIZING; } bool is_preboot() const { return state == STATE_PREBOOT; } bool is_booting() const { return state == STATE_BOOTING; } bool is_active() const { return state == STATE_ACTIVE; } bool is_stopping() const { return state == STATE_STOPPING; } bool is_waiting_for_healthy() const { return state == STATE_WAITING_FOR_HEALTHY; } private: ThreadPool peering_tp; ShardedThreadPool osd_op_tp; ThreadPool disk_tp; ThreadPool command_tp; void set_disk_tp_priority(); void get_latest_osdmap(); // -- sessions -- private: void dispatch_session_waiting(Session *session, OSDMapRef osdmap); void maybe_share_map(Session *session, OpRequestRef op, OSDMapRef osdmap); Mutex session_waiting_lock; set session_waiting_for_map; /// Caller assumes refs for included Sessions void get_sessions_waiting_for_map(set *out) { Mutex::Locker l(session_waiting_lock); out->swap(session_waiting_for_map); } void register_session_waiting_on_map(Session *session) { Mutex::Locker l(session_waiting_lock); if (session_waiting_for_map.insert(session).second) { session->get(); } } void clear_session_waiting_on_map(Session *session) { Mutex::Locker l(session_waiting_lock); set::iterator i = session_waiting_for_map.find(session); if (i != session_waiting_for_map.end()) { (*i)->put(); session_waiting_for_map.erase(i); } } void dispatch_sessions_waiting_on_map() { set sessions_to_check; get_sessions_waiting_for_map(&sessions_to_check); for (set::iterator i = sessions_to_check.begin(); i != sessions_to_check.end(); sessions_to_check.erase(i++)) { (*i)->session_dispatch_lock.Lock(); dispatch_session_waiting(*i, osdmap); (*i)->session_dispatch_lock.Unlock(); (*i)->put(); } } void session_handle_reset(Session *session) { Mutex::Locker l(session->session_dispatch_lock); clear_session_waiting_on_map(session); session->clear_backoffs(); /* Messages have connection refs, we need to clear the * connection->session->message->connection * cycles which result. * Bug #12338 */ session->waiting_on_map.clear_and_dispose(TrackedOp::Putter()); } private: /** * @defgroup monc helpers * @{ * Right now we only have the one */ /** * Ask the Monitors for a sequence of OSDMaps. * * @param epoch The epoch to start with when replying * @param force_request True if this request forces a new subscription to * the monitors; false if an outstanding request that encompasses it is * sufficient. */ void osdmap_subscribe(version_t epoch, bool force_request); /** @} monc helpers */ Mutex osdmap_subscribe_lock; epoch_t latest_subscribed_epoch{0}; // -- heartbeat -- /// information about a heartbeat peer struct HeartbeatInfo { int peer; ///< peer ConnectionRef con_front; ///< peer connection (front) ConnectionRef con_back; ///< peer connection (back) utime_t first_tx; ///< time we sent our first ping request utime_t last_tx; ///< last time we sent a ping request utime_t last_rx_front; ///< last time we got a ping reply on the front side utime_t last_rx_back; ///< last time we got a ping reply on the back side epoch_t epoch; ///< most recent epoch we wanted this peer bool is_unhealthy(utime_t cutoff) const { return ! ((last_rx_front > cutoff || (last_rx_front == utime_t() && (last_tx == utime_t() || first_tx > cutoff))) && (last_rx_back > cutoff || (last_rx_back == utime_t() && (last_tx == utime_t() || first_tx > cutoff)))); } bool is_healthy(utime_t cutoff) const { return last_rx_front > cutoff && last_rx_back > cutoff; } }; /// state attached to outgoing heartbeat connections struct HeartbeatSession : public RefCountedObject { int peer; explicit HeartbeatSession(int p) : peer(p) {} }; Mutex heartbeat_lock; map debug_heartbeat_drops_remaining; Cond heartbeat_cond; bool heartbeat_stop; std::atomic_bool heartbeat_need_update; map heartbeat_peers; ///< map of osd id to HeartbeatInfo utime_t last_mon_heartbeat; Messenger *hb_front_client_messenger; Messenger *hb_back_client_messenger; Messenger *hb_front_server_messenger; Messenger *hb_back_server_messenger; utime_t last_heartbeat_resample; ///< last time we chose random peers in waiting-for-healthy state double daily_loadavg; void _add_heartbeat_peer(int p); void _remove_heartbeat_peer(int p); bool heartbeat_reset(Connection *con); void maybe_update_heartbeat_peers(); void reset_heartbeat_peers(); bool heartbeat_peers_need_update() { return heartbeat_need_update.load(); } void heartbeat_set_peers_need_update() { heartbeat_need_update.store(true); } void heartbeat_clear_peers_need_update() { heartbeat_need_update.store(false); } void heartbeat(); void heartbeat_check(); void heartbeat_entry(); void need_heartbeat_peer_update(); void heartbeat_kick() { Mutex::Locker l(heartbeat_lock); heartbeat_cond.Signal(); } struct T_Heartbeat : public Thread { OSD *osd; explicit T_Heartbeat(OSD *o) : osd(o) {} void *entry() override { osd->heartbeat_entry(); return 0; } } heartbeat_thread; public: bool heartbeat_dispatch(Message *m); struct HeartbeatDispatcher : public Dispatcher { OSD *osd; explicit HeartbeatDispatcher(OSD *o) : Dispatcher(o->cct), osd(o) {} bool ms_can_fast_dispatch_any() const override { return true; } bool ms_can_fast_dispatch(const Message *m) const override { switch (m->get_type()) { case CEPH_MSG_PING: case MSG_OSD_PING: return true; default: return false; } } void ms_fast_dispatch(Message *m) override { osd->heartbeat_dispatch(m); } bool ms_dispatch(Message *m) override { return osd->heartbeat_dispatch(m); } bool ms_handle_reset(Connection *con) override { return osd->heartbeat_reset(con); } void ms_handle_remote_reset(Connection *con) override {} bool ms_handle_refused(Connection *con) override { return osd->ms_handle_refused(con); } bool ms_verify_authorizer(Connection *con, int peer_type, int protocol, bufferlist& authorizer_data, bufferlist& authorizer_reply, bool& isvalid, CryptoKey& session_key) override { isvalid = true; return true; } } heartbeat_dispatcher; private: // -- waiters -- list finished; void take_waiters(list& ls) { assert(osd_lock.is_locked()); finished.splice(finished.end(), ls); } void do_waiters(); // -- op tracking -- OpTracker op_tracker; void check_ops_in_flight(); void test_ops(std::string command, std::string args, ostream& ss); friend class TestOpsSocketHook; TestOpsSocketHook *test_ops_hook; friend struct C_CompleteSplits; friend struct C_OpenPGs; // -- op queue -- enum class io_queue { prioritized, weightedpriority, mclock_opclass, mclock_client, }; friend std::ostream& operator<<(std::ostream& out, const OSD::io_queue& q); const io_queue op_queue; const unsigned int op_prio_cutoff; /* * The ordered op delivery chain is: * * fast dispatch -> pqueue back * pqueue front <-> to_process back * to_process front -> RunVis(item) * <- queue_front() * * The pqueue is per-shard, and to_process is per pg_slot. Items can be * pushed back up into to_process and/or pqueue while order is preserved. * * Multiple worker threads can operate on each shard. * * Under normal circumstances, num_running == to_proces.size(). There are * two times when that is not true: (1) when waiting_for_pg == true and * to_process is accumulating requests that are waiting for the pg to be * instantiated; in that case they will all get requeued together by * wake_pg_waiters, and (2) when wake_pg_waiters just ran, waiting_for_pg * and already requeued the items. */ friend class PGQueueable; class ShardedOpWQ : public ShardedThreadPool::ShardedWQ> { struct ShardData { Mutex sdata_lock; Cond sdata_cond; Mutex sdata_op_ordering_lock; ///< protects all members below OSDMapRef waiting_for_pg_osdmap; struct pg_slot { PGRef pg; ///< cached pg reference [optional] list to_process; ///< order items for this slot int num_running = 0; ///< _process threads doing pg lookup/lock /// true if pg does/did not exist. if so all new items go directly to /// to_process. cleared by prune_pg_waiters. bool waiting_for_pg = false; /// incremented by wake_pg_waiters; indicates racing _process threads /// should bail out (their op has been requeued) uint64_t requeue_seq = 0; }; /// map of slots for each spg_t. maintains ordering of items dequeued /// from pqueue while _process thread drops shard lock to acquire the /// pg lock. slots are removed only by prune_pg_waiters. unordered_map pg_slots; /// priority queue std::unique_ptr, entity_inst_t>> pqueue; void _enqueue_front(pair item, unsigned cutoff) { unsigned priority = item.second.get_priority(); unsigned cost = item.second.get_cost(); if (priority >= cutoff) pqueue->enqueue_strict_front( item.second.get_owner(), priority, item); else pqueue->enqueue_front( item.second.get_owner(), priority, cost, item); } ShardData( string lock_name, string ordering_lock, uint64_t max_tok_per_prio, uint64_t min_cost, CephContext *cct, io_queue opqueue) : sdata_lock(lock_name.c_str(), false, true, false, cct), sdata_op_ordering_lock(ordering_lock.c_str(), false, true, false, cct) { if (opqueue == io_queue::weightedpriority) { pqueue = std::unique_ptr ,entity_inst_t>>( new WeightedPriorityQueue,entity_inst_t>( max_tok_per_prio, min_cost)); } else if (opqueue == io_queue::prioritized) { pqueue = std::unique_ptr ,entity_inst_t>>( new PrioritizedQueue,entity_inst_t>( max_tok_per_prio, min_cost)); } else if (opqueue == io_queue::mclock_opclass) { pqueue = std::unique_ptr (new ceph::mClockOpClassQueue(cct)); } else if (opqueue == io_queue::mclock_client) { pqueue = std::unique_ptr (new ceph::mClockClientQueue(cct)); } } }; // struct ShardData vector shard_list; OSD *osd; uint32_t num_shards; public: ShardedOpWQ(uint32_t pnum_shards, OSD *o, time_t ti, time_t si, ShardedThreadPool* tp) : ShardedThreadPool::ShardedWQ>(ti, si, tp), osd(o), num_shards(pnum_shards) { for (uint32_t i = 0; i < num_shards; i++) { char lock_name[32] = {0}; snprintf(lock_name, sizeof(lock_name), "%s.%d", "OSD:ShardedOpWQ:", i); char order_lock[32] = {0}; snprintf(order_lock, sizeof(order_lock), "%s.%d", "OSD:ShardedOpWQ:order:", i); ShardData* one_shard = new ShardData( lock_name, order_lock, osd->cct->_conf->osd_op_pq_max_tokens_per_priority, osd->cct->_conf->osd_op_pq_min_cost, osd->cct, osd->op_queue); shard_list.push_back(one_shard); } } ~ShardedOpWQ() override { while (!shard_list.empty()) { delete shard_list.back(); shard_list.pop_back(); } } /// wake any pg waiters after a PG is created/instantiated void wake_pg_waiters(spg_t pgid); /// prune ops (and possiblye pg_slots) for pgs that shouldn't be here void prune_pg_waiters(OSDMapRef osdmap, int whoami); /// clear cached PGRef on pg deletion void clear_pg_pointer(spg_t pgid); /// clear pg_slots on shutdown void clear_pg_slots(); /// try to do some work void _process(uint32_t thread_index, heartbeat_handle_d *hb) override; /// enqueue a new item void _enqueue(pair item) override; /// requeue an old item (at the front of the line) void _enqueue_front(pair item) override; void return_waiting_threads() override { for(uint32_t i = 0; i < num_shards; i++) { ShardData* sdata = shard_list[i]; assert (NULL != sdata); sdata->sdata_lock.Lock(); sdata->sdata_cond.Signal(); sdata->sdata_lock.Unlock(); } } void dump(Formatter *f) { for(uint32_t i = 0; i < num_shards; i++) { ShardData* sdata = shard_list[i]; char lock_name[32] = {0}; snprintf(lock_name, sizeof(lock_name), "%s%d", "OSD:ShardedOpWQ:", i); assert (NULL != sdata); sdata->sdata_op_ordering_lock.Lock(); f->open_object_section(lock_name); sdata->pqueue->dump(f); f->close_section(); sdata->sdata_op_ordering_lock.Unlock(); } } /// Must be called on ops queued back to front struct Pred { spg_t pgid; list *out_ops; uint64_t reserved_pushes_to_free; Pred(spg_t pg, list *out_ops = 0) : pgid(pg), out_ops(out_ops), reserved_pushes_to_free(0) {} void accumulate(const PGQueueable &op) { reserved_pushes_to_free += op.get_reserved_pushes(); if (out_ops) { boost::optional mop = op.maybe_get_op(); if (mop) out_ops->push_front(*mop); } } bool operator()(const pair &op) { if (op.first == pgid) { accumulate(op.second); return true; } else { return false; } } uint64_t get_reserved_pushes_to_free() const { return reserved_pushes_to_free; } }; bool is_shard_empty(uint32_t thread_index) override { uint32_t shard_index = thread_index % num_shards; ShardData* sdata = shard_list[shard_index]; assert(NULL != sdata); Mutex::Locker l(sdata->sdata_op_ordering_lock); return sdata->pqueue->empty(); } } op_shardedwq; void enqueue_op(spg_t pg, OpRequestRef& op, epoch_t epoch); void dequeue_op( PGRef pg, OpRequestRef op, ThreadPool::TPHandle &handle); // -- peering queue -- struct PeeringWQ : public ThreadPool::BatchWorkQueue { list peering_queue; OSD *osd; set in_use; PeeringWQ(OSD *o, time_t ti, time_t si, ThreadPool *tp) : ThreadPool::BatchWorkQueue( "OSD::PeeringWQ", ti, si, tp), osd(o) {} void _dequeue(PG *pg) override { for (list::iterator i = peering_queue.begin(); i != peering_queue.end(); ) { if (*i == pg) { peering_queue.erase(i++); pg->put("PeeringWQ"); } else { ++i; } } } bool _enqueue(PG *pg) override { pg->get("PeeringWQ"); peering_queue.push_back(pg); return true; } bool _empty() override { return peering_queue.empty(); } void _dequeue(list *out) override; void _process( const list &pgs, ThreadPool::TPHandle &handle) override { assert(!pgs.empty()); osd->process_peering_events(pgs, handle); for (list::const_iterator i = pgs.begin(); i != pgs.end(); ++i) { (*i)->put("PeeringWQ"); } } void _process_finish(const list &pgs) override { for (list::const_iterator i = pgs.begin(); i != pgs.end(); ++i) { in_use.erase(*i); } } void _clear() override { assert(peering_queue.empty()); } } peering_wq; void process_peering_events( const list &pg, ThreadPool::TPHandle &handle); friend class PG; friend class PrimaryLogPG; protected: // -- osd map -- OSDMapRef osdmap; OSDMapRef get_osdmap() { return osdmap; } epoch_t get_osdmap_epoch() const { return osdmap ? osdmap->get_epoch() : 0; } utime_t had_map_since; RWLock map_lock; list waiting_for_osdmap; deque osd_markdown_log; friend struct send_map_on_destruct; void wait_for_new_map(OpRequestRef op); void handle_osd_map(class MOSDMap *m); void _committed_osd_maps(epoch_t first, epoch_t last, class MOSDMap *m); void trim_maps(epoch_t oldest, int nreceived, bool skip_maps); void note_down_osd(int osd); void note_up_osd(int osd); friend class C_OnMapCommit; bool advance_pg( epoch_t advance_to, PG *pg, ThreadPool::TPHandle &handle, PG::RecoveryCtx *rctx, set *split_pgs ); void consume_map(); void activate_map(); // osd map cache (past osd maps) OSDMapRef get_map(epoch_t e) { return service.get_map(e); } OSDMapRef add_map(OSDMap *o) { return service.add_map(o); } void add_map_bl(epoch_t e, bufferlist& bl) { return service.add_map_bl(e, bl); } void pin_map_bl(epoch_t e, bufferlist &bl) { return service.pin_map_bl(e, bl); } bool get_map_bl(epoch_t e, bufferlist& bl) { return service.get_map_bl(e, bl); } void add_map_inc_bl(epoch_t e, bufferlist& bl) { return service.add_map_inc_bl(e, bl); } void pin_map_inc_bl(epoch_t e, bufferlist &bl) { return service.pin_map_inc_bl(e, bl); } protected: // -- placement groups -- RWLock pg_map_lock; // this lock orders *above* individual PG _locks ceph::unordered_map pg_map; // protected by pg_map lock std::mutex pending_creates_lock; std::set pending_creates_from_osd; unsigned pending_creates_from_mon = 0; map > peering_wait_for_split; PGRecoveryStats pg_recovery_stats; PGPool _get_pool(int id, OSDMapRef createmap); PG *_lookup_lock_pg_with_map_lock_held(spg_t pgid); PG *_lookup_lock_pg(spg_t pgid); public: PG *lookup_lock_pg(spg_t pgid); int get_num_pgs() { RWLock::RLocker l(pg_map_lock); return pg_map.size(); } protected: PG *_open_lock_pg(OSDMapRef createmap, spg_t pg, bool no_lockdep_check=false); enum res_result { RES_PARENT, // resurrected a parent RES_SELF, // resurrected self RES_NONE // nothing relevant deleting }; res_result _try_resurrect_pg( OSDMapRef curmap, spg_t pgid, spg_t *resurrected, PGRef *old_pg_state); PG *_create_lock_pg( OSDMapRef createmap, spg_t pgid, bool hold_map_lock, bool backfill, int role, vector& up, int up_primary, vector& acting, int acting_primary, pg_history_t history, const PastIntervals& pi, ObjectStore::Transaction& t); PG* _make_pg(OSDMapRef createmap, spg_t pgid); void add_newly_split_pg(PG *pg, PG::RecoveryCtx *rctx); int handle_pg_peering_evt( spg_t pgid, const pg_history_t& orig_history, const PastIntervals& pi, epoch_t epoch, PG::CephPeeringEvtRef evt); bool maybe_wait_for_max_pg(spg_t pgid, bool is_mon_create); void resume_creating_pg(); void load_pgs(); void build_past_intervals_parallel(); /// build initial pg history and intervals on create void build_initial_pg_history( spg_t pgid, epoch_t created, utime_t created_stamp, pg_history_t *h, PastIntervals *pi); /// project pg history from from to now bool project_pg_history( spg_t pgid, pg_history_t& h, epoch_t from, const vector& lastup, int lastupprimary, const vector& lastacting, int lastactingprimary ); ///< @return false if there was a map gap between from and now // this must be called with pg->lock held on any pg addition to pg_map void wake_pg_waiters(PGRef pg) { assert(pg->is_locked()); op_shardedwq.wake_pg_waiters(pg->info.pgid); } epoch_t last_pg_create_epoch; void handle_pg_create(OpRequestRef op); void split_pgs( PG *parent, const set &childpgids, set *out_pgs, OSDMapRef curmap, OSDMapRef nextmap, PG::RecoveryCtx *rctx); // == monitor interaction == Mutex mon_report_lock; utime_t last_mon_report; utime_t last_pg_stats_sent; /* if our monitor dies, we want to notice it and reconnect. * So we keep track of when it last acked our stat updates, * and if too much time passes (and we've been sending * more updates) then we can call it dead and reconnect * elsewhere. */ utime_t last_pg_stats_ack; float stats_ack_timeout; set outstanding_pg_stats; // how many stat updates haven't been acked yet // -- boot -- void start_boot(); void _got_mon_epochs(epoch_t oldest, epoch_t newest); void _preboot(epoch_t oldest, epoch_t newest); void _send_boot(); void _collect_metadata(map *pmeta); void start_waiting_for_healthy(); bool _is_healthy(); void send_full_update(); friend struct C_OSD_GetVersion; // -- alive -- epoch_t up_thru_wanted; void queue_want_up_thru(epoch_t want); void send_alive(); // -- full map requests -- epoch_t requested_full_first, requested_full_last; void request_full_map(epoch_t first, epoch_t last); void rerequest_full_maps() { epoch_t first = requested_full_first; epoch_t last = requested_full_last; requested_full_first = 0; requested_full_last = 0; request_full_map(first, last); } void got_full_map(epoch_t e); // -- failures -- map failure_queue; map > failure_pending; void requeue_failures(); void send_failures(); void send_still_alive(epoch_t epoch, const entity_inst_t &i); // -- pg stats -- Mutex pg_stat_queue_lock; Cond pg_stat_queue_cond; xlist pg_stat_queue; bool osd_stat_updated; uint64_t pg_stat_tid, pg_stat_tid_flushed; void send_pg_stats(const utime_t &now); void handle_pg_stats_ack(class MPGStatsAck *ack); void flush_pg_stats(); ceph::coarse_mono_clock::time_point last_sent_beacon; Mutex min_last_epoch_clean_lock{"OSD::min_last_epoch_clean_lock"}; epoch_t min_last_epoch_clean = 0; // which pgs were scanned for min_lec std::vector min_last_epoch_clean_pgs; void send_beacon(const ceph::coarse_mono_clock::time_point& now); void pg_stat_queue_enqueue(PG *pg) { pg_stat_queue_lock.Lock(); if (pg->is_primary() && !pg->stat_queue_item.is_on_list()) { pg->get("pg_stat_queue"); pg_stat_queue.push_back(&pg->stat_queue_item); } osd_stat_updated = true; pg_stat_queue_lock.Unlock(); } void pg_stat_queue_dequeue(PG *pg) { pg_stat_queue_lock.Lock(); if (pg->stat_queue_item.remove_myself()) pg->put("pg_stat_queue"); pg_stat_queue_lock.Unlock(); } void clear_pg_stat_queue() { pg_stat_queue_lock.Lock(); while (!pg_stat_queue.empty()) { PG *pg = pg_stat_queue.front(); pg_stat_queue.pop_front(); pg->put("pg_stat_queue"); } pg_stat_queue_lock.Unlock(); } void clear_outstanding_pg_stats(){ Mutex::Locker l(pg_stat_queue_lock); outstanding_pg_stats.clear(); } ceph_tid_t get_tid() { return service.get_tid(); } // -- generic pg peering -- PG::RecoveryCtx create_context(); void dispatch_context(PG::RecoveryCtx &ctx, PG *pg, OSDMapRef curmap, ThreadPool::TPHandle *handle = NULL); void dispatch_context_transaction(PG::RecoveryCtx &ctx, PG *pg, ThreadPool::TPHandle *handle = NULL); void do_notifies(map > >& notify_list, OSDMapRef map); void do_queries(map >& query_map, OSDMapRef map); void do_infos(map > >& info_map, OSDMapRef map); bool require_mon_peer(const Message *m); bool require_mon_or_mgr_peer(const Message *m); bool require_osd_peer(const Message *m); /*** * Verifies that we were alive in the given epoch, and that * still are. */ bool require_self_aliveness(const Message *m, epoch_t alive_since); /** * Verifies that the OSD who sent the given op has the same * address as in the given map. * @pre op was sent by an OSD using the cluster messenger */ bool require_same_peer_instance(const Message *m, OSDMapRef& map, bool is_fast_dispatch); bool require_same_or_newer_map(OpRequestRef& op, epoch_t e, bool is_fast_dispatch); void handle_pg_query(OpRequestRef op); void handle_pg_notify(OpRequestRef op); void handle_pg_log(OpRequestRef op); void handle_pg_info(OpRequestRef op); void handle_pg_trim(OpRequestRef op); void handle_pg_backfill_reserve(OpRequestRef op); void handle_pg_recovery_reserve(OpRequestRef op); void handle_force_recovery(Message *m); void handle_pg_remove(OpRequestRef op); void _remove_pg(PG *pg); // -- commands -- struct Command { vector cmd; ceph_tid_t tid; bufferlist indata; ConnectionRef con; Command(vector& c, ceph_tid_t t, bufferlist& bl, Connection *co) : cmd(c), tid(t), indata(bl), con(co) {} }; list command_queue; struct CommandWQ : public ThreadPool::WorkQueue { OSD *osd; CommandWQ(OSD *o, time_t ti, time_t si, ThreadPool *tp) : ThreadPool::WorkQueue("OSD::CommandWQ", ti, si, tp), osd(o) {} bool _empty() override { return osd->command_queue.empty(); } bool _enqueue(Command *c) override { osd->command_queue.push_back(c); return true; } void _dequeue(Command *pg) override { ceph_abort(); } Command *_dequeue() override { if (osd->command_queue.empty()) return NULL; Command *c = osd->command_queue.front(); osd->command_queue.pop_front(); return c; } void _process(Command *c, ThreadPool::TPHandle &) override { osd->osd_lock.Lock(); if (osd->is_stopping()) { osd->osd_lock.Unlock(); delete c; return; } osd->do_command(c->con.get(), c->tid, c->cmd, c->indata); osd->osd_lock.Unlock(); delete c; } void _clear() override { while (!osd->command_queue.empty()) { Command *c = osd->command_queue.front(); osd->command_queue.pop_front(); delete c; } } } command_wq; void handle_command(class MMonCommand *m); void handle_command(class MCommand *m); void do_command(Connection *con, ceph_tid_t tid, vector& cmd, bufferlist& data); // -- pg recovery -- void do_recovery(PG *pg, epoch_t epoch_queued, uint64_t pushes_reserved, ThreadPool::TPHandle &handle); // -- scrubbing -- void sched_scrub(); bool scrub_random_backoff(); bool scrub_load_below_threshold(); bool scrub_time_permit(utime_t now); // -- removing -- struct RemoveWQ : public ThreadPool::WorkQueueVal > { CephContext* cct; ObjectStore *&store; list > remove_queue; RemoveWQ(CephContext* cct, ObjectStore *&o, time_t ti, time_t si, ThreadPool *tp) : ThreadPool::WorkQueueVal >( "OSD::RemoveWQ", ti, si, tp), cct(cct), store(o) {} bool _empty() override { return remove_queue.empty(); } void _enqueue(pair item) override { remove_queue.push_back(item); } void _enqueue_front(pair item) override { remove_queue.push_front(item); } bool _dequeue(pair item) { ceph_abort(); } pair _dequeue() override { assert(!remove_queue.empty()); pair item = remove_queue.front(); remove_queue.pop_front(); return item; } void _process(pair, ThreadPool::TPHandle &) override; void _clear() override { remove_queue.clear(); } } remove_wq; private: bool ms_can_fast_dispatch_any() const override { return true; } bool ms_can_fast_dispatch(const Message *m) const override { switch (m->get_type()) { case CEPH_MSG_OSD_OP: case CEPH_MSG_OSD_BACKOFF: case MSG_OSD_SUBOP: case MSG_OSD_REPOP: case MSG_OSD_SUBOPREPLY: case MSG_OSD_REPOPREPLY: case MSG_OSD_PG_PUSH: case MSG_OSD_PG_PULL: case MSG_OSD_PG_PUSH_REPLY: case MSG_OSD_PG_SCAN: case MSG_OSD_PG_BACKFILL: case MSG_OSD_PG_BACKFILL_REMOVE: case MSG_OSD_EC_WRITE: case MSG_OSD_EC_WRITE_REPLY: case MSG_OSD_EC_READ: case MSG_OSD_EC_READ_REPLY: case MSG_OSD_SCRUB_RESERVE: case MSG_OSD_REP_SCRUB: case MSG_OSD_REP_SCRUBMAP: case MSG_OSD_PG_UPDATE_LOG_MISSING: case MSG_OSD_PG_UPDATE_LOG_MISSING_REPLY: case MSG_OSD_PG_RECOVERY_DELETE: case MSG_OSD_PG_RECOVERY_DELETE_REPLY: return true; default: return false; } } void ms_fast_dispatch(Message *m) override; void ms_fast_preprocess(Message *m) override; bool ms_dispatch(Message *m) override; bool ms_get_authorizer(int dest_type, AuthAuthorizer **authorizer, bool force_new) override; bool ms_verify_authorizer(Connection *con, int peer_type, int protocol, bufferlist& authorizer, bufferlist& authorizer_reply, bool& isvalid, CryptoKey& session_key) override; void ms_handle_connect(Connection *con) override; void ms_handle_fast_connect(Connection *con) override; void ms_handle_fast_accept(Connection *con) override; bool ms_handle_reset(Connection *con) override; void ms_handle_remote_reset(Connection *con) override {} bool ms_handle_refused(Connection *con) override; io_queue get_io_queue() const { if (cct->_conf->osd_op_queue == "debug_random") { static io_queue index_lookup[] = { io_queue::prioritized, io_queue::weightedpriority, io_queue::mclock_opclass, io_queue::mclock_client }; srand(time(NULL)); unsigned which = rand() % (sizeof(index_lookup) / sizeof(index_lookup[0])); return index_lookup[which]; } else if (cct->_conf->osd_op_queue == "prioritized") { return io_queue::prioritized; } else if (cct->_conf->osd_op_queue == "mclock_opclass") { return io_queue::mclock_opclass; } else if (cct->_conf->osd_op_queue == "mclock_client") { return io_queue::mclock_client; } else { // default / catch-all is 'wpq' return io_queue::weightedpriority; } } unsigned int get_io_prio_cut() const { if (cct->_conf->osd_op_queue_cut_off == "debug_random") { srand(time(NULL)); return (rand() % 2 < 1) ? CEPH_MSG_PRIO_HIGH : CEPH_MSG_PRIO_LOW; } else if (cct->_conf->osd_op_queue_cut_off == "high") { return CEPH_MSG_PRIO_HIGH; } else { // default / catch-all is 'low' return CEPH_MSG_PRIO_LOW; } } public: /* internal and external can point to the same messenger, they will still * be cleaned up properly*/ OSD(CephContext *cct_, ObjectStore *store_, int id, Messenger *internal, Messenger *external, Messenger *hb_front_client, Messenger *hb_back_client, Messenger *hb_front_server, Messenger *hb_back_server, Messenger *osdc_messenger, MonClient *mc, const std::string &dev, const std::string &jdev); ~OSD() override; // static bits static int mkfs(CephContext *cct, ObjectStore *store, const string& dev, uuid_d fsid, int whoami); /* remove any non-user xattrs from a map of them */ void filter_xattrs(map& attrs) { for (map::iterator iter = attrs.begin(); iter != attrs.end(); ) { if (('_' != iter->first.at(0)) || (iter->first.size() == 1)) attrs.erase(iter++); else ++iter; } } private: int mon_cmd_maybe_osd_create(string &cmd); int update_crush_device_class(); int update_crush_location(); static int write_meta(CephContext *cct, ObjectStore *store, uuid_d& cluster_fsid, uuid_d& osd_fsid, int whoami); void handle_pg_scrub(struct MOSDScrub *m, PG* pg); void handle_scrub(struct MOSDScrub *m); void handle_osd_ping(class MOSDPing *m); int init_op_flags(OpRequestRef& op); int get_num_op_shards(); int get_num_op_threads(); float get_osd_recovery_sleep(); public: static int peek_meta(ObjectStore *store, string& magic, uuid_d& cluster_fsid, uuid_d& osd_fsid, int& whoami); // startup/shutdown int pre_init(); int init(); void final_init(); int enable_disable_fuse(bool stop); void suicide(int exitcode); int shutdown(); void handle_signal(int signum); /// check if we can throw out op from a disconnected client static bool op_is_discardable(const MOSDOp *m); public: OSDService service; friend class OSDService; }; std::ostream& operator<<(std::ostream& out, const OSD::io_queue& q); //compatibility of the executable extern const CompatSet::Feature ceph_osd_feature_compat[]; extern const CompatSet::Feature ceph_osd_feature_ro_compat[]; extern const CompatSet::Feature ceph_osd_feature_incompat[]; #endif // CEPH_OSD_H