Fix some bugs when testing opensds ansible
[stor4nfv.git] / src / ceph / src / mon / PGMonitor.cc
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
3 /*
4  * Ceph - scalable distributed file system
5  *
6  * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
7  *
8  * This is free software; you can redistribute it and/or
9  * modify it under the terms of the GNU Lesser General Public
10  * License version 2.1, as published by the Free Software
11  * Foundation.  See file COPYING.
12  *
13  */
14
15
16 #include "json_spirit/json_spirit.h"
17 #include "common/debug.h"               // undo damage
18 #include "PGMonitor.h"
19 #include "Monitor.h"
20 #include "OSDMonitor.h"
21 #include "MonitorDBStore.h"
22 #include "PGStatService.h"
23
24 #include "messages/MPGStats.h"
25 #include "messages/MPGStatsAck.h"
26
27 #include "messages/MOSDPGCreate.h"
28 #include "messages/MMonCommand.h"
29 #include "messages/MOSDScrub.h"
30
31 #include "common/Formatter.h"
32 #include "common/config.h"
33
34 #include "include/stringify.h"
35
36 #include "osd/osd_types.h"
37
38 #include "common/config.h"
39 #include "common/errno.h"
40 #include "common/strtol.h"
41 #include "include/str_list.h"
42 #include <sstream>
43
44 #define dout_subsys ceph_subsys_mon
45 #undef dout_prefix
46 #define dout_prefix _prefix(_dout, mon, pg_map)
47 static ostream& _prefix(std::ostream *_dout, const Monitor *mon, const PGMap& pg_map) {
48   return *_dout << "mon." << mon->name << "@" << mon->rank
49                 << "(" << mon->get_state_name()
50                 << ").pg v" << pg_map.version << " ";
51 }
52
53 /*
54    Tick function to update the map based on performance every N seconds
55  */
56
57 void PGMonitor::on_restart()
58 {
59   // clear leader state
60   last_osd_report.clear();
61 }
62
63 void PGMonitor::on_active()
64 {
65   if (mon->is_leader()) {
66     check_all_pgs = true;
67     check_osd_map(mon->osdmon()->osdmap.get_epoch());
68   }
69
70   update_logger();
71
72   if (mon->is_leader() &&
73       mon->osdmon()->osdmap.require_osd_release < CEPH_RELEASE_LUMINOUS) {
74     mon->clog->info() << "pgmap " << pg_map;
75   }
76 }
77
78 void PGMonitor::update_logger()
79 {
80   if (mon->osdmon()->osdmap.require_osd_release >= CEPH_RELEASE_LUMINOUS) {
81     return;
82   }
83   dout(10) << "update_logger" << dendl;
84
85   mon->cluster_logger->set(l_cluster_osd_bytes, pg_map.osd_sum.kb * 1024ull);
86   mon->cluster_logger->set(l_cluster_osd_bytes_used,
87                            pg_map.osd_sum.kb_used * 1024ull);
88   mon->cluster_logger->set(l_cluster_osd_bytes_avail,
89                            pg_map.osd_sum.kb_avail * 1024ull);
90
91   mon->cluster_logger->set(l_cluster_num_pool, pg_map.pg_pool_sum.size());
92   mon->cluster_logger->set(l_cluster_num_pg, pg_map.pg_stat.size());
93
94   unsigned active = 0, active_clean = 0, peering = 0;
95   for (ceph::unordered_map<int,int>::iterator p = pg_map.num_pg_by_state.begin();
96        p != pg_map.num_pg_by_state.end();
97        ++p) {
98     if (p->first & PG_STATE_ACTIVE) {
99       active += p->second;
100       if (p->first & PG_STATE_CLEAN)
101         active_clean += p->second;
102     }
103     if (p->first & PG_STATE_PEERING)
104       peering += p->second;
105   }
106   mon->cluster_logger->set(l_cluster_num_pg_active_clean, active_clean);
107   mon->cluster_logger->set(l_cluster_num_pg_active, active);
108   mon->cluster_logger->set(l_cluster_num_pg_peering, peering);
109
110   mon->cluster_logger->set(l_cluster_num_object, pg_map.pg_sum.stats.sum.num_objects);
111   mon->cluster_logger->set(l_cluster_num_object_degraded, pg_map.pg_sum.stats.sum.num_objects_degraded);
112   mon->cluster_logger->set(l_cluster_num_object_misplaced, pg_map.pg_sum.stats.sum.num_objects_misplaced);
113   mon->cluster_logger->set(l_cluster_num_object_unfound, pg_map.pg_sum.stats.sum.num_objects_unfound);
114   mon->cluster_logger->set(l_cluster_num_bytes, pg_map.pg_sum.stats.sum.num_bytes);
115 }
116
117 void PGMonitor::tick()
118 {
119   if (!is_active()) return;
120   if (mon->osdmon()->osdmap.require_osd_release >= CEPH_RELEASE_LUMINOUS) {
121     return;
122   }
123
124   handle_osd_timeouts();
125
126   if (!pg_map.pg_sum_deltas.empty()) {
127     utime_t age = ceph_clock_now() - pg_map.stamp;
128     if (age > 2 * g_conf->mon_delta_reset_interval) {
129       dout(10) << " clearing pg_map delta (" << age << " > " << g_conf->mon_delta_reset_interval << " seconds old)" << dendl;
130       pg_map.clear_delta();
131     }
132   }
133
134   /* If we have deltas for pools, run through pgmap's 'per_pool_sum_delta' and
135    * clear any deltas that are old enough.
136    *
137    * Note that 'per_pool_sum_delta' keeps a pool id as key, and a pair containing
138    * the calc'ed stats delta and an absolute timestamp from when those stats were
139    * obtained -- the timestamp IS NOT a delta itself.
140    */
141   if (!pg_map.per_pool_sum_deltas.empty()) {
142     ceph::unordered_map<uint64_t,pair<pool_stat_t,utime_t> >::iterator it;
143     for (it = pg_map.per_pool_sum_delta.begin();
144          it != pg_map.per_pool_sum_delta.end(); ) {
145       utime_t age = ceph_clock_now() - it->second.second;
146       if (age > 2*g_conf->mon_delta_reset_interval) {
147         dout(10) << " clearing pg_map delta for pool " << it->first
148                  << " (" << age << " > " << g_conf->mon_delta_reset_interval
149                  << " seconds old)" << dendl;
150         pg_map.per_pool_sum_deltas.erase(it->first);
151         pg_map.per_pool_sum_deltas_stamps.erase(it->first);
152         pg_map.per_pool_sum_delta.erase((it++)->first);
153       } else {
154         ++it;
155       }
156     }
157   }
158
159   dout(10) << pg_map << dendl;
160 }
161
162 void PGMonitor::create_initial()
163 {
164   dout(10) << "create_initial -- creating initial map" << dendl;
165   format_version = 1;
166 }
167
168 void PGMonitor::update_from_paxos(bool *need_bootstrap)
169 {
170   if (did_delete)
171     return;
172
173   if (get_value("deleted")) {
174     did_delete = true;
175     dout(10) << __func__ << " deleted, clearing in-memory PGMap" << dendl;
176     pg_map = PGMap();
177     pending_inc = PGMap::Incremental();
178     pgservice.reset();
179     last_osd_report.clear();
180     return;
181   }
182
183   version_t version = get_last_committed();
184   if (version == pg_map.version)
185     return;
186
187   assert(version >= pg_map.version);
188   if (format_version < 1) {
189     derr << __func__ << "unsupported monitor protocol: "
190          << get_service_name() << ".format_version = "
191          << format_version << dendl;
192   }
193   assert(format_version >= 1);
194
195   // pg/osd keys in leveldb
196   // read meta
197   while (version > pg_map.version) {
198     // load full state?
199     if (pg_map.version == 0) {
200       dout(10) << __func__ << " v0, read_full" << dendl;
201       read_pgmap_full();
202       goto out;
203     }
204
205     // incremental state?
206     dout(10) << __func__ << " read_incremental" << dendl;
207     bufferlist bl;
208     int r = get_version(pg_map.version + 1, bl);
209     if (r == -ENOENT) {
210       dout(10) << __func__ << " failed to read_incremental, read_full" << dendl;
211       // reset pg map
212       pg_map = PGMap();
213       read_pgmap_full();
214       goto out;
215     }
216     assert(r == 0);
217     apply_pgmap_delta(bl);
218   }
219
220   read_pgmap_meta();
221
222 out:
223   assert(version == pg_map.version);
224
225   update_logger();
226 }
227
228 void PGMonitor::on_upgrade()
229 {
230   dout(1) << __func__ << " discarding in-core PGMap" << dendl;
231   pg_map = PGMap();
232 }
233
234 void PGMonitor::upgrade_format()
235 {
236   unsigned current = 1;
237   assert(format_version == current);
238 }
239
240 void PGMonitor::post_paxos_update()
241 {
242   if (did_delete)
243     return;
244   dout(10) << __func__ << dendl;
245   OSDMap& osdmap = mon->osdmon()->osdmap;
246   if (mon->monmap->get_required_features().contains_all(
247         ceph::features::mon::FEATURE_LUMINOUS)) {
248     // let OSDMonitor take care of the pg-creates subscriptions.
249     return;
250   }
251   if (osdmap.get_epoch()) {
252     if (osdmap.get_num_up_osds() > 0) {
253       assert(osdmap.get_up_osd_features() & CEPH_FEATURE_MON_STATEFUL_SUB);
254       check_subs();
255     }
256   }
257 }
258
259 void PGMonitor::handle_osd_timeouts()
260 {
261   if (!mon->is_leader())
262     return;
263   if (did_delete)
264     return;
265
266   utime_t now(ceph_clock_now());
267   utime_t timeo(g_conf->mon_osd_report_timeout, 0);
268   if (now - mon->get_leader_since() < timeo) {
269     // We haven't been the leader for long enough to consider OSD timeouts
270     return;
271   }
272
273   if (mon->osdmon()->is_writeable())
274     mon->osdmon()->handle_osd_timeouts(now, last_osd_report);
275 }
276
277 void PGMonitor::create_pending()
278 {
279   if (did_delete)
280     return;
281   do_delete = false;
282   pending_inc = PGMap::Incremental();
283   pending_inc.version = pg_map.version + 1;
284   if (pg_map.version == 0) {
285     // pull initial values from first leader mon's config
286     pending_inc.full_ratio = g_conf->mon_osd_full_ratio;
287     if (pending_inc.full_ratio > 1.0)
288       pending_inc.full_ratio /= 100.0;
289     pending_inc.nearfull_ratio = g_conf->mon_osd_nearfull_ratio;
290     if (pending_inc.nearfull_ratio > 1.0)
291       pending_inc.nearfull_ratio /= 100.0;
292   } else {
293     pending_inc.full_ratio = pg_map.full_ratio;
294     pending_inc.nearfull_ratio = pg_map.nearfull_ratio;
295   }
296   dout(10) << "create_pending v " << pending_inc.version << dendl;
297 }
298
299 void PGMonitor::read_pgmap_meta()
300 {
301   dout(10) << __func__ << dendl;
302
303   string prefix = pgmap_meta_prefix;
304
305   version_t version = mon->store->get(prefix, "version");
306   epoch_t last_osdmap_epoch = mon->store->get(prefix, "last_osdmap_epoch");
307   epoch_t last_pg_scan = mon->store->get(prefix, "last_pg_scan");
308   pg_map.set_version(version);
309   pg_map.set_last_osdmap_epoch(last_osdmap_epoch);
310
311   if (last_pg_scan != pg_map.get_last_pg_scan()) {
312     pg_map.set_last_pg_scan(last_pg_scan);
313   }
314
315   float full_ratio, nearfull_ratio;
316   {
317     bufferlist bl;
318     mon->store->get(prefix, "full_ratio", bl);
319     bufferlist::iterator p = bl.begin();
320     ::decode(full_ratio, p);
321   }
322   {
323     bufferlist bl;
324     mon->store->get(prefix, "nearfull_ratio", bl);
325     bufferlist::iterator p = bl.begin();
326     ::decode(nearfull_ratio, p);
327   }
328   pg_map.set_full_ratios(full_ratio, nearfull_ratio);
329   {
330     bufferlist bl;
331     mon->store->get(prefix, "stamp", bl);
332     bufferlist::iterator p = bl.begin();
333     utime_t stamp;
334     ::decode(stamp, p);
335     pg_map.set_stamp(stamp);
336   }
337 }
338
339 void PGMonitor::read_pgmap_full()
340 {
341   read_pgmap_meta();
342
343   string prefix = pgmap_pg_prefix;
344   for (KeyValueDB::Iterator i = mon->store->get_iterator(prefix); i->valid(); i->next()) {
345     string key = i->key();
346     pg_t pgid;
347     if (!pgid.parse(key.c_str())) {
348       dout(0) << "unable to parse key " << key << dendl;
349       continue;
350     }
351     bufferlist bl = i->value();
352     pg_map.update_pg(pgid, bl);
353     dout(20) << " got " << pgid << dendl;
354   }
355
356   prefix = pgmap_osd_prefix;
357   for (KeyValueDB::Iterator i = mon->store->get_iterator(prefix); i->valid(); i->next()) {
358     string key = i->key();
359     int osd = atoi(key.c_str());
360     bufferlist bl = i->value();
361     pg_map.update_osd(osd, bl);
362     dout(20) << " got osd." << osd << dendl;
363   }
364 }
365
366 void PGMonitor::apply_pgmap_delta(bufferlist& bl)
367 {
368   version_t v = pg_map.version + 1;
369
370   utime_t inc_stamp;
371   bufferlist dirty_pgs, dirty_osds;
372   {
373     bufferlist::iterator p = bl.begin();
374     ::decode(inc_stamp, p);
375     ::decode(dirty_pgs, p);
376     ::decode(dirty_osds, p);
377   }
378
379   pool_stat_t pg_sum_old = pg_map.pg_sum;
380   mempool::pgmap::unordered_map<uint64_t, pool_stat_t> pg_pool_sum_old;
381
382   // pgs
383   set<int64_t> deleted_pools;
384   bufferlist::iterator p = dirty_pgs.begin();
385   while (!p.end()) {
386     pg_t pgid;
387     ::decode(pgid, p);
388
389     int r;
390     bufferlist pgbl;
391     if (deleted_pools.count(pgid.pool())) {
392       r = -ENOENT;
393     } else {
394       r = mon->store->get(pgmap_pg_prefix, stringify(pgid), pgbl);
395       if (pg_pool_sum_old.count(pgid.pool()) == 0)
396         pg_pool_sum_old[pgid.pool()] = pg_map.pg_pool_sum[pgid.pool()];
397     }
398
399     if (r >= 0) {
400       pg_map.update_pg(pgid, pgbl);
401       dout(20) << " refreshing pg " << pgid
402                << " " << pg_map.pg_stat[pgid].reported_epoch
403                << ":" << pg_map.pg_stat[pgid].reported_seq
404                << " " << pg_state_string(pg_map.pg_stat[pgid].state)
405                << dendl;
406     } else {
407       dout(20) << " removing pg " << pgid << dendl;
408       pg_map.remove_pg(pgid);
409       if (pgid.ps() == 0)
410         deleted_pools.insert(pgid.pool());
411     }
412   }
413
414   // osds
415   p = dirty_osds.begin();
416   while (!p.end()) {
417     int32_t osd;
418     ::decode(osd, p);
419     dout(20) << " refreshing osd." << osd << dendl;
420     bufferlist bl;
421     int r = mon->store->get(pgmap_osd_prefix, stringify(osd), bl);
422     if (r >= 0) {
423       pg_map.update_osd(osd, bl);
424     } else {
425       pg_map.remove_osd(osd);
426     }
427   }
428
429   pg_map.update_global_delta(g_ceph_context, inc_stamp, pg_sum_old);
430   pg_map.update_pool_deltas(g_ceph_context, inc_stamp, pg_pool_sum_old);
431
432   // clean up deleted pools after updating the deltas
433   for (set<int64_t>::iterator p = deleted_pools.begin();
434        p != deleted_pools.end();
435        ++p) {
436     dout(20) << " deleted pool " << *p << dendl;
437     pg_map.deleted_pool(*p);
438   }
439
440   // ok, we're now on the new version
441   pg_map.version = v;
442 }
443
444
445 void PGMonitor::encode_pending(MonitorDBStore::TransactionRef t)
446 {
447   if (did_delete)
448     return;
449
450   string prefix = pgmap_meta_prefix;
451   if (do_delete) {
452     dout(1) << __func__ << " clearing pgmap data at v" << pending_inc.version
453             << dendl;
454     do_delete = false;
455     for (auto key : { "version", "stamp", "last_osdmap_epoch",
456           "last_pg_scan", "full_ratio", "nearfull_ratio" }) {
457       t->erase(prefix, key);
458     }
459     for (auto& p : pg_map.pg_stat) {
460       t->erase(prefix, stringify(p.first));
461     }
462     for (auto& p : pg_map.osd_stat) {
463       t->erase(prefix, stringify(p.first));
464     }
465     put_last_committed(t, pending_inc.version);
466     put_value(t, "deleted", 1);
467     return;
468   }
469
470   assert(mon->osdmon()->osdmap.require_osd_release < CEPH_RELEASE_LUMINOUS ||
471          pending_inc.version == 1  /* rebuild-mondb.yaml case */);
472
473   version_t version = pending_inc.version;
474   dout(10) << __func__ << " v " << version << dendl;
475   assert(get_last_committed() + 1 == version);
476   pending_inc.stamp = ceph_clock_now();
477
478   uint64_t features = mon->get_quorum_con_features();
479
480   t->put(prefix, "version", pending_inc.version);
481   {
482     bufferlist bl;
483     ::encode(pending_inc.stamp, bl);
484     t->put(prefix, "stamp", bl);
485   }
486
487   if (pending_inc.osdmap_epoch)
488     t->put(prefix, "last_osdmap_epoch", pending_inc.osdmap_epoch);
489   if (pending_inc.pg_scan)
490     t->put(prefix, "last_pg_scan", pending_inc.pg_scan);
491   if (pending_inc.full_ratio > 0) {
492     bufferlist bl;
493     ::encode(pending_inc.full_ratio, bl);
494     t->put(prefix, "full_ratio", bl);
495   }
496   if (pending_inc.nearfull_ratio > 0) {
497     bufferlist bl;
498     ::encode(pending_inc.nearfull_ratio, bl);
499     t->put(prefix, "nearfull_ratio", bl);
500   }
501
502   bufferlist incbl;
503   ::encode(pending_inc.stamp, incbl);
504   {
505     bufferlist dirty;
506     string prefix = pgmap_pg_prefix;
507     for (map<pg_t,pg_stat_t>::const_iterator p = pending_inc.pg_stat_updates.begin();
508          p != pending_inc.pg_stat_updates.end();
509          ++p) {
510       ::encode(p->first, dirty);
511       bufferlist bl;
512       ::encode(p->second, bl, features);
513       t->put(prefix, stringify(p->first), bl);
514     }
515     for (set<pg_t>::const_iterator p = pending_inc.pg_remove.begin(); p != pending_inc.pg_remove.end(); ++p) {
516       ::encode(*p, dirty);
517       t->erase(prefix, stringify(*p));
518     }
519     ::encode(dirty, incbl);
520   }
521   {
522     bufferlist dirty;
523     string prefix = pgmap_osd_prefix;
524     for (map<int32_t,osd_stat_t>::const_iterator p =
525            pending_inc.get_osd_stat_updates().begin();
526          p != pending_inc.get_osd_stat_updates().end();
527          ++p) {
528       ::encode(p->first, dirty);
529       bufferlist bl;
530       ::encode(p->second, bl, features);
531       ::encode(pending_inc.get_osd_epochs().find(p->first)->second, bl);
532       t->put(prefix, stringify(p->first), bl);
533     }
534     for (set<int32_t>::const_iterator p =
535            pending_inc.get_osd_stat_rm().begin();
536          p != pending_inc.get_osd_stat_rm().end();
537          ++p) {
538       ::encode(*p, dirty);
539       t->erase(prefix, stringify(*p));
540     }
541     ::encode(dirty, incbl);
542   }
543
544   put_version(t, version, incbl);
545
546   put_last_committed(t, version);
547 }
548
549 version_t PGMonitor::get_trim_to()
550 {
551   unsigned max = g_conf->mon_max_pgmap_epochs;
552   version_t version = get_last_committed();
553   if (mon->is_leader() && (version > max))
554     return version - max;
555
556   return 0;
557 }
558
559 bool PGMonitor::preprocess_query(MonOpRequestRef op)
560 {
561   op->mark_pgmon_event(__func__);
562   PaxosServiceMessage *m = static_cast<PaxosServiceMessage*>(op->get_req());
563   dout(10) << "preprocess_query " << *m
564            << " from " << m->get_orig_source_inst() << dendl;
565   switch (m->get_type()) {
566   case MSG_PGSTATS:
567     if (mon->osdmon()->osdmap.require_osd_release >= CEPH_RELEASE_LUMINOUS) {
568       return true;
569     }
570     return preprocess_pg_stats(op);
571
572   case MSG_MON_COMMAND:
573     if (mon->osdmon()->osdmap.require_osd_release >= CEPH_RELEASE_LUMINOUS) {
574       bufferlist rdata;
575       mon->reply_command(op, -EOPNOTSUPP, "this command is obsolete", rdata,
576                          get_last_committed());
577       return true;
578     }
579     return preprocess_command(op);
580
581   default:
582     ceph_abort();
583     return true;
584   }
585 }
586
587 bool PGMonitor::prepare_update(MonOpRequestRef op)
588 {
589   if (mon->osdmon()->osdmap.require_osd_release >= CEPH_RELEASE_LUMINOUS) {
590     return false;
591   }
592
593   op->mark_pgmon_event(__func__);
594   PaxosServiceMessage *m = static_cast<PaxosServiceMessage*>(op->get_req());
595   dout(10) << "prepare_update " << *m << " from " << m->get_orig_source_inst() << dendl;
596   switch (m->get_type()) {
597   case MSG_PGSTATS:
598     return prepare_pg_stats(op);
599
600   case MSG_MON_COMMAND:
601     return prepare_command(op);
602
603   default:
604     ceph_abort();
605     return false;
606   }
607 }
608
609 bool PGMonitor::preprocess_pg_stats(MonOpRequestRef op)
610 {
611   op->mark_pgmon_event(__func__);
612   MPGStats *stats = static_cast<MPGStats*>(op->get_req());
613   // check caps
614   MonSession *session = stats->get_session();
615   if (!session) {
616     dout(10) << "PGMonitor::preprocess_pg_stats: no monitor session!" << dendl;
617     return true;
618   }
619   if (!session->is_capable("pg", MON_CAP_R)) {
620     derr << "PGMonitor::preprocess_pg_stats: MPGStats received from entity "
621          << "with insufficient privileges " << session->caps << dendl;
622     return true;
623   }
624
625   if (stats->fsid != mon->monmap->fsid) {
626     dout(0) << __func__ << " drop message on fsid " << stats->fsid << " != "
627             << mon->monmap->fsid << " for " << *stats << dendl;
628     return true;
629   }
630
631   // First, just see if they need a new osdmap. But
632   // only if they've had the map for a while.
633   if (stats->had_map_for > 30.0 &&
634       mon->osdmon()->is_readable() &&
635       stats->epoch < mon->osdmon()->osdmap.get_epoch() &&
636       !session->proxy_con)
637     mon->osdmon()->send_latest_now_nodelete(op, stats->epoch+1);
638
639   // Always forward the PGStats to the leader, even if they are the same as
640   // the old PGStats. The leader will mark as down osds that haven't sent
641   // PGStats for a few minutes.
642   return false;
643 }
644
645 bool PGMonitor::pg_stats_have_changed(int from, const MPGStats *stats) const
646 {
647   // any new osd info?
648   ceph::unordered_map<int,osd_stat_t>::const_iterator s = pg_map.osd_stat.find(from);
649   if (s == pg_map.osd_stat.end())
650     return true;
651
652   if (s->second != stats->osd_stat)
653     return true;
654
655   // any new pg info?
656   for (map<pg_t,pg_stat_t>::const_iterator p = stats->pg_stat.begin();
657        p != stats->pg_stat.end(); ++p) {
658     ceph::unordered_map<pg_t,pg_stat_t>::const_iterator t = pg_map.pg_stat.find(p->first);
659     if (t == pg_map.pg_stat.end())
660       return true;
661
662     if (t->second.reported_epoch != p->second.reported_epoch ||
663         t->second.reported_seq != p->second.reported_seq)
664       return true;
665   }
666
667   return false;
668 }
669
670 struct PGMonitor::C_Stats : public C_MonOp {
671   PGMonitor *pgmon;
672   MonOpRequestRef stats_op_ack;
673   entity_inst_t who;
674   C_Stats(PGMonitor *p,
675           MonOpRequestRef op,
676           MonOpRequestRef op_ack)
677     : C_MonOp(op), pgmon(p), stats_op_ack(op_ack) {}
678   void _finish(int r) override {
679     if (r >= 0) {
680       pgmon->_updated_stats(op, stats_op_ack);
681     } else if (r == -ECANCELED) {
682       return;
683     } else if (r == -EAGAIN) {
684       pgmon->dispatch(op);
685     } else {
686       assert(0 == "bad C_Stats return value");
687     }
688   }
689 };
690
691 bool PGMonitor::prepare_pg_stats(MonOpRequestRef op)
692 {
693   op->mark_pgmon_event(__func__);
694   MPGStats *stats = static_cast<MPGStats*>(op->get_req());
695   dout(10) << "prepare_pg_stats " << *stats << " from " << stats->get_orig_source() << dendl;
696   int from = stats->get_orig_source().num();
697
698   if (stats->fsid != mon->monmap->fsid) {
699     dout(0) << "prepare_pg_stats on fsid " << stats->fsid << " != " << mon->monmap->fsid << dendl;
700     return false;
701   }
702
703   if (!stats->get_orig_source().is_osd() ||
704       !mon->osdmon()->osdmap.is_up(from) ||
705       stats->get_orig_source_inst() != mon->osdmon()->osdmap.get_inst(from)) {
706     dout(1) << " ignoring stats from non-active osd." << dendl;
707     return false;
708   }
709       
710   last_osd_report[from] = ceph_clock_now();
711
712   if (!pg_stats_have_changed(from, stats)) {
713     dout(10) << " message contains no new osd|pg stats" << dendl;
714     MPGStatsAck *ack = new MPGStatsAck;
715     ack->set_tid(stats->get_tid());
716     for (map<pg_t,pg_stat_t>::const_iterator p = stats->pg_stat.begin();
717          p != stats->pg_stat.end();
718          ++p) {
719       ack->pg_stat[p->first] = make_pair(p->second.reported_seq, p->second.reported_epoch);
720     }
721     mon->send_reply(op, ack);
722     return false;
723   }
724
725   // osd stat
726   if (mon->osdmon()->osdmap.is_in(from)) {
727     pending_inc.update_stat(from, stats->epoch, std::move(stats->osd_stat));
728   } else {
729     pending_inc.update_stat(from, stats->epoch, osd_stat_t());
730   }
731
732   if (pg_map.osd_stat.count(from))
733     dout(10) << " got osd." << from << " " << stats->osd_stat << " (was " << pg_map.osd_stat[from] << ")" << dendl;
734   else
735     dout(10) << " got osd." << from << " " << stats->osd_stat << " (first report)" << dendl;
736
737   // pg stats
738   MPGStatsAck *ack = new MPGStatsAck;
739   MonOpRequestRef ack_op = mon->op_tracker.create_request<MonOpRequest>(ack);
740   ack->set_tid(stats->get_tid());
741   for (map<pg_t,pg_stat_t>::iterator p = stats->pg_stat.begin();
742        p != stats->pg_stat.end();
743        ++p) {
744     pg_t pgid = p->first;
745     ack->pg_stat[pgid] = make_pair(p->second.reported_seq, p->second.reported_epoch);
746
747     if (pg_map.pg_stat.count(pgid) &&
748         pg_map.pg_stat[pgid].get_version_pair() > p->second.get_version_pair()) {
749       dout(15) << " had " << pgid << " from " << pg_map.pg_stat[pgid].reported_epoch << ":"
750                << pg_map.pg_stat[pgid].reported_seq << dendl;
751       continue;
752     }
753     if (pending_inc.pg_stat_updates.count(pgid) &&
754         pending_inc.pg_stat_updates[pgid].get_version_pair() > p->second.get_version_pair()) {
755       dout(15) << " had " << pgid << " from " << pending_inc.pg_stat_updates[pgid].reported_epoch << ":"
756                << pending_inc.pg_stat_updates[pgid].reported_seq << " (pending)" << dendl;
757       continue;
758     }
759
760     if (pg_map.pg_stat.count(pgid) == 0) {
761       dout(15) << " got " << pgid << " reported at " << p->second.reported_epoch << ":"
762                << p->second.reported_seq
763                << " state " << pg_state_string(p->second.state)
764                << " but DNE in pg_map; pool was probably deleted."
765                << dendl;
766       continue;
767     }
768
769     dout(15) << " got " << pgid
770              << " reported at " << p->second.reported_epoch << ":" << p->second.reported_seq
771              << " state " << pg_state_string(pg_map.pg_stat[pgid].state)
772              << " -> " << pg_state_string(p->second.state)
773              << dendl;
774     pending_inc.pg_stat_updates[pgid] = p->second;
775   }
776
777   wait_for_finished_proposal(op, new C_Stats(this, op, ack_op));
778   return true;
779 }
780
781 void PGMonitor::_updated_stats(MonOpRequestRef op, MonOpRequestRef ack_op)
782 {
783   op->mark_pgmon_event(__func__);
784   ack_op->mark_pgmon_event(__func__);
785   MPGStats *ack = static_cast<MPGStats*>(ack_op->get_req());
786   ack->get();  // MonOpRequestRef owns one ref; give the other to send_reply.
787   dout(7) << "_updated_stats for "
788           << op->get_req()->get_orig_source_inst() << dendl;
789   mon->send_reply(op, ack);
790 }
791
792
793 // ------------------------
794
795 struct RetryCheckOSDMap : public Context {
796   PGMonitor *pgmon;
797   epoch_t epoch;
798   RetryCheckOSDMap(PGMonitor *p, epoch_t e) : pgmon(p), epoch(e) {
799   }
800   void finish(int r) override {
801     if (r == -ECANCELED)
802       return;
803
804     pgmon->check_osd_map(epoch);
805   }
806 };
807
808 void PGMonitor::check_osd_map(epoch_t epoch)
809 {
810   if (mon->is_peon())
811     return;  // whatever.
812
813   if (did_delete)
814     return;
815
816   if (pg_map.last_osdmap_epoch >= epoch) {
817     dout(10) << __func__ << " already seen " << pg_map.last_osdmap_epoch
818              << " >= " << epoch << dendl;
819     return;
820   }
821
822   if (!mon->osdmon()->is_readable()) {
823     dout(10) << __func__ << " -- osdmap not readable, waiting" << dendl;
824     mon->osdmon()->wait_for_readable_ctx(new RetryCheckOSDMap(this, epoch));
825     return;
826   }
827
828   if (!is_writeable()) {
829     dout(10) << __func__ << " -- pgmap not writeable, waiting" << dendl;
830     wait_for_writeable_ctx(new RetryCheckOSDMap(this, epoch));
831     return;
832   }
833
834   const OSDMap& osdmap = mon->osdmon()->osdmap;
835   if (!did_delete && osdmap.require_osd_release >= CEPH_RELEASE_LUMINOUS) {
836     // delete all my data
837     dout(1) << __func__ << " will clear pg_map data" << dendl;
838     do_delete = true;
839     propose_pending();
840     return;
841   }
842
843   // osds that went up or down
844   set<int> need_check_down_pg_osds;
845
846   // apply latest map(s)
847   epoch = std::max(epoch, osdmap.get_epoch());
848   for (epoch_t e = pg_map.last_osdmap_epoch+1;
849        e <= epoch;
850        e++) {
851     dout(10) << __func__ << " applying osdmap e" << e << " to pg_map" << dendl;
852     bufferlist bl;
853     int err = mon->osdmon()->get_version(e, bl);
854     assert(err == 0);
855
856     assert(bl.length());
857     OSDMap::Incremental inc(bl);
858
859     PGMapUpdater::check_osd_map(inc, &need_check_down_pg_osds,
860                                 &last_osd_report, &pg_map, &pending_inc);
861   }
862
863   assert(pg_map.last_osdmap_epoch < epoch);
864   pending_inc.osdmap_epoch = epoch;
865   PGMapUpdater::update_creating_pgs(osdmap, pg_map, &pending_inc);
866   PGMapUpdater::register_new_pgs(osdmap, pg_map, &pending_inc);
867
868   PGMapUpdater::check_down_pgs(osdmap, pg_map, check_all_pgs,
869                                need_check_down_pg_osds, &pending_inc);
870   check_all_pgs = false;
871
872   propose_pending();
873 }
874
875 epoch_t PGMonitor::send_pg_creates(int osd, Connection *con, epoch_t next)
876 {
877   dout(30) << __func__ << " " << pg_map.creating_pgs_by_osd_epoch << dendl;
878   map<int, map<epoch_t, set<pg_t> > >::iterator p =
879     pg_map.creating_pgs_by_osd_epoch.find(osd);
880   if (p == pg_map.creating_pgs_by_osd_epoch.end())
881     return next;
882
883   assert(p->second.size() > 0);
884
885   MOSDPGCreate *m = NULL;
886   epoch_t last = 0;
887   for (map<epoch_t, set<pg_t> >::iterator q = p->second.lower_bound(next);
888        q != p->second.end();
889        ++q) {
890     dout(20) << __func__ << " osd." << osd << " from " << next
891              << " : epoch " << q->first << " " << q->second.size() << " pgs"
892              << dendl;
893     last = q->first;
894     for (set<pg_t>::iterator r = q->second.begin(); r != q->second.end(); ++r) {
895       pg_stat_t &st = pg_map.pg_stat[*r];
896       if (!m)
897         m = new MOSDPGCreate(pg_map.last_osdmap_epoch);
898       m->mkpg[*r] = pg_create_t(st.created,
899                                 st.parent,
900                                 st.parent_split_bits);
901       // Need the create time from the monitor using its clock to set
902       // last_scrub_stamp upon pg creation.
903       m->ctimes[*r] = pg_map.pg_stat[*r].last_scrub_stamp;
904     }
905   }
906   if (!m) {
907     dout(20) << "send_pg_creates osd." << osd << " from " << next
908              << " has nothing to send" << dendl;
909     return next;
910   }
911
912   con->send_message(m);
913
914   // sub is current through last + 1
915   return last + 1;
916 }
917
918 bool PGMonitor::preprocess_command(MonOpRequestRef op)
919 {
920   op->mark_pgmon_event(__func__);
921   MMonCommand *m = static_cast<MMonCommand*>(op->get_req());
922   int r = -1;
923   bufferlist rdata;
924   stringstream ss, ds;
925
926   if (m->fsid != mon->monmap->fsid) {
927     dout(0) << __func__ << " drop message on fsid " << m->fsid << " != "
928             << mon->monmap->fsid << " for " << *m << dendl;
929     return true;
930   }
931
932   map<string, cmd_vartype> cmdmap;
933   if (!cmdmap_from_json(m->cmd, &cmdmap, ss)) {
934     // ss has reason for failure
935     string rs = ss.str();
936     mon->reply_command(op, -EINVAL, rs, rdata, get_last_committed());
937     return true;
938   }
939
940   string prefix;
941   cmd_getval(g_ceph_context, cmdmap, "prefix", prefix);
942
943   MonSession *session = m->get_session();
944   if (!session) {
945     mon->reply_command(op, -EACCES, "access denied", rdata, get_last_committed());
946     return true;
947   }
948
949   string format;
950   cmd_getval(g_ceph_context, cmdmap, "format", format, string("plain"));
951   boost::scoped_ptr<Formatter> f(Formatter::create(format));
952
953   if (prefix == "pg scrub" ||
954       prefix == "pg repair" ||
955       prefix == "pg deep-scrub") {
956     string scrubop = prefix.substr(3, string::npos);
957     pg_t pgid;
958     string pgidstr;
959     cmd_getval(g_ceph_context, cmdmap, "pgid", pgidstr);
960     if (!pgid.parse(pgidstr.c_str())) {
961       ss << "invalid pgid '" << pgidstr << "'";
962       r = -EINVAL;
963       goto reply;
964     }
965     if (!pg_map.pg_stat.count(pgid)) {
966       ss << "pg " << pgid << " dne";
967       r = -ENOENT;
968       goto reply;
969     }
970     int osd = pg_map.pg_stat[pgid].acting_primary;
971     if (osd == -1) {
972       ss << "pg " << pgid << " has no primary osd";
973       r = -EAGAIN;
974       goto reply;
975     }
976     if (!mon->osdmon()->osdmap.is_up(osd)) {
977       ss << "pg " << pgid << " primary osd." << osd << " not up";
978       r = -EAGAIN;
979       goto reply;
980     }
981     vector<pg_t> pgs(1);
982     pgs[0] = pgid;
983     mon->try_send_message(new MOSDScrub(mon->monmap->fsid, pgs,
984                                         scrubop == "repair",
985                                         scrubop == "deep-scrub"),
986                           mon->osdmon()->osdmap.get_inst(osd));
987     ss << "instructing pg " << pgid << " on osd." << osd << " to " << scrubop;
988     r = 0;
989   } else {
990     r = process_pg_map_command(prefix, cmdmap, pg_map, mon->osdmon()->osdmap,
991                                f.get(), &ss, &rdata);
992   }
993
994   if (r == -EOPNOTSUPP)
995     return false;
996
997 reply:
998   string rs;
999   getline(ss, rs);
1000   rdata.append(ds);
1001   mon->reply_command(op, r, rs, rdata, get_last_committed());
1002   return true;
1003 }
1004
1005 bool PGMonitor::prepare_command(MonOpRequestRef op)
1006 {
1007   op->mark_pgmon_event(__func__);
1008   MMonCommand *m = static_cast<MMonCommand*>(op->get_req());
1009   if (m->fsid != mon->monmap->fsid) {
1010     dout(0) << __func__ << " drop message on fsid " << m->fsid << " != "
1011             << mon->monmap->fsid << " for " << *m << dendl;
1012     return true;
1013   }
1014   stringstream ss;
1015   pg_t pgid;
1016   epoch_t epoch = mon->osdmon()->osdmap.get_epoch();
1017   int r = 0;
1018   string rs;
1019
1020   map<string, cmd_vartype> cmdmap;
1021   if (!cmdmap_from_json(m->cmd, &cmdmap, ss)) {
1022     // ss has reason for failure
1023     string rs = ss.str();
1024     mon->reply_command(op, -EINVAL, rs, get_last_committed());
1025     return true;
1026   }
1027
1028   string prefix;
1029   cmd_getval(g_ceph_context, cmdmap, "prefix", prefix);
1030
1031   MonSession *session = m->get_session();
1032   if (!session) {
1033     mon->reply_command(op, -EACCES, "access denied", get_last_committed());
1034     return true;
1035   }
1036
1037   if (prefix == "pg force_create_pg") {
1038     string pgidstr;
1039     cmd_getval(g_ceph_context, cmdmap, "pgid", pgidstr);
1040     if (!pgid.parse(pgidstr.c_str())) {
1041       ss << "pg " << pgidstr << " invalid";
1042       r = -EINVAL;
1043       goto reply;
1044     }
1045     if (!pg_map.pg_stat.count(pgid)) {
1046       ss << "pg " << pgid << " dne";
1047       r = -ENOENT;
1048       goto reply;
1049     }
1050     if (pg_map.creating_pgs.count(pgid)) {
1051       ss << "pg " << pgid << " already creating";
1052       r = 0;
1053       goto reply;
1054     }
1055     {
1056       PGMapUpdater::register_pg(
1057         mon->osdmon()->osdmap,
1058         pgid,
1059         epoch,
1060         true,
1061         pg_map,
1062         &pending_inc);
1063     }
1064     ss << "pg " << pgidstr << " now creating, ok";
1065     goto update;
1066   } else if (prefix == "pg force-recovery" ||
1067              prefix == "pg force-backfill" ||
1068              prefix == "pg cancel-force-recovery" ||
1069              prefix == "pg cancel-force-backfill") {
1070     if (mon->osdmon()->osdmap.require_osd_release >= CEPH_RELEASE_LUMINOUS) {
1071       ss << "you must complete the upgrade and 'ceph osd require-osd-release "
1072          << "luminous' before using forced recovery";
1073       r = -EPERM;
1074       goto reply;
1075     }
1076   } else if (prefix == "pg set_full_ratio" ||
1077              prefix == "pg set_nearfull_ratio") {
1078     if (mon->osdmon()->osdmap.require_osd_release >= CEPH_RELEASE_LUMINOUS) {
1079       ss << "please use the new luminous interfaces"
1080          << " ('osd set-full-ratio' and 'osd set-nearfull-ratio')";
1081       r = -EPERM;
1082       goto reply;
1083     }
1084     double n;
1085     if (!cmd_getval(g_ceph_context, cmdmap, "ratio", n)) {
1086       ss << "unable to parse 'ratio' value '"
1087          << cmd_vartype_stringify(cmdmap["who"]) << "'";
1088       r = -EINVAL;
1089       goto reply;
1090     }
1091     string op = prefix.substr(3, string::npos);
1092     if (op == "set_full_ratio")
1093       pending_inc.full_ratio = n;
1094     else if (op == "set_nearfull_ratio")
1095       pending_inc.nearfull_ratio = n;
1096     goto update;
1097   } else {
1098     r = -EINVAL;
1099     goto reply;
1100   }
1101
1102 reply:
1103   getline(ss, rs);
1104   if (r < 0 && rs.length() == 0)
1105     rs = cpp_strerror(r);
1106   mon->reply_command(op, r, rs, get_last_committed());
1107   return false;
1108
1109 update:
1110   getline(ss, rs);
1111   wait_for_finished_proposal(op, new Monitor::C_Command(
1112                                mon, op, r, rs, get_last_committed() + 1));
1113   return true;
1114 }
1115
1116 void PGMonitor::get_health(list<pair<health_status_t,string> >& summary,
1117                            list<pair<health_status_t,string> > *detail,
1118                            CephContext *cct) const
1119 {
1120   // legacy pre-luminous full/nearfull
1121   if (mon->osdmon()->osdmap.require_osd_release < CEPH_RELEASE_LUMINOUS) {
1122     check_full_osd_health(summary, detail, pg_map.full_osds, "full",
1123                           HEALTH_ERR);
1124     check_full_osd_health(summary, detail, pg_map.nearfull_osds, "near full",
1125                           HEALTH_WARN);
1126     pg_map.get_health(cct, mon->osdmon()->osdmap, summary, detail);
1127   }
1128 }
1129
1130 void PGMonitor::check_full_osd_health(list<pair<health_status_t,string> >& summary,
1131                                       list<pair<health_status_t,string> > *detail,
1132                                       const mempool::pgmap::set<int>& s, const char *desc,
1133                                       health_status_t sev) const
1134 {
1135   if (!s.empty()) {
1136     ostringstream ss;
1137     ss << s.size() << " " << desc << " osd(s)";
1138     summary.push_back(make_pair(sev, ss.str()));
1139     if (detail) {
1140       for (set<int>::const_iterator p = s.begin(); p != s.end(); ++p) {
1141         ostringstream ss;
1142         const osd_stat_t& os = pg_map.osd_stat.find(*p)->second;
1143         int ratio = (int)(((float)os.kb_used) / (float) os.kb * 100.0);
1144         ss << "osd." << *p << " is " << desc << " at " << ratio << "%";
1145         detail->push_back(make_pair(sev, ss.str()));
1146       }
1147     }
1148   }
1149 }
1150
1151 void PGMonitor::check_subs()
1152 {
1153   if (mon->osdmon()->osdmap.require_osd_release >= CEPH_RELEASE_LUMINOUS) {
1154     return;
1155   }
1156
1157   dout(10) << __func__ << dendl;
1158   const string type = "osd_pg_creates";
1159
1160   mon->with_session_map([this, &type](const MonSessionMap& session_map) {
1161       if (mon->session_map.subs.count(type) == 0)
1162         return;
1163
1164       auto p = mon->session_map.subs[type]->begin();
1165       while (!p.end()) {
1166         Subscription *sub = *p;
1167         ++p;
1168         dout(20) << __func__ << " .. " << sub->session->inst << dendl;
1169         check_sub(sub);
1170       }
1171     });
1172 }
1173
1174 bool PGMonitor::check_sub(Subscription *sub)
1175 {
1176   OSDMap& osdmap = mon->osdmon()->osdmap;
1177   if (sub->type == "osd_pg_creates") {
1178     // only send these if the OSD is up.  we will check_subs() when they do
1179     // come up so they will get the creates then.
1180     if (sub->session->inst.name.is_osd() &&
1181         osdmap.is_up(sub->session->inst.name.num())) {
1182       sub->next = send_pg_creates(sub->session->inst.name.num(),
1183                                   sub->session->con.get(),
1184                                   sub->next);
1185     }
1186   }
1187   return true;
1188 }
1189
1190 class PGMonStatService : public MonPGStatService, public PGMapStatService {
1191   PGMonitor *pgmon;
1192 public:
1193   PGMonStatService(const PGMap& o, PGMonitor *pgm)
1194     : MonPGStatService(), PGMapStatService(o), pgmon(pgm) {}
1195        
1196
1197   bool is_readable() const override { return pgmon->is_readable(); }
1198
1199   unsigned maybe_add_creating_pgs(epoch_t scan_epoch,
1200      const mempool::osdmap::map<int64_t,pg_pool_t>& pools,
1201      creating_pgs_t *pending_creates) const override
1202   {
1203     if (pgmap.last_pg_scan < scan_epoch) {
1204       return 0;
1205     }
1206     unsigned added = 0;
1207     for (auto& pgid : pgmap.creating_pgs) {
1208       if (!pools.count(pgid.pool())) {
1209         continue;
1210       }
1211       auto st = pgmap.pg_stat.find(pgid);
1212       assert(st != pgmap.pg_stat.end());
1213       auto created = make_pair(st->second.created,
1214                                st->second.last_scrub_stamp);
1215       // no need to add the pg, if it already exists in creating_pgs
1216       if (pending_creates->pgs.emplace(pgid, created).second) {
1217         added++;
1218       }
1219     }
1220     return added;
1221   }
1222   void maybe_trim_creating_pgs(creating_pgs_t *creates) const override {
1223     auto p = creates->pgs.begin();
1224     while (p != creates->pgs.end()) {
1225       auto q = pgmap.pg_stat.find(p->first);
1226       if (q != pgmap.pg_stat.end() &&
1227           !(q->second.state & PG_STATE_CREATING)) {
1228         p = creates->pgs.erase(p);
1229         creates->created_pools.insert(q->first.pool());
1230       } else {
1231         ++p;
1232       }
1233     }
1234   }
1235   void dump_info(Formatter *f) const override {
1236     f->dump_object("pgmap", pgmap);
1237     f->dump_unsigned("pgmap_first_committed", pgmon->get_first_committed());
1238     f->dump_unsigned("pgmap_last_committed", pgmon->get_last_committed());
1239   }
1240   int process_pg_command(const string& prefix,
1241                          const map<string,cmd_vartype>& cmdmap,
1242                          const OSDMap& osdmap,
1243                          Formatter *f,
1244                          stringstream *ss,
1245                          bufferlist *odata) const override {
1246     return process_pg_map_command(prefix, cmdmap, pgmap, osdmap, f, ss, odata);
1247   }
1248
1249   int reweight_by_utilization(const OSDMap &osd_map,
1250                               int oload,
1251                               double max_changef,
1252                               int max_osds,
1253                               bool by_pg, const set<int64_t> *pools,
1254                               bool no_increasing,
1255                               mempool::osdmap::map<int32_t, uint32_t>* new_weights,
1256                               std::stringstream *ss,
1257                               std::string *out_str,
1258                               Formatter *f) const override {
1259     return reweight::by_utilization(osd_map, pgmap, oload, max_changef,
1260                                     max_osds, by_pg, pools, no_increasing,
1261                                     new_weights, ss, out_str, f);
1262   }
1263 };
1264
1265 MonPGStatService *PGMonitor::get_pg_stat_service()
1266 {
1267   if (!pgservice) {
1268     pgservice.reset(new PGMonStatService(pg_map, this));
1269   }
1270   return pgservice.get();
1271 }
1272
1273 PGMonitor::PGMonitor(Monitor *mn, Paxos *p, const string& service_name)
1274   : PaxosService(mn, p, service_name),
1275     pgmap_meta_prefix("pgmap_meta"),
1276     pgmap_pg_prefix("pgmap_pg"),
1277     pgmap_osd_prefix("pgmap_osd")
1278 {}
1279
1280 PGMonitor::~PGMonitor() = default;