X-Git-Url: https://gerrit.opnfv.org/gerrit/gitweb?a=blobdiff_plain;f=src%2Fceph%2Fsrc%2Fmon%2FPGMonitor.cc;fp=src%2Fceph%2Fsrc%2Fmon%2FPGMonitor.cc;h=c4091ed08a1ffdb7b7cbb22aa5d9d26eb00761d7;hb=812ff6ca9fcd3e629e49d4328905f33eee8ca3f5;hp=0000000000000000000000000000000000000000;hpb=15280273faafb77777eab341909a3f495cf248d9;p=stor4nfv.git diff --git a/src/ceph/src/mon/PGMonitor.cc b/src/ceph/src/mon/PGMonitor.cc new file mode 100644 index 0000000..c4091ed --- /dev/null +++ b/src/ceph/src/mon/PGMonitor.cc @@ -0,0 +1,1280 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab +/* + * Ceph - scalable distributed file system + * + * Copyright (C) 2004-2006 Sage Weil + * + * This is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License version 2.1, as published by the Free Software + * Foundation. See file COPYING. + * + */ + + +#include "json_spirit/json_spirit.h" +#include "common/debug.h" // undo damage +#include "PGMonitor.h" +#include "Monitor.h" +#include "OSDMonitor.h" +#include "MonitorDBStore.h" +#include "PGStatService.h" + +#include "messages/MPGStats.h" +#include "messages/MPGStatsAck.h" + +#include "messages/MOSDPGCreate.h" +#include "messages/MMonCommand.h" +#include "messages/MOSDScrub.h" + +#include "common/Formatter.h" +#include "common/config.h" + +#include "include/stringify.h" + +#include "osd/osd_types.h" + +#include "common/config.h" +#include "common/errno.h" +#include "common/strtol.h" +#include "include/str_list.h" +#include + +#define dout_subsys ceph_subsys_mon +#undef dout_prefix +#define dout_prefix _prefix(_dout, mon, pg_map) +static ostream& _prefix(std::ostream *_dout, const Monitor *mon, const PGMap& pg_map) { + return *_dout << "mon." << mon->name << "@" << mon->rank + << "(" << mon->get_state_name() + << ").pg v" << pg_map.version << " "; +} + +/* + Tick function to update the map based on performance every N seconds + */ + +void PGMonitor::on_restart() +{ + // clear leader state + last_osd_report.clear(); +} + +void PGMonitor::on_active() +{ + if (mon->is_leader()) { + check_all_pgs = true; + check_osd_map(mon->osdmon()->osdmap.get_epoch()); + } + + update_logger(); + + if (mon->is_leader() && + mon->osdmon()->osdmap.require_osd_release < CEPH_RELEASE_LUMINOUS) { + mon->clog->info() << "pgmap " << pg_map; + } +} + +void PGMonitor::update_logger() +{ + if (mon->osdmon()->osdmap.require_osd_release >= CEPH_RELEASE_LUMINOUS) { + return; + } + dout(10) << "update_logger" << dendl; + + mon->cluster_logger->set(l_cluster_osd_bytes, pg_map.osd_sum.kb * 1024ull); + mon->cluster_logger->set(l_cluster_osd_bytes_used, + pg_map.osd_sum.kb_used * 1024ull); + mon->cluster_logger->set(l_cluster_osd_bytes_avail, + pg_map.osd_sum.kb_avail * 1024ull); + + mon->cluster_logger->set(l_cluster_num_pool, pg_map.pg_pool_sum.size()); + mon->cluster_logger->set(l_cluster_num_pg, pg_map.pg_stat.size()); + + unsigned active = 0, active_clean = 0, peering = 0; + for (ceph::unordered_map::iterator p = pg_map.num_pg_by_state.begin(); + p != pg_map.num_pg_by_state.end(); + ++p) { + if (p->first & PG_STATE_ACTIVE) { + active += p->second; + if (p->first & PG_STATE_CLEAN) + active_clean += p->second; + } + if (p->first & PG_STATE_PEERING) + peering += p->second; + } + mon->cluster_logger->set(l_cluster_num_pg_active_clean, active_clean); + mon->cluster_logger->set(l_cluster_num_pg_active, active); + mon->cluster_logger->set(l_cluster_num_pg_peering, peering); + + mon->cluster_logger->set(l_cluster_num_object, pg_map.pg_sum.stats.sum.num_objects); + mon->cluster_logger->set(l_cluster_num_object_degraded, pg_map.pg_sum.stats.sum.num_objects_degraded); + mon->cluster_logger->set(l_cluster_num_object_misplaced, pg_map.pg_sum.stats.sum.num_objects_misplaced); + mon->cluster_logger->set(l_cluster_num_object_unfound, pg_map.pg_sum.stats.sum.num_objects_unfound); + mon->cluster_logger->set(l_cluster_num_bytes, pg_map.pg_sum.stats.sum.num_bytes); +} + +void PGMonitor::tick() +{ + if (!is_active()) return; + if (mon->osdmon()->osdmap.require_osd_release >= CEPH_RELEASE_LUMINOUS) { + return; + } + + handle_osd_timeouts(); + + if (!pg_map.pg_sum_deltas.empty()) { + utime_t age = ceph_clock_now() - pg_map.stamp; + if (age > 2 * g_conf->mon_delta_reset_interval) { + dout(10) << " clearing pg_map delta (" << age << " > " << g_conf->mon_delta_reset_interval << " seconds old)" << dendl; + pg_map.clear_delta(); + } + } + + /* If we have deltas for pools, run through pgmap's 'per_pool_sum_delta' and + * clear any deltas that are old enough. + * + * Note that 'per_pool_sum_delta' keeps a pool id as key, and a pair containing + * the calc'ed stats delta and an absolute timestamp from when those stats were + * obtained -- the timestamp IS NOT a delta itself. + */ + if (!pg_map.per_pool_sum_deltas.empty()) { + ceph::unordered_map >::iterator it; + for (it = pg_map.per_pool_sum_delta.begin(); + it != pg_map.per_pool_sum_delta.end(); ) { + utime_t age = ceph_clock_now() - it->second.second; + if (age > 2*g_conf->mon_delta_reset_interval) { + dout(10) << " clearing pg_map delta for pool " << it->first + << " (" << age << " > " << g_conf->mon_delta_reset_interval + << " seconds old)" << dendl; + pg_map.per_pool_sum_deltas.erase(it->first); + pg_map.per_pool_sum_deltas_stamps.erase(it->first); + pg_map.per_pool_sum_delta.erase((it++)->first); + } else { + ++it; + } + } + } + + dout(10) << pg_map << dendl; +} + +void PGMonitor::create_initial() +{ + dout(10) << "create_initial -- creating initial map" << dendl; + format_version = 1; +} + +void PGMonitor::update_from_paxos(bool *need_bootstrap) +{ + if (did_delete) + return; + + if (get_value("deleted")) { + did_delete = true; + dout(10) << __func__ << " deleted, clearing in-memory PGMap" << dendl; + pg_map = PGMap(); + pending_inc = PGMap::Incremental(); + pgservice.reset(); + last_osd_report.clear(); + return; + } + + version_t version = get_last_committed(); + if (version == pg_map.version) + return; + + assert(version >= pg_map.version); + if (format_version < 1) { + derr << __func__ << "unsupported monitor protocol: " + << get_service_name() << ".format_version = " + << format_version << dendl; + } + assert(format_version >= 1); + + // pg/osd keys in leveldb + // read meta + while (version > pg_map.version) { + // load full state? + if (pg_map.version == 0) { + dout(10) << __func__ << " v0, read_full" << dendl; + read_pgmap_full(); + goto out; + } + + // incremental state? + dout(10) << __func__ << " read_incremental" << dendl; + bufferlist bl; + int r = get_version(pg_map.version + 1, bl); + if (r == -ENOENT) { + dout(10) << __func__ << " failed to read_incremental, read_full" << dendl; + // reset pg map + pg_map = PGMap(); + read_pgmap_full(); + goto out; + } + assert(r == 0); + apply_pgmap_delta(bl); + } + + read_pgmap_meta(); + +out: + assert(version == pg_map.version); + + update_logger(); +} + +void PGMonitor::on_upgrade() +{ + dout(1) << __func__ << " discarding in-core PGMap" << dendl; + pg_map = PGMap(); +} + +void PGMonitor::upgrade_format() +{ + unsigned current = 1; + assert(format_version == current); +} + +void PGMonitor::post_paxos_update() +{ + if (did_delete) + return; + dout(10) << __func__ << dendl; + OSDMap& osdmap = mon->osdmon()->osdmap; + if (mon->monmap->get_required_features().contains_all( + ceph::features::mon::FEATURE_LUMINOUS)) { + // let OSDMonitor take care of the pg-creates subscriptions. + return; + } + if (osdmap.get_epoch()) { + if (osdmap.get_num_up_osds() > 0) { + assert(osdmap.get_up_osd_features() & CEPH_FEATURE_MON_STATEFUL_SUB); + check_subs(); + } + } +} + +void PGMonitor::handle_osd_timeouts() +{ + if (!mon->is_leader()) + return; + if (did_delete) + return; + + utime_t now(ceph_clock_now()); + utime_t timeo(g_conf->mon_osd_report_timeout, 0); + if (now - mon->get_leader_since() < timeo) { + // We haven't been the leader for long enough to consider OSD timeouts + return; + } + + if (mon->osdmon()->is_writeable()) + mon->osdmon()->handle_osd_timeouts(now, last_osd_report); +} + +void PGMonitor::create_pending() +{ + if (did_delete) + return; + do_delete = false; + pending_inc = PGMap::Incremental(); + pending_inc.version = pg_map.version + 1; + if (pg_map.version == 0) { + // pull initial values from first leader mon's config + pending_inc.full_ratio = g_conf->mon_osd_full_ratio; + if (pending_inc.full_ratio > 1.0) + pending_inc.full_ratio /= 100.0; + pending_inc.nearfull_ratio = g_conf->mon_osd_nearfull_ratio; + if (pending_inc.nearfull_ratio > 1.0) + pending_inc.nearfull_ratio /= 100.0; + } else { + pending_inc.full_ratio = pg_map.full_ratio; + pending_inc.nearfull_ratio = pg_map.nearfull_ratio; + } + dout(10) << "create_pending v " << pending_inc.version << dendl; +} + +void PGMonitor::read_pgmap_meta() +{ + dout(10) << __func__ << dendl; + + string prefix = pgmap_meta_prefix; + + version_t version = mon->store->get(prefix, "version"); + epoch_t last_osdmap_epoch = mon->store->get(prefix, "last_osdmap_epoch"); + epoch_t last_pg_scan = mon->store->get(prefix, "last_pg_scan"); + pg_map.set_version(version); + pg_map.set_last_osdmap_epoch(last_osdmap_epoch); + + if (last_pg_scan != pg_map.get_last_pg_scan()) { + pg_map.set_last_pg_scan(last_pg_scan); + } + + float full_ratio, nearfull_ratio; + { + bufferlist bl; + mon->store->get(prefix, "full_ratio", bl); + bufferlist::iterator p = bl.begin(); + ::decode(full_ratio, p); + } + { + bufferlist bl; + mon->store->get(prefix, "nearfull_ratio", bl); + bufferlist::iterator p = bl.begin(); + ::decode(nearfull_ratio, p); + } + pg_map.set_full_ratios(full_ratio, nearfull_ratio); + { + bufferlist bl; + mon->store->get(prefix, "stamp", bl); + bufferlist::iterator p = bl.begin(); + utime_t stamp; + ::decode(stamp, p); + pg_map.set_stamp(stamp); + } +} + +void PGMonitor::read_pgmap_full() +{ + read_pgmap_meta(); + + string prefix = pgmap_pg_prefix; + for (KeyValueDB::Iterator i = mon->store->get_iterator(prefix); i->valid(); i->next()) { + string key = i->key(); + pg_t pgid; + if (!pgid.parse(key.c_str())) { + dout(0) << "unable to parse key " << key << dendl; + continue; + } + bufferlist bl = i->value(); + pg_map.update_pg(pgid, bl); + dout(20) << " got " << pgid << dendl; + } + + prefix = pgmap_osd_prefix; + for (KeyValueDB::Iterator i = mon->store->get_iterator(prefix); i->valid(); i->next()) { + string key = i->key(); + int osd = atoi(key.c_str()); + bufferlist bl = i->value(); + pg_map.update_osd(osd, bl); + dout(20) << " got osd." << osd << dendl; + } +} + +void PGMonitor::apply_pgmap_delta(bufferlist& bl) +{ + version_t v = pg_map.version + 1; + + utime_t inc_stamp; + bufferlist dirty_pgs, dirty_osds; + { + bufferlist::iterator p = bl.begin(); + ::decode(inc_stamp, p); + ::decode(dirty_pgs, p); + ::decode(dirty_osds, p); + } + + pool_stat_t pg_sum_old = pg_map.pg_sum; + mempool::pgmap::unordered_map pg_pool_sum_old; + + // pgs + set deleted_pools; + bufferlist::iterator p = dirty_pgs.begin(); + while (!p.end()) { + pg_t pgid; + ::decode(pgid, p); + + int r; + bufferlist pgbl; + if (deleted_pools.count(pgid.pool())) { + r = -ENOENT; + } else { + r = mon->store->get(pgmap_pg_prefix, stringify(pgid), pgbl); + if (pg_pool_sum_old.count(pgid.pool()) == 0) + pg_pool_sum_old[pgid.pool()] = pg_map.pg_pool_sum[pgid.pool()]; + } + + if (r >= 0) { + pg_map.update_pg(pgid, pgbl); + dout(20) << " refreshing pg " << pgid + << " " << pg_map.pg_stat[pgid].reported_epoch + << ":" << pg_map.pg_stat[pgid].reported_seq + << " " << pg_state_string(pg_map.pg_stat[pgid].state) + << dendl; + } else { + dout(20) << " removing pg " << pgid << dendl; + pg_map.remove_pg(pgid); + if (pgid.ps() == 0) + deleted_pools.insert(pgid.pool()); + } + } + + // osds + p = dirty_osds.begin(); + while (!p.end()) { + int32_t osd; + ::decode(osd, p); + dout(20) << " refreshing osd." << osd << dendl; + bufferlist bl; + int r = mon->store->get(pgmap_osd_prefix, stringify(osd), bl); + if (r >= 0) { + pg_map.update_osd(osd, bl); + } else { + pg_map.remove_osd(osd); + } + } + + pg_map.update_global_delta(g_ceph_context, inc_stamp, pg_sum_old); + pg_map.update_pool_deltas(g_ceph_context, inc_stamp, pg_pool_sum_old); + + // clean up deleted pools after updating the deltas + for (set::iterator p = deleted_pools.begin(); + p != deleted_pools.end(); + ++p) { + dout(20) << " deleted pool " << *p << dendl; + pg_map.deleted_pool(*p); + } + + // ok, we're now on the new version + pg_map.version = v; +} + + +void PGMonitor::encode_pending(MonitorDBStore::TransactionRef t) +{ + if (did_delete) + return; + + string prefix = pgmap_meta_prefix; + if (do_delete) { + dout(1) << __func__ << " clearing pgmap data at v" << pending_inc.version + << dendl; + do_delete = false; + for (auto key : { "version", "stamp", "last_osdmap_epoch", + "last_pg_scan", "full_ratio", "nearfull_ratio" }) { + t->erase(prefix, key); + } + for (auto& p : pg_map.pg_stat) { + t->erase(prefix, stringify(p.first)); + } + for (auto& p : pg_map.osd_stat) { + t->erase(prefix, stringify(p.first)); + } + put_last_committed(t, pending_inc.version); + put_value(t, "deleted", 1); + return; + } + + assert(mon->osdmon()->osdmap.require_osd_release < CEPH_RELEASE_LUMINOUS || + pending_inc.version == 1 /* rebuild-mondb.yaml case */); + + version_t version = pending_inc.version; + dout(10) << __func__ << " v " << version << dendl; + assert(get_last_committed() + 1 == version); + pending_inc.stamp = ceph_clock_now(); + + uint64_t features = mon->get_quorum_con_features(); + + t->put(prefix, "version", pending_inc.version); + { + bufferlist bl; + ::encode(pending_inc.stamp, bl); + t->put(prefix, "stamp", bl); + } + + if (pending_inc.osdmap_epoch) + t->put(prefix, "last_osdmap_epoch", pending_inc.osdmap_epoch); + if (pending_inc.pg_scan) + t->put(prefix, "last_pg_scan", pending_inc.pg_scan); + if (pending_inc.full_ratio > 0) { + bufferlist bl; + ::encode(pending_inc.full_ratio, bl); + t->put(prefix, "full_ratio", bl); + } + if (pending_inc.nearfull_ratio > 0) { + bufferlist bl; + ::encode(pending_inc.nearfull_ratio, bl); + t->put(prefix, "nearfull_ratio", bl); + } + + bufferlist incbl; + ::encode(pending_inc.stamp, incbl); + { + bufferlist dirty; + string prefix = pgmap_pg_prefix; + for (map::const_iterator p = pending_inc.pg_stat_updates.begin(); + p != pending_inc.pg_stat_updates.end(); + ++p) { + ::encode(p->first, dirty); + bufferlist bl; + ::encode(p->second, bl, features); + t->put(prefix, stringify(p->first), bl); + } + for (set::const_iterator p = pending_inc.pg_remove.begin(); p != pending_inc.pg_remove.end(); ++p) { + ::encode(*p, dirty); + t->erase(prefix, stringify(*p)); + } + ::encode(dirty, incbl); + } + { + bufferlist dirty; + string prefix = pgmap_osd_prefix; + for (map::const_iterator p = + pending_inc.get_osd_stat_updates().begin(); + p != pending_inc.get_osd_stat_updates().end(); + ++p) { + ::encode(p->first, dirty); + bufferlist bl; + ::encode(p->second, bl, features); + ::encode(pending_inc.get_osd_epochs().find(p->first)->second, bl); + t->put(prefix, stringify(p->first), bl); + } + for (set::const_iterator p = + pending_inc.get_osd_stat_rm().begin(); + p != pending_inc.get_osd_stat_rm().end(); + ++p) { + ::encode(*p, dirty); + t->erase(prefix, stringify(*p)); + } + ::encode(dirty, incbl); + } + + put_version(t, version, incbl); + + put_last_committed(t, version); +} + +version_t PGMonitor::get_trim_to() +{ + unsigned max = g_conf->mon_max_pgmap_epochs; + version_t version = get_last_committed(); + if (mon->is_leader() && (version > max)) + return version - max; + + return 0; +} + +bool PGMonitor::preprocess_query(MonOpRequestRef op) +{ + op->mark_pgmon_event(__func__); + PaxosServiceMessage *m = static_cast(op->get_req()); + dout(10) << "preprocess_query " << *m + << " from " << m->get_orig_source_inst() << dendl; + switch (m->get_type()) { + case MSG_PGSTATS: + if (mon->osdmon()->osdmap.require_osd_release >= CEPH_RELEASE_LUMINOUS) { + return true; + } + return preprocess_pg_stats(op); + + case MSG_MON_COMMAND: + if (mon->osdmon()->osdmap.require_osd_release >= CEPH_RELEASE_LUMINOUS) { + bufferlist rdata; + mon->reply_command(op, -EOPNOTSUPP, "this command is obsolete", rdata, + get_last_committed()); + return true; + } + return preprocess_command(op); + + default: + ceph_abort(); + return true; + } +} + +bool PGMonitor::prepare_update(MonOpRequestRef op) +{ + if (mon->osdmon()->osdmap.require_osd_release >= CEPH_RELEASE_LUMINOUS) { + return false; + } + + op->mark_pgmon_event(__func__); + PaxosServiceMessage *m = static_cast(op->get_req()); + dout(10) << "prepare_update " << *m << " from " << m->get_orig_source_inst() << dendl; + switch (m->get_type()) { + case MSG_PGSTATS: + return prepare_pg_stats(op); + + case MSG_MON_COMMAND: + return prepare_command(op); + + default: + ceph_abort(); + return false; + } +} + +bool PGMonitor::preprocess_pg_stats(MonOpRequestRef op) +{ + op->mark_pgmon_event(__func__); + MPGStats *stats = static_cast(op->get_req()); + // check caps + MonSession *session = stats->get_session(); + if (!session) { + dout(10) << "PGMonitor::preprocess_pg_stats: no monitor session!" << dendl; + return true; + } + if (!session->is_capable("pg", MON_CAP_R)) { + derr << "PGMonitor::preprocess_pg_stats: MPGStats received from entity " + << "with insufficient privileges " << session->caps << dendl; + return true; + } + + if (stats->fsid != mon->monmap->fsid) { + dout(0) << __func__ << " drop message on fsid " << stats->fsid << " != " + << mon->monmap->fsid << " for " << *stats << dendl; + return true; + } + + // First, just see if they need a new osdmap. But + // only if they've had the map for a while. + if (stats->had_map_for > 30.0 && + mon->osdmon()->is_readable() && + stats->epoch < mon->osdmon()->osdmap.get_epoch() && + !session->proxy_con) + mon->osdmon()->send_latest_now_nodelete(op, stats->epoch+1); + + // Always forward the PGStats to the leader, even if they are the same as + // the old PGStats. The leader will mark as down osds that haven't sent + // PGStats for a few minutes. + return false; +} + +bool PGMonitor::pg_stats_have_changed(int from, const MPGStats *stats) const +{ + // any new osd info? + ceph::unordered_map::const_iterator s = pg_map.osd_stat.find(from); + if (s == pg_map.osd_stat.end()) + return true; + + if (s->second != stats->osd_stat) + return true; + + // any new pg info? + for (map::const_iterator p = stats->pg_stat.begin(); + p != stats->pg_stat.end(); ++p) { + ceph::unordered_map::const_iterator t = pg_map.pg_stat.find(p->first); + if (t == pg_map.pg_stat.end()) + return true; + + if (t->second.reported_epoch != p->second.reported_epoch || + t->second.reported_seq != p->second.reported_seq) + return true; + } + + return false; +} + +struct PGMonitor::C_Stats : public C_MonOp { + PGMonitor *pgmon; + MonOpRequestRef stats_op_ack; + entity_inst_t who; + C_Stats(PGMonitor *p, + MonOpRequestRef op, + MonOpRequestRef op_ack) + : C_MonOp(op), pgmon(p), stats_op_ack(op_ack) {} + void _finish(int r) override { + if (r >= 0) { + pgmon->_updated_stats(op, stats_op_ack); + } else if (r == -ECANCELED) { + return; + } else if (r == -EAGAIN) { + pgmon->dispatch(op); + } else { + assert(0 == "bad C_Stats return value"); + } + } +}; + +bool PGMonitor::prepare_pg_stats(MonOpRequestRef op) +{ + op->mark_pgmon_event(__func__); + MPGStats *stats = static_cast(op->get_req()); + dout(10) << "prepare_pg_stats " << *stats << " from " << stats->get_orig_source() << dendl; + int from = stats->get_orig_source().num(); + + if (stats->fsid != mon->monmap->fsid) { + dout(0) << "prepare_pg_stats on fsid " << stats->fsid << " != " << mon->monmap->fsid << dendl; + return false; + } + + if (!stats->get_orig_source().is_osd() || + !mon->osdmon()->osdmap.is_up(from) || + stats->get_orig_source_inst() != mon->osdmon()->osdmap.get_inst(from)) { + dout(1) << " ignoring stats from non-active osd." << dendl; + return false; + } + + last_osd_report[from] = ceph_clock_now(); + + if (!pg_stats_have_changed(from, stats)) { + dout(10) << " message contains no new osd|pg stats" << dendl; + MPGStatsAck *ack = new MPGStatsAck; + ack->set_tid(stats->get_tid()); + for (map::const_iterator p = stats->pg_stat.begin(); + p != stats->pg_stat.end(); + ++p) { + ack->pg_stat[p->first] = make_pair(p->second.reported_seq, p->second.reported_epoch); + } + mon->send_reply(op, ack); + return false; + } + + // osd stat + if (mon->osdmon()->osdmap.is_in(from)) { + pending_inc.update_stat(from, stats->epoch, std::move(stats->osd_stat)); + } else { + pending_inc.update_stat(from, stats->epoch, osd_stat_t()); + } + + if (pg_map.osd_stat.count(from)) + dout(10) << " got osd." << from << " " << stats->osd_stat << " (was " << pg_map.osd_stat[from] << ")" << dendl; + else + dout(10) << " got osd." << from << " " << stats->osd_stat << " (first report)" << dendl; + + // pg stats + MPGStatsAck *ack = new MPGStatsAck; + MonOpRequestRef ack_op = mon->op_tracker.create_request(ack); + ack->set_tid(stats->get_tid()); + for (map::iterator p = stats->pg_stat.begin(); + p != stats->pg_stat.end(); + ++p) { + pg_t pgid = p->first; + ack->pg_stat[pgid] = make_pair(p->second.reported_seq, p->second.reported_epoch); + + if (pg_map.pg_stat.count(pgid) && + pg_map.pg_stat[pgid].get_version_pair() > p->second.get_version_pair()) { + dout(15) << " had " << pgid << " from " << pg_map.pg_stat[pgid].reported_epoch << ":" + << pg_map.pg_stat[pgid].reported_seq << dendl; + continue; + } + if (pending_inc.pg_stat_updates.count(pgid) && + pending_inc.pg_stat_updates[pgid].get_version_pair() > p->second.get_version_pair()) { + dout(15) << " had " << pgid << " from " << pending_inc.pg_stat_updates[pgid].reported_epoch << ":" + << pending_inc.pg_stat_updates[pgid].reported_seq << " (pending)" << dendl; + continue; + } + + if (pg_map.pg_stat.count(pgid) == 0) { + dout(15) << " got " << pgid << " reported at " << p->second.reported_epoch << ":" + << p->second.reported_seq + << " state " << pg_state_string(p->second.state) + << " but DNE in pg_map; pool was probably deleted." + << dendl; + continue; + } + + dout(15) << " got " << pgid + << " reported at " << p->second.reported_epoch << ":" << p->second.reported_seq + << " state " << pg_state_string(pg_map.pg_stat[pgid].state) + << " -> " << pg_state_string(p->second.state) + << dendl; + pending_inc.pg_stat_updates[pgid] = p->second; + } + + wait_for_finished_proposal(op, new C_Stats(this, op, ack_op)); + return true; +} + +void PGMonitor::_updated_stats(MonOpRequestRef op, MonOpRequestRef ack_op) +{ + op->mark_pgmon_event(__func__); + ack_op->mark_pgmon_event(__func__); + MPGStats *ack = static_cast(ack_op->get_req()); + ack->get(); // MonOpRequestRef owns one ref; give the other to send_reply. + dout(7) << "_updated_stats for " + << op->get_req()->get_orig_source_inst() << dendl; + mon->send_reply(op, ack); +} + + +// ------------------------ + +struct RetryCheckOSDMap : public Context { + PGMonitor *pgmon; + epoch_t epoch; + RetryCheckOSDMap(PGMonitor *p, epoch_t e) : pgmon(p), epoch(e) { + } + void finish(int r) override { + if (r == -ECANCELED) + return; + + pgmon->check_osd_map(epoch); + } +}; + +void PGMonitor::check_osd_map(epoch_t epoch) +{ + if (mon->is_peon()) + return; // whatever. + + if (did_delete) + return; + + if (pg_map.last_osdmap_epoch >= epoch) { + dout(10) << __func__ << " already seen " << pg_map.last_osdmap_epoch + << " >= " << epoch << dendl; + return; + } + + if (!mon->osdmon()->is_readable()) { + dout(10) << __func__ << " -- osdmap not readable, waiting" << dendl; + mon->osdmon()->wait_for_readable_ctx(new RetryCheckOSDMap(this, epoch)); + return; + } + + if (!is_writeable()) { + dout(10) << __func__ << " -- pgmap not writeable, waiting" << dendl; + wait_for_writeable_ctx(new RetryCheckOSDMap(this, epoch)); + return; + } + + const OSDMap& osdmap = mon->osdmon()->osdmap; + if (!did_delete && osdmap.require_osd_release >= CEPH_RELEASE_LUMINOUS) { + // delete all my data + dout(1) << __func__ << " will clear pg_map data" << dendl; + do_delete = true; + propose_pending(); + return; + } + + // osds that went up or down + set need_check_down_pg_osds; + + // apply latest map(s) + epoch = std::max(epoch, osdmap.get_epoch()); + for (epoch_t e = pg_map.last_osdmap_epoch+1; + e <= epoch; + e++) { + dout(10) << __func__ << " applying osdmap e" << e << " to pg_map" << dendl; + bufferlist bl; + int err = mon->osdmon()->get_version(e, bl); + assert(err == 0); + + assert(bl.length()); + OSDMap::Incremental inc(bl); + + PGMapUpdater::check_osd_map(inc, &need_check_down_pg_osds, + &last_osd_report, &pg_map, &pending_inc); + } + + assert(pg_map.last_osdmap_epoch < epoch); + pending_inc.osdmap_epoch = epoch; + PGMapUpdater::update_creating_pgs(osdmap, pg_map, &pending_inc); + PGMapUpdater::register_new_pgs(osdmap, pg_map, &pending_inc); + + PGMapUpdater::check_down_pgs(osdmap, pg_map, check_all_pgs, + need_check_down_pg_osds, &pending_inc); + check_all_pgs = false; + + propose_pending(); +} + +epoch_t PGMonitor::send_pg_creates(int osd, Connection *con, epoch_t next) +{ + dout(30) << __func__ << " " << pg_map.creating_pgs_by_osd_epoch << dendl; + map > >::iterator p = + pg_map.creating_pgs_by_osd_epoch.find(osd); + if (p == pg_map.creating_pgs_by_osd_epoch.end()) + return next; + + assert(p->second.size() > 0); + + MOSDPGCreate *m = NULL; + epoch_t last = 0; + for (map >::iterator q = p->second.lower_bound(next); + q != p->second.end(); + ++q) { + dout(20) << __func__ << " osd." << osd << " from " << next + << " : epoch " << q->first << " " << q->second.size() << " pgs" + << dendl; + last = q->first; + for (set::iterator r = q->second.begin(); r != q->second.end(); ++r) { + pg_stat_t &st = pg_map.pg_stat[*r]; + if (!m) + m = new MOSDPGCreate(pg_map.last_osdmap_epoch); + m->mkpg[*r] = pg_create_t(st.created, + st.parent, + st.parent_split_bits); + // Need the create time from the monitor using its clock to set + // last_scrub_stamp upon pg creation. + m->ctimes[*r] = pg_map.pg_stat[*r].last_scrub_stamp; + } + } + if (!m) { + dout(20) << "send_pg_creates osd." << osd << " from " << next + << " has nothing to send" << dendl; + return next; + } + + con->send_message(m); + + // sub is current through last + 1 + return last + 1; +} + +bool PGMonitor::preprocess_command(MonOpRequestRef op) +{ + op->mark_pgmon_event(__func__); + MMonCommand *m = static_cast(op->get_req()); + int r = -1; + bufferlist rdata; + stringstream ss, ds; + + if (m->fsid != mon->monmap->fsid) { + dout(0) << __func__ << " drop message on fsid " << m->fsid << " != " + << mon->monmap->fsid << " for " << *m << dendl; + return true; + } + + map cmdmap; + if (!cmdmap_from_json(m->cmd, &cmdmap, ss)) { + // ss has reason for failure + string rs = ss.str(); + mon->reply_command(op, -EINVAL, rs, rdata, get_last_committed()); + return true; + } + + string prefix; + cmd_getval(g_ceph_context, cmdmap, "prefix", prefix); + + MonSession *session = m->get_session(); + if (!session) { + mon->reply_command(op, -EACCES, "access denied", rdata, get_last_committed()); + return true; + } + + string format; + cmd_getval(g_ceph_context, cmdmap, "format", format, string("plain")); + boost::scoped_ptr f(Formatter::create(format)); + + if (prefix == "pg scrub" || + prefix == "pg repair" || + prefix == "pg deep-scrub") { + string scrubop = prefix.substr(3, string::npos); + pg_t pgid; + string pgidstr; + cmd_getval(g_ceph_context, cmdmap, "pgid", pgidstr); + if (!pgid.parse(pgidstr.c_str())) { + ss << "invalid pgid '" << pgidstr << "'"; + r = -EINVAL; + goto reply; + } + if (!pg_map.pg_stat.count(pgid)) { + ss << "pg " << pgid << " dne"; + r = -ENOENT; + goto reply; + } + int osd = pg_map.pg_stat[pgid].acting_primary; + if (osd == -1) { + ss << "pg " << pgid << " has no primary osd"; + r = -EAGAIN; + goto reply; + } + if (!mon->osdmon()->osdmap.is_up(osd)) { + ss << "pg " << pgid << " primary osd." << osd << " not up"; + r = -EAGAIN; + goto reply; + } + vector pgs(1); + pgs[0] = pgid; + mon->try_send_message(new MOSDScrub(mon->monmap->fsid, pgs, + scrubop == "repair", + scrubop == "deep-scrub"), + mon->osdmon()->osdmap.get_inst(osd)); + ss << "instructing pg " << pgid << " on osd." << osd << " to " << scrubop; + r = 0; + } else { + r = process_pg_map_command(prefix, cmdmap, pg_map, mon->osdmon()->osdmap, + f.get(), &ss, &rdata); + } + + if (r == -EOPNOTSUPP) + return false; + +reply: + string rs; + getline(ss, rs); + rdata.append(ds); + mon->reply_command(op, r, rs, rdata, get_last_committed()); + return true; +} + +bool PGMonitor::prepare_command(MonOpRequestRef op) +{ + op->mark_pgmon_event(__func__); + MMonCommand *m = static_cast(op->get_req()); + if (m->fsid != mon->monmap->fsid) { + dout(0) << __func__ << " drop message on fsid " << m->fsid << " != " + << mon->monmap->fsid << " for " << *m << dendl; + return true; + } + stringstream ss; + pg_t pgid; + epoch_t epoch = mon->osdmon()->osdmap.get_epoch(); + int r = 0; + string rs; + + map cmdmap; + if (!cmdmap_from_json(m->cmd, &cmdmap, ss)) { + // ss has reason for failure + string rs = ss.str(); + mon->reply_command(op, -EINVAL, rs, get_last_committed()); + return true; + } + + string prefix; + cmd_getval(g_ceph_context, cmdmap, "prefix", prefix); + + MonSession *session = m->get_session(); + if (!session) { + mon->reply_command(op, -EACCES, "access denied", get_last_committed()); + return true; + } + + if (prefix == "pg force_create_pg") { + string pgidstr; + cmd_getval(g_ceph_context, cmdmap, "pgid", pgidstr); + if (!pgid.parse(pgidstr.c_str())) { + ss << "pg " << pgidstr << " invalid"; + r = -EINVAL; + goto reply; + } + if (!pg_map.pg_stat.count(pgid)) { + ss << "pg " << pgid << " dne"; + r = -ENOENT; + goto reply; + } + if (pg_map.creating_pgs.count(pgid)) { + ss << "pg " << pgid << " already creating"; + r = 0; + goto reply; + } + { + PGMapUpdater::register_pg( + mon->osdmon()->osdmap, + pgid, + epoch, + true, + pg_map, + &pending_inc); + } + ss << "pg " << pgidstr << " now creating, ok"; + goto update; + } else if (prefix == "pg force-recovery" || + prefix == "pg force-backfill" || + prefix == "pg cancel-force-recovery" || + prefix == "pg cancel-force-backfill") { + if (mon->osdmon()->osdmap.require_osd_release >= CEPH_RELEASE_LUMINOUS) { + ss << "you must complete the upgrade and 'ceph osd require-osd-release " + << "luminous' before using forced recovery"; + r = -EPERM; + goto reply; + } + } else if (prefix == "pg set_full_ratio" || + prefix == "pg set_nearfull_ratio") { + if (mon->osdmon()->osdmap.require_osd_release >= CEPH_RELEASE_LUMINOUS) { + ss << "please use the new luminous interfaces" + << " ('osd set-full-ratio' and 'osd set-nearfull-ratio')"; + r = -EPERM; + goto reply; + } + double n; + if (!cmd_getval(g_ceph_context, cmdmap, "ratio", n)) { + ss << "unable to parse 'ratio' value '" + << cmd_vartype_stringify(cmdmap["who"]) << "'"; + r = -EINVAL; + goto reply; + } + string op = prefix.substr(3, string::npos); + if (op == "set_full_ratio") + pending_inc.full_ratio = n; + else if (op == "set_nearfull_ratio") + pending_inc.nearfull_ratio = n; + goto update; + } else { + r = -EINVAL; + goto reply; + } + +reply: + getline(ss, rs); + if (r < 0 && rs.length() == 0) + rs = cpp_strerror(r); + mon->reply_command(op, r, rs, get_last_committed()); + return false; + +update: + getline(ss, rs); + wait_for_finished_proposal(op, new Monitor::C_Command( + mon, op, r, rs, get_last_committed() + 1)); + return true; +} + +void PGMonitor::get_health(list >& summary, + list > *detail, + CephContext *cct) const +{ + // legacy pre-luminous full/nearfull + if (mon->osdmon()->osdmap.require_osd_release < CEPH_RELEASE_LUMINOUS) { + check_full_osd_health(summary, detail, pg_map.full_osds, "full", + HEALTH_ERR); + check_full_osd_health(summary, detail, pg_map.nearfull_osds, "near full", + HEALTH_WARN); + pg_map.get_health(cct, mon->osdmon()->osdmap, summary, detail); + } +} + +void PGMonitor::check_full_osd_health(list >& summary, + list > *detail, + const mempool::pgmap::set& s, const char *desc, + health_status_t sev) const +{ + if (!s.empty()) { + ostringstream ss; + ss << s.size() << " " << desc << " osd(s)"; + summary.push_back(make_pair(sev, ss.str())); + if (detail) { + for (set::const_iterator p = s.begin(); p != s.end(); ++p) { + ostringstream ss; + const osd_stat_t& os = pg_map.osd_stat.find(*p)->second; + int ratio = (int)(((float)os.kb_used) / (float) os.kb * 100.0); + ss << "osd." << *p << " is " << desc << " at " << ratio << "%"; + detail->push_back(make_pair(sev, ss.str())); + } + } + } +} + +void PGMonitor::check_subs() +{ + if (mon->osdmon()->osdmap.require_osd_release >= CEPH_RELEASE_LUMINOUS) { + return; + } + + dout(10) << __func__ << dendl; + const string type = "osd_pg_creates"; + + mon->with_session_map([this, &type](const MonSessionMap& session_map) { + if (mon->session_map.subs.count(type) == 0) + return; + + auto p = mon->session_map.subs[type]->begin(); + while (!p.end()) { + Subscription *sub = *p; + ++p; + dout(20) << __func__ << " .. " << sub->session->inst << dendl; + check_sub(sub); + } + }); +} + +bool PGMonitor::check_sub(Subscription *sub) +{ + OSDMap& osdmap = mon->osdmon()->osdmap; + if (sub->type == "osd_pg_creates") { + // only send these if the OSD is up. we will check_subs() when they do + // come up so they will get the creates then. + if (sub->session->inst.name.is_osd() && + osdmap.is_up(sub->session->inst.name.num())) { + sub->next = send_pg_creates(sub->session->inst.name.num(), + sub->session->con.get(), + sub->next); + } + } + return true; +} + +class PGMonStatService : public MonPGStatService, public PGMapStatService { + PGMonitor *pgmon; +public: + PGMonStatService(const PGMap& o, PGMonitor *pgm) + : MonPGStatService(), PGMapStatService(o), pgmon(pgm) {} + + + bool is_readable() const override { return pgmon->is_readable(); } + + unsigned maybe_add_creating_pgs(epoch_t scan_epoch, + const mempool::osdmap::map& pools, + creating_pgs_t *pending_creates) const override + { + if (pgmap.last_pg_scan < scan_epoch) { + return 0; + } + unsigned added = 0; + for (auto& pgid : pgmap.creating_pgs) { + if (!pools.count(pgid.pool())) { + continue; + } + auto st = pgmap.pg_stat.find(pgid); + assert(st != pgmap.pg_stat.end()); + auto created = make_pair(st->second.created, + st->second.last_scrub_stamp); + // no need to add the pg, if it already exists in creating_pgs + if (pending_creates->pgs.emplace(pgid, created).second) { + added++; + } + } + return added; + } + void maybe_trim_creating_pgs(creating_pgs_t *creates) const override { + auto p = creates->pgs.begin(); + while (p != creates->pgs.end()) { + auto q = pgmap.pg_stat.find(p->first); + if (q != pgmap.pg_stat.end() && + !(q->second.state & PG_STATE_CREATING)) { + p = creates->pgs.erase(p); + creates->created_pools.insert(q->first.pool()); + } else { + ++p; + } + } + } + void dump_info(Formatter *f) const override { + f->dump_object("pgmap", pgmap); + f->dump_unsigned("pgmap_first_committed", pgmon->get_first_committed()); + f->dump_unsigned("pgmap_last_committed", pgmon->get_last_committed()); + } + int process_pg_command(const string& prefix, + const map& cmdmap, + const OSDMap& osdmap, + Formatter *f, + stringstream *ss, + bufferlist *odata) const override { + return process_pg_map_command(prefix, cmdmap, pgmap, osdmap, f, ss, odata); + } + + int reweight_by_utilization(const OSDMap &osd_map, + int oload, + double max_changef, + int max_osds, + bool by_pg, const set *pools, + bool no_increasing, + mempool::osdmap::map* new_weights, + std::stringstream *ss, + std::string *out_str, + Formatter *f) const override { + return reweight::by_utilization(osd_map, pgmap, oload, max_changef, + max_osds, by_pg, pools, no_increasing, + new_weights, ss, out_str, f); + } +}; + +MonPGStatService *PGMonitor::get_pg_stat_service() +{ + if (!pgservice) { + pgservice.reset(new PGMonStatService(pg_map, this)); + } + return pgservice.get(); +} + +PGMonitor::PGMonitor(Monitor *mn, Paxos *p, const string& service_name) + : PaxosService(mn, p, service_name), + pgmap_meta_prefix("pgmap_meta"), + pgmap_pg_prefix("pgmap_pg"), + pgmap_osd_prefix("pgmap_osd") +{} + +PGMonitor::~PGMonitor() = default;