Fix some bugs when testing opensds ansible
[stor4nfv.git] / src / ceph / src / mon / MgrStatMonitor.cc
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
3
4 #include "MgrStatMonitor.h"
5 #include "mon/OSDMonitor.h"
6 #include "mon/PGMap.h"
7 #include "mon/PGMonitor.h"
8 #include "messages/MGetPoolStats.h"
9 #include "messages/MGetPoolStatsReply.h"
10 #include "messages/MMonMgrReport.h"
11 #include "messages/MStatfs.h"
12 #include "messages/MStatfsReply.h"
13 #include "messages/MServiceMap.h"
14
15 class MgrPGStatService : public MonPGStatService {
16   PGMapDigest& digest;
17 public:
18   MgrPGStatService(PGMapDigest& d) : digest(d) {}
19
20   const pool_stat_t* get_pool_stat(int64_t poolid) const override {
21     auto i = digest.pg_pool_sum.find(poolid);
22     if (i != digest.pg_pool_sum.end()) {
23       return &i->second;
24     }
25     return nullptr;
26   }
27
28   ceph_statfs get_statfs(OSDMap& osdmap,
29                          boost::optional<int64_t> data_pool) const override {
30     return digest.get_statfs(osdmap, data_pool);
31   }
32
33   void print_summary(Formatter *f, ostream *out) const override {
34     digest.print_summary(f, out);
35   }
36   void dump_info(Formatter *f) const override {
37     digest.dump(f);
38   }
39   void dump_fs_stats(stringstream *ss,
40                      Formatter *f,
41                      bool verbose) const override {
42     digest.dump_fs_stats(ss, f, verbose);
43   }
44   void dump_pool_stats(const OSDMap& osdm, stringstream *ss, Formatter *f,
45                        bool verbose) const override {
46     digest.dump_pool_stats_full(osdm, ss, f, verbose);
47   }
48 };
49
50
51 #define dout_subsys ceph_subsys_mon
52 #undef dout_prefix
53 #define dout_prefix _prefix(_dout, mon)
54 static ostream& _prefix(std::ostream *_dout, Monitor *mon) {
55   return *_dout << "mon." << mon->name << "@" << mon->rank
56                 << "(" << mon->get_state_name()
57                 << ").mgrstat ";
58 }
59
60 MgrStatMonitor::MgrStatMonitor(Monitor *mn, Paxos *p, const string& service_name)
61   : PaxosService(mn, p, service_name),
62     pgservice(new MgrPGStatService(digest))
63 {
64 }
65
66 MgrStatMonitor::~MgrStatMonitor() = default;
67
68 MonPGStatService *MgrStatMonitor::get_pg_stat_service()
69 {
70   return pgservice.get();
71 }
72
73 void MgrStatMonitor::create_initial()
74 {
75   dout(10) << __func__ << dendl;
76   version = 0;
77   service_map.epoch = 1;
78   ::encode(service_map, pending_service_map_bl, CEPH_FEATURES_ALL);
79 }
80
81 void MgrStatMonitor::update_from_paxos(bool *need_bootstrap)
82 {
83   version = get_last_committed();
84   dout(10) << " " << version << dendl;
85   load_health();
86   bufferlist bl;
87   get_version(version, bl);
88   if (version) {
89     assert(bl.length());
90     try {
91       auto p = bl.begin();
92       ::decode(digest, p);
93       ::decode(service_map, p);
94       dout(10) << __func__ << " v" << version
95                << " service_map e" << service_map.epoch << dendl;
96     }
97     catch (buffer::error& e) {
98       derr << "failed to decode mgrstat state; luminous dev version?" << dendl;
99     }
100   }
101   check_subs();
102   update_logger();
103 }
104
105 void MgrStatMonitor::update_logger()
106 {
107   dout(20) << __func__ << dendl;
108   if (mon->osdmon()->osdmap.require_osd_release < CEPH_RELEASE_LUMINOUS) {
109     dout(20) << "yielding cluster perfcounter updates to pgmon" << dendl;
110     return;
111   }
112
113   mon->cluster_logger->set(l_cluster_osd_bytes, digest.osd_sum.kb * 1024ull);
114   mon->cluster_logger->set(l_cluster_osd_bytes_used,
115                            digest.osd_sum.kb_used * 1024ull);
116   mon->cluster_logger->set(l_cluster_osd_bytes_avail,
117                            digest.osd_sum.kb_avail * 1024ull);
118
119   mon->cluster_logger->set(l_cluster_num_pool, digest.pg_pool_sum.size());
120   uint64_t num_pg = 0;
121   for (auto i : digest.num_pg_by_pool) {
122     num_pg += i.second;
123   }
124   mon->cluster_logger->set(l_cluster_num_pg, num_pg);
125
126   unsigned active = 0, active_clean = 0, peering = 0;
127   for (auto p = digest.num_pg_by_state.begin();
128        p != digest.num_pg_by_state.end();
129        ++p) {
130     if (p->first & PG_STATE_ACTIVE) {
131       active += p->second;
132       if (p->first & PG_STATE_CLEAN)
133         active_clean += p->second;
134     }
135     if (p->first & PG_STATE_PEERING)
136       peering += p->second;
137   }
138   mon->cluster_logger->set(l_cluster_num_pg_active_clean, active_clean);
139   mon->cluster_logger->set(l_cluster_num_pg_active, active);
140   mon->cluster_logger->set(l_cluster_num_pg_peering, peering);
141
142   mon->cluster_logger->set(l_cluster_num_object, digest.pg_sum.stats.sum.num_objects);
143   mon->cluster_logger->set(l_cluster_num_object_degraded, digest.pg_sum.stats.sum.num_objects_degraded);
144   mon->cluster_logger->set(l_cluster_num_object_misplaced, digest.pg_sum.stats.sum.num_objects_misplaced);
145   mon->cluster_logger->set(l_cluster_num_object_unfound, digest.pg_sum.stats.sum.num_objects_unfound);
146   mon->cluster_logger->set(l_cluster_num_bytes, digest.pg_sum.stats.sum.num_bytes);
147
148 }
149
150 void MgrStatMonitor::create_pending()
151 {
152   dout(10) << " " << version << dendl;
153   pending_digest = digest;
154   pending_health_checks = get_health_checks();
155   pending_service_map_bl.clear();
156   ::encode(service_map, pending_service_map_bl, mon->get_quorum_con_features());
157 }
158
159 void MgrStatMonitor::encode_pending(MonitorDBStore::TransactionRef t)
160 {
161   ++version;
162   if (version < mon->pgmon()->get_last_committed()) {
163     // fast-forward to pgmon version to ensure clients don't see a
164     // jump back in time for MGetPoolStats and MStatFs.
165     version = mon->pgmon()->get_last_committed() + 1;
166   }
167   dout(10) << " " << version << dendl;
168   bufferlist bl;
169   ::encode(pending_digest, bl, mon->get_quorum_con_features());
170   assert(pending_service_map_bl.length());
171   bl.append(pending_service_map_bl);
172   put_version(t, version, bl);
173   put_last_committed(t, version);
174
175   encode_health(pending_health_checks, t);
176 }
177
178 version_t MgrStatMonitor::get_trim_to()
179 {
180   // we don't actually need *any* old states, but keep a few.
181   if (version > 5) {
182     return version - 5;
183   }
184   return 0;
185 }
186
187 void MgrStatMonitor::on_active()
188 {
189   update_logger();
190 }
191
192 void MgrStatMonitor::get_health(list<pair<health_status_t,string> >& summary,
193                                 list<pair<health_status_t,string> > *detail,
194                                 CephContext *cct) const
195 {
196 }
197
198 void MgrStatMonitor::tick()
199 {
200 }
201
202 void MgrStatMonitor::print_summary(Formatter *f, std::ostream *ss) const
203 {
204   pgservice->print_summary(f, ss);
205 }
206
207 bool MgrStatMonitor::preprocess_query(MonOpRequestRef op)
208 {
209   auto m = static_cast<PaxosServiceMessage*>(op->get_req());
210   switch (m->get_type()) {
211   case CEPH_MSG_STATFS:
212     return preprocess_statfs(op);
213   case MSG_MON_MGR_REPORT:
214     return preprocess_report(op);
215   case MSG_GETPOOLSTATS:
216     return preprocess_getpoolstats(op);
217   default:
218     mon->no_reply(op);
219     derr << "Unhandled message type " << m->get_type() << dendl;
220     return true;
221   }
222 }
223
224 bool MgrStatMonitor::prepare_update(MonOpRequestRef op)
225 {
226   auto m = static_cast<PaxosServiceMessage*>(op->get_req());
227   switch (m->get_type()) {
228   case MSG_MON_MGR_REPORT:
229     return prepare_report(op);
230   default:
231     mon->no_reply(op);
232     derr << "Unhandled message type " << m->get_type() << dendl;
233     return true;
234   }
235 }
236
237 bool MgrStatMonitor::preprocess_report(MonOpRequestRef op)
238 {
239   return false;
240 }
241
242 bool MgrStatMonitor::prepare_report(MonOpRequestRef op)
243 {
244   auto m = static_cast<MMonMgrReport*>(op->get_req());
245   bufferlist bl = m->get_data();
246   auto p = bl.begin();
247   ::decode(pending_digest, p);
248   pending_health_checks.swap(m->health_checks);
249   if (m->service_map_bl.length()) {
250     pending_service_map_bl.swap(m->service_map_bl);
251   }
252   dout(10) << __func__ << " " << pending_digest << ", "
253            << pending_health_checks.checks.size() << " health checks" << dendl;
254   return true;
255 }
256
257 bool MgrStatMonitor::preprocess_getpoolstats(MonOpRequestRef op)
258 {
259   op->mark_pgmon_event(__func__);
260   auto m = static_cast<MGetPoolStats*>(op->get_req());
261   auto session = m->get_session();
262   if (!session)
263     return true;
264   if (!session->is_capable("pg", MON_CAP_R)) {
265     dout(0) << "MGetPoolStats received from entity with insufficient caps "
266             << session->caps << dendl;
267     return true;
268   }
269   if (m->fsid != mon->monmap->fsid) {
270     dout(0) << __func__ << " on fsid "
271             << m->fsid << " != " << mon->monmap->fsid << dendl;
272     return true;
273   }
274   epoch_t ver = 0;
275   if (mon->pgservice == get_pg_stat_service()) {
276     ver = get_last_committed();
277   } else {
278     ver = mon->pgmon()->get_last_committed();
279   }
280   auto reply = new MGetPoolStatsReply(m->fsid, m->get_tid(), ver);
281   for (const auto& pool_name : m->pools) {
282     const auto pool_id = mon->osdmon()->osdmap.lookup_pg_pool_name(pool_name);
283     if (pool_id == -ENOENT)
284       continue;
285     auto pool_stat = mon->pgservice->get_pool_stat(pool_id);
286     if (!pool_stat)
287       continue;
288     reply->pool_stats[pool_name] = *pool_stat;
289   }
290   mon->send_reply(op, reply);
291   return true;
292 }
293
294 bool MgrStatMonitor::preprocess_statfs(MonOpRequestRef op)
295 {
296   op->mark_pgmon_event(__func__);
297   auto statfs = static_cast<MStatfs*>(op->get_req());
298   auto session = statfs->get_session();
299
300   if (!session)
301     return true;
302   if (!session->is_capable("pg", MON_CAP_R)) {
303     dout(0) << "MStatfs received from entity with insufficient privileges "
304             << session->caps << dendl;
305     return true;
306   }
307   if (statfs->fsid != mon->monmap->fsid) {
308     dout(0) << __func__ << " on fsid " << statfs->fsid
309             << " != " << mon->monmap->fsid << dendl;
310     return true;
311   }
312   dout(10) << __func__ << " " << *statfs
313            << " from " << statfs->get_orig_source() << dendl;
314   epoch_t ver = 0;
315   if (mon->pgservice == get_pg_stat_service()) {
316     ver = get_last_committed();
317   } else {
318     ver = mon->pgmon()->get_last_committed();
319   }
320   auto reply = new MStatfsReply(statfs->fsid, statfs->get_tid(), ver);
321   reply->h.st = mon->pgservice->get_statfs(mon->osdmon()->osdmap,
322                                            statfs->data_pool);
323   mon->send_reply(op, reply);
324   return true;
325 }
326
327 void MgrStatMonitor::check_sub(Subscription *sub)
328 {
329   const auto epoch = mon->monmap->get_epoch();
330   dout(10) << __func__
331            << " next " << sub->next
332            << " have " << epoch << dendl;
333   if (sub->next <= service_map.epoch) {
334     auto m = new MServiceMap(service_map);
335     sub->session->con->send_message(m);
336     if (sub->onetime) {
337       mon->with_session_map([this, sub](MonSessionMap& session_map) {
338           session_map.remove_sub(sub);
339         });
340     } else {
341       sub->next = epoch + 1;
342     }
343   }
344 }
345
346 void MgrStatMonitor::check_subs()
347 {
348   dout(10) << __func__ << dendl;
349   if (!service_map.epoch) {
350     return;
351   }
352   auto subs = mon->session_map.subs.find("servicemap");
353   if (subs == mon->session_map.subs.end()) {
354     return;
355   }
356   auto p = subs->second->begin();
357   while (!p.end()) {
358     auto sub = *p;
359     ++p;
360     check_sub(sub);
361   }
362 }