Fix some bugs when testing opensds ansible
[stor4nfv.git] / src / ceph / src / mgr / MgrClient.cc
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
3 /*
4  * Ceph - scalable distributed file system
5  *
6  * Copyright (C) 2016 John Spray <john.spray@redhat.com>
7  *
8  * This is free software; you can redistribute it and/or
9  * modify it under the terms of the GNU Lesser General Public
10  * License version 2.1, as published by the Free Software
11  * Foundation.  See file COPYING.
12  */
13
14
15 #include "MgrClient.h"
16
17 #include "mgr/MgrContext.h"
18
19 #include "msg/Messenger.h"
20 #include "messages/MMgrMap.h"
21 #include "messages/MMgrReport.h"
22 #include "messages/MMgrOpen.h"
23 #include "messages/MMgrConfigure.h"
24 #include "messages/MCommand.h"
25 #include "messages/MCommandReply.h"
26 #include "messages/MPGStats.h"
27
28 #define dout_subsys ceph_subsys_mgrc
29 #undef dout_prefix
30 #define dout_prefix *_dout << "mgrc " << __func__ << " "
31
32 MgrClient::MgrClient(CephContext *cct_, Messenger *msgr_)
33     : Dispatcher(cct_), cct(cct_), msgr(msgr_),
34       timer(cct_, lock)
35 {
36   assert(cct != nullptr);
37 }
38
39 void MgrClient::init()
40 {
41   Mutex::Locker l(lock);
42
43   assert(msgr != nullptr);
44
45   timer.init();
46 }
47
48 void MgrClient::shutdown()
49 {
50   Mutex::Locker l(lock);
51
52   if (connect_retry_callback) {
53     timer.cancel_event(connect_retry_callback);
54     connect_retry_callback = nullptr;
55   }
56
57   // forget about in-flight commands if we are prematurely shut down
58   // (e.g., by control-C)
59   command_table.clear();
60
61   timer.shutdown();
62   if (session) {
63     session->con->mark_down();
64     session.reset();
65   }
66 }
67
68 bool MgrClient::ms_dispatch(Message *m)
69 {
70   Mutex::Locker l(lock);
71
72   switch(m->get_type()) {
73   case MSG_MGR_MAP:
74     return handle_mgr_map(static_cast<MMgrMap*>(m));
75   case MSG_MGR_CONFIGURE:
76     return handle_mgr_configure(static_cast<MMgrConfigure*>(m));
77   case MSG_COMMAND_REPLY:
78     if (m->get_source().type() == CEPH_ENTITY_TYPE_MGR) {
79       handle_command_reply(static_cast<MCommandReply*>(m));
80       return true;
81     } else {
82       return false;
83     }
84   default:
85     ldout(cct, 30) << "Not handling " << *m << dendl; 
86     return false;
87   }
88 }
89
90 void MgrClient::reconnect()
91 {
92   assert(lock.is_locked_by_me());
93
94   if (session) {
95     ldout(cct, 4) << "Terminating session with "
96                   << session->con->get_peer_addr() << dendl;
97     session->con->mark_down();
98     session.reset();
99     stats_period = 0;
100     if (report_callback != nullptr) {
101       timer.cancel_event(report_callback);
102       report_callback = nullptr;
103     }
104   }
105
106   if (!map.get_available()) {
107     ldout(cct, 4) << "No active mgr available yet" << dendl;
108     return;
109   }
110
111   if (last_connect_attempt != utime_t()) {
112     utime_t now = ceph_clock_now();
113     utime_t when = last_connect_attempt;
114     when += cct->_conf->get_val<double>("mgr_connect_retry_interval");
115     if (now < when) {
116       if (!connect_retry_callback) {
117         connect_retry_callback = timer.add_event_at(
118           when,
119           new FunctionContext([this](int r){
120               connect_retry_callback = nullptr;
121               reconnect();
122             }));
123       }
124       ldout(cct, 4) << "waiting to retry connect until " << when << dendl;
125       return;
126     }
127   }
128
129   if (connect_retry_callback) {
130     timer.cancel_event(connect_retry_callback);
131     connect_retry_callback = nullptr;
132   }
133
134   ldout(cct, 4) << "Starting new session with " << map.get_active_addr()
135                 << dendl;
136   entity_inst_t inst;
137   inst.addr = map.get_active_addr();
138   inst.name = entity_name_t::MGR(map.get_active_gid());
139   last_connect_attempt = ceph_clock_now();
140
141   session.reset(new MgrSessionState());
142   session->con = msgr->get_connection(inst);
143
144   if (service_daemon) {
145     daemon_dirty_status = true;
146   }
147
148   // Don't send an open if we're just a client (i.e. doing
149   // command-sending, not stats etc)
150   if (!cct->_conf->name.is_client() || service_daemon) {
151     _send_open();
152   }
153
154   // resend any pending commands
155   for (const auto &p : command_table.get_commands()) {
156     MCommand *m = p.second.get_message({});
157     assert(session);
158     assert(session->con);
159     session->con->send_message(m);
160   }
161 }
162
163 void MgrClient::_send_open()
164 {
165   if (session && session->con) {
166     auto open = new MMgrOpen();
167     if (!service_name.empty()) {
168       open->service_name = service_name;
169       open->daemon_name = daemon_name;
170     } else {
171       open->daemon_name = cct->_conf->name.get_id();
172     }
173     if (service_daemon) {
174       open->service_daemon = service_daemon;
175       open->daemon_metadata = daemon_metadata;
176     }
177     session->con->send_message(open);
178   }
179 }
180
181 bool MgrClient::handle_mgr_map(MMgrMap *m)
182 {
183   assert(lock.is_locked_by_me());
184
185   ldout(cct, 20) << *m << dendl;
186
187   map = m->get_map();
188   ldout(cct, 4) << "Got map version " << map.epoch << dendl;
189   m->put();
190
191   ldout(cct, 4) << "Active mgr is now " << map.get_active_addr() << dendl;
192
193   // Reset session?
194   if (!session ||
195       session->con->get_peer_addr() != map.get_active_addr()) {
196     reconnect();
197   }
198
199   return true;
200 }
201
202 bool MgrClient::ms_handle_reset(Connection *con)
203 {
204   Mutex::Locker l(lock);
205   if (session && con == session->con) {
206     ldout(cct, 4) << __func__ << " con " << con << dendl;
207     reconnect();
208     return true;
209   }
210   return false;
211 }
212
213 bool MgrClient::ms_handle_refused(Connection *con)
214 {
215   // do nothing for now
216   return false;
217 }
218
219 void MgrClient::send_report()
220 {
221   assert(lock.is_locked_by_me());
222   assert(session);
223   report_callback = nullptr;
224
225   auto report = new MMgrReport();
226   auto pcc = cct->get_perfcounters_collection();
227
228   pcc->with_counters([this, report](
229         const PerfCountersCollection::CounterMap &by_path)
230   {
231     // Helper for checking whether a counter should be included
232     auto include_counter = [this](
233         const PerfCounters::perf_counter_data_any_d &ctr,
234         const PerfCounters &perf_counters)
235     {
236       return perf_counters.get_adjusted_priority(ctr.prio) >= (int)stats_threshold;
237     };
238
239     // Helper for cases where we want to forget a counter
240     auto undeclare = [report, this](const std::string &path)
241     {
242       report->undeclare_types.push_back(path);
243       ldout(cct,20) << " undeclare " << path << dendl;
244       session->declared.erase(path);
245     };
246
247     ENCODE_START(1, 1, report->packed);
248
249     // Find counters that no longer exist, and undeclare them
250     for (auto p = session->declared.begin(); p != session->declared.end(); ) {
251       const auto &path = *(p++);
252       if (by_path.count(path) == 0) {
253         undeclare(path);
254       }
255     }
256
257     for (const auto &i : by_path) {
258       auto& path = i.first;
259       auto& data = *(i.second.data);
260       auto& perf_counters = *(i.second.perf_counters);
261
262       // Find counters that still exist, but are no longer permitted by
263       // stats_threshold
264       if (!include_counter(data, perf_counters)) {
265         if (session->declared.count(path)) {
266           undeclare(path);
267         }
268         continue;
269       }
270
271       if (session->declared.count(path) == 0) {
272         ldout(cct,20) << " declare " << path << dendl;
273         PerfCounterType type;
274         type.path = path;
275         if (data.description) {
276           type.description = data.description;
277         }
278         if (data.nick) {
279           type.nick = data.nick;
280         }
281         type.type = data.type;
282         type.priority = perf_counters.get_adjusted_priority(data.prio);
283         report->declare_types.push_back(std::move(type));
284         session->declared.insert(path);
285       }
286
287       ::encode(static_cast<uint64_t>(data.u64), report->packed);
288       if (data.type & PERFCOUNTER_LONGRUNAVG) {
289         ::encode(static_cast<uint64_t>(data.avgcount), report->packed);
290         ::encode(static_cast<uint64_t>(data.avgcount2), report->packed);
291       }
292     }
293     ENCODE_FINISH(report->packed);
294
295     ldout(cct, 20) << "sending " << session->declared.size() << " counters ("
296                       "of possible " << by_path.size() << "), "
297                    << report->declare_types.size() << " new, "
298                    << report->undeclare_types.size() << " removed"
299                    << dendl;
300   });
301
302   ldout(cct, 20) << "encoded " << report->packed.length() << " bytes" << dendl;
303
304   if (daemon_name.size()) {
305     report->daemon_name = daemon_name;
306   } else {
307     report->daemon_name = cct->_conf->name.get_id();
308   }
309   report->service_name = service_name;
310
311   if (daemon_dirty_status) {
312     report->daemon_status = daemon_status;
313     daemon_dirty_status = false;
314   }
315
316   session->con->send_message(report);
317
318   if (stats_period != 0) {
319     report_callback = new FunctionContext([this](int r){send_report();});
320     timer.add_event_after(stats_period, report_callback);
321   }
322
323   send_pgstats();
324 }
325
326 void MgrClient::send_pgstats()
327 {
328   if (pgstats_cb && session) {
329     session->con->send_message(pgstats_cb());
330   }
331 }
332
333 bool MgrClient::handle_mgr_configure(MMgrConfigure *m)
334 {
335   assert(lock.is_locked_by_me());
336
337   ldout(cct, 20) << *m << dendl;
338
339   if (!session) {
340     lderr(cct) << "dropping unexpected configure message" << dendl;
341     m->put();
342     return true;
343   }
344
345   ldout(cct, 4) << "stats_period=" << m->stats_period << dendl;
346
347   if (stats_threshold != m->stats_threshold) {
348     ldout(cct, 4) << "updated stats threshold: " << m->stats_threshold << dendl;
349     stats_threshold = m->stats_threshold;
350   }
351
352   bool starting = (stats_period == 0) && (m->stats_period != 0);
353   stats_period = m->stats_period;
354   if (starting) {
355     send_report();
356   }
357
358   m->put();
359   return true;
360 }
361
362 int MgrClient::start_command(const vector<string>& cmd, const bufferlist& inbl,
363                   bufferlist *outbl, string *outs,
364                   Context *onfinish)
365 {
366   Mutex::Locker l(lock);
367
368   ldout(cct, 20) << "cmd: " << cmd << dendl;
369
370   if (map.epoch == 0) {
371     ldout(cct,20) << " no MgrMap, assuming EACCES" << dendl;
372     return -EACCES;
373   }
374
375   auto &op = command_table.start_command();
376   op.cmd = cmd;
377   op.inbl = inbl;
378   op.outbl = outbl;
379   op.outs = outs;
380   op.on_finish = onfinish;
381
382   if (session && session->con) {
383     // Leaving fsid argument null because it isn't used.
384     MCommand *m = op.get_message({});
385     session->con->send_message(m);
386   }
387   return 0;
388 }
389
390 bool MgrClient::handle_command_reply(MCommandReply *m)
391 {
392   assert(lock.is_locked_by_me());
393
394   ldout(cct, 20) << *m << dendl;
395
396   const auto tid = m->get_tid();
397   if (!command_table.exists(tid)) {
398     ldout(cct, 4) << "handle_command_reply tid " << m->get_tid()
399             << " not found" << dendl;
400     m->put();
401     return true;
402   }
403
404   auto &op = command_table.get_command(tid);
405   if (op.outbl) {
406     op.outbl->claim(m->get_data());
407   }
408
409   if (op.outs) {
410     *(op.outs) = m->rs;
411   }
412
413   if (op.on_finish) {
414     op.on_finish->complete(m->r);
415   }
416
417   command_table.erase(tid);
418
419   m->put();
420   return true;
421 }
422
423 int MgrClient::service_daemon_register(
424   const std::string& service,
425   const std::string& name,
426   const std::map<std::string,std::string>& metadata)
427 {
428   Mutex::Locker l(lock);
429   if (name == "osd" ||
430       name == "mds" ||
431       name == "client" ||
432       name == "mon" ||
433       name == "mgr") {
434     // normal ceph entity types are not allowed!
435     return -EINVAL;
436   }
437   if (service_daemon) {
438     return -EEXIST;
439   }
440   ldout(cct,1) << service << "." << name << " metadata " << metadata << dendl;
441   service_daemon = true;
442   service_name = service;
443   daemon_name = name;
444   daemon_metadata = metadata;
445   daemon_dirty_status = true;
446
447   // late register?
448   if (cct->_conf->name.is_client() && session && session->con) {
449     _send_open();
450   }
451
452   return 0;
453 }
454
455 int MgrClient::service_daemon_update_status(
456   const std::map<std::string,std::string>& status)
457 {
458   Mutex::Locker l(lock);
459   ldout(cct,10) << status << dendl;
460   daemon_status = status;
461   daemon_dirty_status = true;
462   return 0;
463 }