Fix some bugs when testing opensds ansible
[stor4nfv.git] / src / ceph / src / mgr / Mgr.cc
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
3 /*
4  * Ceph - scalable distributed file system
5  *
6  * Copyright (C) 2016 John Spray <john.spray@redhat.com>
7  *
8  * This is free software; you can redistribute it and/or
9  * modify it under the terms of the GNU Lesser General Public
10  * License version 2.1, as published by the Free Software
11  * Foundation.  See file COPYING.
12  */
13
14 #include <Python.h>
15
16 #include "osdc/Objecter.h"
17 #include "client/Client.h"
18 #include "common/errno.h"
19 #include "mon/MonClient.h"
20 #include "include/stringify.h"
21 #include "global/global_context.h"
22 #include "global/signal_handler.h"
23
24 #include "mgr/MgrContext.h"
25 #include "mgr/mgr_commands.h"
26
27 //#include "MgrPyModule.h"
28 #include "DaemonServer.h"
29 #include "messages/MMgrBeacon.h"
30 #include "messages/MMgrDigest.h"
31 #include "messages/MCommand.h"
32 #include "messages/MCommandReply.h"
33 #include "messages/MLog.h"
34 #include "messages/MServiceMap.h"
35
36 #include "Mgr.h"
37
38 #define dout_context g_ceph_context
39 #define dout_subsys ceph_subsys_mgr
40 #undef dout_prefix
41 #define dout_prefix *_dout << "mgr " << __func__ << " "
42
43
44 Mgr::Mgr(MonClient *monc_, const MgrMap& mgrmap,
45          PyModuleRegistry *py_module_registry_,
46          Messenger *clientm_, Objecter *objecter_,
47          Client* client_, LogChannelRef clog_, LogChannelRef audit_clog_) :
48   monc(monc_),
49   objecter(objecter_),
50   client(client_),
51   client_messenger(clientm_),
52   lock("Mgr::lock"),
53   timer(g_ceph_context, lock),
54   finisher(g_ceph_context, "Mgr", "mgr-fin"),
55   digest_received(false),
56   py_module_registry(py_module_registry_),
57   cluster_state(monc, nullptr, mgrmap),
58   server(monc, finisher, daemon_state, cluster_state, *py_module_registry,
59          clog_, audit_clog_),
60   clog(clog_),
61   audit_clog(audit_clog_),
62   initialized(false),
63   initializing(false)
64 {
65   cluster_state.set_objecter(objecter);
66 }
67
68
69 Mgr::~Mgr()
70 {
71 }
72
73
74 /**
75  * Context for completion of metadata mon commands: take
76  * the result and stash it in DaemonStateIndex
77  */
78 class MetadataUpdate : public Context
79 {
80   DaemonStateIndex &daemon_state;
81   DaemonKey key;
82
83   std::map<std::string, std::string> defaults;
84
85 public:
86   bufferlist outbl;
87   std::string outs;
88
89   MetadataUpdate(DaemonStateIndex &daemon_state_, const DaemonKey &key_)
90     : daemon_state(daemon_state_), key(key_) {}
91
92   void set_default(const std::string &k, const std::string &v)
93   {
94     defaults[k] = v;
95   }
96
97   void finish(int r) override
98   {
99     daemon_state.clear_updating(key);
100     if (r == 0) {
101       if (key.first == "mds") {
102         json_spirit::mValue json_result;
103         bool read_ok = json_spirit::read(
104             outbl.to_str(), json_result);
105         if (!read_ok) {
106           dout(1) << "mon returned invalid JSON for "
107                   << key.first << "." << key.second << dendl;
108           return;
109         }
110
111         json_spirit::mObject daemon_meta = json_result.get_obj();
112
113         // Apply any defaults
114         for (const auto &i : defaults) {
115           if (daemon_meta.find(i.first) == daemon_meta.end()) {
116             daemon_meta[i.first] = i.second;
117           }
118         }
119
120         DaemonStatePtr state;
121         if (daemon_state.exists(key)) {
122           state = daemon_state.get(key);
123           Mutex::Locker l(state->lock);
124           daemon_meta.erase("name");
125           daemon_meta.erase("hostname");
126           state->metadata.clear();
127           for (const auto &i : daemon_meta) {
128             state->metadata[i.first] = i.second.get_str();
129           }
130         } else {
131           state = std::make_shared<DaemonState>(daemon_state.types);
132           state->key = key;
133           state->hostname = daemon_meta.at("hostname").get_str();
134
135           for (const auto &i : daemon_meta) {
136             state->metadata[i.first] = i.second.get_str();
137           }
138
139           daemon_state.insert(state);
140         }
141       } else if (key.first == "osd") {
142       } else {
143         ceph_abort();
144       }
145     } else {
146       dout(1) << "mon failed to return metadata for "
147               << key.first << "." << key.second << ": "
148               << cpp_strerror(r) << dendl;
149     }
150   }
151 };
152
153
154 void Mgr::background_init(Context *completion)
155 {
156   Mutex::Locker l(lock);
157   assert(!initializing);
158   assert(!initialized);
159   initializing = true;
160
161   finisher.start();
162
163   finisher.queue(new FunctionContext([this, completion](int r){
164     init();
165     completion->complete(0);
166   }));
167 }
168
169 void Mgr::init()
170 {
171   Mutex::Locker l(lock);
172   assert(initializing);
173   assert(!initialized);
174
175   // Start communicating with daemons to learn statistics etc
176   int r = server.init(monc->get_global_id(), client_messenger->get_myaddr());
177   if (r < 0) {
178     derr << "Initialize server fail"<< dendl;
179     return;
180   }
181   dout(4) << "Initialized server at " << server.get_myaddr() << dendl;
182
183   // Preload all daemon metadata (will subsequently keep this
184   // up to date by watching maps, so do the initial load before
185   // we subscribe to any maps)
186   dout(4) << "Loading daemon metadata..." << dendl;
187   load_all_metadata();
188
189   // subscribe to all the maps
190   monc->sub_want("log-info", 0, 0);
191   monc->sub_want("mgrdigest", 0, 0);
192   monc->sub_want("fsmap", 0, 0);
193   monc->sub_want("servicemap", 0, 0);
194
195   dout(4) << "waiting for OSDMap..." << dendl;
196   // Subscribe to OSDMap update to pass on to ClusterState
197   objecter->maybe_request_map();
198
199   // reset the mon session.  we get these maps through subscriptions which
200   // are stateful with the connection, so even if *we* don't have them a
201   // previous incarnation sharing the same MonClient may have.
202   monc->reopen_session();
203
204   // Start Objecter and wait for OSD map
205   lock.Unlock();  // Drop lock because OSDMap dispatch calls into my ms_dispatch
206   objecter->wait_for_osd_map();
207   lock.Lock();
208
209   // Populate PGs in ClusterState
210   objecter->with_osdmap([this](const OSDMap &osd_map) {
211     cluster_state.notify_osdmap(osd_map);
212   });
213
214   // Wait for FSMap
215   dout(4) << "waiting for FSMap..." << dendl;
216   while (!cluster_state.have_fsmap()) {
217     fs_map_cond.Wait(lock);
218   }
219
220   dout(4) << "waiting for config-keys..." << dendl;
221
222   // Preload config keys (`get` for plugins is to be a fast local
223   // operation, we we don't have to synchronize these later because
224   // all sets will come via mgr)
225   auto loaded_config = load_config();
226
227   // Wait for MgrDigest...
228   dout(4) << "waiting for MgrDigest..." << dendl;
229   while (!digest_received) {
230     digest_cond.Wait(lock);
231   }
232
233   // assume finisher already initialized in background_init
234   dout(4) << "starting python modules..." << dendl;
235   py_module_registry->active_start(loaded_config, daemon_state, cluster_state, *monc,
236       clog, *objecter, *client, finisher);
237
238   dout(4) << "Complete." << dendl;
239   initializing = false;
240   initialized = true;
241 }
242
243 void Mgr::load_all_metadata()
244 {
245   assert(lock.is_locked_by_me());
246
247   JSONCommand mds_cmd;
248   mds_cmd.run(monc, "{\"prefix\": \"mds metadata\"}");
249   JSONCommand osd_cmd;
250   osd_cmd.run(monc, "{\"prefix\": \"osd metadata\"}");
251   JSONCommand mon_cmd;
252   mon_cmd.run(monc, "{\"prefix\": \"mon metadata\"}");
253
254   lock.Unlock();
255   mds_cmd.wait();
256   osd_cmd.wait();
257   mon_cmd.wait();
258   lock.Lock();
259
260   assert(mds_cmd.r == 0);
261   assert(mon_cmd.r == 0);
262   assert(osd_cmd.r == 0);
263
264   for (auto &metadata_val : mds_cmd.json_result.get_array()) {
265     json_spirit::mObject daemon_meta = metadata_val.get_obj();
266     if (daemon_meta.count("hostname") == 0) {
267       dout(1) << "Skipping incomplete metadata entry" << dendl;
268       continue;
269     }
270
271     DaemonStatePtr dm = std::make_shared<DaemonState>(daemon_state.types);
272     dm->key = DaemonKey("mds",
273                         daemon_meta.at("name").get_str());
274     dm->hostname = daemon_meta.at("hostname").get_str();
275
276     daemon_meta.erase("name");
277     daemon_meta.erase("hostname");
278
279     for (const auto &i : daemon_meta) {
280       dm->metadata[i.first] = i.second.get_str();
281     }
282
283     daemon_state.insert(dm);
284   }
285
286   for (auto &metadata_val : mon_cmd.json_result.get_array()) {
287     json_spirit::mObject daemon_meta = metadata_val.get_obj();
288     if (daemon_meta.count("hostname") == 0) {
289       dout(1) << "Skipping incomplete metadata entry" << dendl;
290       continue;
291     }
292
293     DaemonStatePtr dm = std::make_shared<DaemonState>(daemon_state.types);
294     dm->key = DaemonKey("mon",
295                         daemon_meta.at("name").get_str());
296     dm->hostname = daemon_meta.at("hostname").get_str();
297
298     daemon_meta.erase("name");
299     daemon_meta.erase("hostname");
300
301     for (const auto &i : daemon_meta) {
302       dm->metadata[i.first] = i.second.get_str();
303     }
304
305     daemon_state.insert(dm);
306   }
307
308   for (auto &osd_metadata_val : osd_cmd.json_result.get_array()) {
309     json_spirit::mObject osd_metadata = osd_metadata_val.get_obj();
310     if (osd_metadata.count("hostname") == 0) {
311       dout(1) << "Skipping incomplete metadata entry" << dendl;
312       continue;
313     }
314     dout(4) << osd_metadata.at("hostname").get_str() << dendl;
315
316     DaemonStatePtr dm = std::make_shared<DaemonState>(daemon_state.types);
317     dm->key = DaemonKey("osd",
318                         stringify(osd_metadata.at("id").get_int()));
319     dm->hostname = osd_metadata.at("hostname").get_str();
320
321     osd_metadata.erase("id");
322     osd_metadata.erase("hostname");
323
324     for (const auto &i : osd_metadata) {
325       dm->metadata[i.first] = i.second.get_str();
326     }
327
328     daemon_state.insert(dm);
329   }
330 }
331
332 std::map<std::string, std::string> Mgr::load_config()
333 {
334   assert(lock.is_locked_by_me());
335
336   dout(10) << "listing keys" << dendl;
337   JSONCommand cmd;
338   cmd.run(monc, "{\"prefix\": \"config-key ls\"}");
339   lock.Unlock();
340   cmd.wait();
341   lock.Lock();
342   assert(cmd.r == 0);
343
344   std::map<std::string, std::string> loaded;
345   
346   for (auto &key_str : cmd.json_result.get_array()) {
347     std::string const key = key_str.get_str();
348     dout(20) << "saw key '" << key << "'" << dendl;
349
350     const std::string config_prefix = PyModuleRegistry::config_prefix;
351
352     if (key.substr(0, config_prefix.size()) == config_prefix) {
353       dout(20) << "fetching '" << key << "'" << dendl;
354       Command get_cmd;
355       std::ostringstream cmd_json;
356       cmd_json << "{\"prefix\": \"config-key get\", \"key\": \"" << key << "\"}";
357       get_cmd.run(monc, cmd_json.str());
358       lock.Unlock();
359       get_cmd.wait();
360       lock.Lock();
361       assert(get_cmd.r == 0);
362       loaded[key] = get_cmd.outbl.to_str();
363     }
364   }
365
366   return loaded;
367 }
368
369 void Mgr::shutdown()
370 {
371   finisher.queue(new FunctionContext([&](int) {
372     {
373       Mutex::Locker l(lock);
374       monc->sub_unwant("log-info");
375       monc->sub_unwant("mgrdigest");
376       monc->sub_unwant("fsmap");
377       // First stop the server so that we're not taking any more incoming
378       // requests
379       server.shutdown();
380     }
381     // after the messenger is stopped, signal modules to shutdown via finisher
382     py_module_registry->active_shutdown();
383   }));
384
385   // Then stop the finisher to ensure its enqueued contexts aren't going
386   // to touch references to the things we're about to tear down
387   finisher.wait_for_empty();
388   finisher.stop();
389 }
390
391 void Mgr::handle_osd_map()
392 {
393   assert(lock.is_locked_by_me());
394
395   std::set<std::string> names_exist;
396
397   /**
398    * When we see a new OSD map, inspect the entity addrs to
399    * see if they have changed (service restart), and if so
400    * reload the metadata.
401    */
402   objecter->with_osdmap([this, &names_exist](const OSDMap &osd_map) {
403     for (unsigned int osd_id = 0; osd_id < osd_map.get_num_osds(); ++osd_id) {
404       if (!osd_map.exists(osd_id)) {
405         continue;
406       }
407
408       // Remember which OSDs exist so that we can cull any that don't
409       names_exist.insert(stringify(osd_id));
410
411       // Consider whether to update the daemon metadata (new/restarted daemon)
412       bool update_meta = false;
413       const auto k = DaemonKey("osd", stringify(osd_id));
414       if (daemon_state.is_updating(k)) {
415         continue;
416       }
417
418       if (daemon_state.exists(k)) {
419         auto metadata = daemon_state.get(k);
420         Mutex::Locker l(metadata->lock);
421         auto addr_iter = metadata->metadata.find("front_addr");
422         if (addr_iter != metadata->metadata.end()) {
423           const std::string &metadata_addr = addr_iter->second;
424           const auto &map_addr = osd_map.get_addr(osd_id);
425
426           if (metadata_addr != stringify(map_addr)) {
427             dout(4) << "OSD[" << osd_id << "] addr change " << metadata_addr
428                     << " != " << stringify(map_addr) << dendl;
429             update_meta = true;
430           } else {
431             dout(20) << "OSD[" << osd_id << "] addr unchanged: "
432                      << metadata_addr << dendl;
433           }
434         } else {
435           // Awkward case where daemon went into DaemonState because it
436           // sent us a report but its metadata didn't get loaded yet
437           update_meta = true;
438         }
439       } else {
440         update_meta = true;
441       }
442
443       if (update_meta) {
444         daemon_state.notify_updating(k);
445         auto c = new MetadataUpdate(daemon_state, k);
446         std::ostringstream cmd;
447         cmd << "{\"prefix\": \"osd metadata\", \"id\": "
448             << osd_id << "}";
449         monc->start_mon_command(
450             {cmd.str()},
451             {}, &c->outbl, &c->outs, c);
452       }
453     }
454
455     cluster_state.notify_osdmap(osd_map);
456   });
457
458   // TODO: same culling for MonMap
459   daemon_state.cull("osd", names_exist);
460 }
461
462 void Mgr::handle_log(MLog *m)
463 {
464   for (const auto &e : m->entries) {
465     py_module_registry->notify_all(e);
466   }
467
468   m->put();
469 }
470
471 void Mgr::handle_service_map(MServiceMap *m)
472 {
473   dout(10) << "e" << m->service_map.epoch << dendl;
474   cluster_state.set_service_map(m->service_map);
475   server.got_service_map();
476 }
477
478 bool Mgr::ms_dispatch(Message *m)
479 {
480   dout(4) << *m << dendl;
481   Mutex::Locker l(lock);
482
483   switch (m->get_type()) {
484     case MSG_MGR_DIGEST:
485       handle_mgr_digest(static_cast<MMgrDigest*>(m));
486       break;
487     case CEPH_MSG_MON_MAP:
488       py_module_registry->notify_all("mon_map", "");
489       m->put();
490       break;
491     case CEPH_MSG_FS_MAP:
492       py_module_registry->notify_all("fs_map", "");
493       handle_fs_map((MFSMap*)m);
494       return false; // I shall let this pass through for Client
495       break;
496     case CEPH_MSG_OSD_MAP:
497       handle_osd_map();
498
499       py_module_registry->notify_all("osd_map", "");
500
501       // Continuous subscribe, so that we can generate notifications
502       // for our MgrPyModules
503       objecter->maybe_request_map();
504       m->put();
505       break;
506     case MSG_SERVICE_MAP:
507       handle_service_map((MServiceMap*)m);
508       py_module_registry->notify_all("service_map", "");
509       m->put();
510       break;
511     case MSG_LOG:
512       handle_log(static_cast<MLog *>(m));
513       break;
514
515     default:
516       return false;
517   }
518   return true;
519 }
520
521
522 void Mgr::handle_fs_map(MFSMap* m)
523 {
524   assert(lock.is_locked_by_me());
525
526   std::set<std::string> names_exist;
527   
528   const FSMap &new_fsmap = m->get_fsmap();
529
530   fs_map_cond.Signal();
531
532   // TODO: callers (e.g. from python land) are potentially going to see
533   // the new fsmap before we've bothered populating all the resulting
534   // daemon_state.  Maybe we should block python land while we're making
535   // this kind of update?
536   
537   cluster_state.set_fsmap(new_fsmap);
538
539   auto mds_info = new_fsmap.get_mds_info();
540   for (const auto &i : mds_info) {
541     const auto &info = i.second;
542
543     if (!new_fsmap.gid_exists(i.first)){
544       continue;
545     }
546
547     // Remember which MDS exists so that we can cull any that don't
548     names_exist.insert(info.name);
549
550     const auto k = DaemonKey("mds", info.name);
551     if (daemon_state.is_updating(k)) {
552       continue;
553     }
554
555     bool update = false;
556     if (daemon_state.exists(k)) {
557       auto metadata = daemon_state.get(k);
558       Mutex::Locker l(metadata->lock);
559       if (metadata->metadata.empty() ||
560           metadata->metadata.count("addr") == 0) {
561         update = true;
562       } else {
563         auto metadata_addr = metadata->metadata.at("addr");
564         const auto map_addr = info.addr;
565         update = metadata_addr != stringify(map_addr);
566         if (update) {
567           dout(4) << "MDS[" << info.name << "] addr change " << metadata_addr
568                   << " != " << stringify(map_addr) << dendl;
569         }
570       }
571     } else {
572       update = true;
573     }
574
575     if (update) {
576       daemon_state.notify_updating(k);
577       auto c = new MetadataUpdate(daemon_state, k);
578
579       // Older MDS daemons don't have addr in the metadata, so
580       // fake it if the returned metadata doesn't have the field.
581       c->set_default("addr", stringify(info.addr));
582
583       std::ostringstream cmd;
584       cmd << "{\"prefix\": \"mds metadata\", \"who\": \""
585           << info.name << "\"}";
586       monc->start_mon_command(
587           {cmd.str()},
588           {}, &c->outbl, &c->outs, c);
589     }
590   }
591   daemon_state.cull("mds", names_exist);
592 }
593
594 bool Mgr::got_mgr_map(const MgrMap& m)
595 {
596   Mutex::Locker l(lock);
597   dout(10) << m << dendl;
598
599   set<string> old_modules;
600   cluster_state.with_mgrmap([&](const MgrMap& m) {
601       old_modules = m.modules;
602     });
603   if (m.modules != old_modules) {
604     derr << "mgrmap module list changed to (" << m.modules << "), respawn"
605          << dendl;
606     return true;
607   }
608
609   cluster_state.set_mgr_map(m);
610
611   return false;
612 }
613
614 void Mgr::handle_mgr_digest(MMgrDigest* m)
615 {
616   dout(10) << m->mon_status_json.length() << dendl;
617   dout(10) << m->health_json.length() << dendl;
618   cluster_state.load_digest(m);
619   py_module_registry->notify_all("mon_status", "");
620   py_module_registry->notify_all("health", "");
621
622   // Hack: use this as a tick/opportunity to prompt python-land that
623   // the pgmap might have changed since last time we were here.
624   py_module_registry->notify_all("pg_summary", "");
625   dout(10) << "done." << dendl;
626
627   m->put();
628
629   if (!digest_received) {
630     digest_received = true;
631     digest_cond.Signal();
632   }
633 }
634
635 void Mgr::tick()
636 {
637   dout(10) << dendl;
638   server.send_report();
639 }
640
641 std::vector<MonCommand> Mgr::get_command_set() const
642 {
643   Mutex::Locker l(lock);
644
645   std::vector<MonCommand> commands = mgr_commands;
646   std::vector<MonCommand> py_commands = py_module_registry->get_commands();
647   commands.insert(commands.end(), py_commands.begin(), py_commands.end());
648   return commands;
649 }
650
651 std::map<std::string, std::string> Mgr::get_services() const
652 {
653   Mutex::Locker l(lock);
654
655   return py_module_registry->get_services();
656 }
657