X-Git-Url: https://gerrit.opnfv.org/gerrit/gitweb?a=blobdiff_plain;f=src%2Fceph%2Fsrc%2Fmgr%2FMgrClient.cc;fp=src%2Fceph%2Fsrc%2Fmgr%2FMgrClient.cc;h=c72470d9bca5a93644bcc2d536853bf8ae9cd4f0;hb=812ff6ca9fcd3e629e49d4328905f33eee8ca3f5;hp=0000000000000000000000000000000000000000;hpb=15280273faafb77777eab341909a3f495cf248d9;p=stor4nfv.git diff --git a/src/ceph/src/mgr/MgrClient.cc b/src/ceph/src/mgr/MgrClient.cc new file mode 100644 index 0000000..c72470d --- /dev/null +++ b/src/ceph/src/mgr/MgrClient.cc @@ -0,0 +1,463 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab +/* + * Ceph - scalable distributed file system + * + * Copyright (C) 2016 John Spray + * + * This is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License version 2.1, as published by the Free Software + * Foundation. See file COPYING. + */ + + +#include "MgrClient.h" + +#include "mgr/MgrContext.h" + +#include "msg/Messenger.h" +#include "messages/MMgrMap.h" +#include "messages/MMgrReport.h" +#include "messages/MMgrOpen.h" +#include "messages/MMgrConfigure.h" +#include "messages/MCommand.h" +#include "messages/MCommandReply.h" +#include "messages/MPGStats.h" + +#define dout_subsys ceph_subsys_mgrc +#undef dout_prefix +#define dout_prefix *_dout << "mgrc " << __func__ << " " + +MgrClient::MgrClient(CephContext *cct_, Messenger *msgr_) + : Dispatcher(cct_), cct(cct_), msgr(msgr_), + timer(cct_, lock) +{ + assert(cct != nullptr); +} + +void MgrClient::init() +{ + Mutex::Locker l(lock); + + assert(msgr != nullptr); + + timer.init(); +} + +void MgrClient::shutdown() +{ + Mutex::Locker l(lock); + + if (connect_retry_callback) { + timer.cancel_event(connect_retry_callback); + connect_retry_callback = nullptr; + } + + // forget about in-flight commands if we are prematurely shut down + // (e.g., by control-C) + command_table.clear(); + + timer.shutdown(); + if (session) { + session->con->mark_down(); + session.reset(); + } +} + +bool MgrClient::ms_dispatch(Message *m) +{ + Mutex::Locker l(lock); + + switch(m->get_type()) { + case MSG_MGR_MAP: + return handle_mgr_map(static_cast(m)); + case MSG_MGR_CONFIGURE: + return handle_mgr_configure(static_cast(m)); + case MSG_COMMAND_REPLY: + if (m->get_source().type() == CEPH_ENTITY_TYPE_MGR) { + handle_command_reply(static_cast(m)); + return true; + } else { + return false; + } + default: + ldout(cct, 30) << "Not handling " << *m << dendl; + return false; + } +} + +void MgrClient::reconnect() +{ + assert(lock.is_locked_by_me()); + + if (session) { + ldout(cct, 4) << "Terminating session with " + << session->con->get_peer_addr() << dendl; + session->con->mark_down(); + session.reset(); + stats_period = 0; + if (report_callback != nullptr) { + timer.cancel_event(report_callback); + report_callback = nullptr; + } + } + + if (!map.get_available()) { + ldout(cct, 4) << "No active mgr available yet" << dendl; + return; + } + + if (last_connect_attempt != utime_t()) { + utime_t now = ceph_clock_now(); + utime_t when = last_connect_attempt; + when += cct->_conf->get_val("mgr_connect_retry_interval"); + if (now < when) { + if (!connect_retry_callback) { + connect_retry_callback = timer.add_event_at( + when, + new FunctionContext([this](int r){ + connect_retry_callback = nullptr; + reconnect(); + })); + } + ldout(cct, 4) << "waiting to retry connect until " << when << dendl; + return; + } + } + + if (connect_retry_callback) { + timer.cancel_event(connect_retry_callback); + connect_retry_callback = nullptr; + } + + ldout(cct, 4) << "Starting new session with " << map.get_active_addr() + << dendl; + entity_inst_t inst; + inst.addr = map.get_active_addr(); + inst.name = entity_name_t::MGR(map.get_active_gid()); + last_connect_attempt = ceph_clock_now(); + + session.reset(new MgrSessionState()); + session->con = msgr->get_connection(inst); + + if (service_daemon) { + daemon_dirty_status = true; + } + + // Don't send an open if we're just a client (i.e. doing + // command-sending, not stats etc) + if (!cct->_conf->name.is_client() || service_daemon) { + _send_open(); + } + + // resend any pending commands + for (const auto &p : command_table.get_commands()) { + MCommand *m = p.second.get_message({}); + assert(session); + assert(session->con); + session->con->send_message(m); + } +} + +void MgrClient::_send_open() +{ + if (session && session->con) { + auto open = new MMgrOpen(); + if (!service_name.empty()) { + open->service_name = service_name; + open->daemon_name = daemon_name; + } else { + open->daemon_name = cct->_conf->name.get_id(); + } + if (service_daemon) { + open->service_daemon = service_daemon; + open->daemon_metadata = daemon_metadata; + } + session->con->send_message(open); + } +} + +bool MgrClient::handle_mgr_map(MMgrMap *m) +{ + assert(lock.is_locked_by_me()); + + ldout(cct, 20) << *m << dendl; + + map = m->get_map(); + ldout(cct, 4) << "Got map version " << map.epoch << dendl; + m->put(); + + ldout(cct, 4) << "Active mgr is now " << map.get_active_addr() << dendl; + + // Reset session? + if (!session || + session->con->get_peer_addr() != map.get_active_addr()) { + reconnect(); + } + + return true; +} + +bool MgrClient::ms_handle_reset(Connection *con) +{ + Mutex::Locker l(lock); + if (session && con == session->con) { + ldout(cct, 4) << __func__ << " con " << con << dendl; + reconnect(); + return true; + } + return false; +} + +bool MgrClient::ms_handle_refused(Connection *con) +{ + // do nothing for now + return false; +} + +void MgrClient::send_report() +{ + assert(lock.is_locked_by_me()); + assert(session); + report_callback = nullptr; + + auto report = new MMgrReport(); + auto pcc = cct->get_perfcounters_collection(); + + pcc->with_counters([this, report]( + const PerfCountersCollection::CounterMap &by_path) + { + // Helper for checking whether a counter should be included + auto include_counter = [this]( + const PerfCounters::perf_counter_data_any_d &ctr, + const PerfCounters &perf_counters) + { + return perf_counters.get_adjusted_priority(ctr.prio) >= (int)stats_threshold; + }; + + // Helper for cases where we want to forget a counter + auto undeclare = [report, this](const std::string &path) + { + report->undeclare_types.push_back(path); + ldout(cct,20) << " undeclare " << path << dendl; + session->declared.erase(path); + }; + + ENCODE_START(1, 1, report->packed); + + // Find counters that no longer exist, and undeclare them + for (auto p = session->declared.begin(); p != session->declared.end(); ) { + const auto &path = *(p++); + if (by_path.count(path) == 0) { + undeclare(path); + } + } + + for (const auto &i : by_path) { + auto& path = i.first; + auto& data = *(i.second.data); + auto& perf_counters = *(i.second.perf_counters); + + // Find counters that still exist, but are no longer permitted by + // stats_threshold + if (!include_counter(data, perf_counters)) { + if (session->declared.count(path)) { + undeclare(path); + } + continue; + } + + if (session->declared.count(path) == 0) { + ldout(cct,20) << " declare " << path << dendl; + PerfCounterType type; + type.path = path; + if (data.description) { + type.description = data.description; + } + if (data.nick) { + type.nick = data.nick; + } + type.type = data.type; + type.priority = perf_counters.get_adjusted_priority(data.prio); + report->declare_types.push_back(std::move(type)); + session->declared.insert(path); + } + + ::encode(static_cast(data.u64), report->packed); + if (data.type & PERFCOUNTER_LONGRUNAVG) { + ::encode(static_cast(data.avgcount), report->packed); + ::encode(static_cast(data.avgcount2), report->packed); + } + } + ENCODE_FINISH(report->packed); + + ldout(cct, 20) << "sending " << session->declared.size() << " counters (" + "of possible " << by_path.size() << "), " + << report->declare_types.size() << " new, " + << report->undeclare_types.size() << " removed" + << dendl; + }); + + ldout(cct, 20) << "encoded " << report->packed.length() << " bytes" << dendl; + + if (daemon_name.size()) { + report->daemon_name = daemon_name; + } else { + report->daemon_name = cct->_conf->name.get_id(); + } + report->service_name = service_name; + + if (daemon_dirty_status) { + report->daemon_status = daemon_status; + daemon_dirty_status = false; + } + + session->con->send_message(report); + + if (stats_period != 0) { + report_callback = new FunctionContext([this](int r){send_report();}); + timer.add_event_after(stats_period, report_callback); + } + + send_pgstats(); +} + +void MgrClient::send_pgstats() +{ + if (pgstats_cb && session) { + session->con->send_message(pgstats_cb()); + } +} + +bool MgrClient::handle_mgr_configure(MMgrConfigure *m) +{ + assert(lock.is_locked_by_me()); + + ldout(cct, 20) << *m << dendl; + + if (!session) { + lderr(cct) << "dropping unexpected configure message" << dendl; + m->put(); + return true; + } + + ldout(cct, 4) << "stats_period=" << m->stats_period << dendl; + + if (stats_threshold != m->stats_threshold) { + ldout(cct, 4) << "updated stats threshold: " << m->stats_threshold << dendl; + stats_threshold = m->stats_threshold; + } + + bool starting = (stats_period == 0) && (m->stats_period != 0); + stats_period = m->stats_period; + if (starting) { + send_report(); + } + + m->put(); + return true; +} + +int MgrClient::start_command(const vector& cmd, const bufferlist& inbl, + bufferlist *outbl, string *outs, + Context *onfinish) +{ + Mutex::Locker l(lock); + + ldout(cct, 20) << "cmd: " << cmd << dendl; + + if (map.epoch == 0) { + ldout(cct,20) << " no MgrMap, assuming EACCES" << dendl; + return -EACCES; + } + + auto &op = command_table.start_command(); + op.cmd = cmd; + op.inbl = inbl; + op.outbl = outbl; + op.outs = outs; + op.on_finish = onfinish; + + if (session && session->con) { + // Leaving fsid argument null because it isn't used. + MCommand *m = op.get_message({}); + session->con->send_message(m); + } + return 0; +} + +bool MgrClient::handle_command_reply(MCommandReply *m) +{ + assert(lock.is_locked_by_me()); + + ldout(cct, 20) << *m << dendl; + + const auto tid = m->get_tid(); + if (!command_table.exists(tid)) { + ldout(cct, 4) << "handle_command_reply tid " << m->get_tid() + << " not found" << dendl; + m->put(); + return true; + } + + auto &op = command_table.get_command(tid); + if (op.outbl) { + op.outbl->claim(m->get_data()); + } + + if (op.outs) { + *(op.outs) = m->rs; + } + + if (op.on_finish) { + op.on_finish->complete(m->r); + } + + command_table.erase(tid); + + m->put(); + return true; +} + +int MgrClient::service_daemon_register( + const std::string& service, + const std::string& name, + const std::map& metadata) +{ + Mutex::Locker l(lock); + if (name == "osd" || + name == "mds" || + name == "client" || + name == "mon" || + name == "mgr") { + // normal ceph entity types are not allowed! + return -EINVAL; + } + if (service_daemon) { + return -EEXIST; + } + ldout(cct,1) << service << "." << name << " metadata " << metadata << dendl; + service_daemon = true; + service_name = service; + daemon_name = name; + daemon_metadata = metadata; + daemon_dirty_status = true; + + // late register? + if (cct->_conf->name.is_client() && session && session->con) { + _send_open(); + } + + return 0; +} + +int MgrClient::service_daemon_update_status( + const std::map& status) +{ + Mutex::Locker l(lock); + ldout(cct,10) << status << dendl; + daemon_status = status; + daemon_dirty_status = true; + return 0; +}