1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
4 * Ceph - scalable distributed file system
6 * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
8 * This is free software; you can redistribute it and/or
9 * modify it under the terms of the GNU Lesser General Public
10 * License version 2.1, as published by the Free Software
11 * Foundation. See file COPYING.
16 #include "json_spirit/json_spirit.h"
17 #include "common/debug.h" // undo damage
18 #include "PGMonitor.h"
20 #include "OSDMonitor.h"
21 #include "MonitorDBStore.h"
22 #include "PGStatService.h"
24 #include "messages/MPGStats.h"
25 #include "messages/MPGStatsAck.h"
27 #include "messages/MOSDPGCreate.h"
28 #include "messages/MMonCommand.h"
29 #include "messages/MOSDScrub.h"
31 #include "common/Formatter.h"
32 #include "common/config.h"
34 #include "include/stringify.h"
36 #include "osd/osd_types.h"
38 #include "common/config.h"
39 #include "common/errno.h"
40 #include "common/strtol.h"
41 #include "include/str_list.h"
44 #define dout_subsys ceph_subsys_mon
46 #define dout_prefix _prefix(_dout, mon, pg_map)
47 static ostream& _prefix(std::ostream *_dout, const Monitor *mon, const PGMap& pg_map) {
48 return *_dout << "mon." << mon->name << "@" << mon->rank
49 << "(" << mon->get_state_name()
50 << ").pg v" << pg_map.version << " ";
54 Tick function to update the map based on performance every N seconds
57 void PGMonitor::on_restart()
60 last_osd_report.clear();
63 void PGMonitor::on_active()
65 if (mon->is_leader()) {
67 check_osd_map(mon->osdmon()->osdmap.get_epoch());
72 if (mon->is_leader() &&
73 mon->osdmon()->osdmap.require_osd_release < CEPH_RELEASE_LUMINOUS) {
74 mon->clog->info() << "pgmap " << pg_map;
78 void PGMonitor::update_logger()
80 if (mon->osdmon()->osdmap.require_osd_release >= CEPH_RELEASE_LUMINOUS) {
83 dout(10) << "update_logger" << dendl;
85 mon->cluster_logger->set(l_cluster_osd_bytes, pg_map.osd_sum.kb * 1024ull);
86 mon->cluster_logger->set(l_cluster_osd_bytes_used,
87 pg_map.osd_sum.kb_used * 1024ull);
88 mon->cluster_logger->set(l_cluster_osd_bytes_avail,
89 pg_map.osd_sum.kb_avail * 1024ull);
91 mon->cluster_logger->set(l_cluster_num_pool, pg_map.pg_pool_sum.size());
92 mon->cluster_logger->set(l_cluster_num_pg, pg_map.pg_stat.size());
94 unsigned active = 0, active_clean = 0, peering = 0;
95 for (ceph::unordered_map<int,int>::iterator p = pg_map.num_pg_by_state.begin();
96 p != pg_map.num_pg_by_state.end();
98 if (p->first & PG_STATE_ACTIVE) {
100 if (p->first & PG_STATE_CLEAN)
101 active_clean += p->second;
103 if (p->first & PG_STATE_PEERING)
104 peering += p->second;
106 mon->cluster_logger->set(l_cluster_num_pg_active_clean, active_clean);
107 mon->cluster_logger->set(l_cluster_num_pg_active, active);
108 mon->cluster_logger->set(l_cluster_num_pg_peering, peering);
110 mon->cluster_logger->set(l_cluster_num_object, pg_map.pg_sum.stats.sum.num_objects);
111 mon->cluster_logger->set(l_cluster_num_object_degraded, pg_map.pg_sum.stats.sum.num_objects_degraded);
112 mon->cluster_logger->set(l_cluster_num_object_misplaced, pg_map.pg_sum.stats.sum.num_objects_misplaced);
113 mon->cluster_logger->set(l_cluster_num_object_unfound, pg_map.pg_sum.stats.sum.num_objects_unfound);
114 mon->cluster_logger->set(l_cluster_num_bytes, pg_map.pg_sum.stats.sum.num_bytes);
117 void PGMonitor::tick()
119 if (!is_active()) return;
120 if (mon->osdmon()->osdmap.require_osd_release >= CEPH_RELEASE_LUMINOUS) {
124 handle_osd_timeouts();
126 if (!pg_map.pg_sum_deltas.empty()) {
127 utime_t age = ceph_clock_now() - pg_map.stamp;
128 if (age > 2 * g_conf->mon_delta_reset_interval) {
129 dout(10) << " clearing pg_map delta (" << age << " > " << g_conf->mon_delta_reset_interval << " seconds old)" << dendl;
130 pg_map.clear_delta();
134 /* If we have deltas for pools, run through pgmap's 'per_pool_sum_delta' and
135 * clear any deltas that are old enough.
137 * Note that 'per_pool_sum_delta' keeps a pool id as key, and a pair containing
138 * the calc'ed stats delta and an absolute timestamp from when those stats were
139 * obtained -- the timestamp IS NOT a delta itself.
141 if (!pg_map.per_pool_sum_deltas.empty()) {
142 ceph::unordered_map<uint64_t,pair<pool_stat_t,utime_t> >::iterator it;
143 for (it = pg_map.per_pool_sum_delta.begin();
144 it != pg_map.per_pool_sum_delta.end(); ) {
145 utime_t age = ceph_clock_now() - it->second.second;
146 if (age > 2*g_conf->mon_delta_reset_interval) {
147 dout(10) << " clearing pg_map delta for pool " << it->first
148 << " (" << age << " > " << g_conf->mon_delta_reset_interval
149 << " seconds old)" << dendl;
150 pg_map.per_pool_sum_deltas.erase(it->first);
151 pg_map.per_pool_sum_deltas_stamps.erase(it->first);
152 pg_map.per_pool_sum_delta.erase((it++)->first);
159 dout(10) << pg_map << dendl;
162 void PGMonitor::create_initial()
164 dout(10) << "create_initial -- creating initial map" << dendl;
168 void PGMonitor::update_from_paxos(bool *need_bootstrap)
173 if (get_value("deleted")) {
175 dout(10) << __func__ << " deleted, clearing in-memory PGMap" << dendl;
177 pending_inc = PGMap::Incremental();
179 last_osd_report.clear();
183 version_t version = get_last_committed();
184 if (version == pg_map.version)
187 assert(version >= pg_map.version);
188 if (format_version < 1) {
189 derr << __func__ << "unsupported monitor protocol: "
190 << get_service_name() << ".format_version = "
191 << format_version << dendl;
193 assert(format_version >= 1);
195 // pg/osd keys in leveldb
197 while (version > pg_map.version) {
199 if (pg_map.version == 0) {
200 dout(10) << __func__ << " v0, read_full" << dendl;
205 // incremental state?
206 dout(10) << __func__ << " read_incremental" << dendl;
208 int r = get_version(pg_map.version + 1, bl);
210 dout(10) << __func__ << " failed to read_incremental, read_full" << dendl;
217 apply_pgmap_delta(bl);
223 assert(version == pg_map.version);
228 void PGMonitor::on_upgrade()
230 dout(1) << __func__ << " discarding in-core PGMap" << dendl;
234 void PGMonitor::upgrade_format()
236 unsigned current = 1;
237 assert(format_version == current);
240 void PGMonitor::post_paxos_update()
244 dout(10) << __func__ << dendl;
245 OSDMap& osdmap = mon->osdmon()->osdmap;
246 if (mon->monmap->get_required_features().contains_all(
247 ceph::features::mon::FEATURE_LUMINOUS)) {
248 // let OSDMonitor take care of the pg-creates subscriptions.
251 if (osdmap.get_epoch()) {
252 if (osdmap.get_num_up_osds() > 0) {
253 assert(osdmap.get_up_osd_features() & CEPH_FEATURE_MON_STATEFUL_SUB);
259 void PGMonitor::handle_osd_timeouts()
261 if (!mon->is_leader())
266 utime_t now(ceph_clock_now());
267 utime_t timeo(g_conf->mon_osd_report_timeout, 0);
268 if (now - mon->get_leader_since() < timeo) {
269 // We haven't been the leader for long enough to consider OSD timeouts
273 if (mon->osdmon()->is_writeable())
274 mon->osdmon()->handle_osd_timeouts(now, last_osd_report);
277 void PGMonitor::create_pending()
282 pending_inc = PGMap::Incremental();
283 pending_inc.version = pg_map.version + 1;
284 if (pg_map.version == 0) {
285 // pull initial values from first leader mon's config
286 pending_inc.full_ratio = g_conf->mon_osd_full_ratio;
287 if (pending_inc.full_ratio > 1.0)
288 pending_inc.full_ratio /= 100.0;
289 pending_inc.nearfull_ratio = g_conf->mon_osd_nearfull_ratio;
290 if (pending_inc.nearfull_ratio > 1.0)
291 pending_inc.nearfull_ratio /= 100.0;
293 pending_inc.full_ratio = pg_map.full_ratio;
294 pending_inc.nearfull_ratio = pg_map.nearfull_ratio;
296 dout(10) << "create_pending v " << pending_inc.version << dendl;
299 void PGMonitor::read_pgmap_meta()
301 dout(10) << __func__ << dendl;
303 string prefix = pgmap_meta_prefix;
305 version_t version = mon->store->get(prefix, "version");
306 epoch_t last_osdmap_epoch = mon->store->get(prefix, "last_osdmap_epoch");
307 epoch_t last_pg_scan = mon->store->get(prefix, "last_pg_scan");
308 pg_map.set_version(version);
309 pg_map.set_last_osdmap_epoch(last_osdmap_epoch);
311 if (last_pg_scan != pg_map.get_last_pg_scan()) {
312 pg_map.set_last_pg_scan(last_pg_scan);
315 float full_ratio, nearfull_ratio;
318 mon->store->get(prefix, "full_ratio", bl);
319 bufferlist::iterator p = bl.begin();
320 ::decode(full_ratio, p);
324 mon->store->get(prefix, "nearfull_ratio", bl);
325 bufferlist::iterator p = bl.begin();
326 ::decode(nearfull_ratio, p);
328 pg_map.set_full_ratios(full_ratio, nearfull_ratio);
331 mon->store->get(prefix, "stamp", bl);
332 bufferlist::iterator p = bl.begin();
335 pg_map.set_stamp(stamp);
339 void PGMonitor::read_pgmap_full()
343 string prefix = pgmap_pg_prefix;
344 for (KeyValueDB::Iterator i = mon->store->get_iterator(prefix); i->valid(); i->next()) {
345 string key = i->key();
347 if (!pgid.parse(key.c_str())) {
348 dout(0) << "unable to parse key " << key << dendl;
351 bufferlist bl = i->value();
352 pg_map.update_pg(pgid, bl);
353 dout(20) << " got " << pgid << dendl;
356 prefix = pgmap_osd_prefix;
357 for (KeyValueDB::Iterator i = mon->store->get_iterator(prefix); i->valid(); i->next()) {
358 string key = i->key();
359 int osd = atoi(key.c_str());
360 bufferlist bl = i->value();
361 pg_map.update_osd(osd, bl);
362 dout(20) << " got osd." << osd << dendl;
366 void PGMonitor::apply_pgmap_delta(bufferlist& bl)
368 version_t v = pg_map.version + 1;
371 bufferlist dirty_pgs, dirty_osds;
373 bufferlist::iterator p = bl.begin();
374 ::decode(inc_stamp, p);
375 ::decode(dirty_pgs, p);
376 ::decode(dirty_osds, p);
379 pool_stat_t pg_sum_old = pg_map.pg_sum;
380 mempool::pgmap::unordered_map<uint64_t, pool_stat_t> pg_pool_sum_old;
383 set<int64_t> deleted_pools;
384 bufferlist::iterator p = dirty_pgs.begin();
391 if (deleted_pools.count(pgid.pool())) {
394 r = mon->store->get(pgmap_pg_prefix, stringify(pgid), pgbl);
395 if (pg_pool_sum_old.count(pgid.pool()) == 0)
396 pg_pool_sum_old[pgid.pool()] = pg_map.pg_pool_sum[pgid.pool()];
400 pg_map.update_pg(pgid, pgbl);
401 dout(20) << " refreshing pg " << pgid
402 << " " << pg_map.pg_stat[pgid].reported_epoch
403 << ":" << pg_map.pg_stat[pgid].reported_seq
404 << " " << pg_state_string(pg_map.pg_stat[pgid].state)
407 dout(20) << " removing pg " << pgid << dendl;
408 pg_map.remove_pg(pgid);
410 deleted_pools.insert(pgid.pool());
415 p = dirty_osds.begin();
419 dout(20) << " refreshing osd." << osd << dendl;
421 int r = mon->store->get(pgmap_osd_prefix, stringify(osd), bl);
423 pg_map.update_osd(osd, bl);
425 pg_map.remove_osd(osd);
429 pg_map.update_global_delta(g_ceph_context, inc_stamp, pg_sum_old);
430 pg_map.update_pool_deltas(g_ceph_context, inc_stamp, pg_pool_sum_old);
432 // clean up deleted pools after updating the deltas
433 for (set<int64_t>::iterator p = deleted_pools.begin();
434 p != deleted_pools.end();
436 dout(20) << " deleted pool " << *p << dendl;
437 pg_map.deleted_pool(*p);
440 // ok, we're now on the new version
445 void PGMonitor::encode_pending(MonitorDBStore::TransactionRef t)
450 string prefix = pgmap_meta_prefix;
452 dout(1) << __func__ << " clearing pgmap data at v" << pending_inc.version
455 for (auto key : { "version", "stamp", "last_osdmap_epoch",
456 "last_pg_scan", "full_ratio", "nearfull_ratio" }) {
457 t->erase(prefix, key);
459 for (auto& p : pg_map.pg_stat) {
460 t->erase(prefix, stringify(p.first));
462 for (auto& p : pg_map.osd_stat) {
463 t->erase(prefix, stringify(p.first));
465 put_last_committed(t, pending_inc.version);
466 put_value(t, "deleted", 1);
470 assert(mon->osdmon()->osdmap.require_osd_release < CEPH_RELEASE_LUMINOUS ||
471 pending_inc.version == 1 /* rebuild-mondb.yaml case */);
473 version_t version = pending_inc.version;
474 dout(10) << __func__ << " v " << version << dendl;
475 assert(get_last_committed() + 1 == version);
476 pending_inc.stamp = ceph_clock_now();
478 uint64_t features = mon->get_quorum_con_features();
480 t->put(prefix, "version", pending_inc.version);
483 ::encode(pending_inc.stamp, bl);
484 t->put(prefix, "stamp", bl);
487 if (pending_inc.osdmap_epoch)
488 t->put(prefix, "last_osdmap_epoch", pending_inc.osdmap_epoch);
489 if (pending_inc.pg_scan)
490 t->put(prefix, "last_pg_scan", pending_inc.pg_scan);
491 if (pending_inc.full_ratio > 0) {
493 ::encode(pending_inc.full_ratio, bl);
494 t->put(prefix, "full_ratio", bl);
496 if (pending_inc.nearfull_ratio > 0) {
498 ::encode(pending_inc.nearfull_ratio, bl);
499 t->put(prefix, "nearfull_ratio", bl);
503 ::encode(pending_inc.stamp, incbl);
506 string prefix = pgmap_pg_prefix;
507 for (map<pg_t,pg_stat_t>::const_iterator p = pending_inc.pg_stat_updates.begin();
508 p != pending_inc.pg_stat_updates.end();
510 ::encode(p->first, dirty);
512 ::encode(p->second, bl, features);
513 t->put(prefix, stringify(p->first), bl);
515 for (set<pg_t>::const_iterator p = pending_inc.pg_remove.begin(); p != pending_inc.pg_remove.end(); ++p) {
517 t->erase(prefix, stringify(*p));
519 ::encode(dirty, incbl);
523 string prefix = pgmap_osd_prefix;
524 for (map<int32_t,osd_stat_t>::const_iterator p =
525 pending_inc.get_osd_stat_updates().begin();
526 p != pending_inc.get_osd_stat_updates().end();
528 ::encode(p->first, dirty);
530 ::encode(p->second, bl, features);
531 ::encode(pending_inc.get_osd_epochs().find(p->first)->second, bl);
532 t->put(prefix, stringify(p->first), bl);
534 for (set<int32_t>::const_iterator p =
535 pending_inc.get_osd_stat_rm().begin();
536 p != pending_inc.get_osd_stat_rm().end();
539 t->erase(prefix, stringify(*p));
541 ::encode(dirty, incbl);
544 put_version(t, version, incbl);
546 put_last_committed(t, version);
549 version_t PGMonitor::get_trim_to()
551 unsigned max = g_conf->mon_max_pgmap_epochs;
552 version_t version = get_last_committed();
553 if (mon->is_leader() && (version > max))
554 return version - max;
559 bool PGMonitor::preprocess_query(MonOpRequestRef op)
561 op->mark_pgmon_event(__func__);
562 PaxosServiceMessage *m = static_cast<PaxosServiceMessage*>(op->get_req());
563 dout(10) << "preprocess_query " << *m
564 << " from " << m->get_orig_source_inst() << dendl;
565 switch (m->get_type()) {
567 if (mon->osdmon()->osdmap.require_osd_release >= CEPH_RELEASE_LUMINOUS) {
570 return preprocess_pg_stats(op);
572 case MSG_MON_COMMAND:
573 if (mon->osdmon()->osdmap.require_osd_release >= CEPH_RELEASE_LUMINOUS) {
575 mon->reply_command(op, -EOPNOTSUPP, "this command is obsolete", rdata,
576 get_last_committed());
579 return preprocess_command(op);
587 bool PGMonitor::prepare_update(MonOpRequestRef op)
589 if (mon->osdmon()->osdmap.require_osd_release >= CEPH_RELEASE_LUMINOUS) {
593 op->mark_pgmon_event(__func__);
594 PaxosServiceMessage *m = static_cast<PaxosServiceMessage*>(op->get_req());
595 dout(10) << "prepare_update " << *m << " from " << m->get_orig_source_inst() << dendl;
596 switch (m->get_type()) {
598 return prepare_pg_stats(op);
600 case MSG_MON_COMMAND:
601 return prepare_command(op);
609 bool PGMonitor::preprocess_pg_stats(MonOpRequestRef op)
611 op->mark_pgmon_event(__func__);
612 MPGStats *stats = static_cast<MPGStats*>(op->get_req());
614 MonSession *session = stats->get_session();
616 dout(10) << "PGMonitor::preprocess_pg_stats: no monitor session!" << dendl;
619 if (!session->is_capable("pg", MON_CAP_R)) {
620 derr << "PGMonitor::preprocess_pg_stats: MPGStats received from entity "
621 << "with insufficient privileges " << session->caps << dendl;
625 if (stats->fsid != mon->monmap->fsid) {
626 dout(0) << __func__ << " drop message on fsid " << stats->fsid << " != "
627 << mon->monmap->fsid << " for " << *stats << dendl;
631 // First, just see if they need a new osdmap. But
632 // only if they've had the map for a while.
633 if (stats->had_map_for > 30.0 &&
634 mon->osdmon()->is_readable() &&
635 stats->epoch < mon->osdmon()->osdmap.get_epoch() &&
637 mon->osdmon()->send_latest_now_nodelete(op, stats->epoch+1);
639 // Always forward the PGStats to the leader, even if they are the same as
640 // the old PGStats. The leader will mark as down osds that haven't sent
641 // PGStats for a few minutes.
645 bool PGMonitor::pg_stats_have_changed(int from, const MPGStats *stats) const
648 ceph::unordered_map<int,osd_stat_t>::const_iterator s = pg_map.osd_stat.find(from);
649 if (s == pg_map.osd_stat.end())
652 if (s->second != stats->osd_stat)
656 for (map<pg_t,pg_stat_t>::const_iterator p = stats->pg_stat.begin();
657 p != stats->pg_stat.end(); ++p) {
658 ceph::unordered_map<pg_t,pg_stat_t>::const_iterator t = pg_map.pg_stat.find(p->first);
659 if (t == pg_map.pg_stat.end())
662 if (t->second.reported_epoch != p->second.reported_epoch ||
663 t->second.reported_seq != p->second.reported_seq)
670 struct PGMonitor::C_Stats : public C_MonOp {
672 MonOpRequestRef stats_op_ack;
674 C_Stats(PGMonitor *p,
676 MonOpRequestRef op_ack)
677 : C_MonOp(op), pgmon(p), stats_op_ack(op_ack) {}
678 void _finish(int r) override {
680 pgmon->_updated_stats(op, stats_op_ack);
681 } else if (r == -ECANCELED) {
683 } else if (r == -EAGAIN) {
686 assert(0 == "bad C_Stats return value");
691 bool PGMonitor::prepare_pg_stats(MonOpRequestRef op)
693 op->mark_pgmon_event(__func__);
694 MPGStats *stats = static_cast<MPGStats*>(op->get_req());
695 dout(10) << "prepare_pg_stats " << *stats << " from " << stats->get_orig_source() << dendl;
696 int from = stats->get_orig_source().num();
698 if (stats->fsid != mon->monmap->fsid) {
699 dout(0) << "prepare_pg_stats on fsid " << stats->fsid << " != " << mon->monmap->fsid << dendl;
703 if (!stats->get_orig_source().is_osd() ||
704 !mon->osdmon()->osdmap.is_up(from) ||
705 stats->get_orig_source_inst() != mon->osdmon()->osdmap.get_inst(from)) {
706 dout(1) << " ignoring stats from non-active osd." << dendl;
710 last_osd_report[from] = ceph_clock_now();
712 if (!pg_stats_have_changed(from, stats)) {
713 dout(10) << " message contains no new osd|pg stats" << dendl;
714 MPGStatsAck *ack = new MPGStatsAck;
715 ack->set_tid(stats->get_tid());
716 for (map<pg_t,pg_stat_t>::const_iterator p = stats->pg_stat.begin();
717 p != stats->pg_stat.end();
719 ack->pg_stat[p->first] = make_pair(p->second.reported_seq, p->second.reported_epoch);
721 mon->send_reply(op, ack);
726 if (mon->osdmon()->osdmap.is_in(from)) {
727 pending_inc.update_stat(from, stats->epoch, std::move(stats->osd_stat));
729 pending_inc.update_stat(from, stats->epoch, osd_stat_t());
732 if (pg_map.osd_stat.count(from))
733 dout(10) << " got osd." << from << " " << stats->osd_stat << " (was " << pg_map.osd_stat[from] << ")" << dendl;
735 dout(10) << " got osd." << from << " " << stats->osd_stat << " (first report)" << dendl;
738 MPGStatsAck *ack = new MPGStatsAck;
739 MonOpRequestRef ack_op = mon->op_tracker.create_request<MonOpRequest>(ack);
740 ack->set_tid(stats->get_tid());
741 for (map<pg_t,pg_stat_t>::iterator p = stats->pg_stat.begin();
742 p != stats->pg_stat.end();
744 pg_t pgid = p->first;
745 ack->pg_stat[pgid] = make_pair(p->second.reported_seq, p->second.reported_epoch);
747 if (pg_map.pg_stat.count(pgid) &&
748 pg_map.pg_stat[pgid].get_version_pair() > p->second.get_version_pair()) {
749 dout(15) << " had " << pgid << " from " << pg_map.pg_stat[pgid].reported_epoch << ":"
750 << pg_map.pg_stat[pgid].reported_seq << dendl;
753 if (pending_inc.pg_stat_updates.count(pgid) &&
754 pending_inc.pg_stat_updates[pgid].get_version_pair() > p->second.get_version_pair()) {
755 dout(15) << " had " << pgid << " from " << pending_inc.pg_stat_updates[pgid].reported_epoch << ":"
756 << pending_inc.pg_stat_updates[pgid].reported_seq << " (pending)" << dendl;
760 if (pg_map.pg_stat.count(pgid) == 0) {
761 dout(15) << " got " << pgid << " reported at " << p->second.reported_epoch << ":"
762 << p->second.reported_seq
763 << " state " << pg_state_string(p->second.state)
764 << " but DNE in pg_map; pool was probably deleted."
769 dout(15) << " got " << pgid
770 << " reported at " << p->second.reported_epoch << ":" << p->second.reported_seq
771 << " state " << pg_state_string(pg_map.pg_stat[pgid].state)
772 << " -> " << pg_state_string(p->second.state)
774 pending_inc.pg_stat_updates[pgid] = p->second;
777 wait_for_finished_proposal(op, new C_Stats(this, op, ack_op));
781 void PGMonitor::_updated_stats(MonOpRequestRef op, MonOpRequestRef ack_op)
783 op->mark_pgmon_event(__func__);
784 ack_op->mark_pgmon_event(__func__);
785 MPGStats *ack = static_cast<MPGStats*>(ack_op->get_req());
786 ack->get(); // MonOpRequestRef owns one ref; give the other to send_reply.
787 dout(7) << "_updated_stats for "
788 << op->get_req()->get_orig_source_inst() << dendl;
789 mon->send_reply(op, ack);
793 // ------------------------
795 struct RetryCheckOSDMap : public Context {
798 RetryCheckOSDMap(PGMonitor *p, epoch_t e) : pgmon(p), epoch(e) {
800 void finish(int r) override {
804 pgmon->check_osd_map(epoch);
808 void PGMonitor::check_osd_map(epoch_t epoch)
816 if (pg_map.last_osdmap_epoch >= epoch) {
817 dout(10) << __func__ << " already seen " << pg_map.last_osdmap_epoch
818 << " >= " << epoch << dendl;
822 if (!mon->osdmon()->is_readable()) {
823 dout(10) << __func__ << " -- osdmap not readable, waiting" << dendl;
824 mon->osdmon()->wait_for_readable_ctx(new RetryCheckOSDMap(this, epoch));
828 if (!is_writeable()) {
829 dout(10) << __func__ << " -- pgmap not writeable, waiting" << dendl;
830 wait_for_writeable_ctx(new RetryCheckOSDMap(this, epoch));
834 const OSDMap& osdmap = mon->osdmon()->osdmap;
835 if (!did_delete && osdmap.require_osd_release >= CEPH_RELEASE_LUMINOUS) {
836 // delete all my data
837 dout(1) << __func__ << " will clear pg_map data" << dendl;
843 // osds that went up or down
844 set<int> need_check_down_pg_osds;
846 // apply latest map(s)
847 epoch = std::max(epoch, osdmap.get_epoch());
848 for (epoch_t e = pg_map.last_osdmap_epoch+1;
851 dout(10) << __func__ << " applying osdmap e" << e << " to pg_map" << dendl;
853 int err = mon->osdmon()->get_version(e, bl);
857 OSDMap::Incremental inc(bl);
859 PGMapUpdater::check_osd_map(inc, &need_check_down_pg_osds,
860 &last_osd_report, &pg_map, &pending_inc);
863 assert(pg_map.last_osdmap_epoch < epoch);
864 pending_inc.osdmap_epoch = epoch;
865 PGMapUpdater::update_creating_pgs(osdmap, pg_map, &pending_inc);
866 PGMapUpdater::register_new_pgs(osdmap, pg_map, &pending_inc);
868 PGMapUpdater::check_down_pgs(osdmap, pg_map, check_all_pgs,
869 need_check_down_pg_osds, &pending_inc);
870 check_all_pgs = false;
875 epoch_t PGMonitor::send_pg_creates(int osd, Connection *con, epoch_t next)
877 dout(30) << __func__ << " " << pg_map.creating_pgs_by_osd_epoch << dendl;
878 map<int, map<epoch_t, set<pg_t> > >::iterator p =
879 pg_map.creating_pgs_by_osd_epoch.find(osd);
880 if (p == pg_map.creating_pgs_by_osd_epoch.end())
883 assert(p->second.size() > 0);
885 MOSDPGCreate *m = NULL;
887 for (map<epoch_t, set<pg_t> >::iterator q = p->second.lower_bound(next);
888 q != p->second.end();
890 dout(20) << __func__ << " osd." << osd << " from " << next
891 << " : epoch " << q->first << " " << q->second.size() << " pgs"
894 for (set<pg_t>::iterator r = q->second.begin(); r != q->second.end(); ++r) {
895 pg_stat_t &st = pg_map.pg_stat[*r];
897 m = new MOSDPGCreate(pg_map.last_osdmap_epoch);
898 m->mkpg[*r] = pg_create_t(st.created,
900 st.parent_split_bits);
901 // Need the create time from the monitor using its clock to set
902 // last_scrub_stamp upon pg creation.
903 m->ctimes[*r] = pg_map.pg_stat[*r].last_scrub_stamp;
907 dout(20) << "send_pg_creates osd." << osd << " from " << next
908 << " has nothing to send" << dendl;
912 con->send_message(m);
914 // sub is current through last + 1
918 bool PGMonitor::preprocess_command(MonOpRequestRef op)
920 op->mark_pgmon_event(__func__);
921 MMonCommand *m = static_cast<MMonCommand*>(op->get_req());
926 if (m->fsid != mon->monmap->fsid) {
927 dout(0) << __func__ << " drop message on fsid " << m->fsid << " != "
928 << mon->monmap->fsid << " for " << *m << dendl;
932 map<string, cmd_vartype> cmdmap;
933 if (!cmdmap_from_json(m->cmd, &cmdmap, ss)) {
934 // ss has reason for failure
935 string rs = ss.str();
936 mon->reply_command(op, -EINVAL, rs, rdata, get_last_committed());
941 cmd_getval(g_ceph_context, cmdmap, "prefix", prefix);
943 MonSession *session = m->get_session();
945 mon->reply_command(op, -EACCES, "access denied", rdata, get_last_committed());
950 cmd_getval(g_ceph_context, cmdmap, "format", format, string("plain"));
951 boost::scoped_ptr<Formatter> f(Formatter::create(format));
953 if (prefix == "pg scrub" ||
954 prefix == "pg repair" ||
955 prefix == "pg deep-scrub") {
956 string scrubop = prefix.substr(3, string::npos);
959 cmd_getval(g_ceph_context, cmdmap, "pgid", pgidstr);
960 if (!pgid.parse(pgidstr.c_str())) {
961 ss << "invalid pgid '" << pgidstr << "'";
965 if (!pg_map.pg_stat.count(pgid)) {
966 ss << "pg " << pgid << " dne";
970 int osd = pg_map.pg_stat[pgid].acting_primary;
972 ss << "pg " << pgid << " has no primary osd";
976 if (!mon->osdmon()->osdmap.is_up(osd)) {
977 ss << "pg " << pgid << " primary osd." << osd << " not up";
983 mon->try_send_message(new MOSDScrub(mon->monmap->fsid, pgs,
985 scrubop == "deep-scrub"),
986 mon->osdmon()->osdmap.get_inst(osd));
987 ss << "instructing pg " << pgid << " on osd." << osd << " to " << scrubop;
990 r = process_pg_map_command(prefix, cmdmap, pg_map, mon->osdmon()->osdmap,
991 f.get(), &ss, &rdata);
994 if (r == -EOPNOTSUPP)
1001 mon->reply_command(op, r, rs, rdata, get_last_committed());
1005 bool PGMonitor::prepare_command(MonOpRequestRef op)
1007 op->mark_pgmon_event(__func__);
1008 MMonCommand *m = static_cast<MMonCommand*>(op->get_req());
1009 if (m->fsid != mon->monmap->fsid) {
1010 dout(0) << __func__ << " drop message on fsid " << m->fsid << " != "
1011 << mon->monmap->fsid << " for " << *m << dendl;
1016 epoch_t epoch = mon->osdmon()->osdmap.get_epoch();
1020 map<string, cmd_vartype> cmdmap;
1021 if (!cmdmap_from_json(m->cmd, &cmdmap, ss)) {
1022 // ss has reason for failure
1023 string rs = ss.str();
1024 mon->reply_command(op, -EINVAL, rs, get_last_committed());
1029 cmd_getval(g_ceph_context, cmdmap, "prefix", prefix);
1031 MonSession *session = m->get_session();
1033 mon->reply_command(op, -EACCES, "access denied", get_last_committed());
1037 if (prefix == "pg force_create_pg") {
1039 cmd_getval(g_ceph_context, cmdmap, "pgid", pgidstr);
1040 if (!pgid.parse(pgidstr.c_str())) {
1041 ss << "pg " << pgidstr << " invalid";
1045 if (!pg_map.pg_stat.count(pgid)) {
1046 ss << "pg " << pgid << " dne";
1050 if (pg_map.creating_pgs.count(pgid)) {
1051 ss << "pg " << pgid << " already creating";
1056 PGMapUpdater::register_pg(
1057 mon->osdmon()->osdmap,
1064 ss << "pg " << pgidstr << " now creating, ok";
1066 } else if (prefix == "pg force-recovery" ||
1067 prefix == "pg force-backfill" ||
1068 prefix == "pg cancel-force-recovery" ||
1069 prefix == "pg cancel-force-backfill") {
1070 if (mon->osdmon()->osdmap.require_osd_release >= CEPH_RELEASE_LUMINOUS) {
1071 ss << "you must complete the upgrade and 'ceph osd require-osd-release "
1072 << "luminous' before using forced recovery";
1076 } else if (prefix == "pg set_full_ratio" ||
1077 prefix == "pg set_nearfull_ratio") {
1078 if (mon->osdmon()->osdmap.require_osd_release >= CEPH_RELEASE_LUMINOUS) {
1079 ss << "please use the new luminous interfaces"
1080 << " ('osd set-full-ratio' and 'osd set-nearfull-ratio')";
1085 if (!cmd_getval(g_ceph_context, cmdmap, "ratio", n)) {
1086 ss << "unable to parse 'ratio' value '"
1087 << cmd_vartype_stringify(cmdmap["who"]) << "'";
1091 string op = prefix.substr(3, string::npos);
1092 if (op == "set_full_ratio")
1093 pending_inc.full_ratio = n;
1094 else if (op == "set_nearfull_ratio")
1095 pending_inc.nearfull_ratio = n;
1104 if (r < 0 && rs.length() == 0)
1105 rs = cpp_strerror(r);
1106 mon->reply_command(op, r, rs, get_last_committed());
1111 wait_for_finished_proposal(op, new Monitor::C_Command(
1112 mon, op, r, rs, get_last_committed() + 1));
1116 void PGMonitor::get_health(list<pair<health_status_t,string> >& summary,
1117 list<pair<health_status_t,string> > *detail,
1118 CephContext *cct) const
1120 // legacy pre-luminous full/nearfull
1121 if (mon->osdmon()->osdmap.require_osd_release < CEPH_RELEASE_LUMINOUS) {
1122 check_full_osd_health(summary, detail, pg_map.full_osds, "full",
1124 check_full_osd_health(summary, detail, pg_map.nearfull_osds, "near full",
1126 pg_map.get_health(cct, mon->osdmon()->osdmap, summary, detail);
1130 void PGMonitor::check_full_osd_health(list<pair<health_status_t,string> >& summary,
1131 list<pair<health_status_t,string> > *detail,
1132 const mempool::pgmap::set<int>& s, const char *desc,
1133 health_status_t sev) const
1137 ss << s.size() << " " << desc << " osd(s)";
1138 summary.push_back(make_pair(sev, ss.str()));
1140 for (set<int>::const_iterator p = s.begin(); p != s.end(); ++p) {
1142 const osd_stat_t& os = pg_map.osd_stat.find(*p)->second;
1143 int ratio = (int)(((float)os.kb_used) / (float) os.kb * 100.0);
1144 ss << "osd." << *p << " is " << desc << " at " << ratio << "%";
1145 detail->push_back(make_pair(sev, ss.str()));
1151 void PGMonitor::check_subs()
1153 if (mon->osdmon()->osdmap.require_osd_release >= CEPH_RELEASE_LUMINOUS) {
1157 dout(10) << __func__ << dendl;
1158 const string type = "osd_pg_creates";
1160 mon->with_session_map([this, &type](const MonSessionMap& session_map) {
1161 if (mon->session_map.subs.count(type) == 0)
1164 auto p = mon->session_map.subs[type]->begin();
1166 Subscription *sub = *p;
1168 dout(20) << __func__ << " .. " << sub->session->inst << dendl;
1174 bool PGMonitor::check_sub(Subscription *sub)
1176 OSDMap& osdmap = mon->osdmon()->osdmap;
1177 if (sub->type == "osd_pg_creates") {
1178 // only send these if the OSD is up. we will check_subs() when they do
1179 // come up so they will get the creates then.
1180 if (sub->session->inst.name.is_osd() &&
1181 osdmap.is_up(sub->session->inst.name.num())) {
1182 sub->next = send_pg_creates(sub->session->inst.name.num(),
1183 sub->session->con.get(),
1190 class PGMonStatService : public MonPGStatService, public PGMapStatService {
1193 PGMonStatService(const PGMap& o, PGMonitor *pgm)
1194 : MonPGStatService(), PGMapStatService(o), pgmon(pgm) {}
1197 bool is_readable() const override { return pgmon->is_readable(); }
1199 unsigned maybe_add_creating_pgs(epoch_t scan_epoch,
1200 const mempool::osdmap::map<int64_t,pg_pool_t>& pools,
1201 creating_pgs_t *pending_creates) const override
1203 if (pgmap.last_pg_scan < scan_epoch) {
1207 for (auto& pgid : pgmap.creating_pgs) {
1208 if (!pools.count(pgid.pool())) {
1211 auto st = pgmap.pg_stat.find(pgid);
1212 assert(st != pgmap.pg_stat.end());
1213 auto created = make_pair(st->second.created,
1214 st->second.last_scrub_stamp);
1215 // no need to add the pg, if it already exists in creating_pgs
1216 if (pending_creates->pgs.emplace(pgid, created).second) {
1222 void maybe_trim_creating_pgs(creating_pgs_t *creates) const override {
1223 auto p = creates->pgs.begin();
1224 while (p != creates->pgs.end()) {
1225 auto q = pgmap.pg_stat.find(p->first);
1226 if (q != pgmap.pg_stat.end() &&
1227 !(q->second.state & PG_STATE_CREATING)) {
1228 p = creates->pgs.erase(p);
1229 creates->created_pools.insert(q->first.pool());
1235 void dump_info(Formatter *f) const override {
1236 f->dump_object("pgmap", pgmap);
1237 f->dump_unsigned("pgmap_first_committed", pgmon->get_first_committed());
1238 f->dump_unsigned("pgmap_last_committed", pgmon->get_last_committed());
1240 int process_pg_command(const string& prefix,
1241 const map<string,cmd_vartype>& cmdmap,
1242 const OSDMap& osdmap,
1245 bufferlist *odata) const override {
1246 return process_pg_map_command(prefix, cmdmap, pgmap, osdmap, f, ss, odata);
1249 int reweight_by_utilization(const OSDMap &osd_map,
1253 bool by_pg, const set<int64_t> *pools,
1255 mempool::osdmap::map<int32_t, uint32_t>* new_weights,
1256 std::stringstream *ss,
1257 std::string *out_str,
1258 Formatter *f) const override {
1259 return reweight::by_utilization(osd_map, pgmap, oload, max_changef,
1260 max_osds, by_pg, pools, no_increasing,
1261 new_weights, ss, out_str, f);
1265 MonPGStatService *PGMonitor::get_pg_stat_service()
1268 pgservice.reset(new PGMonStatService(pg_map, this));
1270 return pgservice.get();
1273 PGMonitor::PGMonitor(Monitor *mn, Paxos *p, const string& service_name)
1274 : PaxosService(mn, p, service_name),
1275 pgmap_meta_prefix("pgmap_meta"),
1276 pgmap_pg_prefix("pgmap_pg"),
1277 pgmap_osd_prefix("pgmap_osd")
1280 PGMonitor::~PGMonitor() = default;