+++ /dev/null
-// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
-// vim: ts=8 sw=2 smarttab
-/*
- * Ceph - scalable distributed file system
- *
- * Copyright (C) 2014 John Spray <john.spray@inktank.com>
- *
- * This is free software; you can redistribute it and/or
- * modify it under the terms of the GNU Lesser General Public
- * License version 2.1, as published by the Free Software
- * Foundation. See file COPYING.
- */
-
-#include "messages/MMgrDigest.h"
-#include "messages/MMonMgrReport.h"
-#include "messages/MPGStats.h"
-
-#include "mgr/ClusterState.h"
-
-#define dout_context g_ceph_context
-#define dout_subsys ceph_subsys_mgr
-#undef dout_prefix
-#define dout_prefix *_dout << "mgr " << __func__ << " "
-
-ClusterState::ClusterState(
- MonClient *monc_,
- Objecter *objecter_,
- const MgrMap& mgrmap)
- : monc(monc_),
- objecter(objecter_),
- lock("ClusterState"),
- mgr_map(mgrmap),
- pgservice(pg_map)
-{}
-
-void ClusterState::set_objecter(Objecter *objecter_)
-{
- Mutex::Locker l(lock);
-
- objecter = objecter_;
-}
-
-void ClusterState::set_fsmap(FSMap const &new_fsmap)
-{
- Mutex::Locker l(lock);
-
- fsmap = new_fsmap;
-}
-
-void ClusterState::set_mgr_map(MgrMap const &new_mgrmap)
-{
- Mutex::Locker l(lock);
- mgr_map = new_mgrmap;
-}
-
-void ClusterState::set_service_map(ServiceMap const &new_service_map)
-{
- Mutex::Locker l(lock);
- servicemap = new_service_map;
-}
-
-void ClusterState::load_digest(MMgrDigest *m)
-{
- health_json = std::move(m->health_json);
- mon_status_json = std::move(m->mon_status_json);
-}
-
-void ClusterState::ingest_pgstats(MPGStats *stats)
-{
- Mutex::Locker l(lock);
-
- const int from = stats->get_orig_source().num();
- bool is_in = false;
- objecter->with_osdmap([&is_in, from](const OSDMap &osd_map){
- is_in = osd_map.is_in(from);
- });
-
- if (is_in) {
- pending_inc.update_stat(from, stats->epoch, std::move(stats->osd_stat));
- } else {
- pending_inc.update_stat(from, stats->epoch, osd_stat_t());
- }
-
- for (auto p : stats->pg_stat) {
- pg_t pgid = p.first;
- const auto &pg_stats = p.second;
-
- // In case we're hearing about a PG that according to last
- // OSDMap update should not exist
- if (existing_pools.count(pgid.pool()) == 0) {
- dout(15) << " got " << pgid
- << " reported at " << pg_stats.reported_epoch << ":"
- << pg_stats.reported_seq
- << " state " << pg_state_string(pg_stats.state)
- << " but pool not in " << existing_pools
- << dendl;
- continue;
- }
- // In case we already heard about more recent stats from this PG
- // from another OSD
- const auto q = pg_map.pg_stat.find(pgid);
- if (q != pg_map.pg_stat.end() &&
- q->second.get_version_pair() > pg_stats.get_version_pair()) {
- dout(15) << " had " << pgid << " from "
- << q->second.reported_epoch << ":"
- << q->second.reported_seq << dendl;
- continue;
- }
-
- pending_inc.pg_stat_updates[pgid] = pg_stats;
- }
-}
-
-void ClusterState::update_delta_stats()
-{
- pending_inc.stamp = ceph_clock_now();
- pending_inc.version = pg_map.version + 1; // to make apply_incremental happy
- dout(10) << " v" << pending_inc.version << dendl;
-
- dout(30) << " pg_map before:\n";
- JSONFormatter jf(true);
- jf.dump_object("pg_map", pg_map);
- jf.flush(*_dout);
- *_dout << dendl;
- dout(30) << " incremental:\n";
- JSONFormatter jf(true);
- jf.dump_object("pending_inc", pending_inc);
- jf.flush(*_dout);
- *_dout << dendl;
-
- pg_map.apply_incremental(g_ceph_context, pending_inc);
- pending_inc = PGMap::Incremental();
-}
-
-void ClusterState::notify_osdmap(const OSDMap &osd_map)
-{
- Mutex::Locker l(lock);
-
- pending_inc.stamp = ceph_clock_now();
- pending_inc.version = pg_map.version + 1; // to make apply_incremental happy
- dout(10) << " v" << pending_inc.version << dendl;
-
- PGMapUpdater::check_osd_map(g_ceph_context, osd_map, pg_map, &pending_inc);
-
- // update our list of pools that exist, so that we can filter pg_map updates
- // in synchrony with this OSDMap.
- existing_pools.clear();
- for (auto& p : osd_map.get_pools()) {
- existing_pools.insert(p.first);
- }
-
- // brute force this for now (don't bother being clever by only
- // checking osds that went up/down)
- set<int> need_check_down_pg_osds;
- PGMapUpdater::check_down_pgs(osd_map, pg_map, true,
- need_check_down_pg_osds, &pending_inc);
-
- dout(30) << " pg_map before:\n";
- JSONFormatter jf(true);
- jf.dump_object("pg_map", pg_map);
- jf.flush(*_dout);
- *_dout << dendl;
- dout(30) << " incremental:\n";
- JSONFormatter jf(true);
- jf.dump_object("pending_inc", pending_inc);
- jf.flush(*_dout);
- *_dout << dendl;
-
- pg_map.apply_incremental(g_ceph_context, pending_inc);
- pending_inc = PGMap::Incremental();
- // TODO: Complete the separation of PG state handling so
- // that a cut-down set of functionality remains in PGMonitor
- // while the full-blown PGMap lives only here.
-}