X-Git-Url: https://gerrit.opnfv.org/gerrit/gitweb?a=blobdiff_plain;f=src%2Fceph%2Fsrc%2Fmgr%2FMgrClient.cc;fp=src%2Fceph%2Fsrc%2Fmgr%2FMgrClient.cc;h=0000000000000000000000000000000000000000;hb=7da45d65be36d36b880cc55c5036e96c24b53f00;hp=c72470d9bca5a93644bcc2d536853bf8ae9cd4f0;hpb=691462d09d0987b47e112d6ee8740375df3c51b2;p=stor4nfv.git diff --git a/src/ceph/src/mgr/MgrClient.cc b/src/ceph/src/mgr/MgrClient.cc deleted file mode 100644 index c72470d..0000000 --- a/src/ceph/src/mgr/MgrClient.cc +++ /dev/null @@ -1,463 +0,0 @@ -// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- -// vim: ts=8 sw=2 smarttab -/* - * Ceph - scalable distributed file system - * - * Copyright (C) 2016 John Spray - * - * This is free software; you can redistribute it and/or - * modify it under the terms of the GNU Lesser General Public - * License version 2.1, as published by the Free Software - * Foundation. See file COPYING. - */ - - -#include "MgrClient.h" - -#include "mgr/MgrContext.h" - -#include "msg/Messenger.h" -#include "messages/MMgrMap.h" -#include "messages/MMgrReport.h" -#include "messages/MMgrOpen.h" -#include "messages/MMgrConfigure.h" -#include "messages/MCommand.h" -#include "messages/MCommandReply.h" -#include "messages/MPGStats.h" - -#define dout_subsys ceph_subsys_mgrc -#undef dout_prefix -#define dout_prefix *_dout << "mgrc " << __func__ << " " - -MgrClient::MgrClient(CephContext *cct_, Messenger *msgr_) - : Dispatcher(cct_), cct(cct_), msgr(msgr_), - timer(cct_, lock) -{ - assert(cct != nullptr); -} - -void MgrClient::init() -{ - Mutex::Locker l(lock); - - assert(msgr != nullptr); - - timer.init(); -} - -void MgrClient::shutdown() -{ - Mutex::Locker l(lock); - - if (connect_retry_callback) { - timer.cancel_event(connect_retry_callback); - connect_retry_callback = nullptr; - } - - // forget about in-flight commands if we are prematurely shut down - // (e.g., by control-C) - command_table.clear(); - - timer.shutdown(); - if (session) { - session->con->mark_down(); - session.reset(); - } -} - -bool MgrClient::ms_dispatch(Message *m) -{ - Mutex::Locker l(lock); - - switch(m->get_type()) { - case MSG_MGR_MAP: - return handle_mgr_map(static_cast(m)); - case MSG_MGR_CONFIGURE: - return handle_mgr_configure(static_cast(m)); - case MSG_COMMAND_REPLY: - if (m->get_source().type() == CEPH_ENTITY_TYPE_MGR) { - handle_command_reply(static_cast(m)); - return true; - } else { - return false; - } - default: - ldout(cct, 30) << "Not handling " << *m << dendl; - return false; - } -} - -void MgrClient::reconnect() -{ - assert(lock.is_locked_by_me()); - - if (session) { - ldout(cct, 4) << "Terminating session with " - << session->con->get_peer_addr() << dendl; - session->con->mark_down(); - session.reset(); - stats_period = 0; - if (report_callback != nullptr) { - timer.cancel_event(report_callback); - report_callback = nullptr; - } - } - - if (!map.get_available()) { - ldout(cct, 4) << "No active mgr available yet" << dendl; - return; - } - - if (last_connect_attempt != utime_t()) { - utime_t now = ceph_clock_now(); - utime_t when = last_connect_attempt; - when += cct->_conf->get_val("mgr_connect_retry_interval"); - if (now < when) { - if (!connect_retry_callback) { - connect_retry_callback = timer.add_event_at( - when, - new FunctionContext([this](int r){ - connect_retry_callback = nullptr; - reconnect(); - })); - } - ldout(cct, 4) << "waiting to retry connect until " << when << dendl; - return; - } - } - - if (connect_retry_callback) { - timer.cancel_event(connect_retry_callback); - connect_retry_callback = nullptr; - } - - ldout(cct, 4) << "Starting new session with " << map.get_active_addr() - << dendl; - entity_inst_t inst; - inst.addr = map.get_active_addr(); - inst.name = entity_name_t::MGR(map.get_active_gid()); - last_connect_attempt = ceph_clock_now(); - - session.reset(new MgrSessionState()); - session->con = msgr->get_connection(inst); - - if (service_daemon) { - daemon_dirty_status = true; - } - - // Don't send an open if we're just a client (i.e. doing - // command-sending, not stats etc) - if (!cct->_conf->name.is_client() || service_daemon) { - _send_open(); - } - - // resend any pending commands - for (const auto &p : command_table.get_commands()) { - MCommand *m = p.second.get_message({}); - assert(session); - assert(session->con); - session->con->send_message(m); - } -} - -void MgrClient::_send_open() -{ - if (session && session->con) { - auto open = new MMgrOpen(); - if (!service_name.empty()) { - open->service_name = service_name; - open->daemon_name = daemon_name; - } else { - open->daemon_name = cct->_conf->name.get_id(); - } - if (service_daemon) { - open->service_daemon = service_daemon; - open->daemon_metadata = daemon_metadata; - } - session->con->send_message(open); - } -} - -bool MgrClient::handle_mgr_map(MMgrMap *m) -{ - assert(lock.is_locked_by_me()); - - ldout(cct, 20) << *m << dendl; - - map = m->get_map(); - ldout(cct, 4) << "Got map version " << map.epoch << dendl; - m->put(); - - ldout(cct, 4) << "Active mgr is now " << map.get_active_addr() << dendl; - - // Reset session? - if (!session || - session->con->get_peer_addr() != map.get_active_addr()) { - reconnect(); - } - - return true; -} - -bool MgrClient::ms_handle_reset(Connection *con) -{ - Mutex::Locker l(lock); - if (session && con == session->con) { - ldout(cct, 4) << __func__ << " con " << con << dendl; - reconnect(); - return true; - } - return false; -} - -bool MgrClient::ms_handle_refused(Connection *con) -{ - // do nothing for now - return false; -} - -void MgrClient::send_report() -{ - assert(lock.is_locked_by_me()); - assert(session); - report_callback = nullptr; - - auto report = new MMgrReport(); - auto pcc = cct->get_perfcounters_collection(); - - pcc->with_counters([this, report]( - const PerfCountersCollection::CounterMap &by_path) - { - // Helper for checking whether a counter should be included - auto include_counter = [this]( - const PerfCounters::perf_counter_data_any_d &ctr, - const PerfCounters &perf_counters) - { - return perf_counters.get_adjusted_priority(ctr.prio) >= (int)stats_threshold; - }; - - // Helper for cases where we want to forget a counter - auto undeclare = [report, this](const std::string &path) - { - report->undeclare_types.push_back(path); - ldout(cct,20) << " undeclare " << path << dendl; - session->declared.erase(path); - }; - - ENCODE_START(1, 1, report->packed); - - // Find counters that no longer exist, and undeclare them - for (auto p = session->declared.begin(); p != session->declared.end(); ) { - const auto &path = *(p++); - if (by_path.count(path) == 0) { - undeclare(path); - } - } - - for (const auto &i : by_path) { - auto& path = i.first; - auto& data = *(i.second.data); - auto& perf_counters = *(i.second.perf_counters); - - // Find counters that still exist, but are no longer permitted by - // stats_threshold - if (!include_counter(data, perf_counters)) { - if (session->declared.count(path)) { - undeclare(path); - } - continue; - } - - if (session->declared.count(path) == 0) { - ldout(cct,20) << " declare " << path << dendl; - PerfCounterType type; - type.path = path; - if (data.description) { - type.description = data.description; - } - if (data.nick) { - type.nick = data.nick; - } - type.type = data.type; - type.priority = perf_counters.get_adjusted_priority(data.prio); - report->declare_types.push_back(std::move(type)); - session->declared.insert(path); - } - - ::encode(static_cast(data.u64), report->packed); - if (data.type & PERFCOUNTER_LONGRUNAVG) { - ::encode(static_cast(data.avgcount), report->packed); - ::encode(static_cast(data.avgcount2), report->packed); - } - } - ENCODE_FINISH(report->packed); - - ldout(cct, 20) << "sending " << session->declared.size() << " counters (" - "of possible " << by_path.size() << "), " - << report->declare_types.size() << " new, " - << report->undeclare_types.size() << " removed" - << dendl; - }); - - ldout(cct, 20) << "encoded " << report->packed.length() << " bytes" << dendl; - - if (daemon_name.size()) { - report->daemon_name = daemon_name; - } else { - report->daemon_name = cct->_conf->name.get_id(); - } - report->service_name = service_name; - - if (daemon_dirty_status) { - report->daemon_status = daemon_status; - daemon_dirty_status = false; - } - - session->con->send_message(report); - - if (stats_period != 0) { - report_callback = new FunctionContext([this](int r){send_report();}); - timer.add_event_after(stats_period, report_callback); - } - - send_pgstats(); -} - -void MgrClient::send_pgstats() -{ - if (pgstats_cb && session) { - session->con->send_message(pgstats_cb()); - } -} - -bool MgrClient::handle_mgr_configure(MMgrConfigure *m) -{ - assert(lock.is_locked_by_me()); - - ldout(cct, 20) << *m << dendl; - - if (!session) { - lderr(cct) << "dropping unexpected configure message" << dendl; - m->put(); - return true; - } - - ldout(cct, 4) << "stats_period=" << m->stats_period << dendl; - - if (stats_threshold != m->stats_threshold) { - ldout(cct, 4) << "updated stats threshold: " << m->stats_threshold << dendl; - stats_threshold = m->stats_threshold; - } - - bool starting = (stats_period == 0) && (m->stats_period != 0); - stats_period = m->stats_period; - if (starting) { - send_report(); - } - - m->put(); - return true; -} - -int MgrClient::start_command(const vector& cmd, const bufferlist& inbl, - bufferlist *outbl, string *outs, - Context *onfinish) -{ - Mutex::Locker l(lock); - - ldout(cct, 20) << "cmd: " << cmd << dendl; - - if (map.epoch == 0) { - ldout(cct,20) << " no MgrMap, assuming EACCES" << dendl; - return -EACCES; - } - - auto &op = command_table.start_command(); - op.cmd = cmd; - op.inbl = inbl; - op.outbl = outbl; - op.outs = outs; - op.on_finish = onfinish; - - if (session && session->con) { - // Leaving fsid argument null because it isn't used. - MCommand *m = op.get_message({}); - session->con->send_message(m); - } - return 0; -} - -bool MgrClient::handle_command_reply(MCommandReply *m) -{ - assert(lock.is_locked_by_me()); - - ldout(cct, 20) << *m << dendl; - - const auto tid = m->get_tid(); - if (!command_table.exists(tid)) { - ldout(cct, 4) << "handle_command_reply tid " << m->get_tid() - << " not found" << dendl; - m->put(); - return true; - } - - auto &op = command_table.get_command(tid); - if (op.outbl) { - op.outbl->claim(m->get_data()); - } - - if (op.outs) { - *(op.outs) = m->rs; - } - - if (op.on_finish) { - op.on_finish->complete(m->r); - } - - command_table.erase(tid); - - m->put(); - return true; -} - -int MgrClient::service_daemon_register( - const std::string& service, - const std::string& name, - const std::map& metadata) -{ - Mutex::Locker l(lock); - if (name == "osd" || - name == "mds" || - name == "client" || - name == "mon" || - name == "mgr") { - // normal ceph entity types are not allowed! - return -EINVAL; - } - if (service_daemon) { - return -EEXIST; - } - ldout(cct,1) << service << "." << name << " metadata " << metadata << dendl; - service_daemon = true; - service_name = service; - daemon_name = name; - daemon_metadata = metadata; - daemon_dirty_status = true; - - // late register? - if (cct->_conf->name.is_client() && session && session->con) { - _send_open(); - } - - return 0; -} - -int MgrClient::service_daemon_update_status( - const std::map& status) -{ - Mutex::Locker l(lock); - ldout(cct,10) << status << dendl; - daemon_status = status; - daemon_dirty_status = true; - return 0; -}