Fix some bugs when testing opensds ansible
[stor4nfv.git] / src / ceph / src / mon / MDSMonitor.cc
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
3 /*
4  * Ceph - scalable distributed file system
5  *
6  * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
7  *
8  * This is free software; you can redistribute it and/or
9  * modify it under the terms of the GNU Lesser General Public
10  * License version 2.1, as published by the Free Software 
11  * Foundation.  See file COPYING.
12  * 
13  */
14
15 #include <sstream>
16 #include <boost/utility.hpp>
17 #include <boost/regex.hpp>
18
19 #include "MDSMonitor.h"
20 #include "FSCommands.h"
21 #include "Monitor.h"
22 #include "MonitorDBStore.h"
23 #include "OSDMonitor.h"
24 #include "PGMonitor.h"
25
26 #include "common/strtol.h"
27 #include "common/perf_counters.h"
28 #include "common/config.h"
29 #include "common/cmdparse.h"
30 #include "messages/MMDSMap.h"
31 #include "messages/MFSMap.h"
32 #include "messages/MFSMapUser.h"
33 #include "messages/MMDSLoadTargets.h"
34 #include "messages/MMonCommand.h"
35 #include "messages/MGenericMessage.h"
36
37 #include "include/assert.h"
38 #include "include/str_list.h"
39 #include "include/stringify.h"
40 #include "mds/mdstypes.h"
41 #include "Session.h"
42
43 #define dout_subsys ceph_subsys_mon
44 #undef dout_prefix
45 #define dout_prefix _prefix(_dout, mon, fsmap)
46 static ostream& _prefix(std::ostream *_dout, Monitor *mon, FSMap const& fsmap) {
47   return *_dout << "mon." << mon->name << "@" << mon->rank
48                 << "(" << mon->get_state_name()
49                 << ").mds e" << fsmap.get_epoch() << " ";
50 }
51
52 static const string MDS_METADATA_PREFIX("mds_metadata");
53 static const string MDS_HEALTH_PREFIX("mds_health");
54
55
56 /*
57  * Specialized implementation of cmd_getval to allow us to parse
58  * out strongly-typedef'd types
59  */
60 template<> bool cmd_getval(CephContext *cct, const cmdmap_t& cmdmap,
61                            const std::string& k, mds_gid_t &val)
62 {
63   return cmd_getval(cct, cmdmap, k, (int64_t&)val);
64 }
65
66 template<> bool cmd_getval(CephContext *cct, const cmdmap_t& cmdmap,
67                            const std::string& k, mds_rank_t &val)
68 {
69   return cmd_getval(cct, cmdmap, k, (int64_t&)val);
70 }
71
72 template<> bool cmd_getval(CephContext *cct, const cmdmap_t& cmdmap,
73                            const std::string& k, MDSMap::DaemonState &val)
74 {
75   return cmd_getval(cct, cmdmap, k, (int64_t&)val);
76 }
77
78 // my methods
79
80 void MDSMonitor::print_map(FSMap &m, int dbl)
81 {
82   dout(dbl) << "print_map\n";
83   m.print(*_dout);
84   *_dout << dendl;
85 }
86
87 // service methods
88 void MDSMonitor::create_initial()
89 {
90   dout(10) << "create_initial" << dendl;
91 }
92
93 void MDSMonitor::get_store_prefixes(std::set<string>& s)
94 {
95   s.insert(service_name);
96   s.insert(MDS_METADATA_PREFIX);
97   s.insert(MDS_HEALTH_PREFIX);
98 }
99
100 void MDSMonitor::update_from_paxos(bool *need_bootstrap)
101 {
102   version_t version = get_last_committed();
103   if (version == fsmap.epoch)
104     return;
105
106   dout(10) << __func__ << " version " << version
107            << ", my e " << fsmap.epoch << dendl;
108   assert(version > fsmap.epoch);
109
110   load_health();
111
112   // read and decode
113   bufferlist fsmap_bl;
114   fsmap_bl.clear();
115   int err = get_version(version, fsmap_bl);
116   assert(err == 0);
117
118   assert(fsmap_bl.length() > 0);
119   dout(10) << __func__ << " got " << version << dendl;
120   fsmap.decode(fsmap_bl);
121
122   // new map
123   dout(4) << "new map" << dendl;
124   print_map(fsmap, 0);
125   if (!g_conf->mon_mds_skip_sanity) {
126     fsmap.sanity();
127   }
128
129   check_subs();
130   update_logger();
131 }
132
133 void MDSMonitor::init()
134 {
135   (void)load_metadata(pending_metadata);
136 }
137
138 void MDSMonitor::create_pending()
139 {
140   pending_fsmap = fsmap;
141   pending_fsmap.epoch++;
142
143   if (mon->osdmon()->is_readable()) {
144     auto &osdmap = mon->osdmon()->osdmap;
145     pending_fsmap.sanitize([&osdmap](int64_t pool){return osdmap.have_pg_pool(pool);});
146   }
147
148   dout(10) << "create_pending e" << pending_fsmap.epoch << dendl;
149 }
150
151 void MDSMonitor::encode_pending(MonitorDBStore::TransactionRef t)
152 {
153   dout(10) << "encode_pending e" << pending_fsmap.epoch << dendl;
154
155
156   // print map iff 'debug mon = 30' or higher
157   print_map(pending_fsmap, 30);
158   if (!g_conf->mon_mds_skip_sanity) {
159     pending_fsmap.sanity();
160   }
161
162   // Set 'modified' on maps modified this epoch
163   for (auto &i : fsmap.filesystems) {
164     if (i.second->mds_map.epoch == fsmap.epoch) {
165       i.second->mds_map.modified = ceph_clock_now();
166     }
167   }
168
169   // apply to paxos
170   assert(get_last_committed() + 1 == pending_fsmap.epoch);
171   bufferlist fsmap_bl;
172   pending_fsmap.encode(fsmap_bl, mon->get_quorum_con_features());
173
174   /* put everything in the transaction */
175   put_version(t, pending_fsmap.epoch, fsmap_bl);
176   put_last_committed(t, pending_fsmap.epoch);
177
178   // Encode MDSHealth data
179   for (std::map<uint64_t, MDSHealth>::iterator i = pending_daemon_health.begin();
180       i != pending_daemon_health.end(); ++i) {
181     bufferlist bl;
182     i->second.encode(bl);
183     t->put(MDS_HEALTH_PREFIX, stringify(i->first), bl);
184   }
185
186   for (std::set<uint64_t>::iterator i = pending_daemon_health_rm.begin();
187       i != pending_daemon_health_rm.end(); ++i) {
188     t->erase(MDS_HEALTH_PREFIX, stringify(*i));
189   }
190   pending_daemon_health_rm.clear();
191   remove_from_metadata(t);
192
193   // health
194   health_check_map_t new_checks;
195   const auto info_map = pending_fsmap.get_mds_info();
196   for (const auto &i : info_map) {
197     const auto &gid = i.first;
198     const auto &info = i.second;
199     if (pending_daemon_health_rm.count(gid)) {
200       continue;
201     }
202     MDSHealth health;
203     auto p = pending_daemon_health.find(gid);
204     if (p != pending_daemon_health.end()) {
205       health = p->second;
206     } else {
207       bufferlist bl;
208       mon->store->get(MDS_HEALTH_PREFIX, stringify(gid), bl);
209       if (!bl.length()) {
210         derr << "Missing health data for MDS " << gid << dendl;
211         continue;
212       }
213       bufferlist::iterator bl_i = bl.begin();
214       health.decode(bl_i);
215     }
216     for (const auto &metric : health.metrics) {
217       const int rank = info.rank;
218       health_check_t *check = &new_checks.get_or_add(
219         mds_metric_name(metric.type),
220         metric.sev,
221         mds_metric_summary(metric.type));
222       ostringstream ss;
223       ss << "mds" << info.name << "(mds." << rank << "): " << metric.message;
224       for (auto p = metric.metadata.begin();
225            p != metric.metadata.end();
226            ++p) {
227         if (p != metric.metadata.begin()) {
228           ss << ", ";
229         }
230         ss << p->first << ": " << p->second;
231       }
232       check->detail.push_back(ss.str());
233     }
234   }
235   pending_fsmap.get_health_checks(&new_checks);
236   for (auto& p : new_checks.checks) {
237     p.second.summary = boost::regex_replace(
238       p.second.summary,
239       boost::regex("%num%"),
240       stringify(p.second.detail.size()));
241     p.second.summary = boost::regex_replace(
242       p.second.summary,
243       boost::regex("%plurals%"),
244       p.second.detail.size() > 1 ? "s" : "");
245     p.second.summary = boost::regex_replace(
246       p.second.summary,
247       boost::regex("%isorare%"),
248       p.second.detail.size() > 1 ? "are" : "is");
249     p.second.summary = boost::regex_replace(
250       p.second.summary,
251       boost::regex("%hasorhave%"),
252       p.second.detail.size() > 1 ? "have" : "has");
253   }
254   encode_health(new_checks, t);
255 }
256
257 version_t MDSMonitor::get_trim_to()
258 {
259   version_t floor = 0;
260   if (g_conf->mon_mds_force_trim_to > 0 &&
261       g_conf->mon_mds_force_trim_to < (int)get_last_committed()) {
262     floor = g_conf->mon_mds_force_trim_to;
263     dout(10) << __func__ << " explicit mon_mds_force_trim_to = "
264              << floor << dendl;
265   }
266
267   unsigned max = g_conf->mon_max_mdsmap_epochs;
268   version_t last = get_last_committed();
269
270   if (last - get_first_committed() > max && floor < last - max)
271     return last - max;
272   return floor;
273 }
274
275 void MDSMonitor::update_logger()
276 {
277   dout(10) << "update_logger" << dendl;
278
279   uint64_t up = 0;
280   uint64_t in = 0;
281   uint64_t failed = 0;
282   for (const auto &i : fsmap.filesystems) {
283     const MDSMap &mds_map = i.second->mds_map;
284
285     up += mds_map.get_num_up_mds();
286     in += mds_map.get_num_in_mds();
287     failed += mds_map.get_num_failed_mds();
288   }
289   mon->cluster_logger->set(l_cluster_num_mds_up, up);
290   mon->cluster_logger->set(l_cluster_num_mds_in, in);
291   mon->cluster_logger->set(l_cluster_num_mds_failed, failed);
292   mon->cluster_logger->set(l_cluster_mds_epoch, fsmap.get_epoch());
293 }
294
295 bool MDSMonitor::preprocess_query(MonOpRequestRef op)
296 {
297   op->mark_mdsmon_event(__func__);
298   PaxosServiceMessage *m = static_cast<PaxosServiceMessage*>(op->get_req());
299   dout(10) << "preprocess_query " << *m << " from " << m->get_orig_source_inst() << dendl;
300
301   switch (m->get_type()) {
302     
303   case MSG_MDS_BEACON:
304     return preprocess_beacon(op);
305     
306   case MSG_MON_COMMAND:
307     return preprocess_command(op);
308
309   case MSG_MDS_OFFLOAD_TARGETS:
310     return preprocess_offload_targets(op);
311
312   default:
313     ceph_abort();
314     return true;
315   }
316 }
317
318 void MDSMonitor::_note_beacon(MMDSBeacon *m)
319 {
320   mds_gid_t gid = mds_gid_t(m->get_global_id());
321   version_t seq = m->get_seq();
322
323   dout(15) << "_note_beacon " << *m << " noting time" << dendl;
324   last_beacon[gid].stamp = ceph_clock_now();
325   last_beacon[gid].seq = seq;
326 }
327
328 bool MDSMonitor::preprocess_beacon(MonOpRequestRef op)
329 {
330   op->mark_mdsmon_event(__func__);
331   MMDSBeacon *m = static_cast<MMDSBeacon*>(op->get_req());
332   MDSMap::DaemonState state = m->get_state();
333   mds_gid_t gid = m->get_global_id();
334   version_t seq = m->get_seq();
335   MDSMap::mds_info_t info;
336   epoch_t effective_epoch = 0;
337
338   // check privileges, ignore if fails
339   MonSession *session = m->get_session();
340   assert(session);
341   if (!session->is_capable("mds", MON_CAP_X)) {
342     dout(0) << "preprocess_beacon got MMDSBeacon from entity with insufficient privileges "
343             << session->caps << dendl;
344     goto ignore;
345   }
346
347   if (m->get_fsid() != mon->monmap->fsid) {
348     dout(0) << "preprocess_beacon on fsid " << m->get_fsid() << " != " << mon->monmap->fsid << dendl;
349     goto ignore;
350   }
351
352   dout(12) << "preprocess_beacon " << *m
353            << " from " << m->get_orig_source_inst()
354            << " " << m->get_compat()
355            << dendl;
356
357   // make sure the address has a port
358   if (m->get_orig_source_addr().get_port() == 0) {
359     dout(1) << " ignoring boot message without a port" << dendl;
360     goto ignore;
361   }
362
363   // check compat
364   if (!m->get_compat().writeable(fsmap.compat)) {
365     dout(1) << " mds " << m->get_source_inst() << " can't write to fsmap " << fsmap.compat << dendl;
366     goto ignore;
367   }
368
369   // fw to leader?
370   if (!mon->is_leader())
371     return false;
372
373   // booted, but not in map?
374   if (!pending_fsmap.gid_exists(gid)) {
375     if (state != MDSMap::STATE_BOOT) {
376       dout(7) << "mds_beacon " << *m << " is not in fsmap (state "
377               << ceph_mds_state_name(state) << ")" << dendl;
378
379       MDSMap null_map;
380       null_map.epoch = fsmap.epoch;
381       null_map.compat = fsmap.compat;
382       mon->send_reply(op, new MMDSMap(mon->monmap->fsid, &null_map));
383       return true;
384     } else {
385       return false;  // not booted yet.
386     }
387   }
388   dout(10) << __func__ << ": GID exists in map: " << gid << dendl;
389   info = pending_fsmap.get_info_gid(gid);
390
391   // old seq?
392   if (info.state_seq > seq) {
393     dout(7) << "mds_beacon " << *m << " has old seq, ignoring" << dendl;
394     goto ignore;
395   }
396
397   // Work out the latest epoch that this daemon should have seen
398   {
399     fs_cluster_id_t fscid = pending_fsmap.mds_roles.at(gid);
400     if (fscid == FS_CLUSTER_ID_NONE) {
401       effective_epoch = pending_fsmap.standby_epochs.at(gid);
402     } else {
403       effective_epoch = pending_fsmap.get_filesystem(fscid)->mds_map.epoch;
404     }
405     if (effective_epoch != m->get_last_epoch_seen()) {
406       dout(10) << "mds_beacon " << *m
407                << " ignoring requested state, because mds hasn't seen latest map" << dendl;
408       goto reply;
409     }
410   }
411
412   if (info.laggy()) {
413     _note_beacon(m);
414     return false;  // no longer laggy, need to update map.
415   }
416   if (state == MDSMap::STATE_BOOT) {
417     // ignore, already booted.
418     goto ignore;
419   }
420   // is there a state change here?
421   if (info.state != state) {
422     // legal state change?
423     if ((info.state == MDSMap::STATE_STANDBY ||
424          info.state == MDSMap::STATE_STANDBY_REPLAY) && state > 0) {
425       dout(10) << "mds_beacon mds can't activate itself (" << ceph_mds_state_name(info.state)
426                << " -> " << ceph_mds_state_name(state) << ")" << dendl;
427       goto reply;
428     }
429
430     if ((state == MDSMap::STATE_STANDBY || state == MDSMap::STATE_STANDBY_REPLAY)
431         && info.rank != MDS_RANK_NONE)
432     {
433       dout(4) << "mds_beacon MDS can't go back into standby after taking rank: "
434                  "held rank " << info.rank << " while requesting state "
435               << ceph_mds_state_name(state) << dendl;
436       goto reply;
437     }
438     
439     _note_beacon(m);
440     return false;
441   }
442
443   // Comparing known daemon health with m->get_health()
444   // and return false (i.e. require proposal) if they
445   // do not match, to update our stored
446   if (!(pending_daemon_health[gid] == m->get_health())) {
447     dout(20) << __func__ << " health metrics for gid " << gid << " were updated" << dendl;
448     _note_beacon(m);
449     return false;
450   }
451
452  reply:
453   // note time and reply
454   assert(effective_epoch > 0);
455   _note_beacon(m);
456   mon->send_reply(op,
457                   new MMDSBeacon(mon->monmap->fsid, m->get_global_id(), m->get_name(),
458                                  effective_epoch, state, seq,
459                                  CEPH_FEATURES_SUPPORTED_DEFAULT));
460   return true;
461
462  ignore:
463   // I won't reply this beacon, drop it.
464   mon->no_reply(op);
465   return true;
466 }
467
468 bool MDSMonitor::preprocess_offload_targets(MonOpRequestRef op)
469 {
470   op->mark_mdsmon_event(__func__);
471   MMDSLoadTargets *m = static_cast<MMDSLoadTargets*>(op->get_req());
472   dout(10) << "preprocess_offload_targets " << *m << " from " << m->get_orig_source() << dendl;
473   
474   // check privileges, ignore message if fails
475   MonSession *session = m->get_session();
476   if (!session)
477     goto done;
478   if (!session->is_capable("mds", MON_CAP_X)) {
479     dout(0) << "preprocess_offload_targets got MMDSLoadTargets from entity with insufficient caps "
480             << session->caps << dendl;
481     goto done;
482   }
483
484   if (fsmap.gid_exists(m->global_id) &&
485       m->targets == fsmap.get_info_gid(m->global_id).export_targets)
486     goto done;
487
488   return false;
489
490  done:
491   return true;
492 }
493
494
495 bool MDSMonitor::prepare_update(MonOpRequestRef op)
496 {
497   op->mark_mdsmon_event(__func__);
498   PaxosServiceMessage *m = static_cast<PaxosServiceMessage*>(op->get_req());
499   dout(7) << "prepare_update " << *m << dendl;
500
501   switch (m->get_type()) {
502     
503   case MSG_MDS_BEACON:
504     return prepare_beacon(op);
505
506   case MSG_MON_COMMAND:
507     return prepare_command(op);
508
509   case MSG_MDS_OFFLOAD_TARGETS:
510     return prepare_offload_targets(op);
511   
512   default:
513     ceph_abort();
514   }
515
516   return true;
517 }
518
519 bool MDSMonitor::prepare_beacon(MonOpRequestRef op)
520 {
521   op->mark_mdsmon_event(__func__);
522   MMDSBeacon *m = static_cast<MMDSBeacon*>(op->get_req());
523   // -- this is an update --
524   dout(12) << "prepare_beacon " << *m << " from " << m->get_orig_source_inst() << dendl;
525   entity_addr_t addr = m->get_orig_source_inst().addr;
526   mds_gid_t gid = m->get_global_id();
527   MDSMap::DaemonState state = m->get_state();
528   version_t seq = m->get_seq();
529
530   dout(20) << __func__ << " got health from gid " << gid << " with " << m->get_health().metrics.size() << " metrics." << dendl;
531
532   // Calculate deltas of health metrics created and removed
533   // Do this by type rather than MDSHealthMetric equality, because messages can
534   // change a lot when they include e.g. a number of items.
535   const auto &old_health = pending_daemon_health[gid].metrics;
536   const auto &new_health = m->get_health().metrics;
537
538   std::set<mds_metric_t> old_types;
539   for (const auto &i : old_health) {
540     old_types.insert(i.type);
541   }
542
543   std::set<mds_metric_t> new_types;
544   for (const auto &i : new_health) {
545     new_types.insert(i.type);
546   }
547
548   for (const auto &new_metric: new_health) {
549     if (old_types.count(new_metric.type) == 0) {
550       std::stringstream msg;
551       msg << "MDS health message (" << m->get_orig_source_inst().name << "): "
552           << new_metric.message;
553       if (new_metric.sev == HEALTH_ERR) {
554         mon->clog->error() << msg.str();
555       } else if (new_metric.sev == HEALTH_WARN) {
556         mon->clog->warn() << msg.str();
557       } else {
558         mon->clog->info() << msg.str();
559       }
560     }
561   }
562
563   // Log the disappearance of health messages at INFO
564   for (const auto &old_metric : old_health) {
565     if (new_types.count(old_metric.type) == 0) {
566       mon->clog->info() << "MDS health message cleared ("
567         << m->get_orig_source_inst().name << "): " << old_metric.message;
568     }
569   }
570
571   // Store health
572   pending_daemon_health[gid] = m->get_health();
573
574   // boot?
575   if (state == MDSMap::STATE_BOOT) {
576     // zap previous instance of this name?
577     if (g_conf->mds_enforce_unique_name) {
578       bool failed_mds = false;
579       while (mds_gid_t existing = pending_fsmap.find_mds_gid_by_name(m->get_name())) {
580         if (!mon->osdmon()->is_writeable()) {
581           mon->osdmon()->wait_for_writeable(op, new C_RetryMessage(this, op));
582           return false;
583         }
584         const MDSMap::mds_info_t &existing_info =
585           pending_fsmap.get_info_gid(existing);
586         mon->clog->info() << existing_info.human_name() << " restarted";
587         fail_mds_gid(existing);
588         failed_mds = true;
589       }
590       if (failed_mds) {
591         assert(mon->osdmon()->is_writeable());
592         request_proposal(mon->osdmon());
593       }
594     }
595
596     // Add this daemon to the map
597     if (pending_fsmap.mds_roles.count(gid) == 0) {
598       MDSMap::mds_info_t new_info;
599       new_info.global_id = gid;
600       new_info.name = m->get_name();
601       new_info.addr = addr;
602       new_info.mds_features = m->get_mds_features();
603       new_info.state = MDSMap::STATE_STANDBY;
604       new_info.state_seq = seq;
605       new_info.standby_for_rank = m->get_standby_for_rank();
606       new_info.standby_for_name = m->get_standby_for_name();
607       new_info.standby_for_fscid = m->get_standby_for_fscid();
608       new_info.standby_replay = m->get_standby_replay();
609       pending_fsmap.insert(new_info);
610     }
611
612     // Resolve standby_for_name to a rank
613     const MDSMap::mds_info_t &info = pending_fsmap.get_info_gid(gid);
614     if (!info.standby_for_name.empty()) {
615       const MDSMap::mds_info_t *leaderinfo = fsmap.find_by_name(
616           info.standby_for_name);
617       if (leaderinfo && (leaderinfo->rank >= 0)) {
618         auto fscid = pending_fsmap.mds_roles.at(leaderinfo->global_id);
619         auto fs = pending_fsmap.get_filesystem(fscid);
620
621         pending_fsmap.modify_daemon(gid, [fscid, leaderinfo](
622               MDSMap::mds_info_t *info) {
623             info->standby_for_rank = leaderinfo->rank;
624             info->standby_for_fscid = fscid;
625         });
626       }
627     }
628
629     // initialize the beacon timer
630     last_beacon[gid].stamp = ceph_clock_now();
631     last_beacon[gid].seq = seq;
632
633     // new incompat?
634     if (!pending_fsmap.compat.writeable(m->get_compat())) {
635       dout(10) << " fsmap " << pending_fsmap.compat
636                << " can't write to new mds' " << m->get_compat()
637                << ", updating fsmap and killing old mds's"
638                << dendl;
639       pending_fsmap.update_compat(m->get_compat());
640     }
641
642     update_metadata(m->get_global_id(), m->get_sys_info());
643   } else {
644     // state update
645     const MDSMap::mds_info_t &info = pending_fsmap.get_info_gid(gid);
646     // Old MDS daemons don't mention that they're standby replay until
647     // after they've sent their boot beacon, so update this field.
648     if (info.standby_replay != m->get_standby_replay()) {
649       pending_fsmap.modify_daemon(info.global_id, [&m](
650             MDSMap::mds_info_t *i)
651         {
652           i->standby_replay = m->get_standby_replay();
653         });
654     }
655
656     if (info.state == MDSMap::STATE_STOPPING && state != MDSMap::STATE_STOPPED ) {
657       // we can't transition to any other states from STOPPING
658       dout(0) << "got beacon for MDS in STATE_STOPPING, ignoring requested state change"
659                << dendl;
660       _note_beacon(m);
661       return true;
662     }
663
664     if (info.laggy()) {
665       dout(10) << "prepare_beacon clearing laggy flag on " << addr << dendl;
666       pending_fsmap.modify_daemon(info.global_id, [](MDSMap::mds_info_t *info)
667         {
668           info->clear_laggy();
669         }
670       );
671     }
672   
673     dout(10) << "prepare_beacon mds." << info.rank
674              << " " << ceph_mds_state_name(info.state)
675              << " -> " << ceph_mds_state_name(state)
676              << "  standby_for_rank=" << m->get_standby_for_rank()
677              << dendl;
678     if (state == MDSMap::STATE_STOPPED) {
679       const auto fscid = pending_fsmap.mds_roles.at(gid);
680       auto fs = pending_fsmap.get_filesystem(fscid);
681
682       mon->clog->info() << info.human_name() << " finished "
683                         << "deactivating rank " << info.rank << " in filesystem "
684                         << fs->mds_map.fs_name << " (now has "
685                         << fs->mds_map.get_num_in_mds() - 1 << " ranks)";
686
687       auto erased = pending_fsmap.stop(gid);
688       erased.push_back(gid);
689
690       for (const auto &erased_gid : erased) {
691         last_beacon.erase(erased_gid);
692         if (pending_daemon_health.count(erased_gid)) {
693           pending_daemon_health.erase(erased_gid);
694           pending_daemon_health_rm.insert(erased_gid);
695         }
696       }
697
698
699     } else if (state == MDSMap::STATE_DAMAGED) {
700       if (!mon->osdmon()->is_writeable()) {
701         dout(4) << __func__ << ": DAMAGED from rank " << info.rank
702                 << " waiting for osdmon writeable to blacklist it" << dendl;
703         mon->osdmon()->wait_for_writeable(op, new C_RetryMessage(this, op));
704         return false;
705       }
706
707       // Record this MDS rank as damaged, so that other daemons
708       // won't try to run it.
709       dout(4) << __func__ << ": marking rank "
710               << info.rank << " damaged" << dendl;
711
712       utime_t until = ceph_clock_now();
713       until += g_conf->mds_blacklist_interval;
714       const auto blacklist_epoch = mon->osdmon()->blacklist(info.addr, until);
715       request_proposal(mon->osdmon());
716       pending_fsmap.damaged(gid, blacklist_epoch);
717       last_beacon.erase(gid);
718
719       // Respond to MDS, so that it knows it can continue to shut down
720       mon->send_reply(op,
721                       new MMDSBeacon(
722                         mon->monmap->fsid, m->get_global_id(),
723                         m->get_name(), fsmap.get_epoch(), state, seq,
724                         CEPH_FEATURES_SUPPORTED_DEFAULT));
725     } else if (state == MDSMap::STATE_DNE) {
726       if (!mon->osdmon()->is_writeable()) {
727         dout(4) << __func__ << ": DNE from rank " << info.rank
728                 << " waiting for osdmon writeable to blacklist it" << dendl;
729         mon->osdmon()->wait_for_writeable(op, new C_RetryMessage(this, op));
730         return false;
731       }
732
733       fail_mds_gid(gid);
734       assert(mon->osdmon()->is_writeable());
735       request_proposal(mon->osdmon());
736
737       // Respond to MDS, so that it knows it can continue to shut down
738       mon->send_reply(op,
739                       new MMDSBeacon(
740                         mon->monmap->fsid, m->get_global_id(),
741                         m->get_name(), fsmap.get_epoch(), state, seq,
742                         CEPH_FEATURES_SUPPORTED_DEFAULT));
743     } else if (info.state == MDSMap::STATE_STANDBY && state != info.state) {
744       // Standby daemons should never modify their own
745       // state.  Reject any attempts to do so.
746       derr << "standby " << gid << " attempted to change state to "
747            << ceph_mds_state_name(state) << ", rejecting" << dendl;
748       return true;
749     } else if (info.state != MDSMap::STATE_STANDBY && state != info.state &&
750                !MDSMap::state_transition_valid(info.state, state)) {
751       // Validate state transitions for daemons that hold a rank
752       derr << "daemon " << gid << " (rank " << info.rank << ") "
753            << "reported invalid state transition "
754            << ceph_mds_state_name(info.state) << " -> "
755            << ceph_mds_state_name(state) << dendl;
756       return true;
757     } else {
758       // Made it through special cases and validations, record the
759       // daemon's reported state to the FSMap.
760       pending_fsmap.modify_daemon(gid, [state, seq](MDSMap::mds_info_t *info) {
761         info->state = state;
762         info->state_seq = seq;
763       });
764
765       if (state == MDSMap::STATE_ACTIVE) {
766         auto fscid = pending_fsmap.mds_roles.at(gid);
767         auto fs = pending_fsmap.get_filesystem(fscid);
768         mon->clog->info() << info.human_name() << " is now active in "
769                           << "filesystem " << fs->mds_map.fs_name << " as rank "
770                           << info.rank;
771       }
772     }
773   }
774
775   dout(7) << "prepare_beacon pending map now:" << dendl;
776   print_map(pending_fsmap);
777   
778   wait_for_finished_proposal(op, new FunctionContext([op, this](int r){
779     if (r >= 0)
780       _updated(op);   // success
781     else if (r == -ECANCELED) {
782       mon->no_reply(op);
783     } else {
784       dispatch(op);        // try again
785     }
786   }));
787
788   return true;
789 }
790
791 bool MDSMonitor::prepare_offload_targets(MonOpRequestRef op)
792 {
793   op->mark_mdsmon_event(__func__);
794   MMDSLoadTargets *m = static_cast<MMDSLoadTargets*>(op->get_req());
795   mds_gid_t gid = m->global_id;
796   if (pending_fsmap.gid_has_rank(gid)) {
797     dout(10) << "prepare_offload_targets " << gid << " " << m->targets << dendl;
798     pending_fsmap.update_export_targets(gid, m->targets);
799   } else {
800     dout(10) << "prepare_offload_targets " << gid << " not in map" << dendl;
801   }
802   return true;
803 }
804
805 bool MDSMonitor::should_propose(double& delay)
806 {
807   // delegate to PaxosService to assess whether we should propose
808   return PaxosService::should_propose(delay);
809 }
810
811 void MDSMonitor::_updated(MonOpRequestRef op)
812 {
813   op->mark_mdsmon_event(__func__);
814   MMDSBeacon *m = static_cast<MMDSBeacon*>(op->get_req());
815   dout(10) << "_updated " << m->get_orig_source() << " " << *m << dendl;
816   mon->clog->debug() << m->get_orig_source_inst() << " "
817           << ceph_mds_state_name(m->get_state());
818
819   if (m->get_state() == MDSMap::STATE_STOPPED) {
820     // send the map manually (they're out of the map, so they won't get it automatic)
821     MDSMap null_map;
822     null_map.epoch = fsmap.epoch;
823     null_map.compat = fsmap.compat;
824     mon->send_reply(op, new MMDSMap(mon->monmap->fsid, &null_map));
825   } else {
826     mon->send_reply(op, new MMDSBeacon(mon->monmap->fsid,
827                                        m->get_global_id(),
828                                        m->get_name(),
829                                        fsmap.get_epoch(),
830                                        m->get_state(),
831                                        m->get_seq(),
832                                        CEPH_FEATURES_SUPPORTED_DEFAULT));
833   }
834 }
835
836 void MDSMonitor::on_active()
837 {
838   tick();
839   update_logger();
840
841   if (mon->is_leader()) {
842     mon->clog->debug() << "fsmap " << fsmap;
843   }
844 }
845
846 void MDSMonitor::get_health(list<pair<health_status_t, string> >& summary,
847                             list<pair<health_status_t, string> > *detail,
848                             CephContext* cct) const
849 {
850   fsmap.get_health(summary, detail);
851
852   // For each MDS GID...
853   const auto info_map = fsmap.get_mds_info();
854   for (const auto &i : info_map) {
855     const auto &gid = i.first;
856     const auto &info = i.second;
857
858     // Decode MDSHealth
859     bufferlist bl;
860     mon->store->get(MDS_HEALTH_PREFIX, stringify(gid), bl);
861     if (!bl.length()) {
862       derr << "Missing health data for MDS " << gid << dendl;
863       continue;
864     }
865     MDSHealth health;
866     bufferlist::iterator bl_i = bl.begin();
867     health.decode(bl_i);
868
869     for (const auto &metric : health.metrics) {
870       const int rank = info.rank;
871       std::ostringstream message;
872       message << "mds" << rank << ": " << metric.message;
873       summary.push_back(std::make_pair(metric.sev, message.str()));
874
875       if (detail) {
876         // There is no way for us to clealy associate detail entries with summary entries (#7192), so
877         // we duplicate the summary message in the detail string and tag the metadata on.
878         std::ostringstream detail_message;
879         detail_message << message.str();
880         if (metric.metadata.size()) {
881           detail_message << "(";
882           auto k = metric.metadata.begin();
883           while (k != metric.metadata.end()) {
884             detail_message << k->first << ": " << k->second;
885             if (boost::next(k) != metric.metadata.end()) {
886               detail_message << ", ";
887             }
888             ++k;
889           }
890           detail_message << ")";
891         }
892         detail->push_back(std::make_pair(metric.sev, detail_message.str()));
893       }
894     }
895   }
896 }
897
898 void MDSMonitor::dump_info(Formatter *f)
899 {
900   f->open_object_section("fsmap");
901   fsmap.dump(f);
902   f->close_section();
903
904   f->dump_unsigned("mdsmap_first_committed", get_first_committed());
905   f->dump_unsigned("mdsmap_last_committed", get_last_committed());
906 }
907
908 bool MDSMonitor::preprocess_command(MonOpRequestRef op)
909 {
910   op->mark_mdsmon_event(__func__);
911   MMonCommand *m = static_cast<MMonCommand*>(op->get_req());
912   int r = -1;
913   bufferlist rdata;
914   stringstream ss, ds;
915
916   map<string, cmd_vartype> cmdmap;
917   if (!cmdmap_from_json(m->cmd, &cmdmap, ss)) {
918     // ss has reason for failure
919     string rs = ss.str();
920     mon->reply_command(op, -EINVAL, rs, rdata, get_last_committed());
921     return true;
922   }
923
924   string prefix;
925   cmd_getval(g_ceph_context, cmdmap, "prefix", prefix);
926   string format;
927   cmd_getval(g_ceph_context, cmdmap, "format", format, string("plain"));
928   boost::scoped_ptr<Formatter> f(Formatter::create(format));
929
930   MonSession *session = m->get_session();
931   if (!session) {
932     mon->reply_command(op, -EACCES, "access denied", rdata, get_last_committed());
933     return true;
934   }
935
936   if (prefix == "mds stat") {
937     if (f) {
938       f->open_object_section("mds_stat");
939       dump_info(f.get());
940       f->close_section();
941       f->flush(ds);
942     } else {
943       ds << fsmap;
944     }
945     r = 0;
946   } else if (prefix == "mds dump") {
947     int64_t epocharg;
948     epoch_t epoch;
949
950     FSMap *p = &fsmap;
951     if (cmd_getval(g_ceph_context, cmdmap, "epoch", epocharg)) {
952       epoch = epocharg;
953       bufferlist b;
954       int err = get_version(epoch, b);
955       if (err == -ENOENT) {
956         p = 0;
957         r = -ENOENT;
958       } else {
959         assert(err == 0);
960         assert(b.length());
961         p = new FSMap;
962         p->decode(b);
963       }
964     }
965     if (p) {
966       stringstream ds;
967       const MDSMap *mdsmap = nullptr;
968       MDSMap blank;
969       blank.epoch = fsmap.epoch;
970       if (fsmap.legacy_client_fscid != FS_CLUSTER_ID_NONE) {
971         mdsmap = &(fsmap.filesystems[fsmap.legacy_client_fscid]->mds_map);
972       } else {
973         mdsmap = &blank;
974       }
975       if (f != NULL) {
976         f->open_object_section("mdsmap");
977         mdsmap->dump(f.get());
978         f->close_section();
979         f->flush(ds);
980         r = 0;
981       } else {
982         mdsmap->print(ds);
983         r = 0;
984       }
985
986       rdata.append(ds);
987       ss << "dumped fsmap epoch " << p->get_epoch();
988
989       if (p != &fsmap) {
990         delete p;
991       }
992     }
993   } else if (prefix == "fs dump") {
994     int64_t epocharg;
995     epoch_t epoch;
996
997     FSMap *p = &fsmap;
998     if (cmd_getval(g_ceph_context, cmdmap, "epoch", epocharg)) {
999       epoch = epocharg;
1000       bufferlist b;
1001       int err = get_version(epoch, b);
1002       if (err == -ENOENT) {
1003         p = 0;
1004         r = -ENOENT;
1005       } else {
1006         assert(err == 0);
1007         assert(b.length());
1008         p = new FSMap;
1009         p->decode(b);
1010       }
1011     }
1012     if (p) {
1013       stringstream ds;
1014       if (f != NULL) {
1015         f->open_object_section("fsmap");
1016         p->dump(f.get());
1017         f->close_section();
1018         f->flush(ds);
1019         r = 0;
1020       } else {
1021         p->print(ds);
1022         r = 0;
1023       }
1024
1025       rdata.append(ds);
1026       ss << "dumped fsmap epoch " << p->get_epoch();
1027
1028       if (p != &fsmap)
1029         delete p;
1030     }
1031   } else if (prefix == "mds metadata") {
1032     if (!f)
1033       f.reset(Formatter::create("json-pretty"));
1034
1035     string who;
1036     bool all = !cmd_getval(g_ceph_context, cmdmap, "who", who);
1037     dout(1) << "all = " << all << dendl;
1038     if (all) {
1039       r = 0;
1040       // Dump all MDSs' metadata
1041       const auto all_info = fsmap.get_mds_info();
1042
1043       f->open_array_section("mds_metadata");
1044       for(const auto &i : all_info) {
1045         const auto &info = i.second;
1046
1047         f->open_object_section("mds");
1048         f->dump_string("name", info.name);
1049         std::ostringstream get_err;
1050         r = dump_metadata(info.name, f.get(), get_err);
1051         if (r == -EINVAL || r == -ENOENT) {
1052           // Drop error, list what metadata we do have
1053           dout(1) << get_err.str() << dendl;
1054           r = 0;
1055         } else if (r != 0) {
1056           derr << "Unexpected error reading metadata: " << cpp_strerror(r)
1057                << dendl;
1058           ss << get_err.str();
1059           f->close_section();
1060           break;
1061         }
1062         f->close_section();
1063       }
1064       f->close_section();
1065     } else {
1066       // Dump a single daemon's metadata
1067       f->open_object_section("mds_metadata");
1068       r = dump_metadata(who, f.get(), ss);
1069       f->close_section();
1070     }
1071     f->flush(ds);
1072   } else if (prefix == "mds versions") {
1073     if (!f)
1074       f.reset(Formatter::create("json-pretty"));
1075     count_metadata("ceph_version", f.get());
1076     f->flush(ds);
1077     r = 0;
1078   } else if (prefix == "mds count-metadata") {
1079     if (!f)
1080       f.reset(Formatter::create("json-pretty"));
1081     string field;
1082     cmd_getval(g_ceph_context, cmdmap, "property", field);
1083     count_metadata(field, f.get());
1084     f->flush(ds);
1085     r = 0;
1086   } else if (prefix == "mds getmap") {
1087     epoch_t e;
1088     int64_t epocharg;
1089     bufferlist b;
1090     if (cmd_getval(g_ceph_context, cmdmap, "epoch", epocharg)) {
1091       e = epocharg;
1092       int err = get_version(e, b);
1093       if (err == -ENOENT) {
1094         r = -ENOENT;
1095       } else {
1096         assert(err == 0);
1097         assert(b.length());
1098         FSMap mm;
1099         mm.decode(b);
1100         mm.encode(rdata, m->get_connection()->get_features());
1101         ss << "got fsmap epoch " << mm.get_epoch();
1102         r = 0;
1103       }
1104     } else {
1105       fsmap.encode(rdata, m->get_connection()->get_features());
1106       ss << "got fsmap epoch " << fsmap.get_epoch();
1107       r = 0;
1108     }
1109   } else if (prefix == "mds compat show") {
1110       if (f) {
1111         f->open_object_section("mds_compat");
1112         fsmap.compat.dump(f.get());
1113         f->close_section();
1114         f->flush(ds);
1115       } else {
1116         ds << fsmap.compat;
1117       }
1118       r = 0;
1119   } else if (prefix == "fs get") {
1120     string fs_name;
1121     cmd_getval(g_ceph_context, cmdmap, "fs_name", fs_name);
1122     auto fs = fsmap.get_filesystem(fs_name);
1123     if (fs == nullptr) {
1124       ss << "filesystem '" << fs_name << "' not found";
1125       r = -ENOENT;
1126     } else {
1127       if (f != nullptr) {
1128         f->open_object_section("filesystem");
1129         fs->dump(f.get());
1130         f->close_section();
1131         f->flush(ds);
1132         r = 0;
1133       } else {
1134         fs->print(ds);
1135         r = 0;
1136       }
1137     }
1138   } else if (prefix == "fs ls") {
1139     if (f) {
1140       f->open_array_section("filesystems");
1141       {
1142         for (const auto i : fsmap.filesystems) {
1143           const auto fs = i.second;
1144           f->open_object_section("filesystem");
1145           {
1146             const MDSMap &mds_map = fs->mds_map;
1147             f->dump_string("name", mds_map.fs_name);
1148             /* Output both the names and IDs of pools, for use by
1149              * humans and machines respectively */
1150             f->dump_string("metadata_pool", mon->osdmon()->osdmap.get_pool_name(
1151                   mds_map.metadata_pool));
1152             f->dump_int("metadata_pool_id", mds_map.metadata_pool);
1153             f->open_array_section("data_pool_ids");
1154             {
1155               for (auto dpi = mds_map.data_pools.begin();
1156                    dpi != mds_map.data_pools.end(); ++dpi) {
1157                 f->dump_int("data_pool_id", *dpi);
1158               }
1159             }
1160             f->close_section();
1161
1162             f->open_array_section("data_pools");
1163             {
1164                 for (auto dpi = mds_map.data_pools.begin();
1165                    dpi != mds_map.data_pools.end(); ++dpi) {
1166                   const auto &name = mon->osdmon()->osdmap.get_pool_name(
1167                       *dpi);
1168                   f->dump_string("data_pool", name);
1169                 }
1170             }
1171
1172             f->close_section();
1173           }
1174           f->close_section();
1175         }
1176       }
1177       f->close_section();
1178       f->flush(ds);
1179     } else {
1180       for (const auto i : fsmap.filesystems) {
1181         const auto fs = i.second;
1182         const MDSMap &mds_map = fs->mds_map;
1183         const string &md_pool_name = mon->osdmon()->osdmap.get_pool_name(
1184             mds_map.metadata_pool);
1185         
1186         ds << "name: " << mds_map.fs_name << ", metadata pool: "
1187            << md_pool_name << ", data pools: [";
1188         for (auto dpi : mds_map.data_pools) {
1189           const string &pool_name = mon->osdmon()->osdmap.get_pool_name(dpi);
1190           ds << pool_name << " ";
1191         }
1192         ds << "]" << std::endl;
1193       }
1194
1195       if (fsmap.filesystems.empty()) {
1196         ds << "No filesystems enabled" << std::endl;
1197       }
1198     }
1199     r = 0;
1200   }
1201
1202   if (r != -1) {
1203     rdata.append(ds);
1204     string rs;
1205     getline(ss, rs);
1206     mon->reply_command(op, r, rs, rdata, get_last_committed());
1207     return true;
1208   } else
1209     return false;
1210 }
1211
1212 bool MDSMonitor::fail_mds_gid(mds_gid_t gid)
1213 {
1214   const MDSMap::mds_info_t info = pending_fsmap.get_info_gid(gid);
1215   dout(10) << "fail_mds_gid " << gid << " mds." << info.name << " role " << info.rank << dendl;
1216
1217   epoch_t blacklist_epoch = 0;
1218   if (info.rank >= 0 && info.state != MDSMap::STATE_STANDBY_REPLAY) {
1219     utime_t until = ceph_clock_now();
1220     until += g_conf->mds_blacklist_interval;
1221     blacklist_epoch = mon->osdmon()->blacklist(info.addr, until);
1222   }
1223
1224   pending_fsmap.erase(gid, blacklist_epoch);
1225   last_beacon.erase(gid);
1226   if (pending_daemon_health.count(gid)) {
1227     pending_daemon_health.erase(gid);
1228     pending_daemon_health_rm.insert(gid);
1229   }
1230
1231   return blacklist_epoch != 0;
1232 }
1233
1234 mds_gid_t MDSMonitor::gid_from_arg(const std::string& arg, std::ostream &ss)
1235 {
1236   const FSMap *relevant_fsmap = mon->is_leader() ? &pending_fsmap : &fsmap;
1237
1238   // Try parsing as a role
1239   mds_role_t role;
1240   std::ostringstream ignore_err;  // Don't spam 'ss' with parse_role errors
1241   int r = parse_role(arg, &role, ignore_err);
1242   if (r == 0) {
1243     // See if a GID is assigned to this role
1244     auto fs = relevant_fsmap->get_filesystem(role.fscid);
1245     assert(fs != nullptr);  // parse_role ensures it exists
1246     if (fs->mds_map.is_up(role.rank)) {
1247       dout(10) << __func__ << ": validated rank/GID " << role
1248                << " as a rank" << dendl;
1249       return fs->mds_map.get_mds_info(role.rank).global_id;
1250     }
1251   }
1252
1253   // Try parsing as a gid
1254   std::string err;
1255   unsigned long long maybe_gid = strict_strtoll(arg.c_str(), 10, &err);
1256   if (!err.empty()) {
1257     // Not a role or a GID, try as a daemon name
1258     const MDSMap::mds_info_t *mds_info = relevant_fsmap->find_by_name(arg);
1259     if (!mds_info) {
1260       ss << "MDS named '" << arg
1261          << "' does not exist, or is not up";
1262       return MDS_GID_NONE;
1263     }
1264     dout(10) << __func__ << ": resolved MDS name '" << arg
1265              << "' to GID " << mds_info->global_id << dendl;
1266     return mds_info->global_id;
1267   } else {
1268     // Not a role, but parses as a an integer, might be a GID
1269     dout(10) << __func__ << ": treating MDS reference '" << arg
1270              << "' as an integer " << maybe_gid << dendl;
1271
1272     if (relevant_fsmap->gid_exists(mds_gid_t(maybe_gid))) {
1273       return mds_gid_t(maybe_gid);
1274     }
1275   }
1276
1277   dout(1) << __func__ << ": rank/GID " << arg
1278           << " not a existent rank or GID" << dendl;
1279   return MDS_GID_NONE;
1280 }
1281
1282 int MDSMonitor::fail_mds(std::ostream &ss, const std::string &arg,
1283     MDSMap::mds_info_t *failed_info)
1284 {
1285   assert(failed_info != nullptr);
1286
1287   mds_gid_t gid = gid_from_arg(arg, ss);
1288   if (gid == MDS_GID_NONE) {
1289     return 0;
1290   }
1291   if (!mon->osdmon()->is_writeable()) {
1292     return -EAGAIN;
1293   }
1294
1295   // Take a copy of the info before removing the MDS from the map,
1296   // so that the caller knows which mds (if any) they ended up removing.
1297   *failed_info = pending_fsmap.get_info_gid(gid);
1298
1299   fail_mds_gid(gid);
1300   ss << "failed mds gid " << gid;
1301   assert(mon->osdmon()->is_writeable());
1302   request_proposal(mon->osdmon());
1303   return 0;
1304 }
1305
1306 bool MDSMonitor::prepare_command(MonOpRequestRef op)
1307 {
1308   op->mark_mdsmon_event(__func__);
1309   MMonCommand *m = static_cast<MMonCommand*>(op->get_req());
1310   int r = -EINVAL;
1311   stringstream ss;
1312   bufferlist rdata;
1313
1314   map<string, cmd_vartype> cmdmap;
1315   if (!cmdmap_from_json(m->cmd, &cmdmap, ss)) {
1316     string rs = ss.str();
1317     mon->reply_command(op, -EINVAL, rs, rdata, get_last_committed());
1318     return true;
1319   }
1320
1321   string prefix;
1322   cmd_getval(g_ceph_context, cmdmap, "prefix", prefix);
1323
1324   /* Refuse access if message not associated with a valid session */
1325   MonSession *session = m->get_session();
1326   if (!session) {
1327     mon->reply_command(op, -EACCES, "access denied", rdata, get_last_committed());
1328     return true;
1329   }
1330
1331   bool batched_propose = false;
1332   for (auto h : handlers) {
1333     if (h->can_handle(prefix)) {
1334       batched_propose = h->batched_propose();
1335       if (batched_propose) {
1336         paxos->plug();
1337       }
1338       r = h->handle(mon, pending_fsmap, op, cmdmap, ss);
1339       if (batched_propose) {
1340         paxos->unplug();
1341       }
1342
1343       if (r == -EAGAIN) {
1344         // message has been enqueued for retry; return.
1345         dout(4) << __func__ << " enqueue for retry by prepare_command" << dendl;
1346         return false;
1347       } else {
1348         if (r == 0) {
1349           // On successful updates, print the updated map
1350           print_map(pending_fsmap);
1351         }
1352         // Successful or not, we're done: respond.
1353         goto out;
1354       }
1355     }
1356   }
1357
1358   r = filesystem_command(op, prefix, cmdmap, ss);
1359   if (r >= 0) {
1360     goto out;
1361   } else if (r == -EAGAIN) {
1362     // Do not reply, the message has been enqueued for retry
1363     dout(4) << __func__ << " enqueue for retry by filesystem_command" << dendl;
1364     return false;
1365   } else if (r != -ENOSYS) {
1366     goto out;
1367   }
1368
1369   // Only handle legacy commands if there is a filesystem configured
1370   if (pending_fsmap.legacy_client_fscid == FS_CLUSTER_ID_NONE) {
1371     if (pending_fsmap.filesystems.size() == 0) {
1372       ss << "No filesystem configured: use `ceph fs new` to create a filesystem";
1373     } else {
1374       ss << "No filesystem set for use with legacy commands";
1375     }
1376     r = -EINVAL;
1377     goto out;
1378   }
1379
1380   r = legacy_filesystem_command(op, prefix, cmdmap, ss);
1381
1382   if (r == -ENOSYS && ss.str().empty()) {
1383     ss << "unrecognized command";
1384   }
1385
1386 out:
1387   dout(4) << __func__ << " done, r=" << r << dendl;
1388   /* Compose response */
1389   string rs;
1390   getline(ss, rs);
1391
1392   if (r >= 0) {
1393     // success.. delay reply
1394     wait_for_finished_proposal(op, new Monitor::C_Command(mon, op, r, rs,
1395                                               get_last_committed() + 1));
1396     if (batched_propose) {
1397       force_immediate_propose();
1398     }
1399     return true;
1400   } else {
1401     // reply immediately
1402     mon->reply_command(op, r, rs, rdata, get_last_committed());
1403     return false;
1404   }
1405 }
1406
1407
1408 /**
1409  * Given one of the following forms:
1410  *   <fs name>:<rank>
1411  *   <fs id>:<rank>
1412  *   <rank>
1413  *
1414  * Parse into a mds_role_t.  The rank-only form is only valid
1415  * if legacy_client_ns is set.
1416  */
1417 int MDSMonitor::parse_role(
1418     const std::string &role_str,
1419     mds_role_t *role,
1420     std::ostream &ss)
1421 {
1422   const FSMap *relevant_fsmap = &fsmap;
1423   if (mon->is_leader()) {
1424     relevant_fsmap = &pending_fsmap;
1425   }
1426   return relevant_fsmap->parse_role(role_str, role, ss);
1427 }
1428
1429 int MDSMonitor::filesystem_command(
1430     MonOpRequestRef op,
1431     std::string const &prefix,
1432     map<string, cmd_vartype> &cmdmap,
1433     std::stringstream &ss)
1434 {
1435   dout(4) << __func__ << " prefix='" << prefix << "'" << dendl;
1436   op->mark_mdsmon_event(__func__);
1437   int r = 0;
1438   string whostr;
1439   cmd_getval(g_ceph_context, cmdmap, "who", whostr);
1440
1441   if (prefix == "mds stop" ||
1442       prefix == "mds deactivate") {
1443
1444     mds_role_t role;
1445     r = parse_role(whostr, &role, ss);
1446     if (r < 0 ) {
1447       return r;
1448     }
1449     auto fs = pending_fsmap.get_filesystem(role.fscid);
1450
1451     if (!fs->mds_map.is_active(role.rank)) {
1452       r = -EEXIST;
1453       ss << "mds." << role << " not active (" 
1454          << ceph_mds_state_name(fs->mds_map.get_state(role.rank)) << ")";
1455     } else if (fs->mds_map.get_root() == role.rank ||
1456                 fs->mds_map.get_tableserver() == role.rank) {
1457       r = -EINVAL;
1458       ss << "can't tell the root (" << fs->mds_map.get_root()
1459          << ") or tableserver (" << fs->mds_map.get_tableserver()
1460          << ") to deactivate";
1461     } else if (role.rank != fs->mds_map.get_last_in_mds()) {
1462       r = -EINVAL;
1463       ss << "mds." << role << " doesn't have the max rank ("
1464          << fs->mds_map.get_last_in_mds() << ")";
1465     } else if (fs->mds_map.get_num_in_mds() <= size_t(fs->mds_map.get_max_mds())) {
1466       r = -EBUSY;
1467       ss << "must decrease max_mds or else MDS will immediately reactivate";
1468     } else {
1469       r = 0;
1470       mds_gid_t gid = fs->mds_map.up.at(role.rank);
1471       ss << "telling mds." << role << " "
1472          << pending_fsmap.get_info_gid(gid).addr << " to deactivate";
1473
1474       pending_fsmap.modify_daemon(gid, [](MDSMap::mds_info_t *info) {
1475         info->state = MDSMap::STATE_STOPPING;
1476       });
1477     }
1478   } else if (prefix == "mds set_state") {
1479     mds_gid_t gid;
1480     if (!cmd_getval(g_ceph_context, cmdmap, "gid", gid)) {
1481       ss << "error parsing 'gid' value '"
1482          << cmd_vartype_stringify(cmdmap["gid"]) << "'";
1483       return -EINVAL;
1484     }
1485     MDSMap::DaemonState state;
1486     if (!cmd_getval(g_ceph_context, cmdmap, "state", state)) {
1487       ss << "error parsing 'state' string value '"
1488          << cmd_vartype_stringify(cmdmap["state"]) << "'";
1489       return -EINVAL;
1490     }
1491     if (pending_fsmap.gid_exists(gid)) {
1492       pending_fsmap.modify_daemon(gid, [state](MDSMap::mds_info_t *info) {
1493         info->state = state;
1494       });
1495       ss << "set mds gid " << gid << " to state " << state << " "
1496          << ceph_mds_state_name(state);
1497       return 0;
1498     }
1499   } else if (prefix == "mds fail") {
1500     string who;
1501     cmd_getval(g_ceph_context, cmdmap, "who", who);
1502
1503     MDSMap::mds_info_t failed_info;
1504     r = fail_mds(ss, who, &failed_info);
1505     if (r < 0 && r == -EAGAIN) {
1506       mon->osdmon()->wait_for_writeable(op, new C_RetryMessage(this, op));
1507       return -EAGAIN; // don't propose yet; wait for message to be retried
1508     } else if (r == 0) {
1509       // Only log if we really did something (not when was already gone)
1510       if (failed_info.global_id != MDS_GID_NONE) {
1511         mon->clog->info() << failed_info.human_name() << " marked failed by "
1512                           << op->get_session()->entity_name;
1513       }
1514     }
1515   } else if (prefix == "mds rm") {
1516     mds_gid_t gid;
1517     if (!cmd_getval(g_ceph_context, cmdmap, "gid", gid)) {
1518       ss << "error parsing 'gid' value '"
1519          << cmd_vartype_stringify(cmdmap["gid"]) << "'";
1520       return -EINVAL;
1521     }
1522     if (!pending_fsmap.gid_exists(gid)) {
1523       ss << "mds gid " << gid << " dne";
1524       r = 0;
1525     } else {
1526       MDSMap::DaemonState state = pending_fsmap.get_info_gid(gid).state;
1527       if (state > 0) {
1528         ss << "cannot remove active mds." << pending_fsmap.get_info_gid(gid).name
1529            << " rank " << pending_fsmap.get_info_gid(gid).rank;
1530         return -EBUSY;
1531       } else {
1532         pending_fsmap.erase(gid, {});
1533         ss << "removed mds gid " << gid;
1534         return 0;
1535       }
1536     }
1537   } else if (prefix == "mds rmfailed") {
1538     string confirm;
1539     if (!cmd_getval(g_ceph_context, cmdmap, "confirm", confirm) ||
1540        confirm != "--yes-i-really-mean-it") {
1541          ss << "WARNING: this can make your filesystem inaccessible! "
1542                "Add --yes-i-really-mean-it if you are sure you wish to continue.";
1543          return -EPERM;
1544     }
1545     
1546     std::string role_str;
1547     cmd_getval(g_ceph_context, cmdmap, "who", role_str);
1548     mds_role_t role;
1549     int r = parse_role(role_str, &role, ss);
1550     if (r < 0) {
1551       ss << "invalid role '" << role_str << "'";
1552       return -EINVAL;
1553     }
1554
1555     pending_fsmap.modify_filesystem(
1556         role.fscid,
1557         [role](std::shared_ptr<Filesystem> fs)
1558     {
1559       fs->mds_map.failed.erase(role.rank);
1560     });
1561
1562     ss << "removed failed mds." << role;
1563     return 0;
1564   } else if (prefix == "mds compat rm_compat") {
1565     int64_t f;
1566     if (!cmd_getval(g_ceph_context, cmdmap, "feature", f)) {
1567       ss << "error parsing feature value '"
1568          << cmd_vartype_stringify(cmdmap["feature"]) << "'";
1569       return -EINVAL;
1570     }
1571     if (pending_fsmap.compat.compat.contains(f)) {
1572       ss << "removing compat feature " << f;
1573       CompatSet modified = pending_fsmap.compat;
1574       modified.compat.remove(f);
1575       pending_fsmap.update_compat(modified);
1576     } else {
1577       ss << "compat feature " << f << " not present in " << pending_fsmap.compat;
1578     }
1579     r = 0;
1580   } else if (prefix == "mds compat rm_incompat") {
1581     int64_t f;
1582     if (!cmd_getval(g_ceph_context, cmdmap, "feature", f)) {
1583       ss << "error parsing feature value '"
1584          << cmd_vartype_stringify(cmdmap["feature"]) << "'";
1585       return -EINVAL;
1586     }
1587     if (pending_fsmap.compat.incompat.contains(f)) {
1588       ss << "removing incompat feature " << f;
1589       CompatSet modified = pending_fsmap.compat;
1590       modified.incompat.remove(f);
1591       pending_fsmap.update_compat(modified);
1592     } else {
1593       ss << "incompat feature " << f << " not present in " << pending_fsmap.compat;
1594     }
1595     r = 0;
1596   } else if (prefix == "mds repaired") {
1597     std::string role_str;
1598     cmd_getval(g_ceph_context, cmdmap, "rank", role_str);
1599     mds_role_t role;
1600     r = parse_role(role_str, &role, ss);
1601     if (r < 0) {
1602       return r;
1603     }
1604
1605     bool modified = pending_fsmap.undamaged(role.fscid, role.rank);
1606     if (modified) {
1607       dout(4) << "repaired: restoring rank " << role << dendl;
1608     } else {
1609       dout(4) << "repaired: no-op on rank " << role << dendl;
1610     }
1611
1612     r = 0;
1613   } else {
1614     return -ENOSYS;
1615   }
1616
1617   return r;
1618 }
1619
1620 /**
1621  * Helper to legacy_filesystem_command
1622  */
1623 void MDSMonitor::modify_legacy_filesystem(
1624     std::function<void(std::shared_ptr<Filesystem> )> fn)
1625 {
1626   pending_fsmap.modify_filesystem(
1627     pending_fsmap.legacy_client_fscid,
1628     fn
1629   );
1630 }
1631
1632
1633
1634 /**
1635  * Handle a command that affects the filesystem (i.e. a filesystem
1636  * must exist for the command to act upon).
1637  *
1638  * @retval 0        Command was successfully handled and has side effects
1639  * @retval -EAGAIN  Messages has been requeued for retry
1640  * @retval -ENOSYS  Unknown command
1641  * @retval < 0      An error has occurred; **ss** may have been set.
1642  */
1643 int MDSMonitor::legacy_filesystem_command(
1644     MonOpRequestRef op,
1645     std::string const &prefix,
1646     map<string, cmd_vartype> &cmdmap,
1647     std::stringstream &ss)
1648 {
1649   dout(4) << __func__ << " prefix='" << prefix << "'" << dendl;
1650   op->mark_mdsmon_event(__func__);
1651   int r = 0;
1652   string whostr;
1653   cmd_getval(g_ceph_context, cmdmap, "who", whostr);
1654
1655   assert (pending_fsmap.legacy_client_fscid != FS_CLUSTER_ID_NONE);
1656
1657   if (prefix == "mds set_max_mds") {
1658     // NOTE: deprecated by "fs set max_mds"
1659     int64_t maxmds;
1660     if (!cmd_getval(g_ceph_context, cmdmap, "maxmds", maxmds) || maxmds <= 0) {
1661       return -EINVAL;
1662     }
1663
1664     const MDSMap& mdsmap =
1665       pending_fsmap.filesystems.at(pending_fsmap.legacy_client_fscid)->mds_map;
1666       
1667     if (!mdsmap.allows_multimds() &&
1668         maxmds > mdsmap.get_max_mds() &&
1669         maxmds > 1) {
1670       ss << "multi-MDS clusters are not enabled; set 'allow_multimds' to enable";
1671       return -EINVAL;
1672     }
1673
1674     if (maxmds > MAX_MDS) {
1675       ss << "may not have more than " << MAX_MDS << " MDS ranks";
1676       return -EINVAL;
1677     }
1678
1679     modify_legacy_filesystem(
1680         [maxmds](std::shared_ptr<Filesystem> fs)
1681     {
1682       fs->mds_map.set_max_mds(maxmds);
1683     });
1684
1685     r = 0;
1686     ss << "max_mds = " << maxmds;
1687   } else if (prefix == "mds cluster_down") {
1688     // NOTE: deprecated by "fs set cluster_down"
1689     modify_legacy_filesystem(
1690         [](std::shared_ptr<Filesystem> fs)
1691     {
1692       fs->mds_map.set_flag(CEPH_MDSMAP_DOWN);
1693     });
1694     ss << "marked fsmap DOWN";
1695     r = 0;
1696   } else if (prefix == "mds cluster_up") {
1697     // NOTE: deprecated by "fs set cluster_up"
1698     modify_legacy_filesystem(
1699         [](std::shared_ptr<Filesystem> fs)
1700     {
1701       fs->mds_map.clear_flag(CEPH_MDSMAP_DOWN);
1702     });
1703     ss << "unmarked fsmap DOWN";
1704     r = 0;
1705   } else {
1706     return -ENOSYS;
1707   }
1708
1709   return r;
1710 }
1711
1712
1713 void MDSMonitor::check_subs()
1714 {
1715   std::list<std::string> types;
1716
1717   // Subscriptions may be to "mdsmap" (MDS and legacy clients),
1718   // "mdsmap.<namespace>", or to "fsmap" for the full state of all
1719   // filesystems.  Build a list of all the types we service
1720   // subscriptions for.
1721   types.push_back("fsmap");
1722   types.push_back("fsmap.user");
1723   types.push_back("mdsmap");
1724   for (const auto &i : fsmap.filesystems) {
1725     auto fscid = i.first;
1726     std::ostringstream oss;
1727     oss << "mdsmap." << fscid;
1728     types.push_back(oss.str());
1729   }
1730
1731   for (const auto &type : types) {
1732     if (mon->session_map.subs.count(type) == 0)
1733       continue;
1734     xlist<Subscription*>::iterator p = mon->session_map.subs[type]->begin();
1735     while (!p.end()) {
1736       Subscription *sub = *p;
1737       ++p;
1738       check_sub(sub);
1739     }
1740   }
1741 }
1742
1743
1744 void MDSMonitor::check_sub(Subscription *sub)
1745 {
1746   dout(20) << __func__ << ": " << sub->type << dendl;
1747
1748   if (sub->type == "fsmap") {
1749     if (sub->next <= fsmap.get_epoch()) {
1750       sub->session->con->send_message(new MFSMap(mon->monmap->fsid, fsmap));
1751       if (sub->onetime) {
1752         mon->session_map.remove_sub(sub);
1753       } else {
1754         sub->next = fsmap.get_epoch() + 1;
1755       }
1756     }
1757   } else if (sub->type == "fsmap.user") {
1758     if (sub->next <= fsmap.get_epoch()) {
1759       FSMapUser fsmap_u;
1760       fsmap_u.epoch = fsmap.get_epoch();
1761       fsmap_u.legacy_client_fscid = fsmap.legacy_client_fscid;
1762       for (auto p = fsmap.filesystems.begin();
1763            p != fsmap.filesystems.end();
1764            ++p) {
1765         FSMapUser::fs_info_t& fs_info = fsmap_u.filesystems[p->first];
1766         fs_info.cid = p->first;
1767         fs_info.name= p->second->mds_map.fs_name;
1768       }
1769       sub->session->con->send_message(new MFSMapUser(mon->monmap->fsid, fsmap_u));
1770       if (sub->onetime) {
1771         mon->session_map.remove_sub(sub);
1772       } else {
1773         sub->next = fsmap.get_epoch() + 1;
1774       }
1775     }
1776   } else if (sub->type.compare(0, 6, "mdsmap") == 0) {
1777     if (sub->next > fsmap.get_epoch()) {
1778       return;
1779     }
1780
1781     const bool is_mds = sub->session->inst.name.is_mds();
1782     mds_gid_t mds_gid = MDS_GID_NONE;
1783     fs_cluster_id_t fscid = FS_CLUSTER_ID_NONE;
1784     if (is_mds) {
1785       // What (if any) namespace are you assigned to?
1786       auto mds_info = fsmap.get_mds_info();
1787       for (const auto &i : mds_info) {
1788         if (i.second.addr == sub->session->inst.addr) {
1789           mds_gid = i.first;
1790           fscid = fsmap.mds_roles.at(mds_gid);
1791         }
1792       }
1793     } else {
1794       // You're a client.  Did you request a particular
1795       // namespace?
1796       if (sub->type.find("mdsmap.") == 0) {
1797         auto namespace_id_str = sub->type.substr(std::string("mdsmap.").size());
1798         dout(10) << __func__ << ": namespace_id " << namespace_id_str << dendl;
1799         std::string err;
1800         fscid = strict_strtoll(namespace_id_str.c_str(), 10, &err);
1801         if (!err.empty()) {
1802           // Client asked for a non-existent namespace, send them nothing
1803           dout(1) << "Invalid client subscription '" << sub->type
1804                   << "'" << dendl;
1805           return;
1806         }
1807         if (fsmap.filesystems.count(fscid) == 0) {
1808           // Client asked for a non-existent namespace, send them nothing
1809           // TODO: something more graceful for when a client has a filesystem
1810           // mounted, and the fileysstem is deleted.  Add a "shut down you fool"
1811           // flag to MMDSMap?
1812           dout(1) << "Client subscribed to non-existent namespace '" <<
1813                   fscid << "'" << dendl;
1814           return;
1815         }
1816       } else {
1817         // Unqualified request for "mdsmap": give it the one marked
1818         // for use by legacy clients.
1819         if (fsmap.legacy_client_fscid != FS_CLUSTER_ID_NONE) {
1820           fscid = fsmap.legacy_client_fscid;
1821         } else {
1822           dout(1) << "Client subscribed for legacy filesystem but "
1823                      "none is configured" << dendl;
1824           return;
1825         }
1826       }
1827     }
1828     dout(10) << __func__ << ": is_mds=" << is_mds << ", fscid= " << fscid << dendl;
1829
1830     // Work out the effective latest epoch
1831     MDSMap *mds_map = nullptr;
1832     MDSMap null_map;
1833     null_map.compat = fsmap.compat;
1834     if (fscid == FS_CLUSTER_ID_NONE) {
1835       // For a client, we should have already dropped out
1836       assert(is_mds);
1837
1838       if (fsmap.standby_daemons.count(mds_gid)) {
1839         // For an MDS, we need to feed it an MDSMap with its own state in
1840         null_map.mds_info[mds_gid] = fsmap.standby_daemons[mds_gid];
1841         null_map.epoch = fsmap.standby_epochs[mds_gid];
1842       } else {
1843         null_map.epoch = fsmap.epoch;
1844       }
1845       mds_map = &null_map;
1846     } else {
1847       // Check the effective epoch 
1848       mds_map = &(fsmap.filesystems.at(fscid)->mds_map);
1849     }
1850
1851     assert(mds_map != nullptr);
1852     dout(10) << __func__ << " selected MDS map epoch " <<
1853       mds_map->epoch << " for namespace " << fscid << " for subscriber "
1854       << sub->session->inst.name << " who wants epoch " << sub->next << dendl;
1855
1856     if (sub->next > mds_map->epoch) {
1857       return;
1858     }
1859     auto msg = new MMDSMap(mon->monmap->fsid, mds_map);
1860
1861     sub->session->con->send_message(msg);
1862     if (sub->onetime) {
1863       mon->session_map.remove_sub(sub);
1864     } else {
1865       sub->next = mds_map->get_epoch() + 1;
1866     }
1867   }
1868 }
1869
1870
1871 void MDSMonitor::update_metadata(mds_gid_t gid,
1872                                  const map<string, string>& metadata)
1873 {
1874   if (metadata.empty()) {
1875     return;
1876   }
1877   pending_metadata[gid] = metadata;
1878
1879   MonitorDBStore::TransactionRef t = paxos->get_pending_transaction();
1880   bufferlist bl;
1881   ::encode(pending_metadata, bl);
1882   t->put(MDS_METADATA_PREFIX, "last_metadata", bl);
1883   paxos->trigger_propose();
1884 }
1885
1886 void MDSMonitor::remove_from_metadata(MonitorDBStore::TransactionRef t)
1887 {
1888   bool update = false;
1889   for (map<mds_gid_t, Metadata>::iterator i = pending_metadata.begin();
1890        i != pending_metadata.end(); ) {
1891     if (!pending_fsmap.gid_exists(i->first)) {
1892       pending_metadata.erase(i++);
1893       update = true;
1894     } else {
1895       ++i;
1896     }
1897   }
1898   if (!update)
1899     return;
1900   bufferlist bl;
1901   ::encode(pending_metadata, bl);
1902   t->put(MDS_METADATA_PREFIX, "last_metadata", bl);
1903 }
1904
1905 int MDSMonitor::load_metadata(map<mds_gid_t, Metadata>& m)
1906 {
1907   bufferlist bl;
1908   int r = mon->store->get(MDS_METADATA_PREFIX, "last_metadata", bl);
1909   if (r) {
1910     dout(1) << "Unable to load 'last_metadata'" << dendl;
1911     return r;
1912   }
1913
1914   bufferlist::iterator it = bl.begin();
1915   ::decode(m, it);
1916   return 0;
1917 }
1918
1919 void MDSMonitor::count_metadata(const string& field, map<string,int> *out)
1920 {
1921   map<mds_gid_t,Metadata> meta;
1922   load_metadata(meta);
1923   for (auto& p : meta) {
1924     auto q = p.second.find(field);
1925     if (q == p.second.end()) {
1926       (*out)["unknown"]++;
1927     } else {
1928       (*out)[q->second]++;
1929     }
1930   }
1931 }
1932
1933 void MDSMonitor::count_metadata(const string& field, Formatter *f)
1934 {
1935   map<string,int> by_val;
1936   count_metadata(field, &by_val);
1937   f->open_object_section(field.c_str());
1938   for (auto& p : by_val) {
1939     f->dump_int(p.first.c_str(), p.second);
1940   }
1941   f->close_section();
1942 }
1943
1944 int MDSMonitor::dump_metadata(const std::string &who, Formatter *f, ostream& err)
1945 {
1946   assert(f);
1947
1948   mds_gid_t gid = gid_from_arg(who, err);
1949   if (gid == MDS_GID_NONE) {
1950     return -EINVAL;
1951   }
1952
1953   map<mds_gid_t, Metadata> metadata;
1954   if (int r = load_metadata(metadata)) {
1955     err << "Unable to load 'last_metadata'";
1956     return r;
1957   }
1958
1959   if (!metadata.count(gid)) {
1960     return -ENOENT;
1961   }
1962   const Metadata& m = metadata[gid];
1963   for (Metadata::const_iterator p = m.begin(); p != m.end(); ++p) {
1964     f->dump_string(p->first.c_str(), p->second);
1965   }
1966   return 0;
1967 }
1968
1969 int MDSMonitor::print_nodes(Formatter *f)
1970 {
1971   assert(f);
1972
1973   map<mds_gid_t, Metadata> metadata;
1974   if (int r = load_metadata(metadata)) {
1975     return r;
1976   }
1977
1978   map<string, list<int> > mdses; // hostname => rank
1979   for (map<mds_gid_t, Metadata>::iterator it = metadata.begin();
1980        it != metadata.end(); ++it) {
1981     const Metadata& m = it->second;
1982     Metadata::const_iterator hostname = m.find("hostname");
1983     if (hostname == m.end()) {
1984       // not likely though
1985       continue;
1986     }
1987     const mds_gid_t gid = it->first;
1988     if (!fsmap.gid_exists(gid)) {
1989       dout(5) << __func__ << ": GID " << gid << " not existent" << dendl;
1990       continue;
1991     }
1992     const MDSMap::mds_info_t& mds_info = fsmap.get_info_gid(gid);
1993     // FIXME: include filesystem name with rank here
1994     mdses[hostname->second].push_back(mds_info.rank);
1995   }
1996
1997   dump_services(f, mdses, "mds");
1998   return 0;
1999 }
2000
2001 /**
2002  * If a cluster is undersized (with respect to max_mds), then
2003  * attempt to find daemons to grow it.
2004  */
2005 bool MDSMonitor::maybe_expand_cluster(std::shared_ptr<Filesystem> fs)
2006 {
2007   bool do_propose = false;
2008
2009   if (fs->mds_map.test_flag(CEPH_MDSMAP_DOWN)) {
2010     return do_propose;
2011   }
2012
2013   while (fs->mds_map.get_num_in_mds() < size_t(fs->mds_map.get_max_mds()) &&
2014          !fs->mds_map.is_degraded()) {
2015     mds_rank_t mds = mds_rank_t(0);
2016     string name;
2017     while (fs->mds_map.is_in(mds)) {
2018       mds++;
2019     }
2020     mds_gid_t newgid = pending_fsmap.find_replacement_for({fs->fscid, mds},
2021                          name, g_conf->mon_force_standby_active);
2022     if (newgid == MDS_GID_NONE) {
2023       break;
2024     }
2025
2026     const auto &new_info = pending_fsmap.get_info_gid(newgid);
2027     dout(1) << "assigned standby " << new_info.addr
2028             << " as mds." << mds << dendl;
2029
2030     mon->clog->info() << new_info.human_name() << " assigned to "
2031                          "filesystem " << fs->mds_map.fs_name << " as rank "
2032                       << mds << " (now has " << fs->mds_map.get_num_in_mds() + 1
2033                       << " ranks)";
2034     pending_fsmap.promote(newgid, fs, mds);
2035     do_propose = true;
2036   }
2037
2038   return do_propose;
2039 }
2040
2041
2042 /**
2043  * If a daemon is laggy, and a suitable replacement
2044  * is available, fail this daemon (remove from map) and pass its
2045  * role to another daemon.
2046  */
2047 void MDSMonitor::maybe_replace_gid(mds_gid_t gid, const MDSMap::mds_info_t& info,
2048     bool *mds_propose, bool *osd_propose)
2049 {
2050   assert(mds_propose != nullptr);
2051   assert(osd_propose != nullptr);
2052
2053   const auto fscid = pending_fsmap.mds_roles.at(gid);
2054
2055   // We will only take decisive action (replacing/removing a daemon)
2056   // if we have some indicating that some other daemon(s) are successfully
2057   // getting beacons through recently.
2058   utime_t latest_beacon;
2059   for (const auto & i : last_beacon) {
2060     latest_beacon = MAX(i.second.stamp, latest_beacon);
2061   }
2062   const bool may_replace = latest_beacon >
2063     (ceph_clock_now() -
2064      MAX(g_conf->mds_beacon_interval, g_conf->mds_beacon_grace * 0.5));
2065
2066   // are we in?
2067   // and is there a non-laggy standby that can take over for us?
2068   mds_gid_t sgid;
2069   if (info.rank >= 0 &&
2070       info.state != MDSMap::STATE_STANDBY &&
2071       info.state != MDSMap::STATE_STANDBY_REPLAY &&
2072       may_replace &&
2073       !pending_fsmap.get_filesystem(fscid)->mds_map.test_flag(CEPH_MDSMAP_DOWN) &&
2074       (sgid = pending_fsmap.find_replacement_for({fscid, info.rank}, info.name,
2075                 g_conf->mon_force_standby_active)) != MDS_GID_NONE)
2076   {
2077     
2078     MDSMap::mds_info_t si = pending_fsmap.get_info_gid(sgid);
2079     dout(10) << " replacing " << gid << " " << info.addr << " mds."
2080       << info.rank << "." << info.inc
2081       << " " << ceph_mds_state_name(info.state)
2082       << " with " << sgid << "/" << si.name << " " << si.addr << dendl;
2083
2084     mon->clog->warn() << info.human_name() 
2085                       << " is not responding, replacing it "
2086                       << "as rank " << info.rank
2087                       << " with standby " << si.human_name();
2088
2089     // Remember what NS the old one was in
2090     const fs_cluster_id_t fscid = pending_fsmap.mds_roles.at(gid);
2091
2092     // Remove the old one
2093     *osd_propose |= fail_mds_gid(gid);
2094
2095     // Promote the replacement
2096     auto fs = pending_fsmap.filesystems.at(fscid);
2097     pending_fsmap.promote(sgid, fs, info.rank);
2098
2099     *mds_propose = true;
2100   } else if ((info.state == MDSMap::STATE_STANDBY_REPLAY ||
2101              info.state == MDSMap::STATE_STANDBY) && may_replace) {
2102     dout(10) << " failing and removing " << gid << " " << info.addr << " mds." << info.rank 
2103       << "." << info.inc << " " << ceph_mds_state_name(info.state)
2104       << dendl;
2105     mon->clog->info() << "Standby " << info.human_name() << " is not "
2106                          "responding, dropping it";
2107     fail_mds_gid(gid);
2108     *mds_propose = true;
2109   } else if (!info.laggy()) {
2110       dout(10) << " marking " << gid << " " << info.addr << " mds." << info.rank << "." << info.inc
2111         << " " << ceph_mds_state_name(info.state)
2112         << " laggy" << dendl;
2113       pending_fsmap.modify_daemon(info.global_id, [](MDSMap::mds_info_t *info) {
2114           info->laggy_since = ceph_clock_now();
2115       });
2116       *mds_propose = true;
2117   }
2118 }
2119
2120 bool MDSMonitor::maybe_promote_standby(std::shared_ptr<Filesystem> fs)
2121 {
2122   assert(!fs->mds_map.test_flag(CEPH_MDSMAP_DOWN));
2123
2124   bool do_propose = false;
2125
2126   // have a standby take over?
2127   set<mds_rank_t> failed;
2128   fs->mds_map.get_failed_mds_set(failed);
2129   if (!failed.empty()) {
2130     set<mds_rank_t>::iterator p = failed.begin();
2131     while (p != failed.end()) {
2132       mds_rank_t f = *p++;
2133       mds_gid_t sgid = pending_fsmap.find_replacement_for({fs->fscid, f}, {},
2134           g_conf->mon_force_standby_active);
2135       if (sgid) {
2136         const MDSMap::mds_info_t si = pending_fsmap.get_info_gid(sgid);
2137         dout(0) << " taking over failed mds." << f << " with " << sgid
2138                 << "/" << si.name << " " << si.addr << dendl;
2139         mon->clog->info() << "Standby " << si.human_name()
2140                           << " assigned to filesystem " << fs->mds_map.fs_name
2141                           << " as rank " << f;
2142
2143         pending_fsmap.promote(sgid, fs, f);
2144         do_propose = true;
2145       }
2146     }
2147   } else {
2148     // There were no failures to replace, so try using any available standbys
2149     // as standby-replay daemons.
2150
2151     // Take a copy of the standby GIDs so that we can iterate over
2152     // them while perhaps-modifying standby_daemons during the loop
2153     // (if we promote anyone they are removed from standby_daemons)
2154     std::vector<mds_gid_t> standby_gids;
2155     for (const auto &j : pending_fsmap.standby_daemons) {
2156       standby_gids.push_back(j.first);
2157     }
2158
2159     for (const auto &gid : standby_gids) {
2160       const auto &info = pending_fsmap.standby_daemons.at(gid);
2161       assert(info.state == MDSMap::STATE_STANDBY);
2162
2163       if (!info.standby_replay) {
2164         continue;
2165       }
2166
2167       /*
2168        * This mds is standby but has no rank assigned.
2169        * See if we can find it somebody to shadow
2170        */
2171       dout(20) << "gid " << gid << " is standby and following nobody" << dendl;
2172       
2173       // standby for someone specific?
2174       if (info.standby_for_rank >= 0) {
2175         // The mds_info_t may or may not tell us exactly which filesystem
2176         // the standby_for_rank refers to: lookup via legacy_client_fscid
2177         mds_role_t target_role = {
2178           info.standby_for_fscid == FS_CLUSTER_ID_NONE ?
2179             pending_fsmap.legacy_client_fscid : info.standby_for_fscid,
2180           info.standby_for_rank};
2181
2182         // It is possible that the map contains a standby_for_fscid
2183         // that doesn't correspond to an existing filesystem, especially
2184         // if we loaded from a version with a bug (#17466)
2185         if (info.standby_for_fscid != FS_CLUSTER_ID_NONE
2186             && !pending_fsmap.filesystem_exists(info.standby_for_fscid)) {
2187           derr << "gid " << gid << " has invalid standby_for_fscid "
2188                << info.standby_for_fscid << dendl;
2189           continue;
2190         }
2191
2192         // If we managed to resolve a full target role
2193         if (target_role.fscid != FS_CLUSTER_ID_NONE) {
2194           auto fs = pending_fsmap.get_filesystem(target_role.fscid);
2195           if (fs->mds_map.is_followable(target_role.rank)) {
2196             do_propose |= try_standby_replay(
2197                 info,
2198                 *fs,
2199                 fs->mds_map.get_info(target_role.rank));
2200           }
2201         }
2202
2203         continue;
2204       }
2205
2206       // check everyone
2207       for (auto fs_i : pending_fsmap.filesystems) {
2208         const MDSMap &mds_map = fs_i.second->mds_map;
2209         for (auto mds_i : mds_map.mds_info) {
2210           MDSMap::mds_info_t &cand_info = mds_i.second;
2211           if (cand_info.rank >= 0 && mds_map.is_followable(cand_info.rank)) {
2212             if ((info.standby_for_name.length() && info.standby_for_name != cand_info.name) ||
2213                 info.standby_for_rank != MDS_RANK_NONE) {
2214               continue;   // we're supposed to follow someone else
2215             }
2216
2217             if (try_standby_replay(info, *(fs_i.second), cand_info)) {
2218               do_propose = true;
2219               break;
2220             }
2221             continue;
2222           }
2223         }
2224       }
2225     }
2226   }
2227
2228   return do_propose;
2229 }
2230
2231 void MDSMonitor::tick()
2232 {
2233   // make sure mds's are still alive
2234   // ...if i am an active leader
2235   if (!is_active()) return;
2236
2237   dout(10) << fsmap << dendl;
2238
2239   bool do_propose = false;
2240
2241   if (!mon->is_leader()) return;
2242
2243   do_propose |= pending_fsmap.check_health();
2244
2245   // expand mds cluster (add new nodes to @in)?
2246   for (auto i : pending_fsmap.filesystems) {
2247     do_propose |= maybe_expand_cluster(i.second);
2248   }
2249
2250   const auto now = ceph_clock_now();
2251   if (last_tick.is_zero()) {
2252     last_tick = now;
2253   }
2254
2255   if (now - last_tick > (g_conf->mds_beacon_grace - g_conf->mds_beacon_interval)) {
2256     // This case handles either local slowness (calls being delayed
2257     // for whatever reason) or cluster election slowness (a long gap
2258     // between calls while an election happened)
2259     dout(4) << __func__ << ": resetting beacon timeouts due to mon delay "
2260             "(slow election?) of " << now - last_tick << " seconds" << dendl;
2261     for (auto &i : last_beacon) {
2262       i.second.stamp = now;
2263     }
2264   }
2265
2266   last_tick = now;
2267
2268   // check beacon timestamps
2269   utime_t cutoff = now;
2270   cutoff -= g_conf->mds_beacon_grace;
2271
2272   // make sure last_beacon is fully populated
2273   for (const auto &p : pending_fsmap.mds_roles) {
2274     auto &gid = p.first;
2275     if (last_beacon.count(gid) == 0) {
2276       last_beacon[gid].stamp = now;
2277       last_beacon[gid].seq = 0;
2278     }
2279   }
2280
2281   bool propose_osdmap = false;
2282   bool osdmap_writeable = mon->osdmon()->is_writeable();
2283   auto p = last_beacon.begin();
2284   while (p != last_beacon.end()) {
2285     mds_gid_t gid = p->first;
2286     auto beacon_info = p->second;
2287     ++p;
2288
2289     if (!pending_fsmap.gid_exists(gid)) {
2290       // clean it out
2291       last_beacon.erase(gid);
2292       continue;
2293     }
2294
2295     if (beacon_info.stamp < cutoff) {
2296       auto &info = pending_fsmap.get_info_gid(gid);
2297       dout(1) << "no beacon from mds." << info.rank << "." << info.inc
2298               << " (gid: " << gid << " addr: " << info.addr
2299               << " state: " << ceph_mds_state_name(info.state) << ")"
2300               << " since " << beacon_info.stamp << dendl;
2301       // If the OSDMap is writeable, we can blacklist things, so we can
2302       // try failing any laggy MDS daemons.  Consider each one for failure.
2303       if (osdmap_writeable) {
2304         maybe_replace_gid(gid, info, &do_propose, &propose_osdmap);
2305       }
2306     }
2307   }
2308   if (propose_osdmap) {
2309     request_proposal(mon->osdmon());
2310   }
2311
2312   for (auto i : pending_fsmap.filesystems) {
2313     auto fs = i.second;
2314     if (!fs->mds_map.test_flag(CEPH_MDSMAP_DOWN)) {
2315       do_propose |= maybe_promote_standby(fs);
2316     }
2317   }
2318
2319   if (do_propose) {
2320     propose_pending();
2321   }
2322 }
2323
2324 /**
2325  * finfo: the would-be follower
2326  * leader_fs: the Filesystem containing the would-be leader
2327  * ainfo: the would-be leader
2328  */
2329 bool MDSMonitor::try_standby_replay(
2330     const MDSMap::mds_info_t& finfo,
2331     const Filesystem &leader_fs,
2332     const MDSMap::mds_info_t& ainfo)
2333 {
2334   // someone else already following?
2335   if (leader_fs.has_standby_replay(ainfo.global_id)) {
2336     dout(20) << " mds." << ainfo.rank << " already has a follower" << dendl;
2337     return false;
2338   } else {
2339     // Assign the new role to the standby
2340     dout(10) << "  setting to follow mds rank " << ainfo.rank << dendl;
2341     pending_fsmap.assign_standby_replay(finfo.global_id, leader_fs.fscid, ainfo.rank);
2342     return true;
2343   }
2344 }
2345
2346 MDSMonitor::MDSMonitor(Monitor *mn, Paxos *p, string service_name)
2347   : PaxosService(mn, p, service_name)
2348 {
2349   handlers = FileSystemCommandHandler::load(p);
2350 }
2351
2352 void MDSMonitor::on_restart()
2353 {
2354   // Clear out the leader-specific state.
2355   last_tick = utime_t();
2356   last_beacon.clear();
2357 }
2358