Fix some bugs when testing opensds ansible
[stor4nfv.git] / src / ceph / src / mon / FSCommands.cc
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
3 /*
4  * Ceph - scalable distributed file system
5  *
6  * Copyright (C) 2017 Red Hat Ltd
7  *
8  * This is free software; you can redistribute it and/or
9  * modify it under the terms of the GNU Lesser General Public
10  * License version 2.1, as published by the Free Software 
11  * Foundation.  See file COPYING.
12  * 
13  */
14
15
16 #include "OSDMonitor.h"
17 #include "PGMonitor.h"
18
19 #include "FSCommands.h"
20 #include "MDSMonitor.h"
21
22
23
24 static const string EXPERIMENTAL_WARNING("Warning! This feature is experimental."
25 "It may cause problems up to and including data loss."
26 "Consult the documentation at ceph.com, and if unsure, do not proceed."
27 "Add --yes-i-really-mean-it if you are certain.");
28
29
30
31 class FlagSetHandler : public FileSystemCommandHandler
32 {
33   public:
34   FlagSetHandler()
35     : FileSystemCommandHandler("fs flag set")
36   {
37   }
38
39   int handle(
40       Monitor *mon,
41       FSMap &fsmap,
42       MonOpRequestRef op,
43       map<string, cmd_vartype> &cmdmap,
44       std::stringstream &ss) override
45   {
46     string flag_name;
47     cmd_getval(g_ceph_context, cmdmap, "flag_name", flag_name);
48
49     string flag_val;
50     cmd_getval(g_ceph_context, cmdmap, "val", flag_val);
51
52     string confirm;
53     cmd_getval(g_ceph_context, cmdmap, "confirm", confirm);
54
55     if (flag_name == "enable_multiple") {
56       bool flag_bool = false;
57       int r = parse_bool(flag_val, &flag_bool, ss);
58       if (r != 0) {
59         ss << "Invalid boolean value '" << flag_val << "'";
60         return r;
61       }
62
63       bool jewel = mon->get_quorum_con_features() & CEPH_FEATURE_SERVER_JEWEL;
64       if (flag_bool && !jewel) {
65         ss << "Multiple-filesystems are forbidden until all mons are updated";
66         return -EINVAL;
67       }
68       if (confirm != "--yes-i-really-mean-it") {
69         ss << EXPERIMENTAL_WARNING;
70       }
71       fsmap.set_enable_multiple(flag_bool);
72       return 0;
73     } else {
74       ss << "Unknown flag '" << flag_name << "'";
75       return -EINVAL;
76     }
77   }
78 };
79
80 class FsNewHandler : public FileSystemCommandHandler
81 {
82   public:
83   FsNewHandler(Paxos *paxos)
84     : FileSystemCommandHandler("fs new"), m_paxos(paxos)
85   {
86   }
87
88   bool batched_propose() override {
89     return true;
90   }
91
92   int handle(
93       Monitor *mon,
94       FSMap &fsmap,
95       MonOpRequestRef op,
96       map<string, cmd_vartype> &cmdmap,
97       std::stringstream &ss) override
98   {
99     assert(m_paxos->is_plugged());
100
101     string metadata_name;
102     cmd_getval(g_ceph_context, cmdmap, "metadata", metadata_name);
103     int64_t metadata = mon->osdmon()->osdmap.lookup_pg_pool_name(metadata_name);
104     if (metadata < 0) {
105       ss << "pool '" << metadata_name << "' does not exist";
106       return -ENOENT;
107     }
108
109     string force_str;
110     cmd_getval(g_ceph_context,cmdmap, "force", force_str);
111     bool force = (force_str == "--force");
112     const pool_stat_t *stat = mon->pgservice->get_pool_stat(metadata);
113     if (stat) {
114       int64_t metadata_num_objects = stat->stats.sum.num_objects;
115       if (!force && metadata_num_objects > 0) {
116         ss << "pool '" << metadata_name
117            << "' already contains some objects. Use an empty pool instead.";
118         return -EINVAL;
119       }
120     }
121
122     string data_name;
123     cmd_getval(g_ceph_context, cmdmap, "data", data_name);
124     int64_t data = mon->osdmon()->osdmap.lookup_pg_pool_name(data_name);
125     if (data < 0) {
126       ss << "pool '" << data_name << "' does not exist";
127       return -ENOENT;
128     }
129     if (data == 0) {
130       ss << "pool '" << data_name << "' has id 0, which CephFS does not allow. Use another pool or recreate it to get a non-zero pool id.";
131       return -EINVAL;
132     }
133
134     string fs_name;
135     cmd_getval(g_ceph_context, cmdmap, "fs_name", fs_name);
136     if (fs_name.empty()) {
137         // Ensure fs name is not empty so that we can implement
138         // commmands that refer to FS by name in future.
139         ss << "Filesystem name may not be empty";
140         return -EINVAL;
141     }
142
143     if (fsmap.get_filesystem(fs_name)) {
144       auto fs = fsmap.get_filesystem(fs_name);
145       if (*(fs->mds_map.get_data_pools().begin()) == data
146           && fs->mds_map.get_metadata_pool() == metadata) {
147         // Identical FS created already, this is a no-op
148         ss << "filesystem '" << fs_name << "' already exists";
149         return 0;
150       } else {
151         ss << "filesystem already exists with name '" << fs_name << "'";
152         return -EINVAL;
153       }
154     }
155
156     if (fsmap.filesystem_count() > 0
157         && !fsmap.get_enable_multiple()) {
158       ss << "Creation of multiple filesystems is disabled.  To enable "
159             "this experimental feature, use 'ceph fs flag set enable_multiple "
160             "true'";
161       return -EINVAL;
162     }
163
164     for (auto fs : fsmap.get_filesystems()) {
165       const std::vector<int64_t> &data_pools = fs->mds_map.get_data_pools();
166       string sure;
167       if ((std::find(data_pools.begin(), data_pools.end(), data) != data_pools.end()
168            || fs->mds_map.get_metadata_pool() == metadata)
169           && ((!cmd_getval(g_ceph_context, cmdmap, "sure", sure)
170                || sure != "--allow-dangerous-metadata-overlay"))) {
171         ss << "Filesystem '" << fs_name
172            << "' is already using one of the specified RADOS pools. This should ONLY be done in emergencies and after careful reading of the documentation. Pass --allow-dangerous-metadata-overlay to permit this.";
173         return -EEXIST;
174       }
175     }
176
177     pg_pool_t const *data_pool = mon->osdmon()->osdmap.get_pg_pool(data);
178     assert(data_pool != NULL);  // Checked it existed above
179     pg_pool_t const *metadata_pool = mon->osdmon()->osdmap.get_pg_pool(metadata);
180     assert(metadata_pool != NULL);  // Checked it existed above
181
182     int r = _check_pool(mon->osdmon()->osdmap, data, false, force, &ss);
183     if (r < 0) {
184       return r;
185     }
186
187     r = _check_pool(mon->osdmon()->osdmap, metadata, true, force, &ss);
188     if (r < 0) {
189       return r;
190     }
191     
192     // if we're running as luminous, we have to set the pool application metadata
193     if (mon->osdmon()->osdmap.require_osd_release >= CEPH_RELEASE_LUMINOUS ||
194         mon->osdmon()->pending_inc.new_require_osd_release >= CEPH_RELEASE_LUMINOUS) {
195       if (!mon->osdmon()->is_writeable()) {
196         // not allowed to write yet, so retry when we can
197         mon->osdmon()->wait_for_writeable(op, new PaxosService::C_RetryMessage(mon->mdsmon(), op));
198         return -EAGAIN;
199       }
200       mon->osdmon()->do_application_enable(data,
201                                            pg_pool_t::APPLICATION_NAME_CEPHFS);
202       mon->osdmon()->do_application_enable(metadata,
203                                            pg_pool_t::APPLICATION_NAME_CEPHFS);
204       mon->osdmon()->propose_pending();
205     }
206
207     // All checks passed, go ahead and create.
208     fsmap.create_filesystem(fs_name, metadata, data,
209                             mon->get_quorum_con_features());
210     ss << "new fs with metadata pool " << metadata << " and data pool " << data;
211     return 0;
212   }
213
214 private:
215   Paxos *m_paxos;
216 };
217
218 class SetHandler : public FileSystemCommandHandler
219 {
220 public:
221   SetHandler()
222     : FileSystemCommandHandler("fs set")
223   {}
224
225   int handle(
226       Monitor *mon,
227       FSMap &fsmap,
228       MonOpRequestRef op,
229       map<string, cmd_vartype> &cmdmap,
230       std::stringstream &ss) override
231   {
232     std::string fs_name;
233     if (!cmd_getval(g_ceph_context, cmdmap, "fs_name", fs_name) || fs_name.empty()) {
234       ss << "Missing filesystem name";
235       return -EINVAL;
236     }
237
238     auto fs = fsmap.get_filesystem(fs_name);
239     if (fs == nullptr) {
240       ss << "Not found: '" << fs_name << "'";
241       return -ENOENT;
242     }
243
244     string var;
245     if (!cmd_getval(g_ceph_context, cmdmap, "var", var) || var.empty()) {
246       ss << "Invalid variable";
247       return -EINVAL;
248     }
249     string val;
250     string interr;
251     int64_t n = 0;
252     if (!cmd_getval(g_ceph_context, cmdmap, "val", val)) {
253       return -EINVAL;
254     }
255     // we got a string.  see if it contains an int.
256     n = strict_strtoll(val.c_str(), 10, &interr);
257     if (var == "max_mds") {
258       // NOTE: see also "mds set_max_mds", which can modify the same field.
259       if (interr.length()) {
260         ss << interr;
261         return -EINVAL;
262       }
263
264       if (n <= 0) {
265         ss << "You must specify at least one MDS";
266         return -EINVAL;
267       }
268
269       if (!fs->mds_map.allows_multimds() && n > fs->mds_map.get_max_mds() &&
270           n > 1) {
271         ss << "multi-MDS clusters are not enabled; set 'allow_multimds' to enable";
272         return -EINVAL;
273       }
274       if (n > MAX_MDS) {
275         ss << "may not have more than " << MAX_MDS << " MDS ranks";
276         return -EINVAL;
277       }
278       fsmap.modify_filesystem(
279           fs->fscid,
280           [n](std::shared_ptr<Filesystem> fs)
281       {
282         fs->mds_map.set_max_mds(n);
283       });
284     } else if (var == "inline_data") {
285       bool enable_inline = false;
286       int r = parse_bool(val, &enable_inline, ss);
287       if (r != 0) {
288         return r;
289       }
290
291       if (enable_inline) {
292         string confirm;
293         if (!cmd_getval(g_ceph_context, cmdmap, "confirm", confirm) ||
294             confirm != "--yes-i-really-mean-it") {
295           ss << EXPERIMENTAL_WARNING;
296           return -EPERM;
297         }
298         ss << "inline data enabled";
299
300         fsmap.modify_filesystem(
301             fs->fscid,
302             [](std::shared_ptr<Filesystem> fs)
303         {
304           fs->mds_map.set_inline_data_enabled(true);
305         });
306
307         // Update `compat`
308         CompatSet c = fsmap.get_compat();
309         c.incompat.insert(MDS_FEATURE_INCOMPAT_INLINE);
310         fsmap.update_compat(c);
311       } else {
312         ss << "inline data disabled";
313         fsmap.modify_filesystem(
314             fs->fscid,
315             [](std::shared_ptr<Filesystem> fs)
316         {
317           fs->mds_map.set_inline_data_enabled(false);
318         });
319       }
320     } else if (var == "balancer") {
321       if (val.empty()) {
322         ss << "unsetting the metadata load balancer";
323       } else {
324         ss << "setting the metadata load balancer to " << val;
325       }
326       fsmap.modify_filesystem(
327         fs->fscid,
328         [val](std::shared_ptr<Filesystem> fs)
329         {
330           fs->mds_map.set_balancer(val);
331         });
332       return true;
333     } else if (var == "max_file_size") {
334       if (interr.length()) {
335         ss << var << " requires an integer value";
336         return -EINVAL;
337       }
338       if (n < CEPH_MIN_STRIPE_UNIT) {
339         ss << var << " must at least " << CEPH_MIN_STRIPE_UNIT;
340         return -ERANGE;
341       }
342       fsmap.modify_filesystem(
343           fs->fscid,
344           [n](std::shared_ptr<Filesystem> fs)
345       {
346         fs->mds_map.set_max_filesize(n);
347       });
348     } else if (var == "allow_new_snaps") {
349       bool enable_snaps = false;
350       int r = parse_bool(val, &enable_snaps, ss);
351       if (r != 0) {
352         return r;
353       }
354
355       if (!enable_snaps) {
356         fsmap.modify_filesystem(
357             fs->fscid,
358             [](std::shared_ptr<Filesystem> fs)
359         {
360           fs->mds_map.clear_snaps_allowed();
361         });
362         ss << "disabled new snapshots";
363       } else {
364         string confirm;
365         if (!cmd_getval(g_ceph_context, cmdmap, "confirm", confirm) ||
366             confirm != "--yes-i-really-mean-it") {
367           ss << EXPERIMENTAL_WARNING;
368           return -EPERM;
369         }
370         fsmap.modify_filesystem(
371             fs->fscid,
372             [](std::shared_ptr<Filesystem> fs)
373         {
374           fs->mds_map.set_snaps_allowed();
375         });
376         ss << "enabled new snapshots";
377       }
378     } else if (var == "allow_multimds") {
379       bool enable_multimds = false;
380       int r = parse_bool(val, &enable_multimds, ss);
381       if (r != 0) {
382         return r;
383       }
384
385       if (!enable_multimds) {
386         fsmap.modify_filesystem(fs->fscid,
387              [](std::shared_ptr<Filesystem> fs)
388                 {
389                   fs->mds_map.clear_multimds_allowed();
390                 });
391         ss << "disallowed increasing the cluster size past 1";
392       } else {
393         fsmap.modify_filesystem(
394             fs->fscid,
395             [](std::shared_ptr<Filesystem> fs)
396         {
397           fs->mds_map.set_multimds_allowed();
398         });
399         ss << "enabled creation of more than 1 active MDS";
400       }
401     } else if (var == "allow_dirfrags") {
402       bool enable_dirfrags = false;
403       int r = parse_bool(val, &enable_dirfrags, ss);
404       if (r != 0) {
405         return r;
406       }
407
408       if (!enable_dirfrags) {
409         fsmap.modify_filesystem(fs->fscid,
410              [](std::shared_ptr<Filesystem> fs)
411                 {
412                   fs->mds_map.clear_dirfrags_allowed();
413                 });
414         ss << "disallowed new directory fragmentation";
415       } else {
416         fsmap.modify_filesystem(
417             fs->fscid,
418             [](std::shared_ptr<Filesystem> fs)
419         {
420           fs->mds_map.set_dirfrags_allowed();
421         });
422         ss << "enabled directory fragmentation";
423       }
424     } else if (var == "cluster_down") {
425       bool is_down = false;
426       int r = parse_bool(val, &is_down, ss);
427       if (r != 0) {
428         return r;
429       }
430
431       fsmap.modify_filesystem(
432           fs->fscid,
433           [is_down](std::shared_ptr<Filesystem> fs)
434       {
435         if (is_down) {
436           fs->mds_map.set_flag(CEPH_MDSMAP_DOWN);
437         } else {
438           fs->mds_map.clear_flag(CEPH_MDSMAP_DOWN);
439         }
440       });
441
442       ss << "marked " << (is_down ? "down" : "up");
443     } else if (var == "standby_count_wanted") {
444       if (interr.length()) {
445        ss << var << " requires an integer value";
446        return -EINVAL;
447       }
448       if (n < 0) {
449        ss << var << " must be non-negative";
450        return -ERANGE;
451       }
452       fsmap.modify_filesystem(
453           fs->fscid,
454           [n](std::shared_ptr<Filesystem> fs)
455       {
456         fs->mds_map.set_standby_count_wanted(n);
457       });
458     } else {
459       ss << "unknown variable " << var;
460       return -EINVAL;
461     }
462
463     return 0;
464   }
465 };
466
467 class AddDataPoolHandler : public FileSystemCommandHandler
468 {
469   public:
470   AddDataPoolHandler(Paxos *paxos)
471     : FileSystemCommandHandler("fs add_data_pool"), m_paxos(paxos)
472   {}
473
474   bool batched_propose() override {
475     return true;
476   }
477
478   int handle(
479       Monitor *mon,
480       FSMap &fsmap,
481       MonOpRequestRef op,
482       map<string, cmd_vartype> &cmdmap,
483       std::stringstream &ss) override
484   {
485     assert(m_paxos->is_plugged());
486
487     string poolname;
488     cmd_getval(g_ceph_context, cmdmap, "pool", poolname);
489
490     std::string fs_name;
491     if (!cmd_getval(g_ceph_context, cmdmap, "fs_name", fs_name)
492         || fs_name.empty()) {
493       ss << "Missing filesystem name";
494       return -EINVAL;
495     }
496
497     auto fs = fsmap.get_filesystem(fs_name);
498     if (fs == nullptr) {
499       ss << "Not found: '" << fs_name << "'";
500       return -ENOENT;
501     }
502
503     int64_t poolid = mon->osdmon()->osdmap.lookup_pg_pool_name(poolname);
504     if (poolid < 0) {
505       string err;
506       poolid = strict_strtol(poolname.c_str(), 10, &err);
507       if (err.length()) {
508         ss << "pool '" << poolname << "' does not exist";
509         return -ENOENT;
510       }
511     }
512
513     int r = _check_pool(mon->osdmon()->osdmap, poolid, false, false, &ss);
514     if (r != 0) {
515       return r;
516     }
517
518     // no-op when the data_pool already on fs
519     if (fs->mds_map.is_data_pool(poolid)) {
520       ss << "data pool " << poolid << " is already on fs " << fs_name;
521       return 0;
522     }
523
524     // if we're running as luminous, we have to set the pool application metadata
525     if (mon->osdmon()->osdmap.require_osd_release >= CEPH_RELEASE_LUMINOUS ||
526         mon->osdmon()->pending_inc.new_require_osd_release >= CEPH_RELEASE_LUMINOUS) {
527       if (!mon->osdmon()->is_writeable()) {
528         // not allowed to write yet, so retry when we can
529         mon->osdmon()->wait_for_writeable(op, new PaxosService::C_RetryMessage(mon->mdsmon(), op));
530         return -EAGAIN;
531       }
532       mon->osdmon()->do_application_enable(poolid, pg_pool_t::APPLICATION_NAME_CEPHFS);
533       mon->osdmon()->propose_pending();
534     }
535
536     fsmap.modify_filesystem(
537         fs->fscid,
538         [poolid](std::shared_ptr<Filesystem> fs)
539     {
540       fs->mds_map.add_data_pool(poolid);
541     });
542
543     ss << "added data pool " << poolid << " to fsmap";
544
545     return 0;
546   }
547
548 private:
549   Paxos *m_paxos;
550 };
551
552 class SetDefaultHandler : public FileSystemCommandHandler
553 {
554   public:
555   SetDefaultHandler()
556     : FileSystemCommandHandler("fs set-default")
557   {}
558
559   int handle(
560       Monitor *mon,
561       FSMap &fsmap,
562       MonOpRequestRef op,
563       map<string, cmd_vartype> &cmdmap,
564       std::stringstream &ss) override
565   {
566     std::string fs_name;
567     cmd_getval(g_ceph_context, cmdmap, "fs_name", fs_name);
568     auto fs = fsmap.get_filesystem(fs_name);
569     if (fs == nullptr) {
570         ss << "filesystem '" << fs_name << "' does not exist";
571         return -ENOENT;
572     }
573
574     fsmap.set_legacy_client_fscid(fs->fscid);
575     return 0;
576   }
577 };
578
579 class RemoveFilesystemHandler : public FileSystemCommandHandler
580 {
581   public:
582   RemoveFilesystemHandler()
583     : FileSystemCommandHandler("fs rm")
584   {}
585
586   int handle(
587       Monitor *mon,
588       FSMap &fsmap,
589       MonOpRequestRef op,
590       map<string, cmd_vartype> &cmdmap,
591       std::stringstream &ss) override
592   {
593     // Check caller has correctly named the FS to delete
594     // (redundant while there is only one FS, but command
595     //  syntax should apply to multi-FS future)
596     string fs_name;
597     cmd_getval(g_ceph_context, cmdmap, "fs_name", fs_name);
598     auto fs = fsmap.get_filesystem(fs_name);
599     if (fs == nullptr) {
600         // Consider absence success to make deletes idempotent
601         ss << "filesystem '" << fs_name << "' does not exist";
602         return 0;
603     }
604
605     // Check that no MDS daemons are active
606     if (fs->mds_map.get_num_up_mds() > 0) {
607       ss << "all MDS daemons must be inactive before removing filesystem";
608       return -EINVAL;
609     }
610
611     // Check for confirmation flag
612     string sure;
613     cmd_getval(g_ceph_context, cmdmap, "sure", sure);
614     if (sure != "--yes-i-really-mean-it") {
615       ss << "this is a DESTRUCTIVE operation and will make data in your filesystem permanently" \
616             " inaccessible.  Add --yes-i-really-mean-it if you are sure you wish to continue.";
617       return -EPERM;
618     }
619
620     if (fsmap.get_legacy_client_fscid() == fs->fscid) {
621       fsmap.set_legacy_client_fscid(FS_CLUSTER_ID_NONE);
622     }
623
624     std::vector<mds_gid_t> to_fail;
625     // There may be standby_replay daemons left here
626     for (const auto &i : fs->mds_map.get_mds_info()) {
627       assert(i.second.state == MDSMap::STATE_STANDBY_REPLAY);
628       to_fail.push_back(i.first);
629     }
630
631     for (const auto &gid : to_fail) {
632       // Standby replays don't write, so it isn't important to
633       // wait for an osdmap propose here: ignore return value.
634       mon->mdsmon()->fail_mds_gid(gid);
635     }
636
637     fsmap.erase_filesystem(fs->fscid);
638
639     return 0;
640   }
641 };
642
643 class ResetFilesystemHandler : public FileSystemCommandHandler
644 {
645   public:
646   ResetFilesystemHandler()
647     : FileSystemCommandHandler("fs reset")
648   {}
649
650   int handle(
651       Monitor *mon,
652       FSMap &fsmap,
653       MonOpRequestRef op,
654       map<string, cmd_vartype> &cmdmap,
655       std::stringstream &ss) override
656   {
657     string fs_name;
658     cmd_getval(g_ceph_context, cmdmap, "fs_name", fs_name);
659     auto fs = fsmap.get_filesystem(fs_name);
660     if (fs == nullptr) {
661         ss << "filesystem '" << fs_name << "' does not exist";
662         // Unlike fs rm, we consider this case an error
663         return -ENOENT;
664     }
665
666     // Check that no MDS daemons are active
667     if (fs->mds_map.get_num_up_mds() > 0) {
668       ss << "all MDS daemons must be inactive before resetting filesystem: set the cluster_down flag"
669             " and use `ceph mds fail` to make this so";
670       return -EINVAL;
671     }
672
673     // Check for confirmation flag
674     string sure;
675     cmd_getval(g_ceph_context, cmdmap, "sure", sure);
676     if (sure != "--yes-i-really-mean-it") {
677       ss << "this is a potentially destructive operation, only for use by experts in disaster recovery.  "
678         "Add --yes-i-really-mean-it if you are sure you wish to continue.";
679       return -EPERM;
680     }
681
682     fsmap.reset_filesystem(fs->fscid);
683
684     return 0;
685   }
686 };
687
688 class RemoveDataPoolHandler : public FileSystemCommandHandler
689 {
690   public:
691   RemoveDataPoolHandler()
692     : FileSystemCommandHandler("fs rm_data_pool")
693   {}
694
695   int handle(
696       Monitor *mon,
697       FSMap &fsmap,
698       MonOpRequestRef op,
699       map<string, cmd_vartype> &cmdmap,
700       std::stringstream &ss) override
701   {
702     string poolname;
703     cmd_getval(g_ceph_context, cmdmap, "pool", poolname);
704
705     std::string fs_name;
706     if (!cmd_getval(g_ceph_context, cmdmap, "fs_name", fs_name)
707         || fs_name.empty()) {
708       ss << "Missing filesystem name";
709       return -EINVAL;
710     }
711
712     auto fs = fsmap.get_filesystem(fs_name);
713     if (fs == nullptr) {
714       ss << "Not found: '" << fs_name << "'";
715       return -ENOENT;
716     }
717
718     int64_t poolid = mon->osdmon()->osdmap.lookup_pg_pool_name(poolname);
719     if (poolid < 0) {
720       string err;
721       poolid = strict_strtol(poolname.c_str(), 10, &err);
722       if (err.length()) {
723         ss << "pool '" << poolname << "' does not exist";
724         return -ENOENT;
725       } else if (poolid < 0) {
726         ss << "invalid pool id '" << poolid << "'";
727         return -EINVAL;
728       }
729     }
730
731     assert(poolid >= 0);  // Checked by parsing code above
732
733     if (fs->mds_map.get_first_data_pool() == poolid) {
734       ss << "cannot remove default data pool";
735       return -EINVAL;
736     }
737
738
739     int r = 0;
740     fsmap.modify_filesystem(fs->fscid,
741         [&r, poolid](std::shared_ptr<Filesystem> fs)
742     {
743       r = fs->mds_map.remove_data_pool(poolid);
744     });
745     if (r == -ENOENT) {
746       // It was already removed, succeed in silence
747       return 0;
748     } else if (r == 0) {
749       // We removed it, succeed
750       ss << "removed data pool " << poolid << " from fsmap";
751       return 0;
752     } else {
753       // Unexpected error, bubble up
754       return r;
755     }
756   }
757 };
758
759
760 /**
761  * For commands that refer to a particular filesystem,
762  * enable wrapping to implement the legacy version of
763  * the command (like "mds add_data_pool" vs "fs add_data_pool")
764  *
765  * The wrapped handler must expect a fs_name argument in
766  * its command map.
767  */
768 template<typename T>
769 class LegacyHandler : public T
770 {
771   std::string legacy_prefix;
772
773   public:
774   template <typename... Args>
775   LegacyHandler(const std::string &new_prefix, Args&&... args)
776     : T(std::forward<Args>(args)...)
777   {
778     legacy_prefix = new_prefix;
779   }
780
781   std::string const &get_prefix() override {return legacy_prefix;}
782
783   int handle(
784       Monitor *mon,
785       FSMap &fsmap,
786       MonOpRequestRef op,
787       map<string, cmd_vartype> &cmdmap,
788       std::stringstream &ss) override
789   {
790     auto fs = fsmap.get_legacy_filesystem();
791     if (fs == nullptr) {
792       ss << "No filesystem configured";
793       return -ENOENT;
794     }
795     std::map<string, cmd_vartype> modified = cmdmap;
796     modified["fs_name"] = fs->mds_map.get_fs_name();
797     return T::handle(mon, fsmap, op, modified, ss);
798   }
799 };
800
801 /**
802  * For commands with an alternative prefix
803  */
804 template<typename T>
805 class AliasHandler : public T
806 {
807   std::string alias_prefix;
808
809   public:
810   AliasHandler(const std::string &new_prefix)
811     : T()
812   {
813     alias_prefix = new_prefix;
814   }
815
816   std::string const &get_prefix() override {return alias_prefix;}
817
818   int handle(
819       Monitor *mon,
820       FSMap &fsmap,
821       MonOpRequestRef op,
822       map<string, cmd_vartype> &cmdmap,
823       std::stringstream &ss) override
824   {
825     return T::handle(mon, fsmap, op, cmdmap, ss);
826   }
827 };
828
829
830 std::list<std::shared_ptr<FileSystemCommandHandler> >
831 FileSystemCommandHandler::load(Paxos *paxos)
832 {
833   std::list<std::shared_ptr<FileSystemCommandHandler> > handlers;
834
835   handlers.push_back(std::make_shared<SetHandler>());
836   handlers.push_back(std::make_shared<LegacyHandler<SetHandler> >("mds set"));
837   handlers.push_back(std::make_shared<FlagSetHandler>());
838   handlers.push_back(std::make_shared<AddDataPoolHandler>(paxos));
839   handlers.push_back(std::make_shared<LegacyHandler<AddDataPoolHandler> >(
840         "mds add_data_pool", paxos));
841   handlers.push_back(std::make_shared<RemoveDataPoolHandler>());
842   handlers.push_back(std::make_shared<LegacyHandler<RemoveDataPoolHandler> >(
843         "mds remove_data_pool"));
844   handlers.push_back(std::make_shared<LegacyHandler<RemoveDataPoolHandler> >(
845         "mds rm_data_pool"));
846   handlers.push_back(std::make_shared<FsNewHandler>(paxos));
847   handlers.push_back(std::make_shared<RemoveFilesystemHandler>());
848   handlers.push_back(std::make_shared<ResetFilesystemHandler>());
849
850   handlers.push_back(std::make_shared<SetDefaultHandler>());
851   handlers.push_back(std::make_shared<AliasHandler<SetDefaultHandler> >(
852         "fs set_default"));
853
854   return handlers;
855 }
856
857 int FileSystemCommandHandler::parse_bool(
858       const std::string &bool_str,
859       bool *result,
860       std::ostream &ss)
861 {
862   assert(result != nullptr);
863
864   string interr;
865   int64_t n = strict_strtoll(bool_str.c_str(), 10, &interr);
866
867   if (bool_str == "false" || bool_str == "no"
868       || (interr.length() == 0 && n == 0)) {
869     *result = false;
870     return 0;
871   } else if (bool_str == "true" || bool_str == "yes"
872       || (interr.length() == 0 && n == 1)) {
873     *result = true;
874     return 0;
875   } else {
876     ss << "value must be false|no|0 or true|yes|1";
877     return -EINVAL;
878   }
879 }
880
881 int FileSystemCommandHandler::_check_pool(
882     OSDMap &osd_map,
883     const int64_t pool_id,
884     bool metadata,
885     bool force,
886     std::stringstream *ss) const
887 {
888   assert(ss != NULL);
889
890   const pg_pool_t *pool = osd_map.get_pg_pool(pool_id);
891   if (!pool) {
892     *ss << "pool id '" << pool_id << "' does not exist";
893     return -ENOENT;
894   }
895
896   const string& pool_name = osd_map.get_pool_name(pool_id);
897
898   if (pool->is_erasure() && metadata) {
899       *ss << "pool '" << pool_name << "' (id '" << pool_id << "')"
900          << " is an erasure-coded pool.  Use of erasure-coded pools"
901          << " for CephFS metadata is not permitted";
902     return -EINVAL;
903   } else if (pool->is_erasure() && !pool->allows_ecoverwrites()) {
904     // non-overwriteable EC pools are only acceptable with a cache tier overlay
905     if (!pool->has_tiers() || !pool->has_read_tier() || !pool->has_write_tier()) {
906       *ss << "pool '" << pool_name << "' (id '" << pool_id << "')"
907          << " is an erasure-coded pool, with no overwrite support";
908       return -EINVAL;
909     }
910
911     // That cache tier overlay must be writeback, not readonly (it's the
912     // write operations like modify+truncate we care about support for)
913     const pg_pool_t *write_tier = osd_map.get_pg_pool(
914         pool->write_tier);
915     assert(write_tier != NULL);  // OSDMonitor shouldn't allow DNE tier
916     if (write_tier->cache_mode == pg_pool_t::CACHEMODE_FORWARD
917         || write_tier->cache_mode == pg_pool_t::CACHEMODE_READONLY) {
918       *ss << "EC pool '" << pool_name << "' has a write tier ("
919           << osd_map.get_pool_name(pool->write_tier)
920           << ") that is configured "
921              "to forward writes.  Use a cache mode such as 'writeback' for "
922              "CephFS";
923       return -EINVAL;
924     }
925   }
926
927   if (pool->is_tier()) {
928     *ss << " pool '" << pool_name << "' (id '" << pool_id
929       << "') is already in use as a cache tier.";
930     return -EINVAL;
931   }
932
933   if (!force && !pool->application_metadata.empty() &&
934       pool->application_metadata.count(
935         pg_pool_t::APPLICATION_NAME_CEPHFS) == 0) {
936     *ss << " pool '" << pool_name << "' (id '" << pool_id
937         << "') has a non-CephFS application enabled.";
938     return -EINVAL;
939   }
940
941   // Nothing special about this pool, so it is permissible
942   return 0;
943 }
944