Fix some bugs when testing opensds ansible
[stor4nfv.git] / src / ceph / src / mgr / DaemonState.cc
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
3 /*
4  * Ceph - scalable distributed file system
5  *
6  * Copyright (C) 2016 John Spray <john.spray@redhat.com>
7  *
8  * This is free software; you can redistribute it and/or
9  * modify it under the terms of the GNU Lesser General Public
10  * License version 2.1, as published by the Free Software
11  * Foundation.  See file COPYING.
12  */
13
14 #include "DaemonState.h"
15
16 #include "MgrSession.h"
17
18 #define dout_context g_ceph_context
19 #define dout_subsys ceph_subsys_mgr
20 #undef dout_prefix
21 #define dout_prefix *_dout << "mgr " << __func__ << " "
22
23 void DaemonStateIndex::insert(DaemonStatePtr dm)
24 {
25   RWLock::WLocker l(lock);
26
27   if (all.count(dm->key)) {
28     _erase(dm->key);
29   }
30
31   by_server[dm->hostname][dm->key] = dm;
32   all[dm->key] = dm;
33 }
34
35 void DaemonStateIndex::_erase(const DaemonKey& dmk)
36 {
37   assert(lock.is_wlocked());
38
39   const auto to_erase = all.find(dmk);
40   assert(to_erase != all.end());
41   const auto dm = to_erase->second;
42   auto &server_collection = by_server[dm->hostname];
43   server_collection.erase(dm->key);
44   if (server_collection.empty()) {
45     by_server.erase(dm->hostname);
46   }
47
48   all.erase(to_erase);
49 }
50
51 DaemonStateCollection DaemonStateIndex::get_by_service(
52   const std::string& svc) const
53 {
54   RWLock::RLocker l(lock);
55
56   DaemonStateCollection result;
57
58   for (const auto &i : all) {
59     if (i.first.first == svc) {
60       result[i.first] = i.second;
61     }
62   }
63
64   return result;
65 }
66
67 DaemonStateCollection DaemonStateIndex::get_by_server(
68   const std::string &hostname) const
69 {
70   RWLock::RLocker l(lock);
71
72   if (by_server.count(hostname)) {
73     return by_server.at(hostname);
74   } else {
75     return {};
76   }
77 }
78
79 bool DaemonStateIndex::exists(const DaemonKey &key) const
80 {
81   RWLock::RLocker l(lock);
82
83   return all.count(key) > 0;
84 }
85
86 DaemonStatePtr DaemonStateIndex::get(const DaemonKey &key)
87 {
88   RWLock::RLocker l(lock);
89
90   auto iter = all.find(key);
91   if (iter != all.end()) {
92     return iter->second;
93   } else {
94     return nullptr;
95   }
96 }
97
98 void DaemonStateIndex::cull(const std::string& svc_name,
99                             const std::set<std::string>& names_exist)
100 {
101   std::vector<string> victims;
102
103   RWLock::WLocker l(lock);
104   auto begin = all.lower_bound({svc_name, ""});
105   auto end = all.end();
106   for (auto &i = begin; i != end; ++i) {
107     const auto& daemon_key = i->first;
108     if (daemon_key.first != svc_name)
109       break;
110     if (names_exist.count(daemon_key.second) == 0) {
111       victims.push_back(daemon_key.second);
112     }
113   }
114
115   for (auto &i : victims) {
116     dout(4) << "Removing data for " << i << dendl;
117     _erase({svc_name, i});
118   }
119 }
120
121 void DaemonPerfCounters::update(MMgrReport *report)
122 {
123   dout(20) << "loading " << report->declare_types.size() << " new types, "
124            << report->undeclare_types.size() << " old types, had "
125            << types.size() << " types, got "
126            << report->packed.length() << " bytes of data" << dendl;
127
128   // Retrieve session state
129   MgrSessionRef session(static_cast<MgrSession*>(
130         report->get_connection()->get_priv()));
131
132   // Load any newly declared types
133   for (const auto &t : report->declare_types) {
134     types.insert(std::make_pair(t.path, t));
135     session->declared_types.insert(t.path);
136   }
137   // Remove any old types
138   for (const auto &t : report->undeclare_types) {
139     session->declared_types.erase(t);
140   }
141
142   const auto now = ceph_clock_now();
143
144   // Parse packed data according to declared set of types
145   bufferlist::iterator p = report->packed.begin();
146   DECODE_START(1, p);
147   for (const auto &t_path : session->declared_types) {
148     const auto &t = types.at(t_path);
149     uint64_t val = 0;
150     uint64_t avgcount = 0;
151     uint64_t avgcount2 = 0;
152
153     ::decode(val, p);
154     if (t.type & PERFCOUNTER_LONGRUNAVG) {
155       ::decode(avgcount, p);
156       ::decode(avgcount2, p);
157     }
158     // TODO: interface for insertion of avgs
159     instances[t_path].push(now, val);
160   }
161   DECODE_FINISH(p);
162 }
163
164 uint64_t PerfCounterInstance::get_current() const
165 {
166   return buffer.front().v;
167 }
168
169 void PerfCounterInstance::push(utime_t t, uint64_t const &v)
170 {
171   buffer.push_back({t, v});
172 }
173