Fix some bugs when testing opensds ansible
[stor4nfv.git] / src / ceph / src / mgr / ClusterState.cc
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
3 /*
4  * Ceph - scalable distributed file system
5  *
6  * Copyright (C) 2014 John Spray <john.spray@inktank.com>
7  *
8  * This is free software; you can redistribute it and/or
9  * modify it under the terms of the GNU Lesser General Public
10  * License version 2.1, as published by the Free Software
11  * Foundation.  See file COPYING.
12  */
13
14 #include "messages/MMgrDigest.h"
15 #include "messages/MMonMgrReport.h"
16 #include "messages/MPGStats.h"
17
18 #include "mgr/ClusterState.h"
19
20 #define dout_context g_ceph_context
21 #define dout_subsys ceph_subsys_mgr
22 #undef dout_prefix
23 #define dout_prefix *_dout << "mgr " << __func__ << " "
24
25 ClusterState::ClusterState(
26   MonClient *monc_,
27   Objecter *objecter_,
28   const MgrMap& mgrmap)
29   : monc(monc_),
30     objecter(objecter_),
31     lock("ClusterState"),
32     mgr_map(mgrmap),
33     pgservice(pg_map)
34 {}
35
36 void ClusterState::set_objecter(Objecter *objecter_)
37 {
38   Mutex::Locker l(lock);
39
40   objecter = objecter_;
41 }
42
43 void ClusterState::set_fsmap(FSMap const &new_fsmap)
44 {
45   Mutex::Locker l(lock);
46
47   fsmap = new_fsmap;
48 }
49
50 void ClusterState::set_mgr_map(MgrMap const &new_mgrmap)
51 {
52   Mutex::Locker l(lock);
53   mgr_map = new_mgrmap;
54 }
55
56 void ClusterState::set_service_map(ServiceMap const &new_service_map)
57 {
58   Mutex::Locker l(lock);
59   servicemap = new_service_map;
60 }
61
62 void ClusterState::load_digest(MMgrDigest *m)
63 {
64   health_json = std::move(m->health_json);
65   mon_status_json = std::move(m->mon_status_json);
66 }
67
68 void ClusterState::ingest_pgstats(MPGStats *stats)
69 {
70   Mutex::Locker l(lock);
71
72   const int from = stats->get_orig_source().num();
73   bool is_in = false;
74   objecter->with_osdmap([&is_in, from](const OSDMap &osd_map){
75       is_in = osd_map.is_in(from);
76   });
77
78   if (is_in) {
79     pending_inc.update_stat(from, stats->epoch, std::move(stats->osd_stat));
80   } else {
81     pending_inc.update_stat(from, stats->epoch, osd_stat_t());
82   }
83
84   for (auto p : stats->pg_stat) {
85     pg_t pgid = p.first;
86     const auto &pg_stats = p.second;
87
88     // In case we're hearing about a PG that according to last
89     // OSDMap update should not exist
90     if (existing_pools.count(pgid.pool()) == 0) {
91       dout(15) << " got " << pgid
92                << " reported at " << pg_stats.reported_epoch << ":"
93                << pg_stats.reported_seq
94                << " state " << pg_state_string(pg_stats.state)
95                << " but pool not in " << existing_pools
96                << dendl;
97       continue;
98     }
99     // In case we already heard about more recent stats from this PG
100     // from another OSD
101     const auto q = pg_map.pg_stat.find(pgid);
102     if (q != pg_map.pg_stat.end() &&
103         q->second.get_version_pair() > pg_stats.get_version_pair()) {
104       dout(15) << " had " << pgid << " from "
105                << q->second.reported_epoch << ":"
106                << q->second.reported_seq << dendl;
107       continue;
108     }
109
110     pending_inc.pg_stat_updates[pgid] = pg_stats;
111   }
112 }
113
114 void ClusterState::update_delta_stats()
115 {
116   pending_inc.stamp = ceph_clock_now();
117   pending_inc.version = pg_map.version + 1; // to make apply_incremental happy
118   dout(10) << " v" << pending_inc.version << dendl;
119
120   dout(30) << " pg_map before:\n";
121   JSONFormatter jf(true);
122   jf.dump_object("pg_map", pg_map);
123   jf.flush(*_dout);
124   *_dout << dendl;
125   dout(30) << " incremental:\n";
126   JSONFormatter jf(true);
127   jf.dump_object("pending_inc", pending_inc);
128   jf.flush(*_dout);
129   *_dout << dendl;
130
131   pg_map.apply_incremental(g_ceph_context, pending_inc);
132   pending_inc = PGMap::Incremental();
133 }
134
135 void ClusterState::notify_osdmap(const OSDMap &osd_map)
136 {
137   Mutex::Locker l(lock);
138
139   pending_inc.stamp = ceph_clock_now();
140   pending_inc.version = pg_map.version + 1; // to make apply_incremental happy
141   dout(10) << " v" << pending_inc.version << dendl;
142
143   PGMapUpdater::check_osd_map(g_ceph_context, osd_map, pg_map, &pending_inc);
144
145   // update our list of pools that exist, so that we can filter pg_map updates
146   // in synchrony with this OSDMap.
147   existing_pools.clear();
148   for (auto& p : osd_map.get_pools()) {
149     existing_pools.insert(p.first);
150   }
151
152   // brute force this for now (don't bother being clever by only
153   // checking osds that went up/down)
154   set<int> need_check_down_pg_osds;
155   PGMapUpdater::check_down_pgs(osd_map, pg_map, true,
156                                need_check_down_pg_osds, &pending_inc);
157
158   dout(30) << " pg_map before:\n";
159   JSONFormatter jf(true);
160   jf.dump_object("pg_map", pg_map);
161   jf.flush(*_dout);
162   *_dout << dendl;
163   dout(30) << " incremental:\n";
164   JSONFormatter jf(true);
165   jf.dump_object("pending_inc", pending_inc);
166   jf.flush(*_dout);
167   *_dout << dendl;
168
169   pg_map.apply_incremental(g_ceph_context, pending_inc);
170   pending_inc = PGMap::Incremental();
171   // TODO: Complete the separation of PG state handling so
172   // that a cut-down set of functionality remains in PGMonitor
173   // while the full-blown PGMap lives only here.
174 }