X-Git-Url: https://gerrit.opnfv.org/gerrit/gitweb?a=blobdiff_plain;f=src%2Fceph%2Fsrc%2Fmgr%2FClusterState.cc;fp=src%2Fceph%2Fsrc%2Fmgr%2FClusterState.cc;h=7e01a811ef158b67da452faad8f8e3ed55c6c5a3;hb=812ff6ca9fcd3e629e49d4328905f33eee8ca3f5;hp=0000000000000000000000000000000000000000;hpb=15280273faafb77777eab341909a3f495cf248d9;p=stor4nfv.git diff --git a/src/ceph/src/mgr/ClusterState.cc b/src/ceph/src/mgr/ClusterState.cc new file mode 100644 index 0000000..7e01a81 --- /dev/null +++ b/src/ceph/src/mgr/ClusterState.cc @@ -0,0 +1,174 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab +/* + * Ceph - scalable distributed file system + * + * Copyright (C) 2014 John Spray + * + * This is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License version 2.1, as published by the Free Software + * Foundation. See file COPYING. + */ + +#include "messages/MMgrDigest.h" +#include "messages/MMonMgrReport.h" +#include "messages/MPGStats.h" + +#include "mgr/ClusterState.h" + +#define dout_context g_ceph_context +#define dout_subsys ceph_subsys_mgr +#undef dout_prefix +#define dout_prefix *_dout << "mgr " << __func__ << " " + +ClusterState::ClusterState( + MonClient *monc_, + Objecter *objecter_, + const MgrMap& mgrmap) + : monc(monc_), + objecter(objecter_), + lock("ClusterState"), + mgr_map(mgrmap), + pgservice(pg_map) +{} + +void ClusterState::set_objecter(Objecter *objecter_) +{ + Mutex::Locker l(lock); + + objecter = objecter_; +} + +void ClusterState::set_fsmap(FSMap const &new_fsmap) +{ + Mutex::Locker l(lock); + + fsmap = new_fsmap; +} + +void ClusterState::set_mgr_map(MgrMap const &new_mgrmap) +{ + Mutex::Locker l(lock); + mgr_map = new_mgrmap; +} + +void ClusterState::set_service_map(ServiceMap const &new_service_map) +{ + Mutex::Locker l(lock); + servicemap = new_service_map; +} + +void ClusterState::load_digest(MMgrDigest *m) +{ + health_json = std::move(m->health_json); + mon_status_json = std::move(m->mon_status_json); +} + +void ClusterState::ingest_pgstats(MPGStats *stats) +{ + Mutex::Locker l(lock); + + const int from = stats->get_orig_source().num(); + bool is_in = false; + objecter->with_osdmap([&is_in, from](const OSDMap &osd_map){ + is_in = osd_map.is_in(from); + }); + + if (is_in) { + pending_inc.update_stat(from, stats->epoch, std::move(stats->osd_stat)); + } else { + pending_inc.update_stat(from, stats->epoch, osd_stat_t()); + } + + for (auto p : stats->pg_stat) { + pg_t pgid = p.first; + const auto &pg_stats = p.second; + + // In case we're hearing about a PG that according to last + // OSDMap update should not exist + if (existing_pools.count(pgid.pool()) == 0) { + dout(15) << " got " << pgid + << " reported at " << pg_stats.reported_epoch << ":" + << pg_stats.reported_seq + << " state " << pg_state_string(pg_stats.state) + << " but pool not in " << existing_pools + << dendl; + continue; + } + // In case we already heard about more recent stats from this PG + // from another OSD + const auto q = pg_map.pg_stat.find(pgid); + if (q != pg_map.pg_stat.end() && + q->second.get_version_pair() > pg_stats.get_version_pair()) { + dout(15) << " had " << pgid << " from " + << q->second.reported_epoch << ":" + << q->second.reported_seq << dendl; + continue; + } + + pending_inc.pg_stat_updates[pgid] = pg_stats; + } +} + +void ClusterState::update_delta_stats() +{ + pending_inc.stamp = ceph_clock_now(); + pending_inc.version = pg_map.version + 1; // to make apply_incremental happy + dout(10) << " v" << pending_inc.version << dendl; + + dout(30) << " pg_map before:\n"; + JSONFormatter jf(true); + jf.dump_object("pg_map", pg_map); + jf.flush(*_dout); + *_dout << dendl; + dout(30) << " incremental:\n"; + JSONFormatter jf(true); + jf.dump_object("pending_inc", pending_inc); + jf.flush(*_dout); + *_dout << dendl; + + pg_map.apply_incremental(g_ceph_context, pending_inc); + pending_inc = PGMap::Incremental(); +} + +void ClusterState::notify_osdmap(const OSDMap &osd_map) +{ + Mutex::Locker l(lock); + + pending_inc.stamp = ceph_clock_now(); + pending_inc.version = pg_map.version + 1; // to make apply_incremental happy + dout(10) << " v" << pending_inc.version << dendl; + + PGMapUpdater::check_osd_map(g_ceph_context, osd_map, pg_map, &pending_inc); + + // update our list of pools that exist, so that we can filter pg_map updates + // in synchrony with this OSDMap. + existing_pools.clear(); + for (auto& p : osd_map.get_pools()) { + existing_pools.insert(p.first); + } + + // brute force this for now (don't bother being clever by only + // checking osds that went up/down) + set need_check_down_pg_osds; + PGMapUpdater::check_down_pgs(osd_map, pg_map, true, + need_check_down_pg_osds, &pending_inc); + + dout(30) << " pg_map before:\n"; + JSONFormatter jf(true); + jf.dump_object("pg_map", pg_map); + jf.flush(*_dout); + *_dout << dendl; + dout(30) << " incremental:\n"; + JSONFormatter jf(true); + jf.dump_object("pending_inc", pending_inc); + jf.flush(*_dout); + *_dout << dendl; + + pg_map.apply_incremental(g_ceph_context, pending_inc); + pending_inc = PGMap::Incremental(); + // TODO: Complete the separation of PG state handling so + // that a cut-down set of functionality remains in PGMonitor + // while the full-blown PGMap lives only here. +}