Fix some bugs when testing opensds ansible
[stor4nfv.git] / src / ceph / src / mds / Beacon.cc
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- 
2 // vim: ts=8 sw=2 smarttab
3 /*
4  * Ceph - scalable distributed file system
5  *
6  * Copyright (C) 2012 Red Hat
7  *
8  * This is free software; you can redistribute it and/or
9  * modify it under the terms of the GNU Lesser General Public
10  * License version 2.1, as published by the Free Software 
11  * Foundation.  See file COPYING.
12  * 
13  */
14
15
16 #include "common/dout.h"
17 #include "common/HeartbeatMap.h"
18
19 #include "include/stringify.h"
20 #include "include/util.h"
21
22 #include "messages/MMDSBeacon.h"
23 #include "mon/MonClient.h"
24 #include "mds/MDLog.h"
25 #include "mds/MDSRank.h"
26 #include "mds/MDSMap.h"
27 #include "mds/Locker.h"
28
29 #include "Beacon.h"
30
31 #define dout_context g_ceph_context
32 #define dout_subsys ceph_subsys_mds
33 #undef dout_prefix
34 #define dout_prefix *_dout << "mds.beacon." << name << ' '
35
36
37 Beacon::Beacon(CephContext *cct_, MonClient *monc_, std::string name_) :
38   Dispatcher(cct_), lock("Beacon"), monc(monc_), timer(g_ceph_context, lock),
39   name(name_), standby_for_rank(MDS_RANK_NONE),
40   standby_for_fscid(FS_CLUSTER_ID_NONE), want_state(MDSMap::STATE_BOOT),
41   awaiting_seq(-1)
42 {
43   last_seq = 0;
44   was_laggy = false;
45
46   epoch = 0;
47 }
48
49
50 Beacon::~Beacon()
51 {
52 }
53
54
55 void Beacon::init(MDSMap const *mdsmap)
56 {
57   Mutex::Locker l(lock);
58   assert(mdsmap != NULL);
59
60   _notify_mdsmap(mdsmap);
61   standby_for_rank = mds_rank_t(g_conf->mds_standby_for_rank);
62   standby_for_name = g_conf->mds_standby_for_name;
63   standby_for_fscid = fs_cluster_id_t(g_conf->mds_standby_for_fscid);
64   standby_replay = g_conf->mds_standby_replay;
65
66   // Spawn threads and start messaging
67   timer.init();
68   _send();
69 }
70
71
72 void Beacon::shutdown()
73 {
74   Mutex::Locker l(lock);
75   if (sender) {
76     timer.cancel_event(sender);
77     sender = NULL;
78   }
79   timer.shutdown();
80 }
81
82
83 bool Beacon::ms_dispatch(Message *m)
84 {
85   if (m->get_type() == MSG_MDS_BEACON) {
86     if (m->get_connection()->get_peer_type() == CEPH_ENTITY_TYPE_MON) {
87       handle_mds_beacon(static_cast<MMDSBeacon*>(m));
88     }
89     return true;
90   }
91
92   return false;
93 }
94
95
96 /**
97  * Update lagginess state based on response from remote MDSMonitor
98  *
99  * This function puts the passed message before returning
100  */
101 void Beacon::handle_mds_beacon(MMDSBeacon *m)
102 {
103   Mutex::Locker l(lock);
104   assert(m != NULL);
105
106   version_t seq = m->get_seq();
107
108   // update lab
109   if (seq_stamp.count(seq)) {
110     utime_t now = ceph_clock_now();
111     if (seq_stamp[seq] > last_acked_stamp) {
112       last_acked_stamp = seq_stamp[seq];
113       utime_t rtt = now - last_acked_stamp;
114
115       dout(10) << "handle_mds_beacon " << ceph_mds_state_name(m->get_state())
116                << " seq " << m->get_seq() << " rtt " << rtt << dendl;
117
118       if (was_laggy && rtt < g_conf->mds_beacon_grace) {
119         dout(0) << "handle_mds_beacon no longer laggy" << dendl;
120         was_laggy = false;
121         laggy_until = now;
122       }
123     } else {
124       // Mark myself laggy if system clock goes backwards. Hopping
125       // later beacons will clear it.
126       dout(1) << "handle_mds_beacon system clock goes backwards, "
127               << "mark myself laggy" << dendl;
128       last_acked_stamp = now - utime_t(g_conf->mds_beacon_grace + 1, 0);
129       was_laggy = true;
130     }
131
132     // clean up seq_stamp map
133     while (!seq_stamp.empty() &&
134            seq_stamp.begin()->first <= seq)
135       seq_stamp.erase(seq_stamp.begin());
136
137     // Wake a waiter up if present
138     if (awaiting_seq == seq) {
139       waiting_cond.Signal();
140     }
141   } else {
142     dout(10) << "handle_mds_beacon " << ceph_mds_state_name(m->get_state())
143              << " seq " << m->get_seq() << " dne" << dendl;
144   }
145 }
146
147
148 void Beacon::send()
149 {
150   Mutex::Locker l(lock);
151   _send();
152 }
153
154
155 void Beacon::send_and_wait(const double duration)
156 {
157   Mutex::Locker l(lock);
158   _send();
159   awaiting_seq = last_seq;
160   dout(20) << __func__ << ": awaiting " << awaiting_seq
161            << " for up to " << duration << "s" << dendl;
162
163   utime_t timeout;
164   timeout.set_from_double(ceph_clock_now() + duration);
165   while ((!seq_stamp.empty() && seq_stamp.begin()->first <= awaiting_seq)
166          && ceph_clock_now() < timeout) {
167     waiting_cond.WaitUntil(lock, timeout);
168   }
169
170   awaiting_seq = -1;
171 }
172
173
174 /**
175  * Call periodically, or when you have updated the desired state
176  */
177 void Beacon::_send()
178 {
179   if (sender) {
180     timer.cancel_event(sender);
181   }
182   sender = timer.add_event_after(
183     g_conf->mds_beacon_interval,
184     new FunctionContext([this](int) {
185         assert(lock.is_locked_by_me());
186         sender = nullptr;
187         _send();
188       }));
189
190   if (!cct->get_heartbeat_map()->is_healthy()) {
191     /* If anything isn't progressing, let avoid sending a beacon so that
192      * the MDS will consider us laggy */
193     dout(1) << __func__ << " skipping beacon, heartbeat map not healthy" << dendl;
194     return;
195   }
196
197   ++last_seq;
198   dout(10) << __func__ << " " << ceph_mds_state_name(want_state)
199            << " seq " << last_seq
200            << dendl;
201
202   seq_stamp[last_seq] = ceph_clock_now();
203
204   assert(want_state != MDSMap::STATE_NULL);
205   
206   MMDSBeacon *beacon = new MMDSBeacon(
207       monc->get_fsid(), mds_gid_t(monc->get_global_id()),
208       name,
209       epoch,
210       want_state,
211       last_seq,
212       CEPH_FEATURES_SUPPORTED_DEFAULT);
213
214   beacon->set_standby_for_rank(standby_for_rank);
215   beacon->set_standby_for_name(standby_for_name);
216   beacon->set_standby_for_fscid(standby_for_fscid);
217   beacon->set_standby_replay(standby_replay);
218   beacon->set_health(health);
219   beacon->set_compat(compat);
220   // piggyback the sys info on beacon msg
221   if (want_state == MDSMap::STATE_BOOT) {
222     map<string, string> sys_info;
223     collect_sys_info(&sys_info, cct);
224     sys_info["addr"] = stringify(monc->get_myaddr());
225     beacon->set_sys_info(sys_info);
226   }
227   monc->send_mon_message(beacon);
228 }
229
230 /**
231  * Call this when there is a new MDSMap available
232  */
233 void Beacon::notify_mdsmap(MDSMap const *mdsmap)
234 {
235   Mutex::Locker l(lock);
236   assert(mdsmap != NULL);
237
238   _notify_mdsmap(mdsmap);
239 }
240
241 void Beacon::_notify_mdsmap(MDSMap const *mdsmap)
242 {
243   assert(mdsmap != NULL);
244   assert(mdsmap->get_epoch() >= epoch);
245
246   if (mdsmap->get_epoch() != epoch) {
247     epoch = mdsmap->get_epoch();
248     compat = get_mdsmap_compat_set_default();
249     compat.merge(mdsmap->compat);
250   }
251 }
252
253
254 bool Beacon::is_laggy()
255 {
256   Mutex::Locker l(lock);
257
258   if (last_acked_stamp == utime_t())
259     return false;
260
261   utime_t now = ceph_clock_now();
262   utime_t since = now - last_acked_stamp;
263   if (since > g_conf->mds_beacon_grace) {
264     dout(5) << "is_laggy " << since << " > " << g_conf->mds_beacon_grace
265             << " since last acked beacon" << dendl;
266     was_laggy = true;
267     if (since > (g_conf->mds_beacon_grace*2) &&
268         now > last_mon_reconnect + g_conf->mds_beacon_interval) {
269       // maybe it's not us?
270       dout(5) << "initiating monitor reconnect; maybe we're not the slow one"
271               << dendl;
272       last_mon_reconnect = now;
273       monc->reopen_session();
274     }
275     return true;
276   }
277   return false;
278 }
279
280 utime_t Beacon::get_laggy_until() const
281 {
282   Mutex::Locker l(lock);
283
284   return laggy_until;
285 }
286
287 void Beacon::set_want_state(MDSMap const *mdsmap, MDSMap::DaemonState const newstate)
288 {
289   Mutex::Locker l(lock);
290
291   // Update mdsmap epoch atomically with updating want_state, so that when
292   // we send a beacon with the new want state it has the latest epoch, and
293   // once we have updated to the latest epoch, we are not sending out
294   // a stale want_state (i.e. one from before making it through MDSMap
295   // handling)
296   _notify_mdsmap(mdsmap);
297
298   if (want_state != newstate) {
299     dout(10) << __func__ << ": "
300       << ceph_mds_state_name(want_state) << " -> "
301       << ceph_mds_state_name(newstate) << dendl;
302     want_state = newstate;
303   }
304 }
305
306
307 /**
308  * We are 'shown' an MDS briefly in order to update
309  * some health metrics that we will send in the next
310  * beacon.
311  */
312 void Beacon::notify_health(MDSRank const *mds)
313 {
314   Mutex::Locker l(lock);
315   if (!mds) {
316     // No MDS rank held
317     return;
318   }
319
320   // I'm going to touch this MDS, so it must be locked
321   assert(mds->mds_lock.is_locked_by_me());
322
323   health.metrics.clear();
324
325   // Detect presence of entries in DamageTable
326   if (!mds->damage_table.empty()) {
327     MDSHealthMetric m(MDS_HEALTH_DAMAGE, HEALTH_ERR, std::string(
328           "Metadata damage detected"));
329     health.metrics.push_back(m);
330   }
331
332   // Detect MDS_HEALTH_TRIM condition
333   // Arbitrary factor of 2, indicates MDS is not trimming promptly
334   {
335     if (mds->mdlog->get_num_segments() > (size_t)(g_conf->mds_log_max_segments * 2)) {
336       std::ostringstream oss;
337       oss << "Behind on trimming (" << mds->mdlog->get_num_segments()
338         << "/" << g_conf->mds_log_max_segments << ")";
339
340       MDSHealthMetric m(MDS_HEALTH_TRIM, HEALTH_WARN, oss.str());
341       m.metadata["num_segments"] = stringify(mds->mdlog->get_num_segments());
342       m.metadata["max_segments"] = stringify(g_conf->mds_log_max_segments);
343       health.metrics.push_back(m);
344     }
345   }
346
347   // Detect clients failing to respond to modifications to capabilities in
348   // CLIENT_CAPS messages.
349   {
350     std::list<client_t> late_clients;
351     mds->locker->get_late_revoking_clients(&late_clients);
352     std::list<MDSHealthMetric> late_cap_metrics;
353
354     for (std::list<client_t>::iterator i = late_clients.begin(); i != late_clients.end(); ++i) {
355
356       // client_t is equivalent to session.info.inst.name.num
357       // Construct an entity_name_t to lookup into SessionMap
358       entity_name_t ename(CEPH_ENTITY_TYPE_CLIENT, i->v);
359       Session const *s = mds->sessionmap.get_session(ename);
360       if (s == NULL) {
361         // Shouldn't happen, but not worth crashing if it does as this is
362         // just health-reporting code.
363         derr << "Client ID without session: " << i->v << dendl;
364         continue;
365       }
366
367       std::ostringstream oss;
368       oss << "Client " << s->get_human_name() << " failing to respond to capability release";
369       MDSHealthMetric m(MDS_HEALTH_CLIENT_LATE_RELEASE, HEALTH_WARN, oss.str());
370       m.metadata["client_id"] = stringify(i->v);
371       late_cap_metrics.push_back(m);
372     }
373
374     if (late_cap_metrics.size() <= (size_t)g_conf->mds_health_summarize_threshold) {
375       health.metrics.splice(health.metrics.end(), late_cap_metrics);
376     } else {
377       std::ostringstream oss;
378       oss << "Many clients (" << late_cap_metrics.size()
379           << ") failing to respond to capability release";
380       MDSHealthMetric m(MDS_HEALTH_CLIENT_LATE_RELEASE_MANY, HEALTH_WARN, oss.str());
381       m.metadata["client_count"] = stringify(late_cap_metrics.size());
382       health.metrics.push_back(m);
383       late_cap_metrics.clear();
384     }
385   }
386
387   // Detect clients failing to generate cap releases from CEPH_SESSION_RECALL_STATE
388   // messages. May be due to buggy client or resource-hogging application.
389   //
390   // Detect clients failing to advance their old_client_tid
391   {
392     set<Session*> sessions;
393     mds->sessionmap.get_client_session_set(sessions);
394
395     utime_t cutoff = ceph_clock_now();
396     cutoff -= g_conf->mds_recall_state_timeout;
397     utime_t last_recall = mds->mdcache->last_recall_state;
398
399     std::list<MDSHealthMetric> late_recall_metrics;
400     std::list<MDSHealthMetric> large_completed_requests_metrics;
401     for (set<Session*>::iterator i = sessions.begin(); i != sessions.end(); ++i) {
402       Session *session = *i;
403       if (!session->recalled_at.is_zero()) {
404         dout(20) << "Session servicing RECALL " << session->info.inst
405           << ": " << session->recalled_at << " " << session->recall_release_count
406           << "/" << session->recall_count << dendl;
407         if (last_recall < cutoff || session->last_recall_sent < last_recall) {
408           dout(20) << "  no longer recall" << dendl;
409           session->clear_recalled_at();
410         } else if (session->recalled_at < cutoff) {
411           dout(20) << "  exceeded timeout " << session->recalled_at << " vs. " << cutoff << dendl;
412           std::ostringstream oss;
413           oss << "Client " << session->get_human_name() << " failing to respond to cache pressure";
414           MDSHealthMetric m(MDS_HEALTH_CLIENT_RECALL, HEALTH_WARN, oss.str());
415           m.metadata["client_id"] = stringify(session->info.inst.name.num());
416           late_recall_metrics.push_back(m);
417         } else {
418           dout(20) << "  within timeout " << session->recalled_at << " vs. " << cutoff << dendl;
419         }
420       }
421       if ((session->get_num_trim_requests_warnings() > 0 &&
422            session->get_num_completed_requests() >= g_conf->mds_max_completed_requests) ||
423           (session->get_num_trim_flushes_warnings() > 0 &&
424            session->get_num_completed_flushes() >= g_conf->mds_max_completed_flushes)) {
425         std::ostringstream oss;
426         oss << "Client " << session->get_human_name() << " failing to advance its oldest client/flush tid";
427         MDSHealthMetric m(MDS_HEALTH_CLIENT_OLDEST_TID, HEALTH_WARN, oss.str());
428         m.metadata["client_id"] = stringify(session->info.inst.name.num());
429         large_completed_requests_metrics.push_back(m);
430       }
431     }
432
433     if (late_recall_metrics.size() <= (size_t)g_conf->mds_health_summarize_threshold) {
434       health.metrics.splice(health.metrics.end(), late_recall_metrics);
435     } else {
436       std::ostringstream oss;
437       oss << "Many clients (" << late_recall_metrics.size()
438           << ") failing to respond to cache pressure";
439       MDSHealthMetric m(MDS_HEALTH_CLIENT_RECALL_MANY, HEALTH_WARN, oss.str());
440       m.metadata["client_count"] = stringify(late_recall_metrics.size());
441       health.metrics.push_back(m);
442       late_recall_metrics.clear();
443     }
444
445     if (large_completed_requests_metrics.size() <= (size_t)g_conf->mds_health_summarize_threshold) {
446       health.metrics.splice(health.metrics.end(), large_completed_requests_metrics);
447     } else {
448       std::ostringstream oss;
449       oss << "Many clients (" << large_completed_requests_metrics.size()
450         << ") failing to advance their oldest client/flush tid";
451       MDSHealthMetric m(MDS_HEALTH_CLIENT_OLDEST_TID_MANY, HEALTH_WARN, oss.str());
452       m.metadata["client_count"] = stringify(large_completed_requests_metrics.size());
453       health.metrics.push_back(m);
454       large_completed_requests_metrics.clear();
455     }
456   }
457
458   // Detect MDS_HEALTH_SLOW_REQUEST condition
459   {
460     int slow = mds->get_mds_slow_req_count();
461     dout(20) << slow << " slow request found" << dendl;
462     if (slow) {
463       std::ostringstream oss;
464       oss << slow << " slow requests are blocked > " << g_conf->mds_op_complaint_time << " sec";
465
466       MDSHealthMetric m(MDS_HEALTH_SLOW_REQUEST, HEALTH_WARN, oss.str());
467       health.metrics.push_back(m);
468     }
469   }
470
471   // Report a health warning if we are readonly
472   if (mds->mdcache->is_readonly()) {
473     MDSHealthMetric m(MDS_HEALTH_READ_ONLY, HEALTH_WARN,
474                       "MDS in read-only mode");
475     health.metrics.push_back(m);
476   }
477
478   // Report if we have significantly exceeded our cache size limit
479   if (mds->mdcache->cache_overfull()) {
480     std::ostringstream oss;
481     oss << "MDS cache is too large (" << bytes2str(mds->mdcache->cache_size())
482         << "/" << bytes2str(mds->mdcache->cache_limit_memory()) << "); "
483         << mds->mdcache->num_inodes_with_caps << " inodes in use by clients, "
484         << mds->mdcache->get_num_strays() << " stray files";
485
486     MDSHealthMetric m(MDS_HEALTH_CACHE_OVERSIZED, HEALTH_WARN, oss.str());
487     health.metrics.push_back(m);
488   }
489 }
490
491 MDSMap::DaemonState Beacon::get_want_state() const
492 {
493   Mutex::Locker l(lock);
494   return want_state;
495 }
496