1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
4 #include "MgrStatMonitor.h"
5 #include "mon/OSDMonitor.h"
7 #include "mon/PGMonitor.h"
8 #include "messages/MGetPoolStats.h"
9 #include "messages/MGetPoolStatsReply.h"
10 #include "messages/MMonMgrReport.h"
11 #include "messages/MStatfs.h"
12 #include "messages/MStatfsReply.h"
13 #include "messages/MServiceMap.h"
15 class MgrPGStatService : public MonPGStatService {
18 MgrPGStatService(PGMapDigest& d) : digest(d) {}
20 const pool_stat_t* get_pool_stat(int64_t poolid) const override {
21 auto i = digest.pg_pool_sum.find(poolid);
22 if (i != digest.pg_pool_sum.end()) {
28 ceph_statfs get_statfs(OSDMap& osdmap,
29 boost::optional<int64_t> data_pool) const override {
30 return digest.get_statfs(osdmap, data_pool);
33 void print_summary(Formatter *f, ostream *out) const override {
34 digest.print_summary(f, out);
36 void dump_info(Formatter *f) const override {
39 void dump_fs_stats(stringstream *ss,
41 bool verbose) const override {
42 digest.dump_fs_stats(ss, f, verbose);
44 void dump_pool_stats(const OSDMap& osdm, stringstream *ss, Formatter *f,
45 bool verbose) const override {
46 digest.dump_pool_stats_full(osdm, ss, f, verbose);
51 #define dout_subsys ceph_subsys_mon
53 #define dout_prefix _prefix(_dout, mon)
54 static ostream& _prefix(std::ostream *_dout, Monitor *mon) {
55 return *_dout << "mon." << mon->name << "@" << mon->rank
56 << "(" << mon->get_state_name()
60 MgrStatMonitor::MgrStatMonitor(Monitor *mn, Paxos *p, const string& service_name)
61 : PaxosService(mn, p, service_name),
62 pgservice(new MgrPGStatService(digest))
66 MgrStatMonitor::~MgrStatMonitor() = default;
68 MonPGStatService *MgrStatMonitor::get_pg_stat_service()
70 return pgservice.get();
73 void MgrStatMonitor::create_initial()
75 dout(10) << __func__ << dendl;
77 service_map.epoch = 1;
78 ::encode(service_map, pending_service_map_bl, CEPH_FEATURES_ALL);
81 void MgrStatMonitor::update_from_paxos(bool *need_bootstrap)
83 version = get_last_committed();
84 dout(10) << " " << version << dendl;
87 get_version(version, bl);
93 ::decode(service_map, p);
94 dout(10) << __func__ << " v" << version
95 << " service_map e" << service_map.epoch << dendl;
97 catch (buffer::error& e) {
98 derr << "failed to decode mgrstat state; luminous dev version?" << dendl;
105 void MgrStatMonitor::update_logger()
107 dout(20) << __func__ << dendl;
108 if (mon->osdmon()->osdmap.require_osd_release < CEPH_RELEASE_LUMINOUS) {
109 dout(20) << "yielding cluster perfcounter updates to pgmon" << dendl;
113 mon->cluster_logger->set(l_cluster_osd_bytes, digest.osd_sum.kb * 1024ull);
114 mon->cluster_logger->set(l_cluster_osd_bytes_used,
115 digest.osd_sum.kb_used * 1024ull);
116 mon->cluster_logger->set(l_cluster_osd_bytes_avail,
117 digest.osd_sum.kb_avail * 1024ull);
119 mon->cluster_logger->set(l_cluster_num_pool, digest.pg_pool_sum.size());
121 for (auto i : digest.num_pg_by_pool) {
124 mon->cluster_logger->set(l_cluster_num_pg, num_pg);
126 unsigned active = 0, active_clean = 0, peering = 0;
127 for (auto p = digest.num_pg_by_state.begin();
128 p != digest.num_pg_by_state.end();
130 if (p->first & PG_STATE_ACTIVE) {
132 if (p->first & PG_STATE_CLEAN)
133 active_clean += p->second;
135 if (p->first & PG_STATE_PEERING)
136 peering += p->second;
138 mon->cluster_logger->set(l_cluster_num_pg_active_clean, active_clean);
139 mon->cluster_logger->set(l_cluster_num_pg_active, active);
140 mon->cluster_logger->set(l_cluster_num_pg_peering, peering);
142 mon->cluster_logger->set(l_cluster_num_object, digest.pg_sum.stats.sum.num_objects);
143 mon->cluster_logger->set(l_cluster_num_object_degraded, digest.pg_sum.stats.sum.num_objects_degraded);
144 mon->cluster_logger->set(l_cluster_num_object_misplaced, digest.pg_sum.stats.sum.num_objects_misplaced);
145 mon->cluster_logger->set(l_cluster_num_object_unfound, digest.pg_sum.stats.sum.num_objects_unfound);
146 mon->cluster_logger->set(l_cluster_num_bytes, digest.pg_sum.stats.sum.num_bytes);
150 void MgrStatMonitor::create_pending()
152 dout(10) << " " << version << dendl;
153 pending_digest = digest;
154 pending_health_checks = get_health_checks();
155 pending_service_map_bl.clear();
156 ::encode(service_map, pending_service_map_bl, mon->get_quorum_con_features());
159 void MgrStatMonitor::encode_pending(MonitorDBStore::TransactionRef t)
162 if (version < mon->pgmon()->get_last_committed()) {
163 // fast-forward to pgmon version to ensure clients don't see a
164 // jump back in time for MGetPoolStats and MStatFs.
165 version = mon->pgmon()->get_last_committed() + 1;
167 dout(10) << " " << version << dendl;
169 ::encode(pending_digest, bl, mon->get_quorum_con_features());
170 assert(pending_service_map_bl.length());
171 bl.append(pending_service_map_bl);
172 put_version(t, version, bl);
173 put_last_committed(t, version);
175 encode_health(pending_health_checks, t);
178 version_t MgrStatMonitor::get_trim_to()
180 // we don't actually need *any* old states, but keep a few.
187 void MgrStatMonitor::on_active()
192 void MgrStatMonitor::get_health(list<pair<health_status_t,string> >& summary,
193 list<pair<health_status_t,string> > *detail,
194 CephContext *cct) const
198 void MgrStatMonitor::tick()
202 void MgrStatMonitor::print_summary(Formatter *f, std::ostream *ss) const
204 pgservice->print_summary(f, ss);
207 bool MgrStatMonitor::preprocess_query(MonOpRequestRef op)
209 auto m = static_cast<PaxosServiceMessage*>(op->get_req());
210 switch (m->get_type()) {
211 case CEPH_MSG_STATFS:
212 return preprocess_statfs(op);
213 case MSG_MON_MGR_REPORT:
214 return preprocess_report(op);
215 case MSG_GETPOOLSTATS:
216 return preprocess_getpoolstats(op);
219 derr << "Unhandled message type " << m->get_type() << dendl;
224 bool MgrStatMonitor::prepare_update(MonOpRequestRef op)
226 auto m = static_cast<PaxosServiceMessage*>(op->get_req());
227 switch (m->get_type()) {
228 case MSG_MON_MGR_REPORT:
229 return prepare_report(op);
232 derr << "Unhandled message type " << m->get_type() << dendl;
237 bool MgrStatMonitor::preprocess_report(MonOpRequestRef op)
242 bool MgrStatMonitor::prepare_report(MonOpRequestRef op)
244 auto m = static_cast<MMonMgrReport*>(op->get_req());
245 bufferlist bl = m->get_data();
247 ::decode(pending_digest, p);
248 pending_health_checks.swap(m->health_checks);
249 if (m->service_map_bl.length()) {
250 pending_service_map_bl.swap(m->service_map_bl);
252 dout(10) << __func__ << " " << pending_digest << ", "
253 << pending_health_checks.checks.size() << " health checks" << dendl;
257 bool MgrStatMonitor::preprocess_getpoolstats(MonOpRequestRef op)
259 op->mark_pgmon_event(__func__);
260 auto m = static_cast<MGetPoolStats*>(op->get_req());
261 auto session = m->get_session();
264 if (!session->is_capable("pg", MON_CAP_R)) {
265 dout(0) << "MGetPoolStats received from entity with insufficient caps "
266 << session->caps << dendl;
269 if (m->fsid != mon->monmap->fsid) {
270 dout(0) << __func__ << " on fsid "
271 << m->fsid << " != " << mon->monmap->fsid << dendl;
275 if (mon->pgservice == get_pg_stat_service()) {
276 ver = get_last_committed();
278 ver = mon->pgmon()->get_last_committed();
280 auto reply = new MGetPoolStatsReply(m->fsid, m->get_tid(), ver);
281 for (const auto& pool_name : m->pools) {
282 const auto pool_id = mon->osdmon()->osdmap.lookup_pg_pool_name(pool_name);
283 if (pool_id == -ENOENT)
285 auto pool_stat = mon->pgservice->get_pool_stat(pool_id);
288 reply->pool_stats[pool_name] = *pool_stat;
290 mon->send_reply(op, reply);
294 bool MgrStatMonitor::preprocess_statfs(MonOpRequestRef op)
296 op->mark_pgmon_event(__func__);
297 auto statfs = static_cast<MStatfs*>(op->get_req());
298 auto session = statfs->get_session();
302 if (!session->is_capable("pg", MON_CAP_R)) {
303 dout(0) << "MStatfs received from entity with insufficient privileges "
304 << session->caps << dendl;
307 if (statfs->fsid != mon->monmap->fsid) {
308 dout(0) << __func__ << " on fsid " << statfs->fsid
309 << " != " << mon->monmap->fsid << dendl;
312 dout(10) << __func__ << " " << *statfs
313 << " from " << statfs->get_orig_source() << dendl;
315 if (mon->pgservice == get_pg_stat_service()) {
316 ver = get_last_committed();
318 ver = mon->pgmon()->get_last_committed();
320 auto reply = new MStatfsReply(statfs->fsid, statfs->get_tid(), ver);
321 reply->h.st = mon->pgservice->get_statfs(mon->osdmon()->osdmap,
323 mon->send_reply(op, reply);
327 void MgrStatMonitor::check_sub(Subscription *sub)
329 const auto epoch = mon->monmap->get_epoch();
331 << " next " << sub->next
332 << " have " << epoch << dendl;
333 if (sub->next <= service_map.epoch) {
334 auto m = new MServiceMap(service_map);
335 sub->session->con->send_message(m);
337 mon->with_session_map([this, sub](MonSessionMap& session_map) {
338 session_map.remove_sub(sub);
341 sub->next = epoch + 1;
346 void MgrStatMonitor::check_subs()
348 dout(10) << __func__ << dendl;
349 if (!service_map.epoch) {
352 auto subs = mon->session_map.subs.find("servicemap");
353 if (subs == mon->session_map.subs.end()) {
356 auto p = subs->second->begin();