1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
4 * Ceph - scalable distributed file system
6 * Copyright (C) 2016 John Spray <john.spray@redhat.com>
8 * This is free software; you can redistribute it and/or
9 * modify it under the terms of the GNU Lesser General Public
10 * License version 2.1, as published by the Free Software
11 * Foundation. See file COPYING.
15 #include "MgrClient.h"
17 #include "mgr/MgrContext.h"
19 #include "msg/Messenger.h"
20 #include "messages/MMgrMap.h"
21 #include "messages/MMgrReport.h"
22 #include "messages/MMgrOpen.h"
23 #include "messages/MMgrConfigure.h"
24 #include "messages/MCommand.h"
25 #include "messages/MCommandReply.h"
26 #include "messages/MPGStats.h"
28 #define dout_subsys ceph_subsys_mgrc
30 #define dout_prefix *_dout << "mgrc " << __func__ << " "
32 MgrClient::MgrClient(CephContext *cct_, Messenger *msgr_)
33 : Dispatcher(cct_), cct(cct_), msgr(msgr_),
36 assert(cct != nullptr);
39 void MgrClient::init()
41 Mutex::Locker l(lock);
43 assert(msgr != nullptr);
48 void MgrClient::shutdown()
50 Mutex::Locker l(lock);
52 if (connect_retry_callback) {
53 timer.cancel_event(connect_retry_callback);
54 connect_retry_callback = nullptr;
57 // forget about in-flight commands if we are prematurely shut down
58 // (e.g., by control-C)
59 command_table.clear();
63 session->con->mark_down();
68 bool MgrClient::ms_dispatch(Message *m)
70 Mutex::Locker l(lock);
72 switch(m->get_type()) {
74 return handle_mgr_map(static_cast<MMgrMap*>(m));
75 case MSG_MGR_CONFIGURE:
76 return handle_mgr_configure(static_cast<MMgrConfigure*>(m));
77 case MSG_COMMAND_REPLY:
78 if (m->get_source().type() == CEPH_ENTITY_TYPE_MGR) {
79 handle_command_reply(static_cast<MCommandReply*>(m));
85 ldout(cct, 30) << "Not handling " << *m << dendl;
90 void MgrClient::reconnect()
92 assert(lock.is_locked_by_me());
95 ldout(cct, 4) << "Terminating session with "
96 << session->con->get_peer_addr() << dendl;
97 session->con->mark_down();
100 if (report_callback != nullptr) {
101 timer.cancel_event(report_callback);
102 report_callback = nullptr;
106 if (!map.get_available()) {
107 ldout(cct, 4) << "No active mgr available yet" << dendl;
111 if (last_connect_attempt != utime_t()) {
112 utime_t now = ceph_clock_now();
113 utime_t when = last_connect_attempt;
114 when += cct->_conf->get_val<double>("mgr_connect_retry_interval");
116 if (!connect_retry_callback) {
117 connect_retry_callback = timer.add_event_at(
119 new FunctionContext([this](int r){
120 connect_retry_callback = nullptr;
124 ldout(cct, 4) << "waiting to retry connect until " << when << dendl;
129 if (connect_retry_callback) {
130 timer.cancel_event(connect_retry_callback);
131 connect_retry_callback = nullptr;
134 ldout(cct, 4) << "Starting new session with " << map.get_active_addr()
137 inst.addr = map.get_active_addr();
138 inst.name = entity_name_t::MGR(map.get_active_gid());
139 last_connect_attempt = ceph_clock_now();
141 session.reset(new MgrSessionState());
142 session->con = msgr->get_connection(inst);
144 if (service_daemon) {
145 daemon_dirty_status = true;
148 // Don't send an open if we're just a client (i.e. doing
149 // command-sending, not stats etc)
150 if (!cct->_conf->name.is_client() || service_daemon) {
154 // resend any pending commands
155 for (const auto &p : command_table.get_commands()) {
156 MCommand *m = p.second.get_message({});
158 assert(session->con);
159 session->con->send_message(m);
163 void MgrClient::_send_open()
165 if (session && session->con) {
166 auto open = new MMgrOpen();
167 if (!service_name.empty()) {
168 open->service_name = service_name;
169 open->daemon_name = daemon_name;
171 open->daemon_name = cct->_conf->name.get_id();
173 if (service_daemon) {
174 open->service_daemon = service_daemon;
175 open->daemon_metadata = daemon_metadata;
177 session->con->send_message(open);
181 bool MgrClient::handle_mgr_map(MMgrMap *m)
183 assert(lock.is_locked_by_me());
185 ldout(cct, 20) << *m << dendl;
188 ldout(cct, 4) << "Got map version " << map.epoch << dendl;
191 ldout(cct, 4) << "Active mgr is now " << map.get_active_addr() << dendl;
195 session->con->get_peer_addr() != map.get_active_addr()) {
202 bool MgrClient::ms_handle_reset(Connection *con)
204 Mutex::Locker l(lock);
205 if (session && con == session->con) {
206 ldout(cct, 4) << __func__ << " con " << con << dendl;
213 bool MgrClient::ms_handle_refused(Connection *con)
215 // do nothing for now
219 void MgrClient::send_report()
221 assert(lock.is_locked_by_me());
223 report_callback = nullptr;
225 auto report = new MMgrReport();
226 auto pcc = cct->get_perfcounters_collection();
228 pcc->with_counters([this, report](
229 const PerfCountersCollection::CounterMap &by_path)
231 // Helper for checking whether a counter should be included
232 auto include_counter = [this](
233 const PerfCounters::perf_counter_data_any_d &ctr,
234 const PerfCounters &perf_counters)
236 return perf_counters.get_adjusted_priority(ctr.prio) >= (int)stats_threshold;
239 // Helper for cases where we want to forget a counter
240 auto undeclare = [report, this](const std::string &path)
242 report->undeclare_types.push_back(path);
243 ldout(cct,20) << " undeclare " << path << dendl;
244 session->declared.erase(path);
247 ENCODE_START(1, 1, report->packed);
249 // Find counters that no longer exist, and undeclare them
250 for (auto p = session->declared.begin(); p != session->declared.end(); ) {
251 const auto &path = *(p++);
252 if (by_path.count(path) == 0) {
257 for (const auto &i : by_path) {
258 auto& path = i.first;
259 auto& data = *(i.second.data);
260 auto& perf_counters = *(i.second.perf_counters);
262 // Find counters that still exist, but are no longer permitted by
264 if (!include_counter(data, perf_counters)) {
265 if (session->declared.count(path)) {
271 if (session->declared.count(path) == 0) {
272 ldout(cct,20) << " declare " << path << dendl;
273 PerfCounterType type;
275 if (data.description) {
276 type.description = data.description;
279 type.nick = data.nick;
281 type.type = data.type;
282 type.priority = perf_counters.get_adjusted_priority(data.prio);
283 report->declare_types.push_back(std::move(type));
284 session->declared.insert(path);
287 ::encode(static_cast<uint64_t>(data.u64), report->packed);
288 if (data.type & PERFCOUNTER_LONGRUNAVG) {
289 ::encode(static_cast<uint64_t>(data.avgcount), report->packed);
290 ::encode(static_cast<uint64_t>(data.avgcount2), report->packed);
293 ENCODE_FINISH(report->packed);
295 ldout(cct, 20) << "sending " << session->declared.size() << " counters ("
296 "of possible " << by_path.size() << "), "
297 << report->declare_types.size() << " new, "
298 << report->undeclare_types.size() << " removed"
302 ldout(cct, 20) << "encoded " << report->packed.length() << " bytes" << dendl;
304 if (daemon_name.size()) {
305 report->daemon_name = daemon_name;
307 report->daemon_name = cct->_conf->name.get_id();
309 report->service_name = service_name;
311 if (daemon_dirty_status) {
312 report->daemon_status = daemon_status;
313 daemon_dirty_status = false;
316 session->con->send_message(report);
318 if (stats_period != 0) {
319 report_callback = new FunctionContext([this](int r){send_report();});
320 timer.add_event_after(stats_period, report_callback);
326 void MgrClient::send_pgstats()
328 if (pgstats_cb && session) {
329 session->con->send_message(pgstats_cb());
333 bool MgrClient::handle_mgr_configure(MMgrConfigure *m)
335 assert(lock.is_locked_by_me());
337 ldout(cct, 20) << *m << dendl;
340 lderr(cct) << "dropping unexpected configure message" << dendl;
345 ldout(cct, 4) << "stats_period=" << m->stats_period << dendl;
347 if (stats_threshold != m->stats_threshold) {
348 ldout(cct, 4) << "updated stats threshold: " << m->stats_threshold << dendl;
349 stats_threshold = m->stats_threshold;
352 bool starting = (stats_period == 0) && (m->stats_period != 0);
353 stats_period = m->stats_period;
362 int MgrClient::start_command(const vector<string>& cmd, const bufferlist& inbl,
363 bufferlist *outbl, string *outs,
366 Mutex::Locker l(lock);
368 ldout(cct, 20) << "cmd: " << cmd << dendl;
370 if (map.epoch == 0) {
371 ldout(cct,20) << " no MgrMap, assuming EACCES" << dendl;
375 auto &op = command_table.start_command();
380 op.on_finish = onfinish;
382 if (session && session->con) {
383 // Leaving fsid argument null because it isn't used.
384 MCommand *m = op.get_message({});
385 session->con->send_message(m);
390 bool MgrClient::handle_command_reply(MCommandReply *m)
392 assert(lock.is_locked_by_me());
394 ldout(cct, 20) << *m << dendl;
396 const auto tid = m->get_tid();
397 if (!command_table.exists(tid)) {
398 ldout(cct, 4) << "handle_command_reply tid " << m->get_tid()
399 << " not found" << dendl;
404 auto &op = command_table.get_command(tid);
406 op.outbl->claim(m->get_data());
414 op.on_finish->complete(m->r);
417 command_table.erase(tid);
423 int MgrClient::service_daemon_register(
424 const std::string& service,
425 const std::string& name,
426 const std::map<std::string,std::string>& metadata)
428 Mutex::Locker l(lock);
434 // normal ceph entity types are not allowed!
437 if (service_daemon) {
440 ldout(cct,1) << service << "." << name << " metadata " << metadata << dendl;
441 service_daemon = true;
442 service_name = service;
444 daemon_metadata = metadata;
445 daemon_dirty_status = true;
448 if (cct->_conf->name.is_client() && session && session->con) {
455 int MgrClient::service_daemon_update_status(
456 const std::map<std::string,std::string>& status)
458 Mutex::Locker l(lock);
459 ldout(cct,10) << status << dendl;
460 daemon_status = status;
461 daemon_dirty_status = true;