Fix some bugs when testing opensds ansible
[stor4nfv.git] / src / ceph / src / ceph_mon.cc
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- 
2 // vim: ts=8 sw=2 smarttab
3 /*
4  * Ceph - scalable distributed file system
5  *
6  * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
7  *
8  * This is free software; you can redistribute it and/or
9  * modify it under the terms of the GNU Lesser General Public
10  * License version 2.1, as published by the Free Software 
11  * Foundation.  See file COPYING.
12  * 
13  */
14
15 #include <sys/types.h>
16 #include <sys/stat.h>
17 #include <fcntl.h>
18
19 #include <iostream>
20 #include <string>
21 using namespace std;
22
23 #include "common/config.h"
24 #include "include/ceph_features.h"
25
26 #include "mon/MonMap.h"
27 #include "mon/Monitor.h"
28 #include "mon/MonitorDBStore.h"
29 #include "mon/MonClient.h"
30
31 #include "msg/Messenger.h"
32
33 #include "include/CompatSet.h"
34
35 #include "common/ceph_argparse.h"
36 #include "common/pick_address.h"
37 #include "common/Timer.h"
38 #include "common/errno.h"
39 #include "common/Preforker.h"
40
41 #include "global/global_init.h"
42 #include "global/signal_handler.h"
43
44 #include "perfglue/heap_profiler.h"
45
46 #include "include/assert.h"
47
48 #define dout_subsys ceph_subsys_mon
49
50 Monitor *mon = NULL;
51
52 void handle_mon_signal(int signum)
53 {
54   if (mon)
55     mon->handle_signal(signum);
56 }
57
58
59 int obtain_monmap(MonitorDBStore &store, bufferlist &bl)
60 {
61   dout(10) << __func__ << dendl;
62   /*
63    * the monmap may be in one of three places:
64    *  'monmap:<latest_version_no>' - the monmap we'd really like to have
65    *  'mon_sync:latest_monmap'     - last monmap backed up for the last sync
66    *  'mkfs:monmap'                - a monmap resulting from mkfs
67    */
68
69   if (store.exists("monmap", "last_committed")) {
70     version_t latest_ver = store.get("monmap", "last_committed");
71     if (store.exists("monmap", latest_ver)) {
72       int err = store.get("monmap", latest_ver, bl);
73       assert(err == 0);
74       assert(bl.length() > 0);
75       dout(10) << __func__ << " read last committed monmap ver "
76                << latest_ver << dendl;
77       return 0;
78     }
79   }
80
81   if (store.exists("mon_sync", "in_sync")
82       || store.exists("mon_sync", "force_sync")) {
83     dout(10) << __func__ << " detected aborted sync" << dendl;
84     if (store.exists("mon_sync", "latest_monmap")) {
85       int err = store.get("mon_sync", "latest_monmap", bl);
86       assert(err == 0);
87       assert(bl.length() > 0);
88       dout(10) << __func__ << " read backup monmap" << dendl;
89       return 0;
90     }
91   }
92
93   if (store.exists("mkfs", "monmap")) {
94     dout(10) << __func__ << " found mkfs monmap" << dendl;
95     int err = store.get("mkfs", "monmap", bl);
96     assert(err == 0);
97     assert(bl.length() > 0);
98     return 0;
99   }
100
101   derr << __func__ << " unable to find a monmap" << dendl;
102   return -ENOENT;
103 }
104
105 int check_mon_data_exists()
106 {
107   string mon_data = g_conf->mon_data;
108   struct stat buf;
109   if (::stat(mon_data.c_str(), &buf)) {
110     if (errno != ENOENT) {
111       derr << "stat(" << mon_data << ") " << cpp_strerror(errno) << dendl;
112     }
113     return -errno;
114   }
115   return 0;
116 }
117
118 /** Check whether **mon data** is empty.
119  *
120  * Being empty means mkfs has not been run and there's no monitor setup
121  * at **g_conf->mon_data**.
122  *
123  * If the directory g_conf->mon_data is not empty we will return -ENOTEMPTY.
124  * Otherwise we will return 0.  Any other negative returns will represent
125  * a failure to be handled by the caller.
126  *
127  * @return **0** on success, -ENOTEMPTY if not empty or **-errno** otherwise.
128  */
129 int check_mon_data_empty()
130 {
131   string mon_data = g_conf->mon_data;
132
133   DIR *dir = ::opendir(mon_data.c_str());
134   if (!dir) {
135     derr << "opendir(" << mon_data << ") " << cpp_strerror(errno) << dendl;
136     return -errno;
137   }
138   int code = 0;
139   struct dirent *de = nullptr;
140   errno = 0;
141   while ((de = ::readdir(dir))) {
142     if (string(".") != de->d_name &&
143         string("..") != de->d_name &&
144         string("kv_backend") != de->d_name) {
145       code = -ENOTEMPTY;
146       break;
147     }
148   }
149   if (!de && errno) {
150     derr << "readdir(" << mon_data << ") " << cpp_strerror(errno) << dendl;
151     code = -errno;
152   }
153
154   ::closedir(dir);
155
156   return code;
157 }
158
159 static void usage()
160 {
161   cout << "usage: ceph-mon -i <ID> [flags]\n"
162        << "  --debug_mon n\n"
163        << "        debug monitor level (e.g. 10)\n"
164        << "  --mkfs\n"
165        << "        build fresh monitor fs\n"
166        << "  --force-sync\n"
167        << "        force a sync from another mon by wiping local data (BE CAREFUL)\n"
168        << "  --yes-i-really-mean-it\n"
169        << "        mandatory safeguard for --force-sync\n"
170        << "  --compact\n"
171        << "        compact the monitor store\n"
172        << "  --osdmap <filename>\n"
173        << "        only used when --mkfs is provided: load the osdmap from <filename>\n"
174        << "  --inject-monmap <filename>\n"
175        << "        write the <filename> monmap to the local monitor store and exit\n"
176        << "  --extract-monmap <filename>\n"
177        << "        extract the monmap from the local monitor store and exit\n"
178        << "  --mon-data <directory>\n"
179        << "        where the mon store and keyring are located\n"
180        << std::endl;
181   generic_server_usage();
182 }
183
184 #ifdef BUILDING_FOR_EMBEDDED
185 void cephd_preload_embedded_plugins();
186 extern "C" int cephd_mon(int argc, const char **argv)
187 #else
188 int main(int argc, const char **argv)
189 #endif
190 {
191   int err;
192
193   bool mkfs = false;
194   bool compact = false;
195   bool force_sync = false;
196   bool yes_really = false;
197   std::string osdmapfn, inject_monmap, extract_monmap;
198
199   vector<const char*> args;
200   argv_to_vec(argc, argv, args);
201   env_to_vec(args);
202
203   // We need to specify some default values that may be overridden by the
204   // user, that are specific to the monitor.  The options we are overriding
205   // are also used on the OSD (or in any other component that uses leveldb),
206   // so changing the global defaults is not an option.
207   // This is not the prettiest way of doing this, especially since it has us
208   // having a different place defining default values, but it's not horribly
209   // wrong enough to prevent us from doing it :)
210   //
211   // NOTE: user-defined options will take precedence over ours.
212   //
213   //  leveldb_write_buffer_size = 32*1024*1024  = 33554432  // 32MB
214   //  leveldb_cache_size        = 512*1024*1204 = 536870912 // 512MB
215   //  leveldb_block_size        = 64*1024       = 65536     // 64KB
216   //  leveldb_compression       = false
217   //  leveldb_log               = ""
218   vector<const char*> def_args;
219   def_args.push_back("--leveldb-write-buffer-size=33554432");
220   def_args.push_back("--leveldb-cache-size=536870912");
221   def_args.push_back("--leveldb-block-size=65536");
222   def_args.push_back("--leveldb-compression=false");
223   def_args.push_back("--leveldb-log=");
224
225   int flags = 0;
226   {
227     vector<const char*> args_copy = args;
228     std::string val;
229     for (std::vector<const char*>::iterator i = args_copy.begin();
230          i != args_copy.end(); ) {
231       if (ceph_argparse_double_dash(args_copy, i)) {
232         break;
233       } else if (ceph_argparse_flag(args_copy, i, "--mkfs", (char*)NULL)) {
234         flags |= CINIT_FLAG_NO_DAEMON_ACTIONS;
235       } else if (ceph_argparse_witharg(args_copy, i, &val, "--inject_monmap", (char*)NULL)) {
236         flags |= CINIT_FLAG_NO_DAEMON_ACTIONS;
237       } else if (ceph_argparse_witharg(args_copy, i, &val, "--extract-monmap", (char*)NULL)) {
238         flags |= CINIT_FLAG_NO_DAEMON_ACTIONS;
239       } else {
240         ++i;
241       }
242     }
243   }
244
245   auto cct = global_init(&def_args, args,
246                          CEPH_ENTITY_TYPE_MON, CODE_ENVIRONMENT_DAEMON,
247                          flags, "mon_data");
248   ceph_heap_profiler_init();
249
250   std::string val;
251   for (std::vector<const char*>::iterator i = args.begin(); i != args.end(); ) {
252     if (ceph_argparse_double_dash(args, i)) {
253       break;
254     } else if (ceph_argparse_flag(args, i, "-h", "--help", (char*)NULL)) {
255       usage();
256     } else if (ceph_argparse_flag(args, i, "--mkfs", (char*)NULL)) {
257       mkfs = true;
258     } else if (ceph_argparse_flag(args, i, "--compact", (char*)NULL)) {
259       compact = true;
260     } else if (ceph_argparse_flag(args, i, "--force-sync", (char*)NULL)) {
261       force_sync = true;
262     } else if (ceph_argparse_flag(args, i, "--yes-i-really-mean-it", (char*)NULL)) {
263       yes_really = true;
264     } else if (ceph_argparse_witharg(args, i, &val, "--osdmap", (char*)NULL)) {
265       osdmapfn = val;
266     } else if (ceph_argparse_witharg(args, i, &val, "--inject_monmap", (char*)NULL)) {
267       inject_monmap = val;
268     } else if (ceph_argparse_witharg(args, i, &val, "--extract-monmap", (char*)NULL)) {
269       extract_monmap = val;
270     } else {
271       ++i;
272     }
273   }
274   if (!args.empty()) {
275     derr << "too many arguments: " << args << dendl;
276     usage();
277   }
278
279   if (force_sync && !yes_really) {
280     derr << "are you SURE you want to force a sync?  this will erase local data and may\n"
281          << "break your mon cluster.  pass --yes-i-really-mean-it if you do." << dendl;
282     exit(1);
283   }
284
285   if (g_conf->mon_data.empty()) {
286     derr << "must specify '--mon-data=foo' data path" << dendl;
287     usage();
288   }
289
290   if (g_conf->name.get_id().empty()) {
291     derr << "must specify id (--id <id> or --name mon.<id>)" << dendl;
292     usage();
293   }
294
295   // -- mkfs --
296   if (mkfs) {
297
298     int err = check_mon_data_exists();
299     if (err == -ENOENT) {
300       if (::mkdir(g_conf->mon_data.c_str(), 0755)) {
301         derr << "mkdir(" << g_conf->mon_data << ") : "
302              << cpp_strerror(errno) << dendl;
303         exit(1);
304       }
305     } else if (err < 0) {
306       derr << "error opening '" << g_conf->mon_data << "': "
307            << cpp_strerror(-err) << dendl;
308       exit(-err);
309     }
310
311     err = check_mon_data_empty();
312     if (err == -ENOTEMPTY) {
313       // Mon may exist.  Let the user know and exit gracefully.
314       derr << "'" << g_conf->mon_data << "' already exists and is not empty"
315            << ": monitor may already exist" << dendl;
316       exit(0);
317     } else if (err < 0) {
318       derr << "error checking if '" << g_conf->mon_data << "' is empty: "
319            << cpp_strerror(-err) << dendl;
320       exit(-err);
321     }
322
323     // resolve public_network -> public_addr
324     pick_addresses(g_ceph_context, CEPH_PICK_ADDRESS_PUBLIC);
325
326     common_init_finish(g_ceph_context);
327
328     bufferlist monmapbl, osdmapbl;
329     std::string error;
330     MonMap monmap;
331
332     // load or generate monmap
333     const auto monmap_fn = g_conf->get_val<string>("monmap");
334     if (monmap_fn.length()) {
335       int err = monmapbl.read_file(monmap_fn.c_str(), &error);
336       if (err < 0) {
337         derr << argv[0] << ": error reading " << monmap_fn << ": " << error << dendl;
338         exit(1);
339       }
340       try {
341         monmap.decode(monmapbl);
342
343         // always mark seed/mkfs monmap as epoch 0
344         monmap.set_epoch(0);
345       } catch (const buffer::error& e) {
346         derr << argv[0] << ": error decoding monmap " << monmap_fn << ": " << e.what() << dendl;
347         exit(1);
348       }      
349     } else {
350       ostringstream oss;
351       int err = monmap.build_initial(g_ceph_context, oss);
352       if (oss.tellp())
353         derr << oss.str() << dendl;
354       if (err < 0) {
355         derr << argv[0] << ": warning: no initial monitors; must use admin socket to feed hints" << dendl;
356       }
357
358       // am i part of the initial quorum?
359       if (monmap.contains(g_conf->name.get_id())) {
360         // hmm, make sure the ip listed exists on the current host?
361         // maybe later.
362       } else if (!g_conf->public_addr.is_blank_ip()) {
363         entity_addr_t a = g_conf->public_addr;
364         if (a.get_port() == 0)
365           a.set_port(CEPH_MON_PORT);
366         if (monmap.contains(a)) {
367           string name;
368           monmap.get_addr_name(a, name);
369           monmap.rename(name, g_conf->name.get_id());
370           dout(0) << argv[0] << ": renaming mon." << name << " " << a
371                << " to mon." << g_conf->name.get_id() << dendl;
372         }
373       } else {
374         // is a local address listed without a name?  if so, name myself.
375         list<entity_addr_t> ls;
376         monmap.list_addrs(ls);
377         entity_addr_t local;
378
379         if (have_local_addr(g_ceph_context, ls, &local)) {
380           string name;
381           monmap.get_addr_name(local, name);
382
383           if (name.compare(0, 7, "noname-") == 0) {
384             dout(0) << argv[0] << ": mon." << name << " " << local
385                  << " is local, renaming to mon." << g_conf->name.get_id() << dendl;
386             monmap.rename(name, g_conf->name.get_id());
387           } else {
388             dout(0) << argv[0] << ": mon." << name << " " << local
389                  << " is local, but not 'noname-' + something; not assuming it's me" << dendl;
390           }
391         }
392       }
393     }
394
395     const auto fsid = g_conf->get_val<uuid_d>("fsid");
396     if (!fsid.is_zero()) {
397       monmap.fsid = fsid;
398       dout(0) << argv[0] << ": set fsid to " << fsid << dendl;
399     }
400     
401     if (monmap.fsid.is_zero()) {
402       derr << argv[0] << ": generated monmap has no fsid; use '--fsid <uuid>'" << dendl;
403       exit(10);
404     }
405
406     //monmap.print(cout);
407
408     // osdmap
409     if (osdmapfn.length()) {
410       err = osdmapbl.read_file(osdmapfn.c_str(), &error);
411       if (err < 0) {
412         derr << argv[0] << ": error reading " << osdmapfn << ": "
413              << error << dendl;
414         exit(1);
415       }
416     }
417
418     // go
419     MonitorDBStore store(g_conf->mon_data);
420     ostringstream oss;
421     int r = store.create_and_open(oss);
422     if (oss.tellp())
423       derr << oss.str() << dendl;
424     if (r < 0) {
425       derr << argv[0] << ": error opening mon data directory at '"
426            << g_conf->mon_data << "': " << cpp_strerror(r) << dendl;
427       exit(1);
428     }
429     assert(r == 0);
430
431     Monitor mon(g_ceph_context, g_conf->name.get_id(), &store, 0, 0, &monmap);
432     r = mon.mkfs(osdmapbl);
433     if (r < 0) {
434       derr << argv[0] << ": error creating monfs: " << cpp_strerror(r) << dendl;
435       exit(1);
436     }
437     store.close();
438     dout(0) << argv[0] << ": created monfs at " << g_conf->mon_data 
439          << " for " << g_conf->name << dendl;
440     return 0;
441   }
442
443   err = check_mon_data_exists();
444   if (err < 0 && err == -ENOENT) {
445     derr << "monitor data directory at '" << g_conf->mon_data << "'"
446          << " does not exist: have you run 'mkfs'?" << dendl;
447     exit(1);
448   } else if (err < 0) {
449     derr << "error accessing monitor data directory at '"
450          << g_conf->mon_data << "': " << cpp_strerror(-err) << dendl;
451     exit(1);
452   }
453
454   err = check_mon_data_empty();
455   if (err == 0) {
456     derr << "monitor data directory at '" << g_conf->mon_data
457       << "' is empty: have you run 'mkfs'?" << dendl;
458     exit(1);
459   } else if (err < 0 && err != -ENOTEMPTY) {
460     // we don't want an empty data dir by now
461     derr << "error accessing '" << g_conf->mon_data << "': "
462          << cpp_strerror(-err) << dendl;
463     exit(1);
464   }
465
466   {
467     // check fs stats. don't start if it's critically close to full.
468     ceph_data_stats_t stats;
469     int err = get_fs_stats(stats, g_conf->mon_data.c_str());
470     if (err < 0) {
471       derr << "error checking monitor data's fs stats: " << cpp_strerror(err)
472            << dendl;
473       exit(-err);
474     }
475     if (stats.avail_percent <= g_conf->mon_data_avail_crit) {
476       derr << "error: monitor data filesystem reached concerning levels of"
477            << " available storage space (available: "
478            << stats.avail_percent << "% " << prettybyte_t(stats.byte_avail)
479            << ")\nyou may adjust 'mon data avail crit' to a lower value"
480            << " to make this go away (default: " << g_conf->mon_data_avail_crit
481            << "%)\n" << dendl;
482       exit(ENOSPC);
483     }
484   }
485
486   // we fork early to prevent leveldb's environment static state from
487   // screwing us over
488   Preforker prefork;
489   if (!(flags & CINIT_FLAG_NO_DAEMON_ACTIONS)) {
490     if (global_init_prefork(g_ceph_context) >= 0) {
491       string err_msg;
492       err = prefork.prefork(err_msg);
493       if (err < 0) {
494         derr << err_msg << dendl;
495         prefork.exit(err);
496       }
497       if (prefork.is_parent()) {
498         err = prefork.parent_wait(err_msg);
499         if (err < 0)
500           derr << err_msg << dendl;
501         prefork.exit(err);
502       }
503       setsid();
504       global_init_postfork_start(g_ceph_context);
505     }
506     common_init_finish(g_ceph_context);
507     global_init_chdir(g_ceph_context);
508 #ifndef BUILDING_FOR_EMBEDDED
509     if (global_init_preload_erasure_code(g_ceph_context) < 0)
510       prefork.exit(1);
511 #else
512     cephd_preload_embedded_plugins();
513 #endif
514   }
515
516   MonitorDBStore *store = new MonitorDBStore(g_conf->mon_data);
517   {
518     ostringstream oss;
519     err = store->open(oss);
520     if (oss.tellp())
521       derr << oss.str() << dendl;
522     if (err < 0) {
523       derr << "error opening mon data directory at '"
524            << g_conf->mon_data << "': " << cpp_strerror(err) << dendl;
525       prefork.exit(1);
526     }
527   }
528
529   bufferlist magicbl;
530   err = store->get(Monitor::MONITOR_NAME, "magic", magicbl);
531   if (err || !magicbl.length()) {
532     derr << "unable to read magic from mon data" << dendl;
533     prefork.exit(1);
534   }
535   string magic(magicbl.c_str(), magicbl.length()-1);  // ignore trailing \n
536   if (strcmp(magic.c_str(), CEPH_MON_ONDISK_MAGIC)) {
537     derr << "mon fs magic '" << magic << "' != current '" << CEPH_MON_ONDISK_MAGIC << "'" << dendl;
538     prefork.exit(1);
539   }
540
541   err = Monitor::check_features(store);
542   if (err < 0) {
543     derr << "error checking features: " << cpp_strerror(err) << dendl;
544     prefork.exit(1);
545   }
546
547   // inject new monmap?
548   if (!inject_monmap.empty()) {
549     bufferlist bl;
550     std::string error;
551     int r = bl.read_file(inject_monmap.c_str(), &error);
552     if (r) {
553       derr << "unable to read monmap from " << inject_monmap << ": "
554            << error << dendl;
555       prefork.exit(1);
556     }
557
558     // get next version
559     version_t v = store->get("monmap", "last_committed");
560     dout(0) << "last committed monmap epoch is " << v << ", injected map will be " << (v+1)
561             << dendl;
562     v++;
563
564     // set the version
565     MonMap tmp;
566     tmp.decode(bl);
567     if (tmp.get_epoch() != v) {
568       dout(0) << "changing monmap epoch from " << tmp.get_epoch()
569            << " to " << v << dendl;
570       tmp.set_epoch(v);
571     }
572     bufferlist mapbl;
573     tmp.encode(mapbl, CEPH_FEATURES_ALL);
574     bufferlist final;
575     ::encode(v, final);
576     ::encode(mapbl, final);
577
578     auto t(std::make_shared<MonitorDBStore::Transaction>());
579     // save it
580     t->put("monmap", v, mapbl);
581     t->put("monmap", "latest", final);
582     t->put("monmap", "last_committed", v);
583     store->apply_transaction(t);
584
585     dout(0) << "done." << dendl;
586     prefork.exit(0);
587   }
588
589   // monmap?
590   MonMap monmap;
591   {
592     // note that even if we don't find a viable monmap, we should go ahead
593     // and try to build it up in the next if-else block.
594     bufferlist mapbl;
595     int err = obtain_monmap(*store, mapbl);
596     if (err >= 0) {
597       try {
598         monmap.decode(mapbl);
599       } catch (const buffer::error& e) {
600         derr << "can't decode monmap: " << e.what() << dendl;
601       }
602     } else {
603       derr << "unable to obtain a monmap: " << cpp_strerror(err) << dendl;
604     }
605     if (!extract_monmap.empty()) {
606       int r = mapbl.write_file(extract_monmap.c_str());
607       if (r < 0) {
608         r = -errno;
609         derr << "error writing monmap to " << extract_monmap << ": " << cpp_strerror(r) << dendl;
610         prefork.exit(1);
611       }
612       derr << "wrote monmap to " << extract_monmap << dendl;
613       prefork.exit(0);
614     }
615   }
616
617   // this is what i will bind to
618   entity_addr_t ipaddr;
619
620   if (monmap.contains(g_conf->name.get_id())) {
621     ipaddr = monmap.get_addr(g_conf->name.get_id());
622
623     // print helpful warning if the conf file doesn't match
624     entity_addr_t conf_addr;
625     std::vector <std::string> my_sections;
626     g_conf->get_my_sections(my_sections);
627     std::string mon_addr_str;
628     if (g_conf->get_val_from_conf_file(my_sections, "mon addr",
629                                        mon_addr_str, true) == 0) {
630       if (conf_addr.parse(mon_addr_str.c_str()) && (ipaddr != conf_addr)) {
631         derr << "WARNING: 'mon addr' config option " << conf_addr
632              << " does not match monmap file" << std::endl
633              << "         continuing with monmap configuration" << dendl;
634       }
635     }
636   } else {
637     dout(0) << g_conf->name << " does not exist in monmap, will attempt to join an existing cluster" << dendl;
638
639     pick_addresses(g_ceph_context, CEPH_PICK_ADDRESS_PUBLIC);
640     if (!g_conf->public_addr.is_blank_ip()) {
641       ipaddr = g_conf->public_addr;
642       if (ipaddr.get_port() == 0)
643         ipaddr.set_port(CEPH_MON_PORT);
644       dout(0) << "using public_addr " << g_conf->public_addr << " -> "
645               << ipaddr << dendl;
646     } else {
647       MonMap tmpmap;
648       ostringstream oss;
649       int err = tmpmap.build_initial(g_ceph_context, oss);
650       if (oss.tellp())
651         derr << oss.str() << dendl;
652       if (err < 0) {
653         derr << argv[0] << ": error generating initial monmap: "
654              << cpp_strerror(err) << dendl;
655         usage();
656         prefork.exit(1);
657       }
658       if (tmpmap.contains(g_conf->name.get_id())) {
659         ipaddr = tmpmap.get_addr(g_conf->name.get_id());
660       } else {
661         derr << "no public_addr or public_network specified, and " << g_conf->name
662              << " not present in monmap or ceph.conf" << dendl;
663         prefork.exit(1);
664       }
665     }
666   }
667
668   // bind
669   int rank = monmap.get_rank(g_conf->name.get_id());
670   std::string public_msgr_type = g_conf->ms_public_type.empty() ? g_conf->get_val<std::string>("ms_type") : g_conf->ms_public_type;
671   Messenger *msgr = Messenger::create(g_ceph_context, public_msgr_type,
672                                       entity_name_t::MON(rank), "mon",
673                                       0, Messenger::HAS_MANY_CONNECTIONS);
674   if (!msgr)
675     exit(1);
676   msgr->set_cluster_protocol(CEPH_MON_PROTOCOL);
677   msgr->set_default_send_priority(CEPH_MSG_PRIO_HIGH);
678
679   msgr->set_default_policy(Messenger::Policy::stateless_server(0));
680   msgr->set_policy(entity_name_t::TYPE_MON,
681                    Messenger::Policy::lossless_peer_reuse(
682                      CEPH_FEATURE_UID |
683                      CEPH_FEATURE_PGID64 |
684                      CEPH_FEATURE_MON_SINGLE_PAXOS));
685   msgr->set_policy(entity_name_t::TYPE_OSD,
686                    Messenger::Policy::stateless_server(
687                      CEPH_FEATURE_PGID64 |
688                      CEPH_FEATURE_OSDENC));
689   msgr->set_policy(entity_name_t::TYPE_CLIENT,
690                    Messenger::Policy::stateless_server(0));
691   msgr->set_policy(entity_name_t::TYPE_MDS,
692                    Messenger::Policy::stateless_server(0));
693
694   // throttle client traffic
695   Throttle *client_throttler = new Throttle(g_ceph_context, "mon_client_bytes",
696                                             g_conf->mon_client_bytes);
697   msgr->set_policy_throttlers(entity_name_t::TYPE_CLIENT,
698                                      client_throttler, NULL);
699
700   // throttle daemon traffic
701   // NOTE: actual usage on the leader may multiply by the number of
702   // monitors if they forward large update messages from daemons.
703   Throttle *daemon_throttler = new Throttle(g_ceph_context, "mon_daemon_bytes",
704                                             g_conf->mon_daemon_bytes);
705   msgr->set_policy_throttlers(entity_name_t::TYPE_OSD, daemon_throttler,
706                                      NULL);
707   msgr->set_policy_throttlers(entity_name_t::TYPE_MDS, daemon_throttler,
708                                      NULL);
709
710   entity_addr_t bind_addr = ipaddr;
711   entity_addr_t public_addr = ipaddr;
712
713   // check if the public_bind_addr option is set
714   if (!g_conf->public_bind_addr.is_blank_ip()) {
715     bind_addr = g_conf->public_bind_addr;
716
717     // set the default port if not already set
718     if (bind_addr.get_port() == 0) {
719       bind_addr.set_port(CEPH_MON_PORT);
720     }
721   }
722
723   dout(0) << "starting " << g_conf->name << " rank " << rank
724        << " at public addr " << public_addr
725        << " at bind addr " << bind_addr
726        << " mon_data " << g_conf->mon_data
727        << " fsid " << monmap.get_fsid()
728        << dendl;
729
730   err = msgr->bind(bind_addr);
731   if (err < 0) {
732     derr << "unable to bind monitor to " << bind_addr << dendl;
733     prefork.exit(1);
734   }
735
736   // if the public and bind addr are different set the msgr addr
737   // to the public one, now that the bind is complete.
738   if (public_addr != bind_addr) {
739     msgr->set_addr(public_addr);
740   }
741
742   Messenger *mgr_msgr = Messenger::create(g_ceph_context, public_msgr_type,
743                                           entity_name_t::MON(rank), "mon-mgrc",
744                                           getpid(), 0);
745   if (!mgr_msgr) {
746     derr << "unable to create mgr_msgr" << dendl;
747     prefork.exit(1);
748   }
749
750   dout(0) << "starting " << g_conf->name << " rank " << rank
751        << " at " << ipaddr
752        << " mon_data " << g_conf->mon_data
753        << " fsid " << monmap.get_fsid()
754        << dendl;
755
756   // start monitor
757   mon = new Monitor(g_ceph_context, g_conf->name.get_id(), store,
758                     msgr, mgr_msgr, &monmap);
759
760   if (force_sync) {
761     derr << "flagging a forced sync ..." << dendl;
762     ostringstream oss;
763     mon->sync_force(NULL, oss);
764     if (oss.tellp())
765       derr << oss.str() << dendl;
766   }
767
768   err = mon->preinit();
769   if (err < 0) {
770     derr << "failed to initialize" << dendl;
771     prefork.exit(1);
772   }
773
774   if (compact || g_conf->mon_compact_on_start) {
775     derr << "compacting monitor store ..." << dendl;
776     mon->store->compact();
777     derr << "done compacting" << dendl;
778   }
779
780   if (g_conf->daemonize) {
781     global_init_postfork_finish(g_ceph_context);
782     prefork.daemonize();
783   }
784
785   msgr->start();
786   mgr_msgr->start();
787
788   mon->init();
789
790   // set up signal handlers, now that we've daemonized/forked.
791   init_async_signal_handler();
792   register_async_signal_handler(SIGHUP, sighup_handler);
793   register_async_signal_handler_oneshot(SIGINT, handle_mon_signal);
794   register_async_signal_handler_oneshot(SIGTERM, handle_mon_signal);
795
796   if (g_conf->inject_early_sigterm)
797     kill(getpid(), SIGTERM);
798
799   msgr->wait();
800   mgr_msgr->wait();
801
802   store->close();
803
804   unregister_async_signal_handler(SIGHUP, sighup_handler);
805   unregister_async_signal_handler(SIGINT, handle_mon_signal);
806   unregister_async_signal_handler(SIGTERM, handle_mon_signal);
807   shutdown_async_signal_handler();
808
809   delete mon;
810   delete store;
811   delete msgr;
812   delete mgr_msgr;
813   delete client_throttler;
814   delete daemon_throttler;
815
816   // cd on exit, so that gmon.out (if any) goes into a separate directory for each node.
817   char s[20];
818   snprintf(s, sizeof(s), "gmon/%d", getpid());
819   if ((mkdir(s, 0755) == 0) && (chdir(s) == 0)) {
820     dout(0) << "ceph-mon: gmon.out should be in " << s << dendl;
821   }
822
823   prefork.signal_exit(0);
824   return 0;
825 }