1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
4 * Ceph - scalable distributed file system
6 * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
8 * This is free software; you can redistribute it and/or
9 * modify it under the terms of the GNU Lesser General Public
10 * License version 2.1, as published by the Free Software
11 * Foundation. See file COPYING.
15 #include "include/compat.h"
18 #include "MDBalancer.h"
20 #include "mon/MonClient.h"
28 #include "include/Context.h"
29 #include "msg/Messenger.h"
30 #include "messages/MHeartbeat.h"
39 #include "common/config.h"
40 #include "common/errno.h"
42 #define dout_context g_ceph_context
43 #define dout_subsys ceph_subsys_mds
45 #define dout_prefix *_dout << "mds." << mds->get_nodeid() << ".bal "
49 auto subsys = ceph_subsys_mds;\
50 if ((dout_context)->_conf->subsys.should_gather(ceph_subsys_mds_balancer, lvl)) {\
51 subsys = ceph_subsys_mds_balancer;\
53 dout_impl(dout_context, subsys, lvl) dout_prefix
55 #define dendl dendl_impl; } while (0)
58 #define MIN_LOAD 50 // ??
59 #define MIN_REEXPORT 5 // will automatically reexport
60 #define MIN_OFFLOAD 10 // point at which i stop trying, close enough
63 /* This function DOES put the passed message before returning */
64 int MDBalancer::proc_message(Message *m)
66 switch (m->get_type()) {
68 case MSG_MDS_HEARTBEAT:
69 handle_heartbeat(static_cast<MHeartbeat*>(m));
73 dout(0) << " balancer unknown message " << m->get_type() << dendl;
74 assert(0 == "balancer unknown message");
80 void MDBalancer::handle_export_pins(void)
82 auto &q = mds->mdcache->export_pin_queue;
84 dout(20) << "export_pin_queue size=" << q.size() << dendl;
85 while (it != q.end()) {
89 mds_rank_t export_pin = in->get_export_pin(false);
93 in->get_dirfrags(dfls);
94 for (auto dir : dfls) {
98 if (export_pin == MDS_RANK_NONE) {
99 if (dir->state_test(CDir::STATE_AUXSUBTREE)) {
100 if (dir->is_frozen() || dir->is_freezing()) {
105 dout(10) << " clear auxsubtree on " << *dir << dendl;
106 dir->state_clear(CDir::STATE_AUXSUBTREE);
107 mds->mdcache->try_subtree_merge(dir);
109 } else if (export_pin == mds->get_nodeid()) {
110 if (dir->state_test(CDir::STATE_CREATING) ||
111 dir->is_frozen() || dir->is_freezing()) {
116 if (!dir->is_subtree_root()) {
117 dir->state_set(CDir::STATE_AUXSUBTREE);
118 mds->mdcache->adjust_subtree_auth(dir, mds->get_nodeid());
119 dout(10) << " create aux subtree on " << *dir << dendl;
120 } else if (!dir->state_test(CDir::STATE_AUXSUBTREE)) {
121 dout(10) << " set auxsubtree bit on " << *dir << dendl;
122 dir->state_set(CDir::STATE_AUXSUBTREE);
125 mds->mdcache->migrator->export_dir(dir, export_pin);
131 in->state_clear(CInode::STATE_QUEUEDEXPORTPIN);
136 set<CDir *> authsubs;
137 mds->mdcache->get_auth_subtrees(authsubs);
138 for (auto &cd : authsubs) {
139 mds_rank_t export_pin = cd->inode->get_export_pin();
140 dout(10) << "auth tree " << *cd << " export_pin=" << export_pin << dendl;
141 if (export_pin >= 0 && export_pin != mds->get_nodeid()) {
142 dout(10) << "exporting auth subtree " << *cd->inode << " to " << export_pin << dendl;
143 mds->mdcache->migrator->export_dir(cd, export_pin);
148 void MDBalancer::tick()
150 static int num_bal_times = g_conf->mds_bal_max;
151 static utime_t first = ceph_clock_now();
152 utime_t now = ceph_clock_now();
153 utime_t elapsed = now;
156 if (g_conf->mds_bal_export_pin) {
157 handle_export_pins();
161 if ((double)now - (double)last_sample > g_conf->mds_bal_sample_interval) {
162 dout(15) << "tick last_sample now " << now << dendl;
167 if (last_heartbeat == utime_t())
168 last_heartbeat = now;
169 if (mds->get_nodeid() == 0 &&
170 g_conf->mds_bal_interval > 0 &&
172 (g_conf->mds_bal_max_until >= 0 &&
173 elapsed.sec() > g_conf->mds_bal_max_until)) &&
175 now.sec() - last_heartbeat.sec() >= g_conf->mds_bal_interval) {
176 last_heartbeat = now;
185 class C_Bal_SendHeartbeat : public MDSInternalContext {
187 explicit C_Bal_SendHeartbeat(MDSRank *mds_) : MDSInternalContext(mds_) { }
188 void finish(int f) override {
189 mds->balancer->send_heartbeat();
194 double mds_load_t::mds_load()
196 switch(g_conf->mds_bal_mode) {
199 .8 * auth.meta_load() +
200 .2 * all.meta_load() +
205 return req_rate + 10.0*queue_len;
215 mds_load_t MDBalancer::get_load(utime_t now)
217 mds_load_t load(now);
219 if (mds->mdcache->get_root()) {
221 mds->mdcache->get_root()->get_dirfrags(ls);
222 for (list<CDir*>::iterator p = ls.begin();
225 load.auth.add(now, mds->mdcache->decayrate, (*p)->pop_auth_subtree_nested);
226 load.all.add(now, mds->mdcache->decayrate, (*p)->pop_nested);
229 dout(20) << "get_load no root, no load" << dendl;
232 load.req_rate = mds->get_req_rate();
233 load.queue_len = messenger->get_dispatch_queue_len();
235 ifstream cpu(PROCPREFIX "/proc/loadavg");
237 cpu >> load.cpu_load_avg;
239 dout(0) << "input file " PROCPREFIX "'/proc/loadavg' not found" << dendl;
241 dout(15) << "get_load " << load << dendl;
246 * Read synchronously from RADOS using a timeout. We cannot do daemon-local
247 * fallbacks (i.e. kick off async read when we are processing the map and
248 * check status when we get here) with the way the mds is structured.
250 int MDBalancer::localize_balancer()
252 /* reset everything */
259 /* we assume that balancer is in the metadata pool */
260 object_t oid = object_t(mds->mdsmap->get_balancer());
261 object_locator_t oloc(mds->mdsmap->get_metadata_pool());
262 ceph_tid_t tid = mds->objecter->read(oid, oloc, 0, 0, CEPH_NOSNAP, &lua_src, 0,
263 new C_SafeCond(&lock, &cond, &ack, &r));
264 dout(15) << "launched non-blocking read tid=" << tid
265 << " oid=" << oid << " oloc=" << oloc << dendl;
267 /* timeout: if we waste half our time waiting for RADOS, then abort! */
268 double t = ceph_clock_now() + g_conf->mds_bal_interval/2;
270 timeout.set_from_double(t);
272 int ret_t = cond.WaitUntil(lock, timeout);
275 /* success: store the balancer in memory and set the version. */
277 if (ret_t == ETIMEDOUT) {
278 mds->objecter->op_cancel(tid, -ECANCELED);
281 bal_code.assign(lua_src.to_str());
282 bal_version.assign(oid.name);
283 dout(0) << "localized balancer, bal_code=" << bal_code << dendl;
288 void MDBalancer::send_heartbeat()
290 utime_t now = ceph_clock_now();
292 if (mds->is_cluster_degraded()) {
293 dout(10) << "send_heartbeat degraded" << dendl;
297 if (!mds->mdcache->is_open()) {
298 dout(5) << "not open" << dendl;
299 mds->mdcache->wait_for_open(new C_Bal_SendHeartbeat(mds));
304 if (mds->get_nodeid() == 0)
308 mds_load_t load = get_load(now);
309 map<mds_rank_t, mds_load_t>::value_type val(mds->get_nodeid(), load);
310 mds_load.insert(val);
312 // import_map -- how much do i import from whom
313 map<mds_rank_t, float> import_map;
315 mds->mdcache->get_auth_subtrees(authsubs);
316 for (set<CDir*>::iterator it = authsubs.begin();
317 it != authsubs.end();
320 mds_rank_t from = im->inode->authority().first;
321 if (from == mds->get_nodeid()) continue;
322 if (im->get_inode()->is_stray()) continue;
323 import_map[from] += im->pop_auth_subtree.meta_load(now, mds->mdcache->decayrate);
325 mds_import_map[ mds->get_nodeid() ] = import_map;
328 dout(5) << "mds." << mds->get_nodeid() << " epoch " << beat_epoch << " load " << load << dendl;
329 for (map<mds_rank_t, float>::iterator it = import_map.begin();
330 it != import_map.end();
332 dout(5) << " import_map from " << it->first << " -> " << it->second << dendl;
337 mds->get_mds_map()->get_up_mds_set(up);
338 for (set<mds_rank_t>::iterator p = up.begin(); p != up.end(); ++p) {
339 if (*p == mds->get_nodeid())
341 MHeartbeat *hb = new MHeartbeat(load, beat_epoch);
342 hb->get_import_map() = import_map;
343 messenger->send_message(hb,
344 mds->mdsmap->get_inst(*p));
348 /* This function DOES put the passed message before returning */
349 void MDBalancer::handle_heartbeat(MHeartbeat *m)
351 typedef map<mds_rank_t, mds_load_t> mds_load_map_t;
353 mds_rank_t who = mds_rank_t(m->get_source().num());
354 dout(25) << "=== got heartbeat " << m->get_beat() << " from " << m->get_source().num() << " " << m->get_load() << dendl;
356 if (!mds->is_active())
359 if (!mds->mdcache->is_open()) {
360 dout(10) << "opening root on handle_heartbeat" << dendl;
361 mds->mdcache->wait_for_open(new C_MDS_RetryMessage(mds, m));
365 if (mds->is_cluster_degraded()) {
366 dout(10) << " degraded, ignoring" << dendl;
371 dout(20) << " from mds0, new epoch" << dendl;
372 beat_epoch = m->get_beat();
375 mds->mdcache->show_subtrees();
380 mds_load_map_t::value_type val(who, m->get_load());
381 pair < mds_load_map_t::iterator, bool > rval (mds_load.insert(val));
383 rval.first->second = val.second;
386 mds_import_map[ who ] = m->get_import_map();
388 //dout(0) << " load is " << load << " have " << mds_load.size() << dendl;
391 unsigned cluster_size = mds->get_mds_map()->get_num_in_mds();
392 if (mds_load.size() == cluster_size) {
394 //export_empties(); // no!
396 /* avoid spamming ceph -w if user does not turn mantle on */
397 if (mds->mdsmap->get_balancer() != "") {
398 int r = mantle_prep_rebalance();
400 mds->clog->warn() << "using old balancer; mantle failed for "
401 << "balancer=" << mds->mdsmap->get_balancer()
402 << " : " << cpp_strerror(r);
404 prep_rebalance(m->get_beat());
414 void MDBalancer::export_empties()
416 dout(5) << "export_empties checking for empty imports" << dendl;
418 std::set<CDir *> subtrees;
419 mds->mdcache->get_fullauth_subtrees(subtrees);
420 for (auto &dir : subtrees) {
421 if (dir->is_freezing() || dir->is_frozen())
424 if (!dir->inode->is_base() &&
425 !dir->inode->is_stray() &&
426 dir->get_num_head_items() == 0)
427 mds->mdcache->migrator->export_empty_import(dir);
433 double MDBalancer::try_match(balance_state_t& state, mds_rank_t ex, double& maxex,
434 mds_rank_t im, double& maxim)
436 if (maxex <= 0 || maxim <= 0) return 0.0;
438 double howmuch = MIN(maxex, maxim);
439 if (howmuch <= 0) return 0.0;
441 dout(5) << " - mds." << ex << " exports " << howmuch << " to mds." << im << dendl;
443 if (ex == mds->get_nodeid())
444 state.targets[im] += howmuch;
446 state.exported[ex] += howmuch;
447 state.imported[im] += howmuch;
455 void MDBalancer::queue_split(const CDir *dir, bool fast)
457 dout(10) << __func__ << " enqueuing " << *dir
458 << " (fast=" << fast << ")" << dendl;
460 assert(mds->mdsmap->allows_dirfrags());
461 const dirfrag_t frag = dir->dirfrag();
463 auto callback = [this, frag](int r) {
464 if (split_pending.erase(frag) == 0) {
465 // Someone beat me to it. This can happen in the fast splitting
466 // path, because we spawn two contexts, one with mds->timer and
467 // one with mds->queue_waiter. The loser can safely just drop
472 CDir *split_dir = mds->mdcache->get_dirfrag(frag);
474 dout(10) << "drop split on " << frag << " because not in cache" << dendl;
477 if (!split_dir->is_auth()) {
478 dout(10) << "drop split on " << frag << " because non-auth" << dendl;
482 // Pass on to MDCache: note that the split might still not
483 // happen if the checks in MDCache::can_fragment fail.
484 dout(10) << __func__ << " splitting " << *split_dir << dendl;
485 mds->mdcache->split_dir(split_dir, g_conf->mds_bal_split_bits);
489 if (split_pending.count(frag) == 0) {
490 split_pending.insert(frag);
495 // Do the split ASAP: enqueue it in the MDSRank waiters which are
496 // run at the end of dispatching the current request
497 mds->queue_waiter(new MDSInternalContextWrapper(mds,
498 new FunctionContext(callback)));
500 // Set a timer to really do the split: we don't do it immediately
501 // so that bursts of ops on a directory have a chance to go through
502 // before we freeze it.
503 mds->timer.add_event_after(g_conf->mds_bal_fragment_interval,
504 new FunctionContext(callback));
508 void MDBalancer::queue_merge(CDir *dir)
510 const auto frag = dir->dirfrag();
511 auto callback = [this, frag](int r) {
512 assert(frag.frag != frag_t());
514 // frag must be in this set because only one context is in flight
515 // for a given frag at a time (because merge_pending is checked before
516 // starting one), and this context is the only one that erases it.
517 merge_pending.erase(frag);
519 CDir *dir = mds->mdcache->get_dirfrag(frag);
521 dout(10) << "drop merge on " << frag << " because not in cache" << dendl;
524 assert(dir->dirfrag() == frag);
526 if(!dir->is_auth()) {
527 dout(10) << "drop merge on " << *dir << " because lost auth" << dendl;
531 dout(10) << "merging " << *dir << dendl;
533 CInode *diri = dir->get_inode();
535 frag_t fg = dir->get_frag();
536 while (fg != frag_t()) {
537 frag_t sibfg = fg.get_sibling();
539 bool complete = diri->get_dirfrags_under(sibfg, sibs);
541 dout(10) << " not all sibs under " << sibfg << " in cache (have " << sibs << ")" << dendl;
545 for (list<CDir*>::iterator p = sibs.begin(); p != sibs.end(); ++p) {
547 if (!sib->is_auth() || !sib->should_merge()) {
553 dout(10) << " not all sibs under " << sibfg << " " << sibs << " should_merge" << dendl;
556 dout(10) << " all sibs under " << sibfg << " " << sibs << " should merge" << dendl;
560 if (fg != dir->get_frag())
561 mds->mdcache->merge_dir(diri, fg);
564 if (merge_pending.count(frag) == 0) {
565 dout(20) << __func__ << " enqueued dir " << *dir << dendl;
566 merge_pending.insert(frag);
567 mds->timer.add_event_after(g_conf->mds_bal_fragment_interval,
568 new FunctionContext(callback));
570 dout(20) << __func__ << " dir already in queue " << *dir << dendl;
574 void MDBalancer::prep_rebalance(int beat)
576 balance_state_t state;
578 if (g_conf->mds_thrash_exports) {
579 //we're going to randomly export to all the mds in the cluster
580 set<mds_rank_t> up_mds;
581 mds->get_mds_map()->get_up_mds_set(up_mds);
582 for (const auto &rank : up_mds) {
583 state.targets[rank] = 0.0;
586 int cluster_size = mds->get_mds_map()->get_num_in_mds();
587 mds_rank_t whoami = mds->get_nodeid();
588 rebalance_time = ceph_clock_now();
590 dout(5) << " prep_rebalance: cluster loads are" << dendl;
592 mds->mdcache->migrator->clear_export_queue();
594 // rescale! turn my mds_load back into meta_load units
595 double load_fac = 1.0;
596 map<mds_rank_t, mds_load_t>::iterator m = mds_load.find(whoami);
597 if ((m != mds_load.end()) && (m->second.mds_load() > 0)) {
598 double metald = m->second.auth.meta_load(rebalance_time, mds->mdcache->decayrate);
599 double mdsld = m->second.mds_load();
600 load_fac = metald / mdsld;
601 dout(7) << " load_fac is " << load_fac
602 << " <- " << m->second.auth << " " << metald
607 double total_load = 0.0;
608 multimap<double,mds_rank_t> load_map;
609 for (mds_rank_t i=mds_rank_t(0); i < mds_rank_t(cluster_size); i++) {
610 map<mds_rank_t, mds_load_t>::value_type val(i, mds_load_t(ceph_clock_now()));
611 std::pair < map<mds_rank_t, mds_load_t>::iterator, bool > r(mds_load.insert(val));
612 mds_load_t &load(r.first->second);
614 double l = load.mds_load() * load_fac;
615 mds_meta_load[i] = l;
618 dout(0) << " mds." << i
620 << " = " << load.mds_load()
621 << " ~ " << l << dendl;
623 if (whoami == i) my_load = l;
626 load_map.insert(pair<double,mds_rank_t>( l, i ));
630 target_load = total_load / (double)cluster_size;
631 dout(5) << "prep_rebalance: my load " << my_load
632 << " target " << target_load
633 << " total " << total_load
637 if (my_load < target_load * (1.0 + g_conf->mds_bal_min_rebalance)) {
638 dout(5) << " i am underloaded or barely overloaded, doing nothing." << dendl;
639 last_epoch_under = beat_epoch;
640 mds->mdcache->show_subtrees();
644 // am i over long enough?
645 if (last_epoch_under && beat_epoch - last_epoch_under < 2) {
646 dout(5) << " i am overloaded, but only for " << (beat_epoch - last_epoch_under) << " epochs" << dendl;
650 dout(5) << " i am sufficiently overloaded" << dendl;
653 // first separate exporters and importers
654 multimap<double,mds_rank_t> importers;
655 multimap<double,mds_rank_t> exporters;
656 set<mds_rank_t> importer_set;
657 set<mds_rank_t> exporter_set;
659 for (multimap<double,mds_rank_t>::iterator it = load_map.begin();
660 it != load_map.end();
662 if (it->first < target_load) {
663 dout(15) << " mds." << it->second << " is importer" << dendl;
664 importers.insert(pair<double,mds_rank_t>(it->first,it->second));
665 importer_set.insert(it->second);
667 dout(15) << " mds." << it->second << " is exporter" << dendl;
668 exporters.insert(pair<double,mds_rank_t>(it->first,it->second));
669 exporter_set.insert(it->second);
674 // determine load transfer mapping
677 // analyze import_map; do any matches i can
679 dout(15) << " matching exporters to import sources" << dendl;
681 // big -> small exporters
682 for (multimap<double,mds_rank_t>::reverse_iterator ex = exporters.rbegin();
683 ex != exporters.rend();
685 double maxex = get_maxex(state, ex->second);
686 if (maxex <= .001) continue;
688 // check importers. for now, just in arbitrary order (no intelligent matching).
689 for (map<mds_rank_t, float>::iterator im = mds_import_map[ex->second].begin();
690 im != mds_import_map[ex->second].end();
692 double maxim = get_maxim(state, im->first);
693 if (maxim <= .001) continue;
694 try_match(state, ex->second, maxex, im->first, maxim);
695 if (maxex <= .001) break;
702 dout(15) << " matching big exporters to big importers" << dendl;
703 // big exporters to big importers
704 multimap<double,mds_rank_t>::reverse_iterator ex = exporters.rbegin();
705 multimap<double,mds_rank_t>::iterator im = importers.begin();
706 while (ex != exporters.rend() &&
707 im != importers.end()) {
708 double maxex = get_maxex(state, ex->second);
709 double maxim = get_maxim(state, im->second);
710 if (maxex < .001 || maxim < .001) break;
711 try_match(state, ex->second, maxex, im->second, maxim);
712 if (maxex <= .001) ++ex;
713 if (maxim <= .001) ++im;
716 dout(15) << " matching small exporters to big importers" << dendl;
717 // small exporters to big importers
718 multimap<double,mds_rank_t>::iterator ex = exporters.begin();
719 multimap<double,mds_rank_t>::iterator im = importers.begin();
720 while (ex != exporters.end() &&
721 im != importers.end()) {
722 double maxex = get_maxex(state, ex->second);
723 double maxim = get_maxim(state, im->second);
724 if (maxex < .001 || maxim < .001) break;
725 try_match(state, ex->second, maxex, im->second, maxim);
726 if (maxex <= .001) ++ex;
727 if (maxim <= .001) ++im;
731 try_rebalance(state);
734 int MDBalancer::mantle_prep_rebalance()
736 balance_state_t state;
738 /* refresh balancer if it has changed */
739 if (bal_version != mds->mdsmap->get_balancer()) {
740 bal_version.assign("");
741 int r = localize_balancer();
744 /* only spam the cluster log from 1 mds on version changes */
745 if (mds->get_nodeid() == 0)
746 mds->clog->info() << "mantle balancer version changed: " << bal_version;
749 /* prepare for balancing */
750 int cluster_size = mds->get_mds_map()->get_num_in_mds();
751 rebalance_time = ceph_clock_now();
752 mds->mdcache->migrator->clear_export_queue();
754 /* fill in the metrics for each mds by grabbing load struct */
755 vector < map<string, double> > metrics (cluster_size);
756 for (mds_rank_t i=mds_rank_t(0);
757 i < mds_rank_t(cluster_size);
759 map<mds_rank_t, mds_load_t>::value_type val(i, mds_load_t(ceph_clock_now()));
760 std::pair < map<mds_rank_t, mds_load_t>::iterator, bool > r(mds_load.insert(val));
761 mds_load_t &load(r.first->second);
763 metrics[i] = {{"auth.meta_load", load.auth.meta_load()},
764 {"all.meta_load", load.all.meta_load()},
765 {"req_rate", load.req_rate},
766 {"queue_len", load.queue_len},
767 {"cpu_load_avg", load.cpu_load_avg}};
770 /* execute the balancer */
772 int ret = mantle.balance(bal_code, mds->get_nodeid(), metrics, state.targets);
773 dout(2) << " mantle decided that new targets=" << state.targets << dendl;
775 /* mantle doesn't know about cluster size, so check target len here */
776 if ((int) state.targets.size() != cluster_size)
781 try_rebalance(state);
787 void MDBalancer::try_rebalance(balance_state_t& state)
789 if (g_conf->mds_thrash_exports) {
790 dout(5) << "mds_thrash is on; not performing standard rebalance operation!"
795 // make a sorted list of my imports
796 map<double,CDir*> import_pop_map;
797 multimap<mds_rank_t,CDir*> import_from_map;
798 set<CDir*> fullauthsubs;
800 mds->mdcache->get_fullauth_subtrees(fullauthsubs);
801 for (set<CDir*>::iterator it = fullauthsubs.begin();
802 it != fullauthsubs.end();
805 if (im->get_inode()->is_stray()) continue;
807 double pop = im->pop_auth_subtree.meta_load(rebalance_time, mds->mdcache->decayrate);
808 if (g_conf->mds_bal_idle_threshold > 0 &&
809 pop < g_conf->mds_bal_idle_threshold &&
810 im->inode != mds->mdcache->get_root() &&
811 im->inode->authority().first != mds->get_nodeid()) {
812 dout(0) << " exporting idle (" << pop << ") import " << *im
813 << " back to mds." << im->inode->authority().first
815 mds->mdcache->migrator->export_dir_nicely(im, im->inode->authority().first);
819 import_pop_map[ pop ] = im;
820 mds_rank_t from = im->inode->authority().first;
821 dout(15) << " map: i imported " << *im << " from " << from << dendl;
822 import_from_map.insert(pair<mds_rank_t,CDir*>(from, im));
828 set<CDir*> already_exporting;
830 for (auto &it : state.targets) {
831 mds_rank_t target = it.first;
832 double amount = it.second;
834 if (amount < MIN_OFFLOAD) continue;
835 if (amount / target_load < .2) continue;
837 dout(5) << "want to send " << amount << " to mds." << target
838 //<< " .. " << (*it).second << " * " << load_fac
840 << dendl;//" .. fudge is " << fudge << dendl;
844 mds->mdcache->show_subtrees();
846 // search imports from target
847 if (import_from_map.count(target)) {
848 dout(5) << " aha, looking through imports from target mds." << target << dendl;
849 pair<multimap<mds_rank_t,CDir*>::iterator, multimap<mds_rank_t,CDir*>::iterator> p =
850 import_from_map.equal_range(target);
851 while (p.first != p.second) {
852 CDir *dir = (*p.first).second;
853 dout(5) << "considering " << *dir << " from " << (*p.first).first << dendl;
854 multimap<mds_rank_t,CDir*>::iterator plast = p.first++;
856 if (dir->inode->is_base() ||
857 dir->inode->is_stray())
859 if (dir->is_freezing() || dir->is_frozen()) continue; // export pbly already in progress
860 double pop = dir->pop_auth_subtree.meta_load(rebalance_time, mds->mdcache->decayrate);
861 assert(dir->inode->authority().first == target); // cuz that's how i put it in the map, dummy
863 if (pop <= amount-have) {
864 dout(0) << "reexporting " << *dir
866 << " back to mds." << target << dendl;
867 mds->mdcache->migrator->export_dir_nicely(dir, target);
869 import_from_map.erase(plast);
870 import_pop_map.erase(pop);
872 dout(5) << "can't reexport " << *dir << ", too big " << pop << dendl;
874 if (amount-have < MIN_OFFLOAD) break;
877 if (amount-have < MIN_OFFLOAD) {
883 for (map<double,CDir*>::iterator import = import_pop_map.begin();
884 import != import_pop_map.end();
886 CDir *imp = (*import).second;
887 if (imp->inode->is_base() ||
888 imp->inode->is_stray())
891 double pop = (*import).first;
892 if (pop < amount-have || pop < MIN_REEXPORT) {
893 dout(0) << "reexporting " << *imp
895 << " back to mds." << imp->inode->authority()
898 mds->mdcache->migrator->export_dir_nicely(imp, imp->inode->authority().first);
900 if (amount-have < MIN_OFFLOAD) break;
902 if (amount-have < MIN_OFFLOAD) {
903 //fudge = amount-have;
907 // okay, search for fragments of my workload
908 set<CDir*> candidates;
909 mds->mdcache->get_fullauth_subtrees(candidates);
913 for (set<CDir*>::iterator pot = candidates.begin();
914 pot != candidates.end();
916 if ((*pot)->get_inode()->is_stray()) continue;
917 find_exports(*pot, amount, exports, have, already_exporting);
918 if (have > amount-MIN_OFFLOAD)
921 //fudge = amount - have;
923 for (list<CDir*>::iterator it = exports.begin(); it != exports.end(); ++it) {
924 dout(0) << " - exporting "
925 << (*it)->pop_auth_subtree
927 << (*it)->pop_auth_subtree.meta_load(rebalance_time, mds->mdcache->decayrate)
928 << " to mds." << target
931 mds->mdcache->migrator->export_dir_nicely(*it, target);
935 dout(5) << "rebalance done" << dendl;
936 mds->mdcache->show_subtrees();
939 void MDBalancer::find_exports(CDir *dir,
941 list<CDir*>& exports,
943 set<CDir*>& already_exporting)
945 double need = amount - have;
946 if (need < amount * g_conf->mds_bal_min_start)
947 return; // good enough!
948 double needmax = need * g_conf->mds_bal_need_max;
949 double needmin = need * g_conf->mds_bal_need_min;
950 double midchunk = need * g_conf->mds_bal_midchunk;
951 double minchunk = need * g_conf->mds_bal_minchunk;
953 list<CDir*> bigger_rep, bigger_unrep;
954 multimap<double, CDir*> smaller;
956 double dir_pop = dir->pop_auth_subtree.meta_load(rebalance_time, mds->mdcache->decayrate);
957 dout(7) << " find_exports in " << dir_pop << " " << *dir << " need " << need << " (" << needmin << " - " << needmax << ")" << dendl;
959 double subdir_sum = 0;
960 for (CDir::map_t::iterator it = dir->begin();
963 CInode *in = it->second->get_linkage()->get_inode();
965 if (!in->is_dir()) continue;
968 in->get_dirfrags(dfls);
969 for (list<CDir*>::iterator p = dfls.begin();
973 if (!subdir->is_auth()) continue;
974 if (already_exporting.count(subdir)) continue;
976 if (subdir->is_frozen()) continue; // can't export this right now!
979 double pop = subdir->pop_auth_subtree.meta_load(rebalance_time, mds->mdcache->decayrate);
981 dout(15) << " subdir pop " << pop << " " << *subdir << dendl;
983 if (pop < minchunk) continue;
986 if (pop > needmin && pop < needmax) {
987 exports.push_back(subdir);
988 already_exporting.insert(subdir);
994 if (subdir->is_rep())
995 bigger_rep.push_back(subdir);
997 bigger_unrep.push_back(subdir);
999 smaller.insert(pair<double,CDir*>(pop, subdir));
1002 dout(15) << " sum " << subdir_sum << " / " << dir_pop << dendl;
1004 // grab some sufficiently big small items
1005 multimap<double,CDir*>::reverse_iterator it;
1006 for (it = smaller.rbegin();
1007 it != smaller.rend();
1010 if ((*it).first < midchunk)
1013 dout(7) << " taking smaller " << *(*it).second << dendl;
1015 exports.push_back((*it).second);
1016 already_exporting.insert((*it).second);
1017 have += (*it).first;
1022 // apprently not enough; drill deeper into the hierarchy (if non-replicated)
1023 for (list<CDir*>::iterator it = bigger_unrep.begin();
1024 it != bigger_unrep.end();
1026 dout(15) << " descending into " << **it << dendl;
1027 find_exports(*it, amount, exports, have, already_exporting);
1032 // ok fine, use smaller bits
1034 it != smaller.rend();
1036 dout(7) << " taking (much) smaller " << it->first << " " << *(*it).second << dendl;
1038 exports.push_back((*it).second);
1039 already_exporting.insert((*it).second);
1040 have += (*it).first;
1045 // ok fine, drill into replicated dirs
1046 for (list<CDir*>::iterator it = bigger_rep.begin();
1047 it != bigger_rep.end();
1049 dout(7) << " descending into replicated " << **it << dendl;
1050 find_exports(*it, amount, exports, have, already_exporting);
1057 void MDBalancer::hit_inode(utime_t now, CInode *in, int type, int who)
1060 in->pop.get(type).hit(now, mds->mdcache->decayrate);
1062 if (in->get_parent_dn())
1063 hit_dir(now, in->get_parent_dn()->get_dir(), type, who);
1066 void MDBalancer::maybe_fragment(CDir *dir, bool hot)
1069 if (g_conf->mds_bal_frag && g_conf->mds_bal_fragment_interval > 0 &&
1070 !dir->inode->is_base() && // not root/base (for now at least)
1074 if (g_conf->mds_bal_split_size > 0 &&
1075 mds->mdsmap->allows_dirfrags() &&
1076 (dir->should_split() || hot))
1078 if (split_pending.count(dir->dirfrag()) == 0) {
1079 queue_split(dir, false);
1081 if (dir->should_split_fast()) {
1082 queue_split(dir, true);
1084 dout(10) << __func__ << ": fragment already enqueued to split: "
1091 if (dir->get_frag() != frag_t() && dir->should_merge() &&
1092 merge_pending.count(dir->dirfrag()) == 0) {
1098 void MDBalancer::hit_dir(utime_t now, CDir *dir, int type, int who, double amount)
1101 double v = dir->pop_me.get(type).hit(now, amount);
1103 const bool hot = (v > g_conf->mds_bal_split_rd && type == META_POP_IRD) ||
1104 (v > g_conf->mds_bal_split_wr && type == META_POP_IWR);
1106 dout(20) << "hit_dir " << type << " pop is " << v << ", frag " << dir->get_frag()
1107 << " size " << dir->get_frag_size() << dendl;
1109 maybe_fragment(dir, hot);
1112 if (type == META_POP_IRD && who >= 0) {
1113 dir->pop_spread.hit(now, mds->mdcache->decayrate, who);
1116 double rd_adj = 0.0;
1117 if (type == META_POP_IRD &&
1118 dir->last_popularity_sample < last_sample) {
1119 double dir_pop = dir->pop_auth_subtree.get(type).get(now, mds->mdcache->decayrate); // hmm??
1120 dir->last_popularity_sample = last_sample;
1121 double pop_sp = dir->pop_spread.get(now, mds->mdcache->decayrate);
1122 dir_pop += pop_sp * 10;
1124 //if (dir->ino() == inodeno_t(0x10000000002))
1126 dout(20) << "hit_dir " << type << " pop " << dir_pop << " spread " << pop_sp
1127 << " " << dir->pop_spread.last[0]
1128 << " " << dir->pop_spread.last[1]
1129 << " " << dir->pop_spread.last[2]
1130 << " " << dir->pop_spread.last[3]
1131 << " in " << *dir << dendl;
1134 if (dir->is_auth() && !dir->is_ambiguous_auth()) {
1135 if (!dir->is_rep() &&
1136 dir_pop >= g_conf->mds_bal_replicate_threshold) {
1138 double rdp = dir->pop_me.get(META_POP_IRD).get(now, mds->mdcache->decayrate);
1139 rd_adj = rdp / mds->get_mds_map()->get_num_in_mds() - rdp;
1140 rd_adj /= 2.0; // temper somewhat
1142 dout(0) << "replicating dir " << *dir << " pop " << dir_pop << " .. rdp " << rdp << " adj " << rd_adj << dendl;
1144 dir->dir_rep = CDir::REP_ALL;
1145 mds->mdcache->send_dir_updates(dir, true);
1147 // fixme this should adjust the whole pop hierarchy
1148 dir->pop_me.get(META_POP_IRD).adjust(rd_adj);
1149 dir->pop_auth_subtree.get(META_POP_IRD).adjust(rd_adj);
1152 if (dir->ino() != 1 &&
1154 dir_pop < g_conf->mds_bal_unreplicate_threshold) {
1156 dout(0) << "unreplicating dir " << *dir << " pop " << dir_pop << dendl;
1158 dir->dir_rep = CDir::REP_NONE;
1159 mds->mdcache->send_dir_updates(dir);
1165 bool hit_subtree = dir->is_auth(); // current auth subtree (if any)
1166 bool hit_subtree_nested = dir->is_auth(); // all nested auth subtrees
1169 dir->pop_nested.get(type).hit(now, amount);
1171 dir->pop_nested.get(META_POP_IRD).adjust(now, mds->mdcache->decayrate, rd_adj);
1174 dir->pop_auth_subtree.get(type).hit(now, amount);
1176 dir->pop_auth_subtree.get(META_POP_IRD).adjust(now, mds->mdcache->decayrate, rd_adj);
1179 if (hit_subtree_nested) {
1180 dir->pop_auth_subtree_nested.get(type).hit(now, mds->mdcache->decayrate, amount);
1182 dir->pop_auth_subtree_nested.get(META_POP_IRD).adjust(now, mds->mdcache->decayrate, rd_adj);
1185 if (dir->is_subtree_root())
1186 hit_subtree = false; // end of auth domain, stop hitting auth counters.
1188 if (dir->inode->get_parent_dn() == 0) break;
1189 dir = dir->inode->get_parent_dn()->get_dir();
1195 * subtract off an exported chunk.
1196 * this excludes *dir itself (encode_export_dir should have take care of that)
1197 * we _just_ do the parents' nested counters.
1199 * NOTE: call me _after_ forcing *dir into a subtree root,
1200 * but _before_ doing the encode_export_dirs.
1202 void MDBalancer::subtract_export(CDir *dir, utime_t now)
1204 dirfrag_load_vec_t subload = dir->pop_auth_subtree;
1207 dir = dir->inode->get_parent_dir();
1210 dir->pop_nested.sub(now, mds->mdcache->decayrate, subload);
1211 dir->pop_auth_subtree_nested.sub(now, mds->mdcache->decayrate, subload);
1216 void MDBalancer::add_import(CDir *dir, utime_t now)
1218 dirfrag_load_vec_t subload = dir->pop_auth_subtree;
1221 dir = dir->inode->get_parent_dir();
1224 dir->pop_nested.add(now, mds->mdcache->decayrate, subload);
1225 dir->pop_auth_subtree_nested.add(now, mds->mdcache->decayrate, subload);
1229 void MDBalancer::handle_mds_failure(mds_rank_t who)
1232 last_epoch_under = 0;