1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
4 * Ceph - scalable distributed file system
6 * Copyright (C) 2012 Red Hat
8 * This is free software; you can redistribute it and/or
9 * modify it under the terms of the GNU Lesser General Public
10 * License version 2.1, as published by the Free Software
11 * Foundation. See file COPYING.
16 #include "common/dout.h"
17 #include "common/HeartbeatMap.h"
19 #include "include/stringify.h"
20 #include "include/util.h"
22 #include "messages/MMDSBeacon.h"
23 #include "mon/MonClient.h"
24 #include "mds/MDLog.h"
25 #include "mds/MDSRank.h"
26 #include "mds/MDSMap.h"
27 #include "mds/Locker.h"
31 #define dout_context g_ceph_context
32 #define dout_subsys ceph_subsys_mds
34 #define dout_prefix *_dout << "mds.beacon." << name << ' '
37 Beacon::Beacon(CephContext *cct_, MonClient *monc_, std::string name_) :
38 Dispatcher(cct_), lock("Beacon"), monc(monc_), timer(g_ceph_context, lock),
39 name(name_), standby_for_rank(MDS_RANK_NONE),
40 standby_for_fscid(FS_CLUSTER_ID_NONE), want_state(MDSMap::STATE_BOOT),
55 void Beacon::init(MDSMap const *mdsmap)
57 Mutex::Locker l(lock);
58 assert(mdsmap != NULL);
60 _notify_mdsmap(mdsmap);
61 standby_for_rank = mds_rank_t(g_conf->mds_standby_for_rank);
62 standby_for_name = g_conf->mds_standby_for_name;
63 standby_for_fscid = fs_cluster_id_t(g_conf->mds_standby_for_fscid);
64 standby_replay = g_conf->mds_standby_replay;
66 // Spawn threads and start messaging
72 void Beacon::shutdown()
74 Mutex::Locker l(lock);
76 timer.cancel_event(sender);
83 bool Beacon::ms_dispatch(Message *m)
85 if (m->get_type() == MSG_MDS_BEACON) {
86 if (m->get_connection()->get_peer_type() == CEPH_ENTITY_TYPE_MON) {
87 handle_mds_beacon(static_cast<MMDSBeacon*>(m));
97 * Update lagginess state based on response from remote MDSMonitor
99 * This function puts the passed message before returning
101 void Beacon::handle_mds_beacon(MMDSBeacon *m)
103 Mutex::Locker l(lock);
106 version_t seq = m->get_seq();
109 if (seq_stamp.count(seq)) {
110 utime_t now = ceph_clock_now();
111 if (seq_stamp[seq] > last_acked_stamp) {
112 last_acked_stamp = seq_stamp[seq];
113 utime_t rtt = now - last_acked_stamp;
115 dout(10) << "handle_mds_beacon " << ceph_mds_state_name(m->get_state())
116 << " seq " << m->get_seq() << " rtt " << rtt << dendl;
118 if (was_laggy && rtt < g_conf->mds_beacon_grace) {
119 dout(0) << "handle_mds_beacon no longer laggy" << dendl;
124 // Mark myself laggy if system clock goes backwards. Hopping
125 // later beacons will clear it.
126 dout(1) << "handle_mds_beacon system clock goes backwards, "
127 << "mark myself laggy" << dendl;
128 last_acked_stamp = now - utime_t(g_conf->mds_beacon_grace + 1, 0);
132 // clean up seq_stamp map
133 while (!seq_stamp.empty() &&
134 seq_stamp.begin()->first <= seq)
135 seq_stamp.erase(seq_stamp.begin());
137 // Wake a waiter up if present
138 if (awaiting_seq == seq) {
139 waiting_cond.Signal();
142 dout(10) << "handle_mds_beacon " << ceph_mds_state_name(m->get_state())
143 << " seq " << m->get_seq() << " dne" << dendl;
150 Mutex::Locker l(lock);
155 void Beacon::send_and_wait(const double duration)
157 Mutex::Locker l(lock);
159 awaiting_seq = last_seq;
160 dout(20) << __func__ << ": awaiting " << awaiting_seq
161 << " for up to " << duration << "s" << dendl;
164 timeout.set_from_double(ceph_clock_now() + duration);
165 while ((!seq_stamp.empty() && seq_stamp.begin()->first <= awaiting_seq)
166 && ceph_clock_now() < timeout) {
167 waiting_cond.WaitUntil(lock, timeout);
175 * Call periodically, or when you have updated the desired state
180 timer.cancel_event(sender);
182 sender = timer.add_event_after(
183 g_conf->mds_beacon_interval,
184 new FunctionContext([this](int) {
185 assert(lock.is_locked_by_me());
190 if (!cct->get_heartbeat_map()->is_healthy()) {
191 /* If anything isn't progressing, let avoid sending a beacon so that
192 * the MDS will consider us laggy */
193 dout(1) << __func__ << " skipping beacon, heartbeat map not healthy" << dendl;
198 dout(10) << __func__ << " " << ceph_mds_state_name(want_state)
199 << " seq " << last_seq
202 seq_stamp[last_seq] = ceph_clock_now();
204 assert(want_state != MDSMap::STATE_NULL);
206 MMDSBeacon *beacon = new MMDSBeacon(
207 monc->get_fsid(), mds_gid_t(monc->get_global_id()),
212 CEPH_FEATURES_SUPPORTED_DEFAULT);
214 beacon->set_standby_for_rank(standby_for_rank);
215 beacon->set_standby_for_name(standby_for_name);
216 beacon->set_standby_for_fscid(standby_for_fscid);
217 beacon->set_standby_replay(standby_replay);
218 beacon->set_health(health);
219 beacon->set_compat(compat);
220 // piggyback the sys info on beacon msg
221 if (want_state == MDSMap::STATE_BOOT) {
222 map<string, string> sys_info;
223 collect_sys_info(&sys_info, cct);
224 sys_info["addr"] = stringify(monc->get_myaddr());
225 beacon->set_sys_info(sys_info);
227 monc->send_mon_message(beacon);
231 * Call this when there is a new MDSMap available
233 void Beacon::notify_mdsmap(MDSMap const *mdsmap)
235 Mutex::Locker l(lock);
236 assert(mdsmap != NULL);
238 _notify_mdsmap(mdsmap);
241 void Beacon::_notify_mdsmap(MDSMap const *mdsmap)
243 assert(mdsmap != NULL);
244 assert(mdsmap->get_epoch() >= epoch);
246 if (mdsmap->get_epoch() != epoch) {
247 epoch = mdsmap->get_epoch();
248 compat = get_mdsmap_compat_set_default();
249 compat.merge(mdsmap->compat);
254 bool Beacon::is_laggy()
256 Mutex::Locker l(lock);
258 if (last_acked_stamp == utime_t())
261 utime_t now = ceph_clock_now();
262 utime_t since = now - last_acked_stamp;
263 if (since > g_conf->mds_beacon_grace) {
264 dout(5) << "is_laggy " << since << " > " << g_conf->mds_beacon_grace
265 << " since last acked beacon" << dendl;
267 if (since > (g_conf->mds_beacon_grace*2) &&
268 now > last_mon_reconnect + g_conf->mds_beacon_interval) {
269 // maybe it's not us?
270 dout(5) << "initiating monitor reconnect; maybe we're not the slow one"
272 last_mon_reconnect = now;
273 monc->reopen_session();
280 utime_t Beacon::get_laggy_until() const
282 Mutex::Locker l(lock);
287 void Beacon::set_want_state(MDSMap const *mdsmap, MDSMap::DaemonState const newstate)
289 Mutex::Locker l(lock);
291 // Update mdsmap epoch atomically with updating want_state, so that when
292 // we send a beacon with the new want state it has the latest epoch, and
293 // once we have updated to the latest epoch, we are not sending out
294 // a stale want_state (i.e. one from before making it through MDSMap
296 _notify_mdsmap(mdsmap);
298 if (want_state != newstate) {
299 dout(10) << __func__ << ": "
300 << ceph_mds_state_name(want_state) << " -> "
301 << ceph_mds_state_name(newstate) << dendl;
302 want_state = newstate;
308 * We are 'shown' an MDS briefly in order to update
309 * some health metrics that we will send in the next
312 void Beacon::notify_health(MDSRank const *mds)
314 Mutex::Locker l(lock);
320 // I'm going to touch this MDS, so it must be locked
321 assert(mds->mds_lock.is_locked_by_me());
323 health.metrics.clear();
325 // Detect presence of entries in DamageTable
326 if (!mds->damage_table.empty()) {
327 MDSHealthMetric m(MDS_HEALTH_DAMAGE, HEALTH_ERR, std::string(
328 "Metadata damage detected"));
329 health.metrics.push_back(m);
332 // Detect MDS_HEALTH_TRIM condition
333 // Arbitrary factor of 2, indicates MDS is not trimming promptly
335 if (mds->mdlog->get_num_segments() > (size_t)(g_conf->mds_log_max_segments * 2)) {
336 std::ostringstream oss;
337 oss << "Behind on trimming (" << mds->mdlog->get_num_segments()
338 << "/" << g_conf->mds_log_max_segments << ")";
340 MDSHealthMetric m(MDS_HEALTH_TRIM, HEALTH_WARN, oss.str());
341 m.metadata["num_segments"] = stringify(mds->mdlog->get_num_segments());
342 m.metadata["max_segments"] = stringify(g_conf->mds_log_max_segments);
343 health.metrics.push_back(m);
347 // Detect clients failing to respond to modifications to capabilities in
348 // CLIENT_CAPS messages.
350 std::list<client_t> late_clients;
351 mds->locker->get_late_revoking_clients(&late_clients);
352 std::list<MDSHealthMetric> late_cap_metrics;
354 for (std::list<client_t>::iterator i = late_clients.begin(); i != late_clients.end(); ++i) {
356 // client_t is equivalent to session.info.inst.name.num
357 // Construct an entity_name_t to lookup into SessionMap
358 entity_name_t ename(CEPH_ENTITY_TYPE_CLIENT, i->v);
359 Session const *s = mds->sessionmap.get_session(ename);
361 // Shouldn't happen, but not worth crashing if it does as this is
362 // just health-reporting code.
363 derr << "Client ID without session: " << i->v << dendl;
367 std::ostringstream oss;
368 oss << "Client " << s->get_human_name() << " failing to respond to capability release";
369 MDSHealthMetric m(MDS_HEALTH_CLIENT_LATE_RELEASE, HEALTH_WARN, oss.str());
370 m.metadata["client_id"] = stringify(i->v);
371 late_cap_metrics.push_back(m);
374 if (late_cap_metrics.size() <= (size_t)g_conf->mds_health_summarize_threshold) {
375 health.metrics.splice(health.metrics.end(), late_cap_metrics);
377 std::ostringstream oss;
378 oss << "Many clients (" << late_cap_metrics.size()
379 << ") failing to respond to capability release";
380 MDSHealthMetric m(MDS_HEALTH_CLIENT_LATE_RELEASE_MANY, HEALTH_WARN, oss.str());
381 m.metadata["client_count"] = stringify(late_cap_metrics.size());
382 health.metrics.push_back(m);
383 late_cap_metrics.clear();
387 // Detect clients failing to generate cap releases from CEPH_SESSION_RECALL_STATE
388 // messages. May be due to buggy client or resource-hogging application.
390 // Detect clients failing to advance their old_client_tid
392 set<Session*> sessions;
393 mds->sessionmap.get_client_session_set(sessions);
395 utime_t cutoff = ceph_clock_now();
396 cutoff -= g_conf->mds_recall_state_timeout;
397 utime_t last_recall = mds->mdcache->last_recall_state;
399 std::list<MDSHealthMetric> late_recall_metrics;
400 std::list<MDSHealthMetric> large_completed_requests_metrics;
401 for (set<Session*>::iterator i = sessions.begin(); i != sessions.end(); ++i) {
402 Session *session = *i;
403 if (!session->recalled_at.is_zero()) {
404 dout(20) << "Session servicing RECALL " << session->info.inst
405 << ": " << session->recalled_at << " " << session->recall_release_count
406 << "/" << session->recall_count << dendl;
407 if (last_recall < cutoff || session->last_recall_sent < last_recall) {
408 dout(20) << " no longer recall" << dendl;
409 session->clear_recalled_at();
410 } else if (session->recalled_at < cutoff) {
411 dout(20) << " exceeded timeout " << session->recalled_at << " vs. " << cutoff << dendl;
412 std::ostringstream oss;
413 oss << "Client " << session->get_human_name() << " failing to respond to cache pressure";
414 MDSHealthMetric m(MDS_HEALTH_CLIENT_RECALL, HEALTH_WARN, oss.str());
415 m.metadata["client_id"] = stringify(session->info.inst.name.num());
416 late_recall_metrics.push_back(m);
418 dout(20) << " within timeout " << session->recalled_at << " vs. " << cutoff << dendl;
421 if ((session->get_num_trim_requests_warnings() > 0 &&
422 session->get_num_completed_requests() >= g_conf->mds_max_completed_requests) ||
423 (session->get_num_trim_flushes_warnings() > 0 &&
424 session->get_num_completed_flushes() >= g_conf->mds_max_completed_flushes)) {
425 std::ostringstream oss;
426 oss << "Client " << session->get_human_name() << " failing to advance its oldest client/flush tid";
427 MDSHealthMetric m(MDS_HEALTH_CLIENT_OLDEST_TID, HEALTH_WARN, oss.str());
428 m.metadata["client_id"] = stringify(session->info.inst.name.num());
429 large_completed_requests_metrics.push_back(m);
433 if (late_recall_metrics.size() <= (size_t)g_conf->mds_health_summarize_threshold) {
434 health.metrics.splice(health.metrics.end(), late_recall_metrics);
436 std::ostringstream oss;
437 oss << "Many clients (" << late_recall_metrics.size()
438 << ") failing to respond to cache pressure";
439 MDSHealthMetric m(MDS_HEALTH_CLIENT_RECALL_MANY, HEALTH_WARN, oss.str());
440 m.metadata["client_count"] = stringify(late_recall_metrics.size());
441 health.metrics.push_back(m);
442 late_recall_metrics.clear();
445 if (large_completed_requests_metrics.size() <= (size_t)g_conf->mds_health_summarize_threshold) {
446 health.metrics.splice(health.metrics.end(), large_completed_requests_metrics);
448 std::ostringstream oss;
449 oss << "Many clients (" << large_completed_requests_metrics.size()
450 << ") failing to advance their oldest client/flush tid";
451 MDSHealthMetric m(MDS_HEALTH_CLIENT_OLDEST_TID_MANY, HEALTH_WARN, oss.str());
452 m.metadata["client_count"] = stringify(large_completed_requests_metrics.size());
453 health.metrics.push_back(m);
454 large_completed_requests_metrics.clear();
458 // Detect MDS_HEALTH_SLOW_REQUEST condition
460 int slow = mds->get_mds_slow_req_count();
461 dout(20) << slow << " slow request found" << dendl;
463 std::ostringstream oss;
464 oss << slow << " slow requests are blocked > " << g_conf->mds_op_complaint_time << " sec";
466 MDSHealthMetric m(MDS_HEALTH_SLOW_REQUEST, HEALTH_WARN, oss.str());
467 health.metrics.push_back(m);
471 // Report a health warning if we are readonly
472 if (mds->mdcache->is_readonly()) {
473 MDSHealthMetric m(MDS_HEALTH_READ_ONLY, HEALTH_WARN,
474 "MDS in read-only mode");
475 health.metrics.push_back(m);
478 // Report if we have significantly exceeded our cache size limit
479 if (mds->mdcache->cache_overfull()) {
480 std::ostringstream oss;
481 oss << "MDS cache is too large (" << bytes2str(mds->mdcache->cache_size())
482 << "/" << bytes2str(mds->mdcache->cache_limit_memory()) << "); "
483 << mds->mdcache->num_inodes_with_caps << " inodes in use by clients, "
484 << mds->mdcache->get_num_strays() << " stray files";
486 MDSHealthMetric m(MDS_HEALTH_CACHE_OVERSIZED, HEALTH_WARN, oss.str());
487 health.metrics.push_back(m);
491 MDSMap::DaemonState Beacon::get_want_state() const
493 Mutex::Locker l(lock);