// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- // vim: ts=8 sw=2 smarttab /* * Ceph - scalable distributed file system * * Copyright (C) 2016 John Spray * * This is free software; you can redistribute it and/or * modify it under the terms of the GNU Lesser General Public * License version 2.1, as published by the Free Software * Foundation. See file COPYING. */ #include "DaemonState.h" #include "MgrSession.h" #define dout_context g_ceph_context #define dout_subsys ceph_subsys_mgr #undef dout_prefix #define dout_prefix *_dout << "mgr " << __func__ << " " void DaemonStateIndex::insert(DaemonStatePtr dm) { RWLock::WLocker l(lock); if (all.count(dm->key)) { _erase(dm->key); } by_server[dm->hostname][dm->key] = dm; all[dm->key] = dm; } void DaemonStateIndex::_erase(const DaemonKey& dmk) { assert(lock.is_wlocked()); const auto to_erase = all.find(dmk); assert(to_erase != all.end()); const auto dm = to_erase->second; auto &server_collection = by_server[dm->hostname]; server_collection.erase(dm->key); if (server_collection.empty()) { by_server.erase(dm->hostname); } all.erase(to_erase); } DaemonStateCollection DaemonStateIndex::get_by_service( const std::string& svc) const { RWLock::RLocker l(lock); DaemonStateCollection result; for (const auto &i : all) { if (i.first.first == svc) { result[i.first] = i.second; } } return result; } DaemonStateCollection DaemonStateIndex::get_by_server( const std::string &hostname) const { RWLock::RLocker l(lock); if (by_server.count(hostname)) { return by_server.at(hostname); } else { return {}; } } bool DaemonStateIndex::exists(const DaemonKey &key) const { RWLock::RLocker l(lock); return all.count(key) > 0; } DaemonStatePtr DaemonStateIndex::get(const DaemonKey &key) { RWLock::RLocker l(lock); auto iter = all.find(key); if (iter != all.end()) { return iter->second; } else { return nullptr; } } void DaemonStateIndex::cull(const std::string& svc_name, const std::set& names_exist) { std::vector victims; RWLock::WLocker l(lock); auto begin = all.lower_bound({svc_name, ""}); auto end = all.end(); for (auto &i = begin; i != end; ++i) { const auto& daemon_key = i->first; if (daemon_key.first != svc_name) break; if (names_exist.count(daemon_key.second) == 0) { victims.push_back(daemon_key.second); } } for (auto &i : victims) { dout(4) << "Removing data for " << i << dendl; _erase({svc_name, i}); } } void DaemonPerfCounters::update(MMgrReport *report) { dout(20) << "loading " << report->declare_types.size() << " new types, " << report->undeclare_types.size() << " old types, had " << types.size() << " types, got " << report->packed.length() << " bytes of data" << dendl; // Retrieve session state MgrSessionRef session(static_cast( report->get_connection()->get_priv())); // Load any newly declared types for (const auto &t : report->declare_types) { types.insert(std::make_pair(t.path, t)); session->declared_types.insert(t.path); } // Remove any old types for (const auto &t : report->undeclare_types) { session->declared_types.erase(t); } const auto now = ceph_clock_now(); // Parse packed data according to declared set of types bufferlist::iterator p = report->packed.begin(); DECODE_START(1, p); for (const auto &t_path : session->declared_types) { const auto &t = types.at(t_path); uint64_t val = 0; uint64_t avgcount = 0; uint64_t avgcount2 = 0; ::decode(val, p); if (t.type & PERFCOUNTER_LONGRUNAVG) { ::decode(avgcount, p); ::decode(avgcount2, p); } // TODO: interface for insertion of avgs instances[t_path].push(now, val); } DECODE_FINISH(p); } uint64_t PerfCounterInstance::get_current() const { return buffer.front().v; } void PerfCounterInstance::push(utime_t t, uint64_t const &v) { buffer.push_back({t, v}); }