Fix some bugs when testing opensds ansible
[stor4nfv.git] / src / ceph / src / mds / MDSRank.cc
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
3 /*
4  * Ceph - scalable distributed file system
5  *
6  * Copyright (C) 2015 Red Hat
7  *
8  * This is free software; you can redistribute it and/or
9  * modify it under the terms of the GNU Lesser General Public
10  * License version 2.1, as published by the Free Software
11  * Foundation.  See file COPYING.
12  *
13  */
14
15 #include "common/debug.h"
16 #include "common/errno.h"
17
18 #include "messages/MClientRequestForward.h"
19 #include "messages/MMDSLoadTargets.h"
20 #include "messages/MMDSMap.h"
21 #include "messages/MMDSTableRequest.h"
22 #include "messages/MCommand.h"
23 #include "messages/MCommandReply.h"
24
25 #include "MDSDaemon.h"
26 #include "MDSMap.h"
27 #include "SnapClient.h"
28 #include "SnapServer.h"
29 #include "MDBalancer.h"
30 #include "Locker.h"
31 #include "Server.h"
32 #include "InoTable.h"
33 #include "mon/MonClient.h"
34 #include "common/HeartbeatMap.h"
35 #include "ScrubStack.h"
36
37
38 #include "MDSRank.h"
39
40 #define dout_context g_ceph_context
41 #define dout_subsys ceph_subsys_mds
42 #undef dout_prefix
43 #define dout_prefix *_dout << "mds." << whoami << '.' << incarnation << ' '
44
45 MDSRank::MDSRank(
46     mds_rank_t whoami_,
47     Mutex &mds_lock_,
48     LogChannelRef &clog_,
49     SafeTimer &timer_,
50     Beacon &beacon_,
51     MDSMap *& mdsmap_,
52     Messenger *msgr,
53     MonClient *monc_,
54     Context *respawn_hook_,
55     Context *suicide_hook_)
56   :
57     whoami(whoami_), incarnation(0),
58     mds_lock(mds_lock_), clog(clog_), timer(timer_),
59     mdsmap(mdsmap_),
60     objecter(new Objecter(g_ceph_context, msgr, monc_, nullptr, 0, 0)),
61     server(NULL), mdcache(NULL), locker(NULL), mdlog(NULL),
62     balancer(NULL), scrubstack(NULL),
63     damage_table(whoami_),
64     inotable(NULL), snapserver(NULL), snapclient(NULL),
65     sessionmap(this), logger(NULL), mlogger(NULL),
66     op_tracker(g_ceph_context, g_conf->mds_enable_op_tracker,
67                g_conf->osd_num_op_tracker_shard),
68     last_state(MDSMap::STATE_BOOT),
69     state(MDSMap::STATE_BOOT),
70     cluster_degraded(false), stopping(false),
71     purge_queue(g_ceph_context, whoami_,
72       mdsmap_->get_metadata_pool(), objecter,
73       new FunctionContext(
74           [this](int r){
75           // Purge Queue operates inside mds_lock when we're calling into
76           // it, and outside when in background, so must handle both cases.
77           if (mds_lock.is_locked_by_me()) {
78             damaged();
79           } else {
80             damaged_unlocked();
81           }
82         }
83       )
84     ),
85     progress_thread(this), dispatch_depth(0),
86     hb(NULL), last_tid(0), osd_epoch_barrier(0), beacon(beacon_),
87     mds_slow_req_count(0),
88     last_client_mdsmap_bcast(0),
89     messenger(msgr), monc(monc_),
90     respawn_hook(respawn_hook_),
91     suicide_hook(suicide_hook_),
92     standby_replaying(false)
93 {
94   hb = g_ceph_context->get_heartbeat_map()->add_worker("MDSRank", pthread_self());
95
96   purge_queue.update_op_limit(*mdsmap);
97
98   objecter->unset_honor_osdmap_full();
99
100   finisher = new Finisher(msgr->cct);
101
102   mdcache = new MDCache(this, purge_queue);
103   mdlog = new MDLog(this);
104   balancer = new MDBalancer(this, messenger, monc);
105
106   scrubstack = new ScrubStack(mdcache, finisher);
107
108   inotable = new InoTable(this);
109   snapserver = new SnapServer(this, monc);
110   snapclient = new SnapClient(this);
111
112   server = new Server(this);
113   locker = new Locker(this, mdcache);
114
115   op_tracker.set_complaint_and_threshold(msgr->cct->_conf->mds_op_complaint_time,
116                                          msgr->cct->_conf->mds_op_log_threshold);
117   op_tracker.set_history_size_and_duration(msgr->cct->_conf->mds_op_history_size,
118                                            msgr->cct->_conf->mds_op_history_duration);
119 }
120
121 MDSRank::~MDSRank()
122 {
123   if (hb) {
124     g_ceph_context->get_heartbeat_map()->remove_worker(hb);
125   }
126
127   if (scrubstack) { delete scrubstack; scrubstack = NULL; }
128   if (mdcache) { delete mdcache; mdcache = NULL; }
129   if (mdlog) { delete mdlog; mdlog = NULL; }
130   if (balancer) { delete balancer; balancer = NULL; }
131   if (inotable) { delete inotable; inotable = NULL; }
132   if (snapserver) { delete snapserver; snapserver = NULL; }
133   if (snapclient) { delete snapclient; snapclient = NULL; }
134   if (mdsmap) { delete mdsmap; mdsmap = 0; }
135
136   if (server) { delete server; server = 0; }
137   if (locker) { delete locker; locker = 0; }
138
139   if (logger) {
140     g_ceph_context->get_perfcounters_collection()->remove(logger);
141     delete logger;
142     logger = 0;
143   }
144   if (mlogger) {
145     g_ceph_context->get_perfcounters_collection()->remove(mlogger);
146     delete mlogger;
147     mlogger = 0;
148   }
149
150   delete finisher;
151   finisher = NULL;
152
153   delete suicide_hook;
154   suicide_hook = NULL;
155
156   delete respawn_hook;
157   respawn_hook = NULL;
158
159   delete objecter;
160   objecter = nullptr;
161 }
162
163 void MDSRankDispatcher::init()
164 {
165   objecter->init();
166   messenger->add_dispatcher_head(objecter);
167
168   objecter->start();
169
170   update_log_config();
171   create_logger();
172
173   // Expose the OSDMap (already populated during MDS::init) to anyone
174   // who is interested in it.
175   handle_osd_map();
176
177   progress_thread.create("mds_rank_progr");
178
179   purge_queue.init();
180
181   finisher->start();
182 }
183
184 void MDSRank::update_targets(utime_t now)
185 {
186   // get MonMap's idea of my export_targets
187   const set<mds_rank_t>& map_targets = mdsmap->get_mds_info(get_nodeid()).export_targets;
188
189   dout(20) << "updating export targets, currently " << map_targets.size() << " ranks are targets" << dendl;
190
191   bool send = false;
192   set<mds_rank_t> new_map_targets;
193
194   auto it = export_targets.begin();
195   while (it != export_targets.end()) {
196     mds_rank_t rank = it->first;
197     double val = it->second.get(now);
198     dout(20) << "export target mds." << rank << " value is " << val << " @ " << now << dendl;
199
200     if (val <= 0.01) {
201       dout(15) << "export target mds." << rank << " is no longer an export target" << dendl;
202       export_targets.erase(it++);
203       send = true;
204       continue;
205     }
206     if (!map_targets.count(rank)) {
207       dout(15) << "export target mds." << rank << " not in map's export_targets" << dendl;
208       send = true;
209     }
210     new_map_targets.insert(rank);
211     it++;
212   }
213   if (new_map_targets.size() < map_targets.size()) {
214     dout(15) << "export target map holds stale targets, sending update" << dendl;
215     send = true;
216   }
217
218   if (send) {
219     dout(15) << "updating export_targets, now " << new_map_targets.size() << " ranks are targets" << dendl;
220     MMDSLoadTargets* m = new MMDSLoadTargets(mds_gid_t(monc->get_global_id()), new_map_targets);
221     monc->send_mon_message(m);
222   }
223 }
224
225 void MDSRank::hit_export_target(utime_t now, mds_rank_t rank, double amount)
226 {
227   double rate = g_conf->mds_bal_target_decay;
228   if (amount < 0.0) {
229     amount = 100.0/g_conf->mds_bal_target_decay; /* a good default for "i am trying to keep this export_target active" */
230   }
231   auto em = export_targets.emplace(std::piecewise_construct, std::forward_as_tuple(rank), std::forward_as_tuple(now, DecayRate(rate)));
232   if (em.second) {
233     dout(15) << "hit export target (new) " << amount << " @ " << now << dendl;
234   } else {
235     dout(15) << "hit export target " << amount << " @ " << now << dendl;
236   }
237   em.first->second.hit(now, amount);
238 }
239
240 void MDSRankDispatcher::tick()
241 {
242   heartbeat_reset();
243
244   if (beacon.is_laggy()) {
245     dout(5) << "tick bailing out since we seem laggy" << dendl;
246     return;
247   }
248
249   check_ops_in_flight();
250
251   // Wake up thread in case we use to be laggy and have waiting_for_nolaggy
252   // messages to progress.
253   progress_thread.signal();
254
255   // make sure mds log flushes, trims periodically
256   mdlog->flush();
257
258   if (is_active() || is_stopping()) {
259     mdcache->trim();
260     mdcache->trim_client_leases();
261     mdcache->check_memory_usage();
262     mdlog->trim();  // NOT during recovery!
263   }
264
265   // log
266   mds_load_t load = balancer->get_load(ceph_clock_now());
267
268   if (logger) {
269     logger->set(l_mds_load_cent, 100 * load.mds_load());
270     logger->set(l_mds_dispatch_queue_len, messenger->get_dispatch_queue_len());
271     logger->set(l_mds_subtrees, mdcache->num_subtrees());
272
273     mdcache->log_stat();
274   }
275
276   // ...
277   if (is_clientreplay() || is_active() || is_stopping()) {
278     server->find_idle_sessions();
279     locker->tick();
280   }
281
282   if (is_reconnect())
283     server->reconnect_tick();
284
285   if (is_active()) {
286     balancer->tick();
287     mdcache->find_stale_fragment_freeze();
288     mdcache->migrator->find_stale_export_freeze();
289     if (snapserver)
290       snapserver->check_osd_map(false);
291   }
292
293   if (is_active() || is_stopping()) {
294     update_targets(ceph_clock_now());
295   }
296
297   // shut down?
298   if (is_stopping()) {
299     mdlog->trim();
300     if (mdcache->shutdown_pass()) {
301       uint64_t pq_progress = 0 ;
302       uint64_t pq_total = 0;
303       size_t pq_in_flight = 0;
304       if (!purge_queue.drain(&pq_progress, &pq_total, &pq_in_flight)) {
305         dout(7) << "shutdown_pass=true, but still waiting for purge queue"
306                 << dendl;
307         // This takes unbounded time, so we must indicate progress
308         // to the administrator: we do it in a slightly imperfect way
309         // by sending periodic (tick frequency) clog messages while
310         // in this state.
311         clog->info() << "MDS rank " << whoami << " waiting for purge queue ("
312           << std::dec << pq_progress << "/" << pq_total << " " << pq_in_flight
313           << " files purging" << ")";
314       } else {
315         dout(7) << "shutdown_pass=true, finished w/ shutdown, moving to "
316                    "down:stopped" << dendl;
317         stopping_done();
318       }
319     }
320     else {
321       dout(7) << "shutdown_pass=false" << dendl;
322     }
323   }
324
325   // Expose ourselves to Beacon to update health indicators
326   beacon.notify_health(this);
327 }
328
329 void MDSRankDispatcher::shutdown()
330 {
331   // It should never be possible for shutdown to get called twice, because
332   // anyone picking up mds_lock checks if stopping is true and drops
333   // out if it is.
334   assert(stopping == false);
335   stopping = true;
336
337   dout(1) << __func__ << ": shutting down rank " << whoami << dendl;
338
339   timer.shutdown();
340
341   // MDLog has to shut down before the finisher, because some of its
342   // threads block on IOs that require finisher to complete.
343   mdlog->shutdown();
344
345   // shut down cache
346   mdcache->shutdown();
347
348   purge_queue.shutdown();
349
350   mds_lock.Unlock();
351   finisher->stop(); // no flushing
352   mds_lock.Lock();
353
354   if (objecter->initialized)
355     objecter->shutdown();
356
357   monc->shutdown();
358
359   op_tracker.on_shutdown();
360
361   progress_thread.shutdown();
362
363   // release mds_lock for finisher/messenger threads (e.g.
364   // MDSDaemon::ms_handle_reset called from Messenger).
365   mds_lock.Unlock();
366
367   // shut down messenger
368   messenger->shutdown();
369
370   mds_lock.Lock();
371
372   // Workaround unclean shutdown: HeartbeatMap will assert if
373   // worker is not removed (as we do in ~MDS), but ~MDS is not
374   // always called after suicide.
375   if (hb) {
376     g_ceph_context->get_heartbeat_map()->remove_worker(hb);
377     hb = NULL;
378   }
379 }
380
381 /**
382  * Helper for simple callbacks that call a void fn with no args.
383  */
384 class C_MDS_VoidFn : public MDSInternalContext
385 {
386   typedef void (MDSRank::*fn_ptr)();
387   protected:
388    fn_ptr fn;
389   public:
390   C_MDS_VoidFn(MDSRank *mds_, fn_ptr fn_)
391     : MDSInternalContext(mds_), fn(fn_)
392   {
393     assert(mds_);
394     assert(fn_);
395   }
396
397   void finish(int r) override
398   {
399     (mds->*fn)();
400   }
401 };
402
403 int64_t MDSRank::get_metadata_pool()
404 {
405     return mdsmap->get_metadata_pool();
406 }
407
408 MDSTableClient *MDSRank::get_table_client(int t)
409 {
410   switch (t) {
411   case TABLE_ANCHOR: return NULL;
412   case TABLE_SNAP: return snapclient;
413   default: ceph_abort();
414   }
415 }
416
417 MDSTableServer *MDSRank::get_table_server(int t)
418 {
419   switch (t) {
420   case TABLE_ANCHOR: return NULL;
421   case TABLE_SNAP: return snapserver;
422   default: ceph_abort();
423   }
424 }
425
426 void MDSRank::suicide()
427 {
428   if (suicide_hook) {
429     suicide_hook->complete(0);
430     suicide_hook = NULL;
431   }
432 }
433
434 void MDSRank::respawn()
435 {
436   if (respawn_hook) {
437     respawn_hook->complete(0);
438     respawn_hook = NULL;
439   }
440 }
441
442 void MDSRank::damaged()
443 {
444   assert(whoami != MDS_RANK_NONE);
445   assert(mds_lock.is_locked_by_me());
446
447   beacon.set_want_state(mdsmap, MDSMap::STATE_DAMAGED);
448   monc->flush_log();  // Flush any clog error from before we were called
449   beacon.notify_health(this);  // Include latest status in our swan song
450   beacon.send_and_wait(g_conf->mds_mon_shutdown_timeout);
451
452   // It's okay if we timed out and the mon didn't get our beacon, because
453   // another daemon (or ourselves after respawn) will eventually take the
454   // rank and report DAMAGED again when it hits same problem we did.
455
456   respawn();  // Respawn into standby in case mon has other work for us
457 }
458
459 void MDSRank::damaged_unlocked()
460 {
461   Mutex::Locker l(mds_lock);
462   damaged();
463 }
464
465 void MDSRank::handle_write_error(int err)
466 {
467   if (err == -EBLACKLISTED) {
468     derr << "we have been blacklisted (fenced), respawning..." << dendl;
469     respawn();
470     return;
471   }
472
473   if (g_conf->mds_action_on_write_error >= 2) {
474     derr << "unhandled write error " << cpp_strerror(err) << ", suicide..." << dendl;
475     respawn();
476   } else if (g_conf->mds_action_on_write_error == 1) {
477     derr << "unhandled write error " << cpp_strerror(err) << ", force readonly..." << dendl;
478     mdcache->force_readonly();
479   } else {
480     // ignore;
481     derr << "unhandled write error " << cpp_strerror(err) << ", ignore..." << dendl;
482   }
483 }
484
485 void *MDSRank::ProgressThread::entry()
486 {
487   Mutex::Locker l(mds->mds_lock);
488   while (true) {
489     while (!mds->stopping &&
490            mds->finished_queue.empty() &&
491            (mds->waiting_for_nolaggy.empty() || mds->beacon.is_laggy())) {
492       cond.Wait(mds->mds_lock);
493     }
494
495     if (mds->stopping) {
496       break;
497     }
498
499     mds->_advance_queues();
500   }
501
502   return NULL;
503 }
504
505
506 void MDSRank::ProgressThread::shutdown()
507 {
508   assert(mds->mds_lock.is_locked_by_me());
509   assert(mds->stopping);
510
511   if (am_self()) {
512     // Stopping is set, we will fall out of our main loop naturally
513   } else {
514     // Kick the thread to notice mds->stopping, and join it
515     cond.Signal();
516     mds->mds_lock.Unlock();
517     if (is_started())
518       join();
519     mds->mds_lock.Lock();
520   }
521 }
522
523 bool MDSRankDispatcher::ms_dispatch(Message *m)
524 {
525   bool ret;
526   inc_dispatch_depth();
527   ret = _dispatch(m, true);
528   dec_dispatch_depth();
529   return ret;
530 }
531
532 /* If this function returns true, it recognizes the message and has taken the
533  * reference. If it returns false, it has done neither. */
534 bool MDSRank::_dispatch(Message *m, bool new_msg)
535 {
536   if (is_stale_message(m)) {
537     m->put();
538     return true;
539   }
540
541   if (beacon.is_laggy()) {
542     dout(10) << " laggy, deferring " << *m << dendl;
543     waiting_for_nolaggy.push_back(m);
544   } else if (new_msg && !waiting_for_nolaggy.empty()) {
545     dout(10) << " there are deferred messages, deferring " << *m << dendl;
546     waiting_for_nolaggy.push_back(m);
547   } else {
548     if (!handle_deferrable_message(m)) {
549       dout(0) << "unrecognized message " << *m << dendl;
550       return false;
551     }
552
553     heartbeat_reset();
554   }
555
556   if (dispatch_depth > 1)
557     return true;
558
559   // finish any triggered contexts
560   _advance_queues();
561
562   if (beacon.is_laggy()) {
563     // We've gone laggy during dispatch, don't do any
564     // more housekeeping
565     return true;
566   }
567
568   // done with all client replayed requests?
569   if (is_clientreplay() &&
570       mdcache->is_open() &&
571       replay_queue.empty() &&
572       beacon.get_want_state() == MDSMap::STATE_CLIENTREPLAY) {
573     int num_requests = mdcache->get_num_client_requests();
574     dout(10) << " still have " << num_requests << " active replay requests" << dendl;
575     if (num_requests == 0)
576       clientreplay_done();
577   }
578
579   // hack: thrash exports
580   static utime_t start;
581   utime_t now = ceph_clock_now();
582   if (start == utime_t())
583     start = now;
584   /*double el = now - start;
585   if (el > 30.0 &&
586     el < 60.0)*/
587   for (int i=0; i<g_conf->mds_thrash_exports; i++) {
588     set<mds_rank_t> s;
589     if (!is_active()) break;
590     mdsmap->get_mds_set(s, MDSMap::STATE_ACTIVE);
591     if (s.size() < 2 || CInode::count() < 10)
592       break;  // need peers for this to work.
593     if (mdcache->migrator->get_num_exporting() > g_conf->mds_thrash_exports * 5 ||
594         mdcache->migrator->get_export_queue_size() > g_conf->mds_thrash_exports * 10)
595       break;
596
597     dout(7) << "mds thrashing exports pass " << (i+1) << "/" << g_conf->mds_thrash_exports << dendl;
598
599     // pick a random dir inode
600     CInode *in = mdcache->hack_pick_random_inode();
601
602     list<CDir*> ls;
603     in->get_dirfrags(ls);
604     if (!ls.empty()) {  // must be an open dir.
605       list<CDir*>::iterator p = ls.begin();
606       int n = rand() % ls.size();
607       while (n--)
608         ++p;
609       CDir *dir = *p;
610       if (!dir->get_parent_dir()) continue;    // must be linked.
611       if (!dir->is_auth()) continue;           // must be auth.
612
613       mds_rank_t dest;
614       do {
615         int k = rand() % s.size();
616         set<mds_rank_t>::iterator p = s.begin();
617         while (k--) ++p;
618         dest = *p;
619       } while (dest == whoami);
620       mdcache->migrator->export_dir_nicely(dir,dest);
621     }
622   }
623   // hack: thrash fragments
624   for (int i=0; i<g_conf->mds_thrash_fragments; i++) {
625     if (!is_active()) break;
626     if (mdcache->get_num_fragmenting_dirs() > 5 * g_conf->mds_thrash_fragments) break;
627     dout(7) << "mds thrashing fragments pass " << (i+1) << "/" << g_conf->mds_thrash_fragments << dendl;
628
629     // pick a random dir inode
630     CInode *in = mdcache->hack_pick_random_inode();
631
632     list<CDir*> ls;
633     in->get_dirfrags(ls);
634     if (ls.empty()) continue;                // must be an open dir.
635     CDir *dir = ls.front();
636     if (!dir->get_parent_dir()) continue;    // must be linked.
637     if (!dir->is_auth()) continue;           // must be auth.
638     frag_t fg = dir->get_frag();
639     if (mdsmap->allows_dirfrags()) {
640       if ((fg == frag_t() || (rand() % (1 << fg.bits()) == 0))) {
641         mdcache->split_dir(dir, 1);
642       } else {
643         balancer->queue_merge(dir);
644       }
645     }
646   }
647
648   // hack: force hash root?
649   /*
650   if (false &&
651       mdcache->get_root() &&
652       mdcache->get_root()->dir &&
653       !(mdcache->get_root()->dir->is_hashed() ||
654         mdcache->get_root()->dir->is_hashing())) {
655     dout(0) << "hashing root" << dendl;
656     mdcache->migrator->hash_dir(mdcache->get_root()->dir);
657   }
658   */
659
660   update_mlogger();
661   return true;
662 }
663
664 void MDSRank::update_mlogger()
665 {
666   if (mlogger) {
667     mlogger->set(l_mdm_ino, CInode::count());
668     mlogger->set(l_mdm_dir, CDir::count());
669     mlogger->set(l_mdm_dn, CDentry::count());
670     mlogger->set(l_mdm_cap, Capability::count());
671     mlogger->set(l_mdm_inoa, CInode::increments());
672     mlogger->set(l_mdm_inos, CInode::decrements());
673     mlogger->set(l_mdm_dira, CDir::increments());
674     mlogger->set(l_mdm_dirs, CDir::decrements());
675     mlogger->set(l_mdm_dna, CDentry::increments());
676     mlogger->set(l_mdm_dns, CDentry::decrements());
677     mlogger->set(l_mdm_capa, Capability::increments());
678     mlogger->set(l_mdm_caps, Capability::decrements());
679     mlogger->set(l_mdm_buf, buffer::get_total_alloc());
680   }
681 }
682
683 /*
684  * lower priority messages we defer if we seem laggy
685  */
686 bool MDSRank::handle_deferrable_message(Message *m)
687 {
688   int port = m->get_type() & 0xff00;
689
690   switch (port) {
691   case MDS_PORT_CACHE:
692     ALLOW_MESSAGES_FROM(CEPH_ENTITY_TYPE_MDS);
693     mdcache->dispatch(m);
694     break;
695
696   case MDS_PORT_MIGRATOR:
697     ALLOW_MESSAGES_FROM(CEPH_ENTITY_TYPE_MDS);
698     mdcache->migrator->dispatch(m);
699     break;
700
701   default:
702     switch (m->get_type()) {
703       // SERVER
704     case CEPH_MSG_CLIENT_SESSION:
705     case CEPH_MSG_CLIENT_RECONNECT:
706       ALLOW_MESSAGES_FROM(CEPH_ENTITY_TYPE_CLIENT);
707       // fall-thru
708     case CEPH_MSG_CLIENT_REQUEST:
709       server->dispatch(m);
710       break;
711     case MSG_MDS_SLAVE_REQUEST:
712       ALLOW_MESSAGES_FROM(CEPH_ENTITY_TYPE_MDS);
713       server->dispatch(m);
714       break;
715
716     case MSG_MDS_HEARTBEAT:
717       ALLOW_MESSAGES_FROM(CEPH_ENTITY_TYPE_MDS);
718       balancer->proc_message(m);
719       break;
720
721     case MSG_MDS_TABLE_REQUEST:
722       ALLOW_MESSAGES_FROM(CEPH_ENTITY_TYPE_MDS);
723       {
724         MMDSTableRequest *req = static_cast<MMDSTableRequest*>(m);
725         if (req->op < 0) {
726           MDSTableClient *client = get_table_client(req->table);
727               client->handle_request(req);
728         } else {
729           MDSTableServer *server = get_table_server(req->table);
730           server->handle_request(req);
731         }
732       }
733       break;
734
735     case MSG_MDS_LOCK:
736     case MSG_MDS_INODEFILECAPS:
737       ALLOW_MESSAGES_FROM(CEPH_ENTITY_TYPE_MDS);
738       locker->dispatch(m);
739       break;
740
741     case CEPH_MSG_CLIENT_CAPS:
742     case CEPH_MSG_CLIENT_CAPRELEASE:
743     case CEPH_MSG_CLIENT_LEASE:
744       ALLOW_MESSAGES_FROM(CEPH_ENTITY_TYPE_CLIENT);
745       locker->dispatch(m);
746       break;
747
748     default:
749       return false;
750     }
751   }
752
753   return true;
754 }
755
756 /**
757  * Advance finished_queue and waiting_for_nolaggy.
758  *
759  * Usually drain both queues, but may not drain waiting_for_nolaggy
760  * if beacon is currently laggy.
761  */
762 void MDSRank::_advance_queues()
763 {
764   assert(mds_lock.is_locked_by_me());
765
766   while (!finished_queue.empty()) {
767     dout(7) << "mds has " << finished_queue.size() << " queued contexts" << dendl;
768     dout(10) << finished_queue << dendl;
769     list<MDSInternalContextBase*> ls;
770     ls.swap(finished_queue);
771     while (!ls.empty()) {
772       dout(10) << " finish " << ls.front() << dendl;
773       ls.front()->complete(0);
774       ls.pop_front();
775
776       heartbeat_reset();
777     }
778   }
779
780   while (!waiting_for_nolaggy.empty()) {
781     // stop if we're laggy now!
782     if (beacon.is_laggy())
783       break;
784
785     Message *old = waiting_for_nolaggy.front();
786     waiting_for_nolaggy.pop_front();
787
788     if (is_stale_message(old)) {
789       old->put();
790     } else {
791       dout(7) << " processing laggy deferred " << *old << dendl;
792       if (!handle_deferrable_message(old)) {
793         dout(0) << "unrecognized message " << *old << dendl;
794         old->put();
795       }
796     }
797
798     heartbeat_reset();
799   }
800 }
801
802 /**
803  * Call this when you take mds_lock, or periodically if you're going to
804  * hold the lock for a long time (e.g. iterating over clients/inodes)
805  */
806 void MDSRank::heartbeat_reset()
807 {
808   // Any thread might jump into mds_lock and call us immediately
809   // after a call to suicide() completes, in which case MDSRank::hb
810   // has been freed and we are a no-op.
811   if (!hb) {
812       assert(stopping);
813       return;
814   }
815
816   // NB not enabling suicide grace, because the mon takes care of killing us
817   // (by blacklisting us) when we fail to send beacons, and it's simpler to
818   // only have one way of dying.
819   g_ceph_context->get_heartbeat_map()->reset_timeout(hb, g_conf->mds_beacon_grace, 0);
820 }
821
822 bool MDSRank::is_stale_message(Message *m) const
823 {
824   // from bad mds?
825   if (m->get_source().is_mds()) {
826     mds_rank_t from = mds_rank_t(m->get_source().num());
827     if (!mdsmap->have_inst(from) ||
828         mdsmap->get_inst(from) != m->get_source_inst() ||
829         mdsmap->is_down(from)) {
830       // bogus mds?
831       if (m->get_type() == CEPH_MSG_MDS_MAP) {
832         dout(5) << "got " << *m << " from old/bad/imposter mds " << m->get_source()
833                 << ", but it's an mdsmap, looking at it" << dendl;
834       } else if (m->get_type() == MSG_MDS_CACHEEXPIRE &&
835                  mdsmap->get_inst(from) == m->get_source_inst()) {
836         dout(5) << "got " << *m << " from down mds " << m->get_source()
837                 << ", but it's a cache_expire, looking at it" << dendl;
838       } else {
839         dout(5) << "got " << *m << " from down/old/bad/imposter mds " << m->get_source()
840                 << ", dropping" << dendl;
841         return true;
842       }
843     }
844   }
845   return false;
846 }
847
848
849 void MDSRank::send_message(Message *m, Connection *c)
850 {
851   assert(c);
852   c->send_message(m);
853 }
854
855
856 void MDSRank::send_message_mds(Message *m, mds_rank_t mds)
857 {
858   if (!mdsmap->is_up(mds)) {
859     dout(10) << "send_message_mds mds." << mds << " not up, dropping " << *m << dendl;
860     m->put();
861     return;
862   }
863
864   // send mdsmap first?
865   if (mds != whoami && peer_mdsmap_epoch[mds] < mdsmap->get_epoch()) {
866     messenger->send_message(new MMDSMap(monc->get_fsid(), mdsmap),
867                             mdsmap->get_inst(mds));
868     peer_mdsmap_epoch[mds] = mdsmap->get_epoch();
869   }
870
871   // send message
872   messenger->send_message(m, mdsmap->get_inst(mds));
873 }
874
875 void MDSRank::forward_message_mds(Message *m, mds_rank_t mds)
876 {
877   assert(mds != whoami);
878
879   // client request?
880   if (m->get_type() == CEPH_MSG_CLIENT_REQUEST &&
881       (static_cast<MClientRequest*>(m))->get_source().is_client()) {
882     MClientRequest *creq = static_cast<MClientRequest*>(m);
883     creq->inc_num_fwd();    // inc forward counter
884
885     /*
886      * don't actually forward if non-idempotent!
887      * client has to do it.  although the MDS will ignore duplicate requests,
888      * the affected metadata may migrate, in which case the new authority
889      * won't have the metareq_id in the completed request map.
890      */
891     // NEW: always make the client resend!
892     bool client_must_resend = true;  //!creq->can_forward();
893
894     // tell the client where it should go
895     messenger->send_message(new MClientRequestForward(creq->get_tid(), mds, creq->get_num_fwd(),
896                                                       client_must_resend),
897                             creq->get_source_inst());
898
899     if (client_must_resend) {
900       m->put();
901       return;
902     }
903   }
904
905   // these are the only types of messages we should be 'forwarding'; they
906   // explicitly encode their source mds, which gets clobbered when we resend
907   // them here.
908   assert(m->get_type() == MSG_MDS_DIRUPDATE ||
909          m->get_type() == MSG_MDS_EXPORTDIRDISCOVER);
910
911   // send mdsmap first?
912   if (peer_mdsmap_epoch[mds] < mdsmap->get_epoch()) {
913     messenger->send_message(new MMDSMap(monc->get_fsid(), mdsmap),
914                             mdsmap->get_inst(mds));
915     peer_mdsmap_epoch[mds] = mdsmap->get_epoch();
916   }
917
918   messenger->send_message(m, mdsmap->get_inst(mds));
919 }
920
921
922
923 void MDSRank::send_message_client_counted(Message *m, client_t client)
924 {
925   Session *session =  sessionmap.get_session(entity_name_t::CLIENT(client.v));
926   if (session) {
927     send_message_client_counted(m, session);
928   } else {
929     dout(10) << "send_message_client_counted no session for client." << client << " " << *m << dendl;
930   }
931 }
932
933 void MDSRank::send_message_client_counted(Message *m, Connection *connection)
934 {
935   Session *session = static_cast<Session *>(connection->get_priv());
936   if (session) {
937     session->put();  // do not carry ref
938     send_message_client_counted(m, session);
939   } else {
940     dout(10) << "send_message_client_counted has no session for " << m->get_source_inst() << dendl;
941     // another Connection took over the Session
942   }
943 }
944
945 void MDSRank::send_message_client_counted(Message *m, Session *session)
946 {
947   version_t seq = session->inc_push_seq();
948   dout(10) << "send_message_client_counted " << session->info.inst.name << " seq "
949            << seq << " " << *m << dendl;
950   if (session->connection) {
951     session->connection->send_message(m);
952   } else {
953     session->preopen_out_queue.push_back(m);
954   }
955 }
956
957 void MDSRank::send_message_client(Message *m, Session *session)
958 {
959   dout(10) << "send_message_client " << session->info.inst << " " << *m << dendl;
960   if (session->connection) {
961     session->connection->send_message(m);
962   } else {
963     session->preopen_out_queue.push_back(m);
964   }
965 }
966
967 /**
968  * This is used whenever a RADOS operation has been cancelled
969  * or a RADOS client has been blacklisted, to cause the MDS and
970  * any clients to wait for this OSD epoch before using any new caps.
971  *
972  * See doc/cephfs/eviction
973  */
974 void MDSRank::set_osd_epoch_barrier(epoch_t e)
975 {
976   dout(4) << __func__ << ": epoch=" << e << dendl;
977   osd_epoch_barrier = e;
978 }
979
980 void MDSRank::retry_dispatch(Message *m)
981 {
982   inc_dispatch_depth();
983   _dispatch(m, false);
984   dec_dispatch_depth();
985 }
986
987 utime_t MDSRank::get_laggy_until() const
988 {
989   return beacon.get_laggy_until();
990 }
991
992 bool MDSRank::is_daemon_stopping() const
993 {
994   return stopping;
995 }
996
997 void MDSRank::request_state(MDSMap::DaemonState s)
998 {
999   dout(3) << "request_state " << ceph_mds_state_name(s) << dendl;
1000   beacon.set_want_state(mdsmap, s);
1001   beacon.send();
1002 }
1003
1004
1005 class C_MDS_BootStart : public MDSInternalContext {
1006   MDSRank::BootStep nextstep;
1007 public:
1008   C_MDS_BootStart(MDSRank *m, MDSRank::BootStep n)
1009     : MDSInternalContext(m), nextstep(n) {}
1010   void finish(int r) override {
1011     mds->boot_start(nextstep, r);
1012   }
1013 };
1014
1015
1016 void MDSRank::boot_start(BootStep step, int r)
1017 {
1018   // Handle errors from previous step
1019   if (r < 0) {
1020     if (is_standby_replay() && (r == -EAGAIN)) {
1021       dout(0) << "boot_start encountered an error EAGAIN"
1022               << ", respawning since we fell behind journal" << dendl;
1023       respawn();
1024     } else if (r == -EINVAL || r == -ENOENT) {
1025       // Invalid or absent data, indicates damaged on-disk structures
1026       clog->error() << "Error loading MDS rank " << whoami << ": "
1027         << cpp_strerror(r);
1028       damaged();
1029       assert(r == 0);  // Unreachable, damaged() calls respawn()
1030     } else {
1031       // Completely unexpected error, give up and die
1032       dout(0) << "boot_start encountered an error, failing" << dendl;
1033       suicide();
1034       return;
1035     }
1036   }
1037
1038   assert(is_starting() || is_any_replay());
1039
1040   switch(step) {
1041     case MDS_BOOT_INITIAL:
1042       {
1043         mdcache->init_layouts();
1044
1045         MDSGatherBuilder gather(g_ceph_context,
1046             new C_MDS_BootStart(this, MDS_BOOT_OPEN_ROOT));
1047         dout(2) << "boot_start " << step << ": opening inotable" << dendl;
1048         inotable->set_rank(whoami);
1049         inotable->load(gather.new_sub());
1050
1051         dout(2) << "boot_start " << step << ": opening sessionmap" << dendl;
1052         sessionmap.set_rank(whoami);
1053         sessionmap.load(gather.new_sub());
1054
1055         dout(2) << "boot_start " << step << ": opening mds log" << dendl;
1056         mdlog->open(gather.new_sub());
1057
1058         if (is_starting()) {
1059           dout(2) << "boot_start " << step << ": opening purge queue" << dendl;
1060           purge_queue.open(new C_IO_Wrapper(this, gather.new_sub()));
1061         } else if (!standby_replaying) {
1062           dout(2) << "boot_start " << step << ": opening purge queue (async)" << dendl;
1063           purge_queue.open(NULL);
1064         }
1065
1066         if (mdsmap->get_tableserver() == whoami) {
1067           dout(2) << "boot_start " << step << ": opening snap table" << dendl;
1068           snapserver->set_rank(whoami);
1069           snapserver->load(gather.new_sub());
1070         }
1071
1072         gather.activate();
1073       }
1074       break;
1075     case MDS_BOOT_OPEN_ROOT:
1076       {
1077         dout(2) << "boot_start " << step << ": loading/discovering base inodes" << dendl;
1078
1079         MDSGatherBuilder gather(g_ceph_context,
1080             new C_MDS_BootStart(this, MDS_BOOT_PREPARE_LOG));
1081
1082         mdcache->open_mydir_inode(gather.new_sub());
1083
1084         if (is_starting() ||
1085             whoami == mdsmap->get_root()) {  // load root inode off disk if we are auth
1086           mdcache->open_root_inode(gather.new_sub());
1087         } else {
1088           // replay.  make up fake root inode to start with
1089           (void)mdcache->create_root_inode();
1090         }
1091         gather.activate();
1092       }
1093       break;
1094     case MDS_BOOT_PREPARE_LOG:
1095       if (is_any_replay()) {
1096         dout(2) << "boot_start " << step << ": replaying mds log" << dendl;
1097         MDSGatherBuilder gather(g_ceph_context,
1098             new C_MDS_BootStart(this, MDS_BOOT_REPLAY_DONE));
1099
1100         if (!standby_replaying) {
1101           dout(2) << "boot_start " << step << ": waiting for purge queue recovered" << dendl;
1102           purge_queue.wait_for_recovery(new C_IO_Wrapper(this, gather.new_sub()));
1103         }
1104
1105         mdlog->replay(gather.new_sub());
1106         gather.activate();
1107       } else {
1108         dout(2) << "boot_start " << step << ": positioning at end of old mds log" << dendl;
1109         mdlog->append();
1110         starting_done();
1111       }
1112       break;
1113     case MDS_BOOT_REPLAY_DONE:
1114       assert(is_any_replay());
1115
1116       // Sessiontable and inotable should be in sync after replay, validate
1117       // that they are consistent.
1118       validate_sessions();
1119
1120       replay_done();
1121       break;
1122   }
1123 }
1124
1125 void MDSRank::validate_sessions()
1126 {
1127   assert(mds_lock.is_locked_by_me());
1128   std::vector<Session*> victims;
1129
1130   // Identify any sessions which have state inconsistent with other,
1131   // after they have been loaded from rados during startup.
1132   // Mitigate bugs like: http://tracker.ceph.com/issues/16842
1133   const auto &sessions = sessionmap.get_sessions();
1134   for (const auto &i : sessions) {
1135     Session *session = i.second;
1136     interval_set<inodeno_t> badones;
1137     if (inotable->intersects_free(session->info.prealloc_inos, &badones)) {
1138       clog->error() << "Client session loaded with invalid preallocated "
1139                           "inodes, evicting session " << *session;
1140
1141       // Make the session consistent with inotable so that it can
1142       // be cleanly torn down
1143       session->info.prealloc_inos.subtract(badones);
1144
1145       victims.push_back(session);
1146     }
1147   }
1148
1149   for (const auto &session: victims) {
1150     server->kill_session(session, nullptr);
1151   }
1152 }
1153
1154 void MDSRank::starting_done()
1155 {
1156   dout(3) << "starting_done" << dendl;
1157   assert(is_starting());
1158   request_state(MDSMap::STATE_ACTIVE);
1159
1160   mdcache->open_root();
1161
1162   if (mdcache->is_open()) {
1163     mdlog->start_new_segment();
1164   } else {
1165     mdcache->wait_for_open(new MDSInternalContextWrapper(this,
1166                            new FunctionContext([this] (int r) {
1167                                mdlog->start_new_segment();
1168                            })));
1169   }
1170 }
1171
1172
1173 void MDSRank::calc_recovery_set()
1174 {
1175   // initialize gather sets
1176   set<mds_rank_t> rs;
1177   mdsmap->get_recovery_mds_set(rs);
1178   rs.erase(whoami);
1179   mdcache->set_recovery_set(rs);
1180
1181   dout(1) << " recovery set is " << rs << dendl;
1182 }
1183
1184
1185 void MDSRank::replay_start()
1186 {
1187   dout(1) << "replay_start" << dendl;
1188
1189   if (is_standby_replay())
1190     standby_replaying = true;
1191
1192   calc_recovery_set();
1193
1194   // Check if we need to wait for a newer OSD map before starting
1195   Context *fin = new C_IO_Wrapper(this, new C_MDS_BootStart(this, MDS_BOOT_INITIAL));
1196   bool const ready = objecter->wait_for_map(
1197       mdsmap->get_last_failure_osd_epoch(),
1198       fin);
1199
1200   if (ready) {
1201     delete fin;
1202     boot_start();
1203   } else {
1204     dout(1) << " waiting for osdmap " << mdsmap->get_last_failure_osd_epoch()
1205             << " (which blacklists prior instance)" << dendl;
1206   }
1207 }
1208
1209
1210 class MDSRank::C_MDS_StandbyReplayRestartFinish : public MDSIOContext {
1211   uint64_t old_read_pos;
1212 public:
1213   C_MDS_StandbyReplayRestartFinish(MDSRank *mds_, uint64_t old_read_pos_) :
1214     MDSIOContext(mds_), old_read_pos(old_read_pos_) {}
1215   void finish(int r) override {
1216     mds->_standby_replay_restart_finish(r, old_read_pos);
1217   }
1218 };
1219
1220 void MDSRank::_standby_replay_restart_finish(int r, uint64_t old_read_pos)
1221 {
1222   if (old_read_pos < mdlog->get_journaler()->get_trimmed_pos()) {
1223     dout(0) << "standby MDS fell behind active MDS journal's expire_pos, restarting" << dendl;
1224     respawn(); /* we're too far back, and this is easier than
1225                   trying to reset everything in the cache, etc */
1226   } else {
1227     mdlog->standby_trim_segments();
1228     boot_start(MDS_BOOT_PREPARE_LOG, r);
1229   }
1230 }
1231
1232 class MDSRank::C_MDS_StandbyReplayRestart : public MDSInternalContext {
1233 public:
1234   explicit C_MDS_StandbyReplayRestart(MDSRank *m) : MDSInternalContext(m) {}
1235   void finish(int r) override {
1236     assert(!r);
1237     mds->standby_replay_restart();
1238   }
1239 };
1240
1241 void MDSRank::standby_replay_restart()
1242 {
1243   if (standby_replaying) {
1244     /* Go around for another pass of replaying in standby */
1245     dout(4) << "standby_replay_restart (as standby)" << dendl;
1246     mdlog->get_journaler()->reread_head_and_probe(
1247       new C_MDS_StandbyReplayRestartFinish(
1248         this,
1249         mdlog->get_journaler()->get_read_pos()));
1250   } else {
1251     /* We are transitioning out of standby: wait for OSD map update
1252        before making final pass */
1253     dout(1) << "standby_replay_restart (final takeover pass)" << dendl;
1254     Context *fin = new C_IO_Wrapper(this, new C_MDS_StandbyReplayRestart(this));
1255     bool ready = objecter->wait_for_map(mdsmap->get_last_failure_osd_epoch(), fin);
1256     if (ready) {
1257       delete fin;
1258       mdlog->get_journaler()->reread_head_and_probe(
1259         new C_MDS_StandbyReplayRestartFinish(
1260           this,
1261           mdlog->get_journaler()->get_read_pos()));
1262
1263       dout(1) << " opening purge queue (async)" << dendl;
1264       purge_queue.open(NULL);
1265     } else {
1266       dout(1) << " waiting for osdmap " << mdsmap->get_last_failure_osd_epoch()
1267               << " (which blacklists prior instance)" << dendl;
1268     }
1269   }
1270 }
1271
1272 void MDSRank::replay_done()
1273 {
1274   dout(1) << "replay_done" << (standby_replaying ? " (as standby)" : "") << dendl;
1275
1276   if (is_standby_replay()) {
1277     // The replay was done in standby state, and we are still in that state
1278     assert(standby_replaying);
1279     dout(10) << "setting replay timer" << dendl;
1280     timer.add_event_after(g_conf->mds_replay_interval,
1281                           new C_MDS_StandbyReplayRestart(this));
1282     return;
1283   } else if (standby_replaying) {
1284     // The replay was done in standby state, we have now _left_ that state
1285     dout(10) << " last replay pass was as a standby; making final pass" << dendl;
1286     standby_replaying = false;
1287     standby_replay_restart();
1288     return;
1289   } else {
1290     // Replay is complete, journal read should be up to date
1291     assert(mdlog->get_journaler()->get_read_pos() == mdlog->get_journaler()->get_write_pos());
1292     assert(!is_standby_replay());
1293
1294     // Reformat and come back here
1295     if (mdlog->get_journaler()->get_stream_format() < g_conf->mds_journal_format) {
1296         dout(4) << "reformatting journal on standbyreplay->replay transition" << dendl;
1297         mdlog->reopen(new C_MDS_BootStart(this, MDS_BOOT_REPLAY_DONE));
1298         return;
1299     }
1300   }
1301
1302   dout(1) << "making mds journal writeable" << dendl;
1303   mdlog->get_journaler()->set_writeable();
1304   mdlog->get_journaler()->trim_tail();
1305
1306   if (g_conf->mds_wipe_sessions) {
1307     dout(1) << "wiping out client sessions" << dendl;
1308     sessionmap.wipe();
1309     sessionmap.save(new C_MDSInternalNoop);
1310   }
1311   if (g_conf->mds_wipe_ino_prealloc) {
1312     dout(1) << "wiping out ino prealloc from sessions" << dendl;
1313     sessionmap.wipe_ino_prealloc();
1314     sessionmap.save(new C_MDSInternalNoop);
1315   }
1316   if (g_conf->mds_skip_ino) {
1317     inodeno_t i = g_conf->mds_skip_ino;
1318     dout(1) << "skipping " << i << " inodes" << dendl;
1319     inotable->skip_inos(i);
1320     inotable->save(new C_MDSInternalNoop);
1321   }
1322
1323   if (mdsmap->get_num_in_mds() == 1 &&
1324       mdsmap->get_num_failed_mds() == 0) { // just me!
1325     dout(2) << "i am alone, moving to state reconnect" << dendl;
1326     request_state(MDSMap::STATE_RECONNECT);
1327   } else {
1328     dout(2) << "i am not alone, moving to state resolve" << dendl;
1329     request_state(MDSMap::STATE_RESOLVE);
1330   }
1331 }
1332
1333 void MDSRank::reopen_log()
1334 {
1335   dout(1) << "reopen_log" << dendl;
1336   mdcache->rollback_uncommitted_fragments();
1337 }
1338
1339
1340 void MDSRank::resolve_start()
1341 {
1342   dout(1) << "resolve_start" << dendl;
1343
1344   reopen_log();
1345
1346   mdcache->resolve_start(new C_MDS_VoidFn(this, &MDSRank::resolve_done));
1347   finish_contexts(g_ceph_context, waiting_for_resolve);
1348 }
1349 void MDSRank::resolve_done()
1350 {
1351   dout(1) << "resolve_done" << dendl;
1352   request_state(MDSMap::STATE_RECONNECT);
1353 }
1354
1355 void MDSRank::reconnect_start()
1356 {
1357   dout(1) << "reconnect_start" << dendl;
1358
1359   if (last_state == MDSMap::STATE_REPLAY) {
1360     reopen_log();
1361   }
1362
1363   // Drop any blacklisted clients from the SessionMap before going
1364   // into reconnect, so that we don't wait for them.
1365   objecter->enable_blacklist_events();
1366   std::set<entity_addr_t> blacklist;
1367   epoch_t epoch = 0;
1368   objecter->with_osdmap([this, &blacklist, &epoch](const OSDMap& o) {
1369       o.get_blacklist(&blacklist);
1370       epoch = o.get_epoch();
1371   });
1372   auto killed = server->apply_blacklist(blacklist);
1373   dout(4) << "reconnect_start: killed " << killed << " blacklisted sessions ("
1374           << blacklist.size() << " blacklist entries, "
1375           << sessionmap.get_sessions().size() << ")" << dendl;
1376   if (killed) {
1377     set_osd_epoch_barrier(epoch);
1378   }
1379
1380   server->reconnect_clients(new C_MDS_VoidFn(this, &MDSRank::reconnect_done));
1381   finish_contexts(g_ceph_context, waiting_for_reconnect);
1382 }
1383 void MDSRank::reconnect_done()
1384 {
1385   dout(1) << "reconnect_done" << dendl;
1386   request_state(MDSMap::STATE_REJOIN);    // move to rejoin state
1387 }
1388
1389 void MDSRank::rejoin_joint_start()
1390 {
1391   dout(1) << "rejoin_joint_start" << dendl;
1392   mdcache->rejoin_send_rejoins();
1393 }
1394 void MDSRank::rejoin_start()
1395 {
1396   dout(1) << "rejoin_start" << dendl;
1397   mdcache->rejoin_start(new C_MDS_VoidFn(this, &MDSRank::rejoin_done));
1398 }
1399 void MDSRank::rejoin_done()
1400 {
1401   dout(1) << "rejoin_done" << dendl;
1402   mdcache->show_subtrees();
1403   mdcache->show_cache();
1404
1405   // funny case: is our cache empty?  no subtrees?
1406   if (!mdcache->is_subtrees()) {
1407     if (whoami == 0) {
1408       // The root should always have a subtree!
1409       clog->error() << "No subtrees found for root MDS rank!";
1410       damaged();
1411       assert(mdcache->is_subtrees());
1412     } else {
1413       dout(1) << " empty cache, no subtrees, leaving cluster" << dendl;
1414       request_state(MDSMap::STATE_STOPPED);
1415     }
1416     return;
1417   }
1418
1419   if (replay_queue.empty())
1420     request_state(MDSMap::STATE_ACTIVE);
1421   else
1422     request_state(MDSMap::STATE_CLIENTREPLAY);
1423 }
1424
1425 void MDSRank::clientreplay_start()
1426 {
1427   dout(1) << "clientreplay_start" << dendl;
1428   finish_contexts(g_ceph_context, waiting_for_replay);  // kick waiters
1429   mdcache->start_files_to_recover();
1430   queue_one_replay();
1431 }
1432
1433 bool MDSRank::queue_one_replay()
1434 {
1435   if (replay_queue.empty()) {
1436     mdlog->wait_for_safe(new C_MDS_VoidFn(this, &MDSRank::clientreplay_done));
1437     return false;
1438   }
1439   queue_waiter(replay_queue.front());
1440   replay_queue.pop_front();
1441   return true;
1442 }
1443
1444 void MDSRank::clientreplay_done()
1445 {
1446   dout(1) << "clientreplay_done" << dendl;
1447   request_state(MDSMap::STATE_ACTIVE);
1448 }
1449
1450 void MDSRank::active_start()
1451 {
1452   dout(1) << "active_start" << dendl;
1453
1454   if (last_state == MDSMap::STATE_CREATING) {
1455     mdcache->open_root();
1456   }
1457
1458   mdcache->clean_open_file_lists();
1459   mdcache->export_remaining_imported_caps();
1460   finish_contexts(g_ceph_context, waiting_for_replay);  // kick waiters
1461   mdcache->start_files_to_recover();
1462
1463   mdcache->reissue_all_caps();
1464   mdcache->activate_stray_manager();
1465
1466   finish_contexts(g_ceph_context, waiting_for_active);  // kick waiters
1467 }
1468
1469 void MDSRank::recovery_done(int oldstate)
1470 {
1471   dout(1) << "recovery_done -- successful recovery!" << dendl;
1472   assert(is_clientreplay() || is_active());
1473
1474   // kick snaptable (resent AGREEs)
1475   if (mdsmap->get_tableserver() == whoami) {
1476     set<mds_rank_t> active;
1477     mdsmap->get_clientreplay_or_active_or_stopping_mds_set(active);
1478     snapserver->finish_recovery(active);
1479   }
1480
1481   if (oldstate == MDSMap::STATE_CREATING)
1482     return;
1483
1484   mdcache->start_recovered_truncates();
1485   mdcache->do_file_recover();
1486
1487   // tell connected clients
1488   //bcast_mds_map();     // not anymore, they get this from the monitor
1489
1490   mdcache->populate_mydir();
1491 }
1492
1493 void MDSRank::creating_done()
1494 {
1495   dout(1)<< "creating_done" << dendl;
1496   request_state(MDSMap::STATE_ACTIVE);
1497 }
1498
1499 void MDSRank::boot_create()
1500 {
1501   dout(3) << "boot_create" << dendl;
1502
1503   MDSGatherBuilder fin(g_ceph_context, new C_MDS_VoidFn(this, &MDSRank::creating_done));
1504
1505   mdcache->init_layouts();
1506
1507   snapserver->set_rank(whoami);
1508   inotable->set_rank(whoami);
1509   sessionmap.set_rank(whoami);
1510
1511   // start with a fresh journal
1512   dout(10) << "boot_create creating fresh journal" << dendl;
1513   mdlog->create(fin.new_sub());
1514
1515   // open new journal segment, but do not journal subtree map (yet)
1516   mdlog->prepare_new_segment();
1517
1518   if (whoami == mdsmap->get_root()) {
1519     dout(3) << "boot_create creating fresh hierarchy" << dendl;
1520     mdcache->create_empty_hierarchy(fin.get());
1521   }
1522
1523   dout(3) << "boot_create creating mydir hierarchy" << dendl;
1524   mdcache->create_mydir_hierarchy(fin.get());
1525
1526   // fixme: fake out inotable (reset, pretend loaded)
1527   dout(10) << "boot_create creating fresh inotable table" << dendl;
1528   inotable->reset();
1529   inotable->save(fin.new_sub());
1530
1531   // write empty sessionmap
1532   sessionmap.save(fin.new_sub());
1533
1534   // Create empty purge queue
1535   purge_queue.create(new C_IO_Wrapper(this, fin.new_sub()));
1536
1537   // initialize tables
1538   if (mdsmap->get_tableserver() == whoami) {
1539     dout(10) << "boot_create creating fresh snaptable" << dendl;
1540     snapserver->reset();
1541     snapserver->save(fin.new_sub());
1542   }
1543
1544   assert(g_conf->mds_kill_create_at != 1);
1545
1546   // ok now journal it
1547   mdlog->journal_segment_subtree_map(fin.new_sub());
1548   mdlog->flush();
1549
1550   // Usually we do this during reconnect, but creation skips that.
1551   objecter->enable_blacklist_events();
1552
1553   fin.activate();
1554 }
1555
1556 void MDSRank::stopping_start()
1557 {
1558   dout(2) << "stopping_start" << dendl;
1559
1560   if (mdsmap->get_num_in_mds() == 1 && !sessionmap.empty()) {
1561     // we're the only mds up!
1562     dout(0) << "we are the last MDS, and have mounted clients: we cannot flush our journal.  suicide!" << dendl;
1563     suicide();
1564   }
1565
1566   mdcache->shutdown_start();
1567 }
1568
1569 void MDSRank::stopping_done()
1570 {
1571   dout(2) << "stopping_done" << dendl;
1572
1573   // tell monitor we shut down cleanly.
1574   request_state(MDSMap::STATE_STOPPED);
1575 }
1576
1577 void MDSRankDispatcher::handle_mds_map(
1578     MMDSMap *m,
1579     MDSMap *oldmap)
1580 {
1581   // I am only to be passed MDSMaps in which I hold a rank
1582   assert(whoami != MDS_RANK_NONE);
1583
1584   MDSMap::DaemonState oldstate = state;
1585   mds_gid_t mds_gid = mds_gid_t(monc->get_global_id());
1586   state = mdsmap->get_state_gid(mds_gid);
1587   if (state != oldstate) {
1588     last_state = oldstate;
1589     incarnation = mdsmap->get_inc_gid(mds_gid);
1590   }
1591
1592   version_t epoch = m->get_epoch();
1593
1594   // note source's map version
1595   if (m->get_source().is_mds() &&
1596       peer_mdsmap_epoch[mds_rank_t(m->get_source().num())] < epoch) {
1597     dout(15) << " peer " << m->get_source()
1598              << " has mdsmap epoch >= " << epoch
1599              << dendl;
1600     peer_mdsmap_epoch[mds_rank_t(m->get_source().num())] = epoch;
1601   }
1602
1603   // Validate state transitions while I hold a rank
1604   if (!MDSMap::state_transition_valid(oldstate, state)) {
1605     derr << "Invalid state transition " << ceph_mds_state_name(oldstate)
1606       << "->" << ceph_mds_state_name(state) << dendl;
1607     respawn();
1608   }
1609
1610   if (oldstate != state) {
1611     // update messenger.
1612     if (state == MDSMap::STATE_STANDBY_REPLAY) {
1613       dout(1) << "handle_mds_map i am now mds." << mds_gid << "." << incarnation
1614               << " replaying mds." << whoami << "." << incarnation << dendl;
1615       messenger->set_myname(entity_name_t::MDS(mds_gid));
1616     } else {
1617       dout(1) << "handle_mds_map i am now mds." << whoami << "." << incarnation << dendl;
1618       messenger->set_myname(entity_name_t::MDS(whoami));
1619     }
1620   }
1621
1622   // tell objecter my incarnation
1623   if (objecter->get_client_incarnation() != incarnation)
1624     objecter->set_client_incarnation(incarnation);
1625
1626   // for debug
1627   if (g_conf->mds_dump_cache_on_map)
1628     mdcache->dump_cache();
1629
1630   // did it change?
1631   if (oldstate != state) {
1632     dout(1) << "handle_mds_map state change "
1633             << ceph_mds_state_name(oldstate) << " --> "
1634             << ceph_mds_state_name(state) << dendl;
1635     beacon.set_want_state(mdsmap, state);
1636
1637     if (oldstate == MDSMap::STATE_STANDBY_REPLAY) {
1638         dout(10) << "Monitor activated us! Deactivating replay loop" << dendl;
1639         assert (state == MDSMap::STATE_REPLAY);
1640     } else {
1641       // did i just recover?
1642       if ((is_active() || is_clientreplay()) &&
1643           (oldstate == MDSMap::STATE_CREATING ||
1644            oldstate == MDSMap::STATE_REJOIN ||
1645            oldstate == MDSMap::STATE_RECONNECT))
1646         recovery_done(oldstate);
1647
1648       if (is_active()) {
1649         active_start();
1650       } else if (is_any_replay()) {
1651         replay_start();
1652       } else if (is_resolve()) {
1653         resolve_start();
1654       } else if (is_reconnect()) {
1655         reconnect_start();
1656       } else if (is_rejoin()) {
1657         rejoin_start();
1658       } else if (is_clientreplay()) {
1659         clientreplay_start();
1660       } else if (is_creating()) {
1661         boot_create();
1662       } else if (is_starting()) {
1663         boot_start();
1664       } else if (is_stopping()) {
1665         assert(oldstate == MDSMap::STATE_ACTIVE);
1666         stopping_start();
1667       }
1668     }
1669   }
1670
1671   // RESOLVE
1672   // is someone else newly resolving?
1673   if (is_resolve() || is_reconnect() || is_rejoin() ||
1674       is_clientreplay() || is_active() || is_stopping()) {
1675     if (!oldmap->is_resolving() && mdsmap->is_resolving()) {
1676       set<mds_rank_t> resolve;
1677       mdsmap->get_mds_set(resolve, MDSMap::STATE_RESOLVE);
1678       dout(10) << " resolve set is " << resolve << dendl;
1679       calc_recovery_set();
1680       mdcache->send_resolves();
1681     }
1682   }
1683
1684   // REJOIN
1685   // is everybody finally rejoining?
1686   if (is_starting() || is_rejoin() || is_clientreplay() || is_active() || is_stopping()) {
1687     // did we start?
1688     if (!oldmap->is_rejoining() && mdsmap->is_rejoining())
1689       rejoin_joint_start();
1690
1691     // did we finish?
1692     if (g_conf->mds_dump_cache_after_rejoin &&
1693         oldmap->is_rejoining() && !mdsmap->is_rejoining())
1694       mdcache->dump_cache();      // for DEBUG only
1695
1696     if (oldstate >= MDSMap::STATE_REJOIN ||
1697         oldstate == MDSMap::STATE_STARTING) {
1698       // ACTIVE|CLIENTREPLAY|REJOIN => we can discover from them.
1699       set<mds_rank_t> olddis, dis;
1700       oldmap->get_mds_set(olddis, MDSMap::STATE_ACTIVE);
1701       oldmap->get_mds_set(olddis, MDSMap::STATE_CLIENTREPLAY);
1702       oldmap->get_mds_set(olddis, MDSMap::STATE_REJOIN);
1703       mdsmap->get_mds_set(dis, MDSMap::STATE_ACTIVE);
1704       mdsmap->get_mds_set(dis, MDSMap::STATE_CLIENTREPLAY);
1705       mdsmap->get_mds_set(dis, MDSMap::STATE_REJOIN);
1706       for (set<mds_rank_t>::iterator p = dis.begin(); p != dis.end(); ++p)
1707         if (*p != whoami &&            // not me
1708             olddis.count(*p) == 0) {  // newly so?
1709           mdcache->kick_discovers(*p);
1710           mdcache->kick_open_ino_peers(*p);
1711         }
1712     }
1713   }
1714
1715   cluster_degraded = mdsmap->is_degraded();
1716   if (oldmap->is_degraded() && !cluster_degraded && state >= MDSMap::STATE_ACTIVE) {
1717     dout(1) << "cluster recovered." << dendl;
1718     auto it = waiting_for_active_peer.find(MDS_RANK_NONE);
1719     if (it != waiting_for_active_peer.end()) {
1720       queue_waiters(it->second);
1721       waiting_for_active_peer.erase(it);
1722     }
1723   }
1724
1725   // did someone go active?
1726   if (oldstate >= MDSMap::STATE_CLIENTREPLAY &&
1727       (is_clientreplay() || is_active() || is_stopping())) {
1728     set<mds_rank_t> oldactive, active;
1729     oldmap->get_mds_set(oldactive, MDSMap::STATE_ACTIVE);
1730     oldmap->get_mds_set(oldactive, MDSMap::STATE_CLIENTREPLAY);
1731     mdsmap->get_mds_set(active, MDSMap::STATE_ACTIVE);
1732     mdsmap->get_mds_set(active, MDSMap::STATE_CLIENTREPLAY);
1733     for (set<mds_rank_t>::iterator p = active.begin(); p != active.end(); ++p)
1734       if (*p != whoami &&            // not me
1735           oldactive.count(*p) == 0)  // newly so?
1736         handle_mds_recovery(*p);
1737   }
1738
1739   // did someone fail?
1740   //   new down?
1741   {
1742     set<mds_rank_t> olddown, down;
1743     oldmap->get_down_mds_set(&olddown);
1744     mdsmap->get_down_mds_set(&down);
1745     for (set<mds_rank_t>::iterator p = down.begin(); p != down.end(); ++p) {
1746       if (oldmap->have_inst(*p) && olddown.count(*p) == 0) {
1747         messenger->mark_down(oldmap->get_inst(*p).addr);
1748         handle_mds_failure(*p);
1749       }
1750     }
1751   }
1752
1753   // did someone fail?
1754   //   did their addr/inst change?
1755   {
1756     set<mds_rank_t> up;
1757     mdsmap->get_up_mds_set(up);
1758     for (set<mds_rank_t>::iterator p = up.begin(); p != up.end(); ++p) {
1759       if (oldmap->have_inst(*p) &&
1760          oldmap->get_inst(*p) != mdsmap->get_inst(*p)) {
1761         messenger->mark_down(oldmap->get_inst(*p).addr);
1762         handle_mds_failure(*p);
1763       }
1764     }
1765   }
1766
1767   if (is_clientreplay() || is_active() || is_stopping()) {
1768     // did anyone stop?
1769     set<mds_rank_t> oldstopped, stopped;
1770     oldmap->get_stopped_mds_set(oldstopped);
1771     mdsmap->get_stopped_mds_set(stopped);
1772     for (set<mds_rank_t>::iterator p = stopped.begin(); p != stopped.end(); ++p)
1773       if (oldstopped.count(*p) == 0)      // newly so?
1774         mdcache->migrator->handle_mds_failure_or_stop(*p);
1775   }
1776
1777   {
1778     map<epoch_t,list<MDSInternalContextBase*> >::iterator p = waiting_for_mdsmap.begin();
1779     while (p != waiting_for_mdsmap.end() && p->first <= mdsmap->get_epoch()) {
1780       list<MDSInternalContextBase*> ls;
1781       ls.swap(p->second);
1782       waiting_for_mdsmap.erase(p++);
1783       finish_contexts(g_ceph_context, ls);
1784     }
1785   }
1786
1787   if (is_active()) {
1788     // Before going active, set OSD epoch barrier to latest (so that
1789     // we don't risk handing out caps to clients with old OSD maps that
1790     // might not include barriers from the previous incarnation of this MDS)
1791     set_osd_epoch_barrier(objecter->with_osdmap(
1792                             std::mem_fn(&OSDMap::get_epoch)));
1793   }
1794
1795   if (is_active()) {
1796     bool found = false;
1797     MDSMap::mds_info_t info = mdsmap->get_info(whoami);
1798
1799     for (map<mds_gid_t,MDSMap::mds_info_t>::const_iterator p = mdsmap->get_mds_info().begin();
1800        p != mdsmap->get_mds_info().end();
1801        ++p) {
1802       if (p->second.state == MDSMap::STATE_STANDBY_REPLAY &&
1803           (p->second.standby_for_rank == whoami ||(info.name.length() && p->second.standby_for_name == info.name))) {
1804         found = true;
1805         break;
1806       }
1807       if (found)
1808         mdlog->set_write_iohint(0);
1809       else
1810         mdlog->set_write_iohint(CEPH_OSD_OP_FLAG_FADVISE_DONTNEED);
1811     }
1812   }
1813
1814   if (oldmap->get_max_mds() != mdsmap->get_max_mds()) {
1815     purge_queue.update_op_limit(*mdsmap);
1816   }
1817 }
1818
1819 void MDSRank::handle_mds_recovery(mds_rank_t who)
1820 {
1821   dout(5) << "handle_mds_recovery mds." << who << dendl;
1822
1823   mdcache->handle_mds_recovery(who);
1824
1825   if (mdsmap->get_tableserver() == whoami) {
1826     snapserver->handle_mds_recovery(who);
1827   }
1828
1829   queue_waiters(waiting_for_active_peer[who]);
1830   waiting_for_active_peer.erase(who);
1831 }
1832
1833 void MDSRank::handle_mds_failure(mds_rank_t who)
1834 {
1835   if (who == whoami) {
1836     dout(5) << "handle_mds_failure for myself; not doing anything" << dendl;
1837     return;
1838   }
1839   dout(5) << "handle_mds_failure mds." << who << dendl;
1840
1841   mdcache->handle_mds_failure(who);
1842
1843   snapclient->handle_mds_failure(who);
1844 }
1845
1846 bool MDSRankDispatcher::handle_asok_command(
1847     std::string command, cmdmap_t& cmdmap, Formatter *f,
1848                     std::ostream& ss)
1849 {
1850   if (command == "dump_ops_in_flight" ||
1851              command == "ops") {
1852     if (!op_tracker.dump_ops_in_flight(f)) {
1853       ss << "op_tracker tracking is not enabled now, so no ops are tracked currently, even those get stuck. \
1854           please enable \"osd_enable_op_tracker\", and the tracker will start to track new ops received afterwards.";
1855     }
1856   } else if (command == "dump_blocked_ops") {
1857     if (!op_tracker.dump_ops_in_flight(f, true)) {
1858       ss << "op_tracker tracking is not enabled now, so no ops are tracked currently, even those get stuck. \
1859         Please enable \"osd_enable_op_tracker\", and the tracker will start to track new ops received afterwards.";
1860     }
1861   } else if (command == "dump_historic_ops") {
1862     if (!op_tracker.dump_historic_ops(f)) {
1863       ss << "op_tracker tracking is not enabled now, so no ops are tracked currently, even those get stuck. \
1864           please enable \"osd_enable_op_tracker\", and the tracker will start to track new ops received afterwards.";
1865     }
1866   } else if (command == "dump_historic_ops_by_duration") {
1867     if (!op_tracker.dump_historic_ops(f, true)) {
1868       ss << "op_tracker tracking is not enabled now, so no ops are tracked currently, even those get stuck. \
1869           please enable \"osd_enable_op_tracker\", and the tracker will start to track new ops received afterwards.";
1870     }
1871   } else if (command == "osdmap barrier") {
1872     int64_t target_epoch = 0;
1873     bool got_val = cmd_getval(g_ceph_context, cmdmap, "target_epoch", target_epoch);
1874
1875     if (!got_val) {
1876       ss << "no target epoch given";
1877       return true;
1878     }
1879
1880     mds_lock.Lock();
1881     set_osd_epoch_barrier(target_epoch);
1882     mds_lock.Unlock();
1883
1884     C_SaferCond cond;
1885     bool already_got = objecter->wait_for_map(target_epoch, &cond);
1886     if (!already_got) {
1887       dout(4) << __func__ << ": waiting for OSD epoch " << target_epoch << dendl;
1888       cond.wait();
1889     }
1890   } else if (command == "session ls") {
1891     Mutex::Locker l(mds_lock);
1892
1893     heartbeat_reset();
1894
1895     dump_sessions(SessionFilter(), f);
1896   } else if (command == "session evict") {
1897     std::string client_id;
1898     const bool got_arg = cmd_getval(g_ceph_context, cmdmap, "client_id", client_id);
1899     if(!got_arg) {
1900       ss << "Invalid client_id specified";
1901       return true;
1902     }
1903
1904     mds_lock.Lock();
1905     std::stringstream dss;
1906     bool evicted = evict_client(strtol(client_id.c_str(), 0, 10), true,
1907         g_conf->mds_session_blacklist_on_evict, dss);
1908     if (!evicted) {
1909       dout(15) << dss.str() << dendl;
1910       ss << dss.str();
1911     }
1912     mds_lock.Unlock();
1913   } else if (command == "scrub_path") {
1914     string path;
1915     vector<string> scrubop_vec;
1916     cmd_getval(g_ceph_context, cmdmap, "scrubops", scrubop_vec);
1917     cmd_getval(g_ceph_context, cmdmap, "path", path);
1918     command_scrub_path(f, path, scrubop_vec);
1919   } else if (command == "tag path") {
1920     string path;
1921     cmd_getval(g_ceph_context, cmdmap, "path", path);
1922     string tag;
1923     cmd_getval(g_ceph_context, cmdmap, "tag", tag);
1924     command_tag_path(f, path, tag);
1925   } else if (command == "flush_path") {
1926     string path;
1927     cmd_getval(g_ceph_context, cmdmap, "path", path);
1928     command_flush_path(f, path);
1929   } else if (command == "flush journal") {
1930     command_flush_journal(f);
1931   } else if (command == "get subtrees") {
1932     command_get_subtrees(f);
1933   } else if (command == "export dir") {
1934     string path;
1935     if(!cmd_getval(g_ceph_context, cmdmap, "path", path)) {
1936       ss << "malformed path";
1937       return true;
1938     }
1939     int64_t rank;
1940     if(!cmd_getval(g_ceph_context, cmdmap, "rank", rank)) {
1941       ss << "malformed rank";
1942       return true;
1943     }
1944     command_export_dir(f, path, (mds_rank_t)rank);
1945   } else if (command == "dump cache") {
1946     Mutex::Locker l(mds_lock);
1947     string path;
1948     int r;
1949     if(!cmd_getval(g_ceph_context, cmdmap, "path", path)) {
1950       r = mdcache->dump_cache(f);
1951     } else {
1952       r = mdcache->dump_cache(path);
1953     }
1954
1955     if (r != 0) {
1956       ss << "Failed to dump cache: " << cpp_strerror(r);
1957       f->reset();
1958     }
1959   } else if (command == "cache status") {
1960     Mutex::Locker l(mds_lock);
1961     int r = mdcache->cache_status(f);
1962     if (r != 0) {
1963       ss << "Failed to get cache status: " << cpp_strerror(r);
1964     }
1965   } else if (command == "dump tree") {
1966     string root;
1967     int64_t depth;
1968     cmd_getval(g_ceph_context, cmdmap, "root", root);
1969     if (!cmd_getval(g_ceph_context, cmdmap, "depth", depth))
1970       depth = -1;
1971     {
1972       Mutex::Locker l(mds_lock);
1973       int r = mdcache->dump_cache(root, depth, f);
1974       if (r != 0) {
1975         ss << "Failed to dump tree: " << cpp_strerror(r);
1976         f->reset();
1977       }
1978     }
1979   } else if (command == "force_readonly") {
1980     Mutex::Locker l(mds_lock);
1981     mdcache->force_readonly();
1982   } else if (command == "dirfrag split") {
1983     command_dirfrag_split(cmdmap, ss);
1984   } else if (command == "dirfrag merge") {
1985     command_dirfrag_merge(cmdmap, ss);
1986   } else if (command == "dirfrag ls") {
1987     command_dirfrag_ls(cmdmap, ss, f);
1988   } else {
1989     return false;
1990   }
1991
1992   return true;
1993 }
1994
1995 class C_MDS_Send_Command_Reply : public MDSInternalContext
1996 {
1997 protected:
1998   MCommand *m;
1999 public:
2000   C_MDS_Send_Command_Reply(MDSRank *_mds, MCommand *_m) :
2001     MDSInternalContext(_mds), m(_m) { m->get(); }
2002   void send (int r, const std::string& out_str) {
2003     bufferlist bl;
2004     MDSDaemon::send_command_reply(m, mds, r, bl, out_str);
2005     m->put();
2006   }
2007   void finish (int r) override {
2008     send(r, "");
2009   }
2010 };
2011
2012 /**
2013  * This function drops the mds_lock, so don't do anything with
2014  * MDSRank after calling it (we could have gone into shutdown): just
2015  * send your result back to the calling client and finish.
2016  */
2017 void MDSRankDispatcher::evict_clients(const SessionFilter &filter, MCommand *m)
2018 {
2019   C_MDS_Send_Command_Reply *reply = new C_MDS_Send_Command_Reply(this, m);
2020
2021   if (is_any_replay()) {
2022     reply->send(-EAGAIN, "MDS is replaying log");
2023     delete reply;
2024     return;
2025   }
2026
2027   std::list<Session*> victims;
2028   const auto sessions = sessionmap.get_sessions();
2029   for (const auto p : sessions)  {
2030     if (!p.first.is_client()) {
2031       continue;
2032     }
2033
2034     Session *s = p.second;
2035
2036     if (filter.match(*s, std::bind(&Server::waiting_for_reconnect, server, std::placeholders::_1))) {
2037       victims.push_back(s);
2038     }
2039   }
2040
2041   dout(20) << __func__ << " matched " << victims.size() << " sessions" << dendl;
2042
2043   if (victims.empty()) {
2044     reply->send(0, "");
2045     delete reply;
2046     return;
2047   }
2048
2049   C_GatherBuilder gather(g_ceph_context, reply);
2050   for (const auto s : victims) {
2051     std::stringstream ss;
2052     evict_client(s->info.inst.name.num(), false,
2053                  g_conf->mds_session_blacklist_on_evict, ss, gather.new_sub());
2054   }
2055   gather.activate();
2056 }
2057
2058 void MDSRankDispatcher::dump_sessions(const SessionFilter &filter, Formatter *f) const
2059 {
2060   // Dump sessions, decorated with recovery/replay status
2061   f->open_array_section("sessions");
2062   const ceph::unordered_map<entity_name_t, Session*> session_map = sessionmap.get_sessions();
2063   for (ceph::unordered_map<entity_name_t,Session*>::const_iterator p = session_map.begin();
2064        p != session_map.end();
2065        ++p)  {
2066     if (!p->first.is_client()) {
2067       continue;
2068     }
2069
2070     Session *s = p->second;
2071
2072     if (!filter.match(*s, std::bind(&Server::waiting_for_reconnect, server, std::placeholders::_1))) {
2073       continue;
2074     }
2075
2076     f->open_object_section("session");
2077     f->dump_int("id", p->first.num());
2078
2079     f->dump_int("num_leases", s->leases.size());
2080     f->dump_int("num_caps", s->caps.size());
2081
2082     f->dump_string("state", s->get_state_name());
2083     f->dump_int("replay_requests", is_clientreplay() ? s->get_request_count() : 0);
2084     f->dump_unsigned("completed_requests", s->get_num_completed_requests());
2085     f->dump_bool("reconnecting", server->waiting_for_reconnect(p->first.num()));
2086     f->dump_stream("inst") << s->info.inst;
2087     f->open_object_section("client_metadata");
2088     for (map<string, string>::const_iterator i = s->info.client_metadata.begin();
2089          i != s->info.client_metadata.end(); ++i) {
2090       f->dump_string(i->first.c_str(), i->second);
2091     }
2092     f->close_section(); // client_metadata
2093     f->close_section(); //session
2094   }
2095   f->close_section(); //sessions
2096 }
2097
2098 void MDSRank::command_scrub_path(Formatter *f, const string& path, vector<string>& scrubop_vec)
2099 {
2100   bool force = false;
2101   bool recursive = false;
2102   bool repair = false;
2103   for (vector<string>::iterator i = scrubop_vec.begin() ; i != scrubop_vec.end(); ++i) {
2104     if (*i == "force")
2105       force = true;
2106     else if (*i == "recursive")
2107       recursive = true;
2108     else if (*i == "repair")
2109       repair = true;
2110   }
2111   C_SaferCond scond;
2112   {
2113     Mutex::Locker l(mds_lock);
2114     mdcache->enqueue_scrub(path, "", force, recursive, repair, f, &scond);
2115   }
2116   scond.wait();
2117   // scrub_dentry() finishers will dump the data for us; we're done!
2118 }
2119
2120 void MDSRank::command_tag_path(Formatter *f,
2121     const string& path, const std::string &tag)
2122 {
2123   C_SaferCond scond;
2124   {
2125     Mutex::Locker l(mds_lock);
2126     mdcache->enqueue_scrub(path, tag, true, true, false, f, &scond);
2127   }
2128   scond.wait();
2129 }
2130
2131 void MDSRank::command_flush_path(Formatter *f, const string& path)
2132 {
2133   C_SaferCond scond;
2134   {
2135     Mutex::Locker l(mds_lock);
2136     mdcache->flush_dentry(path, &scond);
2137   }
2138   int r = scond.wait();
2139   f->open_object_section("results");
2140   f->dump_int("return_code", r);
2141   f->close_section(); // results
2142 }
2143
2144 /**
2145  * Wrapper around _command_flush_journal that
2146  * handles serialization of result
2147  */
2148 void MDSRank::command_flush_journal(Formatter *f)
2149 {
2150   assert(f != NULL);
2151
2152   std::stringstream ss;
2153   const int r = _command_flush_journal(&ss);
2154   f->open_object_section("result");
2155   f->dump_string("message", ss.str());
2156   f->dump_int("return_code", r);
2157   f->close_section();
2158 }
2159
2160 /**
2161  * Implementation of "flush journal" asok command.
2162  *
2163  * @param ss
2164  * Optionally populate with a human readable string describing the
2165  * reason for any unexpected return status.
2166  */
2167 int MDSRank::_command_flush_journal(std::stringstream *ss)
2168 {
2169   assert(ss != NULL);
2170
2171   Mutex::Locker l(mds_lock);
2172
2173   if (mdcache->is_readonly()) {
2174     dout(5) << __func__ << ": read-only FS" << dendl;
2175     return -EROFS;
2176   }
2177
2178   if (!is_active()) {
2179     dout(5) << __func__ << ": MDS not active, no-op" << dendl;
2180     return 0;
2181   }
2182
2183   // I need to seal off the current segment, and then mark all previous segments
2184   // for expiry
2185   mdlog->start_new_segment();
2186   int r = 0;
2187
2188   // Flush initially so that all the segments older than our new one
2189   // will be elegible for expiry
2190   {
2191     C_SaferCond mdlog_flushed;
2192     mdlog->flush();
2193     mdlog->wait_for_safe(new MDSInternalContextWrapper(this, &mdlog_flushed));
2194     mds_lock.Unlock();
2195     r = mdlog_flushed.wait();
2196     mds_lock.Lock();
2197     if (r != 0) {
2198       *ss << "Error " << r << " (" << cpp_strerror(r) << ") while flushing journal";
2199       return r;
2200     }
2201   }
2202
2203   // Because we may not be the last wait_for_safe context on MDLog, and
2204   // subsequent contexts might wake up in the middle of our later trim_all
2205   // and interfere with expiry (by e.g. marking dirs/dentries dirty
2206   // on previous log segments), we run a second wait_for_safe here.
2207   // See #10368
2208   {
2209     C_SaferCond mdlog_cleared;
2210     mdlog->wait_for_safe(new MDSInternalContextWrapper(this, &mdlog_cleared));
2211     mds_lock.Unlock();
2212     r = mdlog_cleared.wait();
2213     mds_lock.Lock();
2214     if (r != 0) {
2215       *ss << "Error " << r << " (" << cpp_strerror(r) << ") while flushing journal";
2216       return r;
2217     }
2218   }
2219
2220   // Put all the old log segments into expiring or expired state
2221   dout(5) << __func__ << ": beginning segment expiry" << dendl;
2222   r = mdlog->trim_all();
2223   if (r != 0) {
2224     *ss << "Error " << r << " (" << cpp_strerror(r) << ") while trimming log";
2225     return r;
2226   }
2227
2228   // Attach contexts to wait for all expiring segments to expire
2229   MDSGatherBuilder expiry_gather(g_ceph_context);
2230
2231   const std::set<LogSegment*> &expiring_segments = mdlog->get_expiring_segments();
2232   for (std::set<LogSegment*>::const_iterator i = expiring_segments.begin();
2233        i != expiring_segments.end(); ++i) {
2234     (*i)->wait_for_expiry(expiry_gather.new_sub());
2235   }
2236   dout(5) << __func__ << ": waiting for " << expiry_gather.num_subs_created()
2237           << " segments to expire" << dendl;
2238
2239   if (expiry_gather.has_subs()) {
2240     C_SaferCond cond;
2241     expiry_gather.set_finisher(new MDSInternalContextWrapper(this, &cond));
2242     expiry_gather.activate();
2243
2244     // Drop mds_lock to allow progress until expiry is complete
2245     mds_lock.Unlock();
2246     int r = cond.wait();
2247     mds_lock.Lock();
2248
2249     assert(r == 0);  // MDLog is not allowed to raise errors via wait_for_expiry
2250   }
2251
2252   dout(5) << __func__ << ": expiry complete, expire_pos/trim_pos is now " << std::hex <<
2253     mdlog->get_journaler()->get_expire_pos() << "/" <<
2254     mdlog->get_journaler()->get_trimmed_pos() << dendl;
2255
2256   // Now everyone I'm interested in is expired
2257   mdlog->trim_expired_segments();
2258
2259   dout(5) << __func__ << ": trim complete, expire_pos/trim_pos is now " << std::hex <<
2260     mdlog->get_journaler()->get_expire_pos() << "/" <<
2261     mdlog->get_journaler()->get_trimmed_pos() << dendl;
2262
2263   // Flush the journal header so that readers will start from after the flushed region
2264   C_SaferCond wrote_head;
2265   mdlog->get_journaler()->write_head(&wrote_head);
2266   mds_lock.Unlock();  // Drop lock to allow messenger dispatch progress
2267   r = wrote_head.wait();
2268   mds_lock.Lock();
2269   if (r != 0) {
2270       *ss << "Error " << r << " (" << cpp_strerror(r) << ") while writing header";
2271       return r;
2272   }
2273
2274   dout(5) << __func__ << ": write_head complete, all done!" << dendl;
2275
2276   return 0;
2277 }
2278
2279
2280 void MDSRank::command_get_subtrees(Formatter *f)
2281 {
2282   assert(f != NULL);
2283   Mutex::Locker l(mds_lock);
2284
2285   std::list<CDir*> subtrees;
2286   mdcache->list_subtrees(subtrees);
2287
2288   f->open_array_section("subtrees");
2289   for (std::list<CDir*>::iterator i = subtrees.begin(); i != subtrees.end(); ++i) {
2290     const CDir *dir = *i;
2291
2292     f->open_object_section("subtree");
2293     {
2294       f->dump_bool("is_auth", dir->is_auth());
2295       f->dump_int("auth_first", dir->get_dir_auth().first);
2296       f->dump_int("auth_second", dir->get_dir_auth().second);
2297       f->dump_int("export_pin", dir->inode->get_export_pin());
2298       f->open_object_section("dir");
2299       dir->dump(f);
2300       f->close_section();
2301     }
2302     f->close_section();
2303   }
2304   f->close_section();
2305 }
2306
2307
2308 void MDSRank::command_export_dir(Formatter *f,
2309     const std::string &path,
2310     mds_rank_t target)
2311 {
2312   int r = _command_export_dir(path, target);
2313   f->open_object_section("results");
2314   f->dump_int("return_code", r);
2315   f->close_section(); // results
2316 }
2317
2318 int MDSRank::_command_export_dir(
2319     const std::string &path,
2320     mds_rank_t target)
2321 {
2322   Mutex::Locker l(mds_lock);
2323   filepath fp(path.c_str());
2324
2325   if (target == whoami || !mdsmap->is_up(target) || !mdsmap->is_in(target)) {
2326     derr << "bad MDS target " << target << dendl;
2327     return -ENOENT;
2328   }
2329
2330   CInode *in = mdcache->cache_traverse(fp);
2331   if (!in) {
2332     derr << "Bath path '" << path << "'" << dendl;
2333     return -ENOENT;
2334   }
2335   CDir *dir = in->get_dirfrag(frag_t());
2336   if (!dir || !(dir->is_auth())) {
2337     derr << "bad export_dir path dirfrag frag_t() or dir not auth" << dendl;
2338     return -EINVAL;
2339   }
2340
2341   mdcache->migrator->export_dir(dir, target);
2342   return 0;
2343 }
2344
2345 CDir *MDSRank::_command_dirfrag_get(
2346     const cmdmap_t &cmdmap,
2347     std::ostream &ss)
2348 {
2349   std::string path;
2350   bool got = cmd_getval(g_ceph_context, cmdmap, "path", path);
2351   if (!got) {
2352     ss << "missing path argument";
2353     return NULL;
2354   }
2355
2356   std::string frag_str;
2357   if (!cmd_getval(g_ceph_context, cmdmap, "frag", frag_str)) {
2358     ss << "missing frag argument";
2359     return NULL;
2360   }
2361
2362   CInode *in = mdcache->cache_traverse(filepath(path.c_str()));
2363   if (!in) {
2364     // TODO really we should load something in if it's not in cache,
2365     // but the infrastructure is harder, and we might still be unable
2366     // to act on it if someone else is auth.
2367     ss << "directory '" << path << "' inode not in cache";
2368     return NULL;
2369   }
2370
2371   frag_t fg;
2372
2373   if (!fg.parse(frag_str.c_str())) {
2374     ss << "frag " << frag_str << " failed to parse";
2375     return NULL;
2376   }
2377
2378   CDir *dir = in->get_dirfrag(fg);
2379   if (!dir) {
2380     ss << "frag 0x" << std::hex << in->ino() << "/" << fg << " not in cache ("
2381           "use `dirfrag ls` to see if it should exist)";
2382     return NULL;
2383   }
2384
2385   if (!dir->is_auth()) {
2386     ss << "frag " << dir->dirfrag() << " not auth (auth = "
2387        << dir->authority() << ")";
2388     return NULL;
2389   }
2390
2391   return dir;
2392 }
2393
2394 bool MDSRank::command_dirfrag_split(
2395     cmdmap_t cmdmap,
2396     std::ostream &ss)
2397 {
2398   Mutex::Locker l(mds_lock);
2399   if (!mdsmap->allows_dirfrags()) {
2400     ss << "dirfrags are disallowed by the mds map!";
2401     return false;
2402   }
2403
2404   int64_t by = 0;
2405   if (!cmd_getval(g_ceph_context, cmdmap, "bits", by)) {
2406     ss << "missing bits argument";
2407     return false;
2408   }
2409
2410   if (by <= 0) {
2411     ss << "must split by >0 bits";
2412     return false;
2413   }
2414
2415   CDir *dir = _command_dirfrag_get(cmdmap, ss);
2416   if (!dir) {
2417     return false;
2418   }
2419
2420   mdcache->split_dir(dir, by);
2421
2422   return true;
2423 }
2424
2425 bool MDSRank::command_dirfrag_merge(
2426     cmdmap_t cmdmap,
2427     std::ostream &ss)
2428 {
2429   Mutex::Locker l(mds_lock);
2430   std::string path;
2431   bool got = cmd_getval(g_ceph_context, cmdmap, "path", path);
2432   if (!got) {
2433     ss << "missing path argument";
2434     return false;
2435   }
2436
2437   std::string frag_str;
2438   if (!cmd_getval(g_ceph_context, cmdmap, "frag", frag_str)) {
2439     ss << "missing frag argument";
2440     return false;
2441   }
2442
2443   CInode *in = mdcache->cache_traverse(filepath(path.c_str()));
2444   if (!in) {
2445     ss << "directory '" << path << "' inode not in cache";
2446     return false;
2447   }
2448
2449   frag_t fg;
2450   if (!fg.parse(frag_str.c_str())) {
2451     ss << "frag " << frag_str << " failed to parse";
2452     return false;
2453   }
2454
2455   mdcache->merge_dir(in, fg);
2456
2457   return true;
2458 }
2459
2460 bool MDSRank::command_dirfrag_ls(
2461     cmdmap_t cmdmap,
2462     std::ostream &ss,
2463     Formatter *f)
2464 {
2465   Mutex::Locker l(mds_lock);
2466   std::string path;
2467   bool got = cmd_getval(g_ceph_context, cmdmap, "path", path);
2468   if (!got) {
2469     ss << "missing path argument";
2470     return false;
2471   }
2472
2473   CInode *in = mdcache->cache_traverse(filepath(path.c_str()));
2474   if (!in) {
2475     ss << "directory inode not in cache";
2476     return false;
2477   }
2478
2479   f->open_array_section("frags");
2480   std::list<frag_t> frags;
2481   // NB using get_leaves_under instead of get_dirfrags to give
2482   // you the list of what dirfrags may exist, not which are in cache
2483   in->dirfragtree.get_leaves_under(frag_t(), frags);
2484   for (std::list<frag_t>::iterator i = frags.begin();
2485        i != frags.end(); ++i) {
2486     f->open_object_section("frag");
2487     f->dump_int("value", i->value());
2488     f->dump_int("bits", i->bits());
2489     std::ostringstream frag_str;
2490     frag_str << std::hex << i->value() << "/" << std::dec << i->bits();
2491     f->dump_string("str", frag_str.str());
2492     f->close_section();
2493   }
2494   f->close_section();
2495
2496   return true;
2497 }
2498
2499 void MDSRank::dump_status(Formatter *f) const
2500 {
2501   if (state == MDSMap::STATE_REPLAY ||
2502       state == MDSMap::STATE_STANDBY_REPLAY) {
2503     mdlog->dump_replay_status(f);
2504   } else if (state == MDSMap::STATE_RESOLVE) {
2505     mdcache->dump_resolve_status(f);
2506   } else if (state == MDSMap::STATE_RECONNECT) {
2507     server->dump_reconnect_status(f);
2508   } else if (state == MDSMap::STATE_REJOIN) {
2509     mdcache->dump_rejoin_status(f);
2510   } else if (state == MDSMap::STATE_CLIENTREPLAY) {
2511     dump_clientreplay_status(f);
2512   }
2513 }
2514
2515 void MDSRank::dump_clientreplay_status(Formatter *f) const
2516 {
2517   f->open_object_section("clientreplay_status");
2518   f->dump_unsigned("clientreplay_queue", replay_queue.size());
2519   f->dump_unsigned("active_replay", mdcache->get_num_client_requests());
2520   f->close_section();
2521 }
2522
2523 void MDSRankDispatcher::update_log_config()
2524 {
2525   map<string,string> log_to_monitors;
2526   map<string,string> log_to_syslog;
2527   map<string,string> log_channel;
2528   map<string,string> log_prio;
2529   map<string,string> log_to_graylog;
2530   map<string,string> log_to_graylog_host;
2531   map<string,string> log_to_graylog_port;
2532   uuid_d fsid;
2533   string host;
2534
2535   if (parse_log_client_options(g_ceph_context, log_to_monitors, log_to_syslog,
2536                                log_channel, log_prio, log_to_graylog,
2537                                log_to_graylog_host, log_to_graylog_port,
2538                                fsid, host) == 0)
2539     clog->update_config(log_to_monitors, log_to_syslog,
2540                         log_channel, log_prio, log_to_graylog,
2541                         log_to_graylog_host, log_to_graylog_port,
2542                         fsid, host);
2543   dout(10) << __func__ << " log_to_monitors " << log_to_monitors << dendl;
2544 }
2545
2546 void MDSRank::create_logger()
2547 {
2548   dout(10) << "create_logger" << dendl;
2549   {
2550     PerfCountersBuilder mds_plb(g_ceph_context, "mds", l_mds_first, l_mds_last);
2551
2552     mds_plb.add_u64_counter(
2553       l_mds_request, "request", "Requests", "req",
2554       PerfCountersBuilder::PRIO_CRITICAL);
2555     mds_plb.add_u64_counter(l_mds_reply, "reply", "Replies");
2556     mds_plb.add_time_avg(
2557       l_mds_reply_latency, "reply_latency", "Reply latency", "rlat",
2558       PerfCountersBuilder::PRIO_CRITICAL);
2559     mds_plb.add_u64_counter(
2560       l_mds_forward, "forward", "Forwarding request", "fwd",
2561       PerfCountersBuilder::PRIO_INTERESTING);
2562     mds_plb.add_u64_counter(l_mds_dir_fetch, "dir_fetch", "Directory fetch");
2563     mds_plb.add_u64_counter(l_mds_dir_commit, "dir_commit", "Directory commit");
2564     mds_plb.add_u64_counter(l_mds_dir_split, "dir_split", "Directory split");
2565     mds_plb.add_u64_counter(l_mds_dir_merge, "dir_merge", "Directory merge");
2566
2567     mds_plb.add_u64(l_mds_inode_max, "inode_max", "Max inodes, cache size");
2568     mds_plb.add_u64(l_mds_inodes, "inodes", "Inodes", "inos",
2569                     PerfCountersBuilder::PRIO_CRITICAL);
2570     mds_plb.add_u64(l_mds_inodes_top, "inodes_top", "Inodes on top");
2571     mds_plb.add_u64(l_mds_inodes_bottom, "inodes_bottom", "Inodes on bottom");
2572     mds_plb.add_u64(
2573       l_mds_inodes_pin_tail, "inodes_pin_tail", "Inodes on pin tail");
2574     mds_plb.add_u64(l_mds_inodes_pinned, "inodes_pinned", "Inodes pinned");
2575     mds_plb.add_u64(l_mds_inodes_expired, "inodes_expired", "Inodes expired");
2576     mds_plb.add_u64(
2577       l_mds_inodes_with_caps, "inodes_with_caps", "Inodes with capabilities");
2578     mds_plb.add_u64(l_mds_caps, "caps", "Capabilities", "caps",
2579                     PerfCountersBuilder::PRIO_INTERESTING);
2580     mds_plb.add_u64(l_mds_subtrees, "subtrees", "Subtrees");
2581
2582     mds_plb.add_u64_counter(l_mds_traverse, "traverse", "Traverses");
2583     mds_plb.add_u64_counter(l_mds_traverse_hit, "traverse_hit", "Traverse hits");
2584     mds_plb.add_u64_counter(l_mds_traverse_forward, "traverse_forward",
2585                             "Traverse forwards");
2586     mds_plb.add_u64_counter(l_mds_traverse_discover, "traverse_discover",
2587                             "Traverse directory discovers");
2588     mds_plb.add_u64_counter(l_mds_traverse_dir_fetch, "traverse_dir_fetch",
2589                             "Traverse incomplete directory content fetchings");
2590     mds_plb.add_u64_counter(l_mds_traverse_remote_ino, "traverse_remote_ino",
2591                             "Traverse remote dentries");
2592     mds_plb.add_u64_counter(l_mds_traverse_lock, "traverse_lock",
2593                             "Traverse locks");
2594
2595     mds_plb.add_u64(l_mds_load_cent, "load_cent", "Load per cent");
2596     mds_plb.add_u64(l_mds_dispatch_queue_len, "q", "Dispatch queue length");
2597
2598     mds_plb.add_u64_counter(l_mds_exported, "exported", "Exports");
2599     mds_plb.add_u64_counter(
2600       l_mds_exported_inodes, "exported_inodes", "Exported inodes", "exi",
2601       PerfCountersBuilder::PRIO_INTERESTING);
2602     mds_plb.add_u64_counter(l_mds_imported, "imported", "Imports");
2603     mds_plb.add_u64_counter(
2604       l_mds_imported_inodes, "imported_inodes", "Imported inodes", "imi",
2605       PerfCountersBuilder::PRIO_INTERESTING);
2606     logger = mds_plb.create_perf_counters();
2607     g_ceph_context->get_perfcounters_collection()->add(logger);
2608   }
2609
2610   {
2611     PerfCountersBuilder mdm_plb(g_ceph_context, "mds_mem", l_mdm_first, l_mdm_last);
2612     mdm_plb.add_u64(l_mdm_ino, "ino", "Inodes");
2613     mdm_plb.add_u64_counter(l_mdm_inoa, "ino+", "Inodes opened");
2614     mdm_plb.add_u64_counter(l_mdm_inos, "ino-", "Inodes closed");
2615     mdm_plb.add_u64(l_mdm_dir, "dir", "Directories");
2616     mdm_plb.add_u64_counter(l_mdm_dira, "dir+", "Directories opened");
2617     mdm_plb.add_u64_counter(l_mdm_dirs, "dir-", "Directories closed");
2618     mdm_plb.add_u64(l_mdm_dn, "dn", "Dentries");
2619     mdm_plb.add_u64_counter(l_mdm_dna, "dn+", "Dentries opened");
2620     mdm_plb.add_u64_counter(l_mdm_dns, "dn-", "Dentries closed");
2621     mdm_plb.add_u64(l_mdm_cap, "cap", "Capabilities");
2622     mdm_plb.add_u64_counter(l_mdm_capa, "cap+", "Capabilities added");
2623     mdm_plb.add_u64_counter(l_mdm_caps, "cap-", "Capabilities removed");
2624     mdm_plb.add_u64(l_mdm_rss, "rss", "RSS");
2625     mdm_plb.add_u64(l_mdm_heap, "heap", "Heap size");
2626     mdm_plb.add_u64(l_mdm_buf, "buf", "Buffer size");
2627     mlogger = mdm_plb.create_perf_counters();
2628     g_ceph_context->get_perfcounters_collection()->add(mlogger);
2629   }
2630
2631   mdlog->create_logger();
2632   server->create_logger();
2633   purge_queue.create_logger();
2634   sessionmap.register_perfcounters();
2635   mdcache->register_perfcounters();
2636 }
2637
2638 void MDSRank::check_ops_in_flight()
2639 {
2640   vector<string> warnings;
2641   int slow = 0;
2642   if (op_tracker.check_ops_in_flight(warnings, &slow)) {
2643     for (vector<string>::iterator i = warnings.begin();
2644         i != warnings.end();
2645         ++i) {
2646       clog->warn() << *i;
2647     }
2648   }
2649  
2650   // set mds slow request count 
2651   mds_slow_req_count = slow;
2652   return;
2653 }
2654
2655 void MDSRankDispatcher::handle_osd_map()
2656 {
2657   if (is_active() && snapserver) {
2658     snapserver->check_osd_map(true);
2659   }
2660
2661   server->handle_osd_map();
2662
2663   purge_queue.update_op_limit(*mdsmap);
2664
2665   std::set<entity_addr_t> newly_blacklisted;
2666   objecter->consume_blacklist_events(&newly_blacklisted);
2667   auto epoch = objecter->with_osdmap([](const OSDMap &o){return o.get_epoch();});
2668   dout(4) << "handle_osd_map epoch " << epoch << ", "
2669           << newly_blacklisted.size() << " new blacklist entries" << dendl;
2670   auto victims = server->apply_blacklist(newly_blacklisted);
2671   if (victims) {
2672     set_osd_epoch_barrier(epoch);
2673   }
2674
2675
2676   // By default the objecter only requests OSDMap updates on use,
2677   // we would like to always receive the latest maps in order to
2678   // apply policy based on the FULL flag.
2679   objecter->maybe_request_map();
2680 }
2681
2682 bool MDSRank::evict_client(int64_t session_id,
2683     bool wait, bool blacklist, std::stringstream& err_ss,
2684     Context *on_killed)
2685 {
2686   assert(mds_lock.is_locked_by_me());
2687
2688   // Mutually exclusive args
2689   assert(!(wait && on_killed != nullptr));
2690
2691   if (is_any_replay()) {
2692     err_ss << "MDS is replaying log";
2693     return false;
2694   }
2695
2696   Session *session = sessionmap.get_session(
2697       entity_name_t(CEPH_ENTITY_TYPE_CLIENT, session_id));
2698   if (!session) {
2699     err_ss << "session " << session_id << " not in sessionmap!";
2700     return false;
2701   }
2702
2703   dout(4) << "Preparing blacklist command... (wait=" << wait << ")" << dendl;
2704   stringstream ss;
2705   ss << "{\"prefix\":\"osd blacklist\", \"blacklistop\":\"add\",";
2706   ss << "\"addr\":\"";
2707   ss << session->info.inst.addr;
2708   ss << "\"}";
2709   std::string tmp = ss.str();
2710   std::vector<std::string> cmd = {tmp};
2711
2712   auto kill_mds_session = [this, session_id, on_killed](){
2713     assert(mds_lock.is_locked_by_me());
2714     Session *session = sessionmap.get_session(
2715         entity_name_t(CEPH_ENTITY_TYPE_CLIENT, session_id));
2716     if (session) {
2717       if (on_killed) {
2718         server->kill_session(session, on_killed);
2719       } else {
2720         C_SaferCond on_safe;
2721         server->kill_session(session, &on_safe);
2722
2723         mds_lock.Unlock();
2724         on_safe.wait();
2725         mds_lock.Lock();
2726       }
2727     } else {
2728       dout(1) << "session " << session_id << " was removed while we waited "
2729       "for blacklist" << dendl;
2730
2731       // Even though it wasn't us that removed it, kick our completion
2732       // as the session has been removed.
2733       if (on_killed) {
2734         on_killed->complete(0);
2735       }
2736     }
2737   };
2738
2739   auto background_blacklist = [this, session_id, cmd](std::function<void ()> fn){
2740     assert(mds_lock.is_locked_by_me());
2741
2742     Context *on_blacklist_done = new FunctionContext([this, session_id, fn](int r) {
2743       objecter->wait_for_latest_osdmap(
2744        new C_OnFinisher(
2745          new FunctionContext([this, session_id, fn](int r) {
2746               Mutex::Locker l(mds_lock);
2747               auto epoch = objecter->with_osdmap([](const OSDMap &o){
2748                   return o.get_epoch();
2749               });
2750
2751               set_osd_epoch_barrier(epoch);
2752
2753               fn();
2754             }), finisher)
2755        );
2756     });
2757
2758     dout(4) << "Sending mon blacklist command: " << cmd[0] << dendl;
2759     monc->start_mon_command(cmd, {}, nullptr, nullptr, on_blacklist_done);
2760   };
2761
2762   auto blocking_blacklist = [this, cmd, &err_ss, background_blacklist](){
2763     C_SaferCond inline_ctx;
2764     background_blacklist([&inline_ctx](){inline_ctx.complete(0);});
2765     mds_lock.Unlock();
2766     inline_ctx.wait();
2767     mds_lock.Lock();
2768   };
2769
2770   if (wait) {
2771     if (blacklist) {
2772       blocking_blacklist();
2773     }
2774
2775     // We dropped mds_lock, so check that session still exists
2776     session = sessionmap.get_session(entity_name_t(CEPH_ENTITY_TYPE_CLIENT,
2777           session_id));
2778     if (!session) {
2779       dout(1) << "session " << session_id << " was removed while we waited "
2780                  "for blacklist" << dendl;
2781       return true;
2782     }
2783     kill_mds_session();
2784   } else {
2785     if (blacklist) {
2786       background_blacklist(kill_mds_session);
2787     } else {
2788       kill_mds_session();
2789     }
2790   }
2791
2792   return true;
2793 }
2794
2795 void MDSRank::bcast_mds_map()
2796 {
2797   dout(7) << "bcast_mds_map " << mdsmap->get_epoch() << dendl;
2798
2799   // share the map with mounted clients
2800   set<Session*> clients;
2801   sessionmap.get_client_session_set(clients);
2802   for (set<Session*>::const_iterator p = clients.begin();
2803        p != clients.end();
2804        ++p)
2805     (*p)->connection->send_message(new MMDSMap(monc->get_fsid(), mdsmap));
2806   last_client_mdsmap_bcast = mdsmap->get_epoch();
2807 }
2808
2809 MDSRankDispatcher::MDSRankDispatcher(
2810     mds_rank_t whoami_,
2811     Mutex &mds_lock_,
2812     LogChannelRef &clog_,
2813     SafeTimer &timer_,
2814     Beacon &beacon_,
2815     MDSMap *& mdsmap_,
2816     Messenger *msgr,
2817     MonClient *monc_,
2818     Context *respawn_hook_,
2819     Context *suicide_hook_)
2820   : MDSRank(whoami_, mds_lock_, clog_, timer_, beacon_, mdsmap_,
2821       msgr, monc_, respawn_hook_, suicide_hook_)
2822 {}
2823
2824 bool MDSRankDispatcher::handle_command(
2825   const cmdmap_t &cmdmap,
2826   MCommand *m,
2827   int *r,
2828   std::stringstream *ds,
2829   std::stringstream *ss,
2830   bool *need_reply)
2831 {
2832   assert(r != nullptr);
2833   assert(ds != nullptr);
2834   assert(ss != nullptr);
2835
2836   *need_reply = true;
2837
2838   std::string prefix;
2839   cmd_getval(g_ceph_context, cmdmap, "prefix", prefix);
2840
2841   if (prefix == "session ls" || prefix == "client ls") {
2842     std::vector<std::string> filter_args;
2843     cmd_getval(g_ceph_context, cmdmap, "filters", filter_args);
2844
2845     SessionFilter filter;
2846     *r = filter.parse(filter_args, ss);
2847     if (*r != 0) {
2848       return true;
2849     }
2850
2851     Formatter *f = new JSONFormatter(true);
2852     dump_sessions(filter, f);
2853     f->flush(*ds);
2854     delete f;
2855     return true;
2856   } else if (prefix == "session evict" || prefix == "client evict") {
2857     std::vector<std::string> filter_args;
2858     cmd_getval(g_ceph_context, cmdmap, "filters", filter_args);
2859
2860     SessionFilter filter;
2861     *r = filter.parse(filter_args, ss);
2862     if (*r != 0) {
2863       return true;
2864     }
2865
2866     evict_clients(filter, m);
2867
2868     *need_reply = false;
2869     return true;
2870   } else if (prefix == "damage ls") {
2871     Formatter *f = new JSONFormatter(true);
2872     damage_table.dump(f);
2873     f->flush(*ds);
2874     delete f;
2875     return true;
2876   } else if (prefix == "damage rm") {
2877     damage_entry_id_t id = 0;
2878     bool got = cmd_getval(g_ceph_context, cmdmap, "damage_id", (int64_t&)id);
2879     if (!got) {
2880       *r = -EINVAL;
2881       return true;
2882     }
2883
2884     damage_table.erase(id);
2885     return true;
2886   } else {
2887     return false;
2888   }
2889 }
2890
2891 epoch_t MDSRank::get_osd_epoch() const
2892 {
2893   return objecter->with_osdmap(std::mem_fn(&OSDMap::get_epoch));  
2894 }
2895