Fix some bugs when testing opensds ansible
[stor4nfv.git] / src / ceph / src / mds / MDBalancer.cc
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
3 /*
4  * Ceph - scalable distributed file system
5  *
6  * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
7  *
8  * This is free software; you can redistribute it and/or
9  * modify it under the terms of the GNU Lesser General Public
10  * License version 2.1, as published by the Free Software
11  * Foundation.  See file COPYING.
12  *
13  */
14
15 #include "include/compat.h"
16 #include "mdstypes.h"
17
18 #include "MDBalancer.h"
19 #include "MDSRank.h"
20 #include "mon/MonClient.h"
21 #include "MDSMap.h"
22 #include "CInode.h"
23 #include "CDir.h"
24 #include "MDCache.h"
25 #include "Migrator.h"
26 #include "Mantle.h"
27
28 #include "include/Context.h"
29 #include "msg/Messenger.h"
30 #include "messages/MHeartbeat.h"
31
32 #include <fstream>
33 #include <iostream>
34 #include <vector>
35 #include <map>
36 using std::map;
37 using std::vector;
38
39 #include "common/config.h"
40 #include "common/errno.h"
41
42 #define dout_context g_ceph_context
43 #define dout_subsys ceph_subsys_mds
44 #undef dout_prefix
45 #define dout_prefix *_dout << "mds." << mds->get_nodeid() << ".bal "
46 #undef dout
47 #define dout(lvl) \
48   do {\
49     auto subsys = ceph_subsys_mds;\
50     if ((dout_context)->_conf->subsys.should_gather(ceph_subsys_mds_balancer, lvl)) {\
51       subsys = ceph_subsys_mds_balancer;\
52     }\
53     dout_impl(dout_context, subsys, lvl) dout_prefix
54 #undef dendl
55 #define dendl dendl_impl; } while (0)
56
57
58 #define MIN_LOAD    50   //  ??
59 #define MIN_REEXPORT 5  // will automatically reexport
60 #define MIN_OFFLOAD 10   // point at which i stop trying, close enough
61
62
63 /* This function DOES put the passed message before returning */
64 int MDBalancer::proc_message(Message *m)
65 {
66   switch (m->get_type()) {
67
68   case MSG_MDS_HEARTBEAT:
69     handle_heartbeat(static_cast<MHeartbeat*>(m));
70     break;
71
72   default:
73     dout(0) << " balancer unknown message " << m->get_type() << dendl;
74     assert(0 == "balancer unknown message");
75   }
76
77   return 0;
78 }
79
80 void MDBalancer::handle_export_pins(void)
81 {
82   auto &q = mds->mdcache->export_pin_queue;
83   auto it = q.begin();
84   dout(20) << "export_pin_queue size=" << q.size() << dendl;
85   while (it != q.end()) {
86     auto cur = it++;
87     CInode *in = *cur;
88     assert(in->is_dir());
89     mds_rank_t export_pin = in->get_export_pin(false);
90
91     bool remove = true;
92     list<CDir*> dfls;
93     in->get_dirfrags(dfls);
94     for (auto dir : dfls) {
95       if (!dir->is_auth())
96         continue;
97
98       if (export_pin == MDS_RANK_NONE) {
99         if (dir->state_test(CDir::STATE_AUXSUBTREE)) {
100           if (dir->is_frozen() || dir->is_freezing()) {
101             // try again later
102             remove = false;
103             continue;
104           }
105           dout(10) << " clear auxsubtree on " << *dir << dendl;
106           dir->state_clear(CDir::STATE_AUXSUBTREE);
107           mds->mdcache->try_subtree_merge(dir);
108         }
109       } else if (export_pin == mds->get_nodeid()) {
110         if (dir->state_test(CDir::STATE_CREATING) ||
111             dir->is_frozen() || dir->is_freezing()) {
112           // try again later
113           remove = false;
114           continue;
115         }
116         if (!dir->is_subtree_root()) {
117           dir->state_set(CDir::STATE_AUXSUBTREE);
118           mds->mdcache->adjust_subtree_auth(dir, mds->get_nodeid());
119           dout(10) << " create aux subtree on " << *dir << dendl;
120         } else if (!dir->state_test(CDir::STATE_AUXSUBTREE)) {
121           dout(10) << " set auxsubtree bit on " << *dir << dendl;
122           dir->state_set(CDir::STATE_AUXSUBTREE);
123         }
124       } else {
125         mds->mdcache->migrator->export_dir(dir, export_pin);
126         remove = false;
127       }
128     }
129
130     if (remove) {
131       in->state_clear(CInode::STATE_QUEUEDEXPORTPIN);
132       q.erase(cur);
133     }
134   }
135
136   set<CDir *> authsubs;
137   mds->mdcache->get_auth_subtrees(authsubs);
138   for (auto &cd : authsubs) {
139     mds_rank_t export_pin = cd->inode->get_export_pin();
140     dout(10) << "auth tree " << *cd << " export_pin=" << export_pin << dendl;
141     if (export_pin >= 0 && export_pin != mds->get_nodeid()) {
142       dout(10) << "exporting auth subtree " << *cd->inode << " to " << export_pin << dendl;
143       mds->mdcache->migrator->export_dir(cd, export_pin);
144     }
145   }
146 }
147
148 void MDBalancer::tick()
149 {
150   static int num_bal_times = g_conf->mds_bal_max;
151   static utime_t first = ceph_clock_now();
152   utime_t now = ceph_clock_now();
153   utime_t elapsed = now;
154   elapsed -= first;
155
156   if (g_conf->mds_bal_export_pin) {
157     handle_export_pins();
158   }
159
160   // sample?
161   if ((double)now - (double)last_sample > g_conf->mds_bal_sample_interval) {
162     dout(15) << "tick last_sample now " << now << dendl;
163     last_sample = now;
164   }
165
166   // balance?
167   if (last_heartbeat == utime_t())
168     last_heartbeat = now;
169   if (mds->get_nodeid() == 0 &&
170       g_conf->mds_bal_interval > 0 &&
171       (num_bal_times ||
172        (g_conf->mds_bal_max_until >= 0 &&
173         elapsed.sec() > g_conf->mds_bal_max_until)) &&
174       mds->is_active() &&
175       now.sec() - last_heartbeat.sec() >= g_conf->mds_bal_interval) {
176     last_heartbeat = now;
177     send_heartbeat();
178     num_bal_times--;
179   }
180 }
181
182
183
184
185 class C_Bal_SendHeartbeat : public MDSInternalContext {
186 public:
187   explicit C_Bal_SendHeartbeat(MDSRank *mds_) : MDSInternalContext(mds_) { }
188   void finish(int f) override {
189     mds->balancer->send_heartbeat();
190   }
191 };
192
193
194 double mds_load_t::mds_load()
195 {
196   switch(g_conf->mds_bal_mode) {
197   case 0:
198     return
199       .8 * auth.meta_load() +
200       .2 * all.meta_load() +
201       req_rate +
202       10.0 * queue_len;
203
204   case 1:
205     return req_rate + 10.0*queue_len;
206
207   case 2:
208     return cpu_load_avg;
209
210   }
211   ceph_abort();
212   return 0;
213 }
214
215 mds_load_t MDBalancer::get_load(utime_t now)
216 {
217   mds_load_t load(now);
218
219   if (mds->mdcache->get_root()) {
220     list<CDir*> ls;
221     mds->mdcache->get_root()->get_dirfrags(ls);
222     for (list<CDir*>::iterator p = ls.begin();
223          p != ls.end();
224          ++p) {
225       load.auth.add(now, mds->mdcache->decayrate, (*p)->pop_auth_subtree_nested);
226       load.all.add(now, mds->mdcache->decayrate, (*p)->pop_nested);
227     }
228   } else {
229     dout(20) << "get_load no root, no load" << dendl;
230   }
231
232   load.req_rate = mds->get_req_rate();
233   load.queue_len = messenger->get_dispatch_queue_len();
234
235   ifstream cpu(PROCPREFIX "/proc/loadavg");
236   if (cpu.is_open())
237     cpu >> load.cpu_load_avg;
238   else
239     dout(0) << "input file " PROCPREFIX "'/proc/loadavg' not found" << dendl;
240   
241   dout(15) << "get_load " << load << dendl;
242   return load;
243 }
244
245 /*
246  * Read synchronously from RADOS using a timeout. We cannot do daemon-local
247  * fallbacks (i.e. kick off async read when we are processing the map and
248  * check status when we get here) with the way the mds is structured.
249  */
250 int MDBalancer::localize_balancer()
251 {
252   /* reset everything */
253   bool ack = false;
254   int r = 0;
255   bufferlist lua_src;
256   Mutex lock("lock");
257   Cond cond;
258
259   /* we assume that balancer is in the metadata pool */
260   object_t oid = object_t(mds->mdsmap->get_balancer());
261   object_locator_t oloc(mds->mdsmap->get_metadata_pool());
262   ceph_tid_t tid = mds->objecter->read(oid, oloc, 0, 0, CEPH_NOSNAP, &lua_src, 0,
263                                        new C_SafeCond(&lock, &cond, &ack, &r));
264   dout(15) << "launched non-blocking read tid=" << tid
265            << " oid=" << oid << " oloc=" << oloc << dendl;
266
267   /* timeout: if we waste half our time waiting for RADOS, then abort! */
268   double t = ceph_clock_now() + g_conf->mds_bal_interval/2;
269   utime_t timeout;
270   timeout.set_from_double(t);
271   lock.Lock();
272   int ret_t = cond.WaitUntil(lock, timeout);
273   lock.Unlock();
274
275   /* success: store the balancer in memory and set the version. */
276   if (!r) {
277     if (ret_t == ETIMEDOUT) {
278       mds->objecter->op_cancel(tid, -ECANCELED);
279       return -ETIMEDOUT;
280     }
281     bal_code.assign(lua_src.to_str());
282     bal_version.assign(oid.name);
283     dout(0) << "localized balancer, bal_code=" << bal_code << dendl;
284   }
285   return r;
286 }
287
288 void MDBalancer::send_heartbeat()
289 {
290   utime_t now = ceph_clock_now();
291   
292   if (mds->is_cluster_degraded()) {
293     dout(10) << "send_heartbeat degraded" << dendl;
294     return;
295   }
296
297   if (!mds->mdcache->is_open()) {
298     dout(5) << "not open" << dendl;
299     mds->mdcache->wait_for_open(new C_Bal_SendHeartbeat(mds));
300     return;
301   }
302
303   mds_load.clear();
304   if (mds->get_nodeid() == 0)
305     beat_epoch++;
306
307   // my load
308   mds_load_t load = get_load(now);
309   map<mds_rank_t, mds_load_t>::value_type val(mds->get_nodeid(), load);
310   mds_load.insert(val);
311
312   // import_map -- how much do i import from whom
313   map<mds_rank_t, float> import_map;
314   set<CDir*> authsubs;
315   mds->mdcache->get_auth_subtrees(authsubs);
316   for (set<CDir*>::iterator it = authsubs.begin();
317        it != authsubs.end();
318        ++it) {
319     CDir *im = *it;
320     mds_rank_t from = im->inode->authority().first;
321     if (from == mds->get_nodeid()) continue;
322     if (im->get_inode()->is_stray()) continue;
323     import_map[from] += im->pop_auth_subtree.meta_load(now, mds->mdcache->decayrate);
324   }
325   mds_import_map[ mds->get_nodeid() ] = import_map;
326
327
328   dout(5) << "mds." << mds->get_nodeid() << " epoch " << beat_epoch << " load " << load << dendl;
329   for (map<mds_rank_t, float>::iterator it = import_map.begin();
330        it != import_map.end();
331        ++it) {
332     dout(5) << "  import_map from " << it->first << " -> " << it->second << dendl;
333   }
334
335
336   set<mds_rank_t> up;
337   mds->get_mds_map()->get_up_mds_set(up);
338   for (set<mds_rank_t>::iterator p = up.begin(); p != up.end(); ++p) {
339     if (*p == mds->get_nodeid())
340       continue;
341     MHeartbeat *hb = new MHeartbeat(load, beat_epoch);
342     hb->get_import_map() = import_map;
343     messenger->send_message(hb,
344                             mds->mdsmap->get_inst(*p));
345   }
346 }
347
348 /* This function DOES put the passed message before returning */
349 void MDBalancer::handle_heartbeat(MHeartbeat *m)
350 {
351   typedef map<mds_rank_t, mds_load_t> mds_load_map_t;
352
353   mds_rank_t who = mds_rank_t(m->get_source().num());
354   dout(25) << "=== got heartbeat " << m->get_beat() << " from " << m->get_source().num() << " " << m->get_load() << dendl;
355
356   if (!mds->is_active())
357     goto out;
358
359   if (!mds->mdcache->is_open()) {
360     dout(10) << "opening root on handle_heartbeat" << dendl;
361     mds->mdcache->wait_for_open(new C_MDS_RetryMessage(mds, m));
362     return;
363   }
364
365   if (mds->is_cluster_degraded()) {
366     dout(10) << " degraded, ignoring" << dendl;
367     goto out;
368   }
369
370   if (who == 0) {
371     dout(20) << " from mds0, new epoch" << dendl;
372     beat_epoch = m->get_beat();
373     send_heartbeat();
374
375     mds->mdcache->show_subtrees();
376   }
377
378   {
379     // set mds_load[who]
380     mds_load_map_t::value_type val(who, m->get_load());
381     pair < mds_load_map_t::iterator, bool > rval (mds_load.insert(val));
382     if (!rval.second) {
383       rval.first->second = val.second;
384     }
385   }
386   mds_import_map[ who ] = m->get_import_map();
387
388   //dout(0) << "  load is " << load << " have " << mds_load.size() << dendl;
389
390   {
391     unsigned cluster_size = mds->get_mds_map()->get_num_in_mds();
392     if (mds_load.size() == cluster_size) {
393       // let's go!
394       //export_empties();  // no!
395
396       /* avoid spamming ceph -w if user does not turn mantle on */
397       if (mds->mdsmap->get_balancer() != "") {
398         int r = mantle_prep_rebalance();
399         if (!r) goto out;
400         mds->clog->warn() << "using old balancer; mantle failed for "
401                           << "balancer=" << mds->mdsmap->get_balancer()
402                           << " : " << cpp_strerror(r);
403       }
404       prep_rebalance(m->get_beat());
405     }
406   }
407
408   // done
409  out:
410   m->put();
411 }
412
413
414 void MDBalancer::export_empties()
415 {
416   dout(5) << "export_empties checking for empty imports" << dendl;
417
418   std::set<CDir *> subtrees;
419   mds->mdcache->get_fullauth_subtrees(subtrees);
420   for (auto &dir : subtrees) {
421     if (dir->is_freezing() || dir->is_frozen())
422       continue;
423
424     if (!dir->inode->is_base() &&
425         !dir->inode->is_stray() &&
426         dir->get_num_head_items() == 0)
427       mds->mdcache->migrator->export_empty_import(dir);
428   }
429 }
430
431
432
433 double MDBalancer::try_match(balance_state_t& state, mds_rank_t ex, double& maxex,
434                              mds_rank_t im, double& maxim)
435 {
436   if (maxex <= 0 || maxim <= 0) return 0.0;
437
438   double howmuch = MIN(maxex, maxim);
439   if (howmuch <= 0) return 0.0;
440
441   dout(5) << "   - mds." << ex << " exports " << howmuch << " to mds." << im << dendl;
442
443   if (ex == mds->get_nodeid())
444     state.targets[im] += howmuch;
445
446   state.exported[ex] += howmuch;
447   state.imported[im] += howmuch;
448
449   maxex -= howmuch;
450   maxim -= howmuch;
451
452   return howmuch;
453 }
454
455 void MDBalancer::queue_split(const CDir *dir, bool fast)
456 {
457   dout(10) << __func__ << " enqueuing " << *dir
458                        << " (fast=" << fast << ")" << dendl;
459
460   assert(mds->mdsmap->allows_dirfrags());
461   const dirfrag_t frag = dir->dirfrag();
462
463   auto callback = [this, frag](int r) {
464     if (split_pending.erase(frag) == 0) {
465       // Someone beat me to it.  This can happen in the fast splitting
466       // path, because we spawn two contexts, one with mds->timer and
467       // one with mds->queue_waiter.  The loser can safely just drop
468       // out.
469       return;
470     }
471
472     CDir *split_dir = mds->mdcache->get_dirfrag(frag);
473     if (!split_dir) {
474       dout(10) << "drop split on " << frag << " because not in cache" << dendl;
475       return;
476     }
477     if (!split_dir->is_auth()) {
478       dout(10) << "drop split on " << frag << " because non-auth" << dendl;
479       return;
480     }
481
482     // Pass on to MDCache: note that the split might still not
483     // happen if the checks in MDCache::can_fragment fail.
484     dout(10) << __func__ << " splitting " << *split_dir << dendl;
485     mds->mdcache->split_dir(split_dir, g_conf->mds_bal_split_bits);
486   };
487
488   bool is_new = false;
489   if (split_pending.count(frag) == 0) {
490     split_pending.insert(frag);
491     is_new = true;
492   }
493
494   if (fast) {
495     // Do the split ASAP: enqueue it in the MDSRank waiters which are
496     // run at the end of dispatching the current request
497     mds->queue_waiter(new MDSInternalContextWrapper(mds, 
498           new FunctionContext(callback)));
499   } else if (is_new) {
500     // Set a timer to really do the split: we don't do it immediately
501     // so that bursts of ops on a directory have a chance to go through
502     // before we freeze it.
503     mds->timer.add_event_after(g_conf->mds_bal_fragment_interval,
504                                new FunctionContext(callback));
505   }
506 }
507
508 void MDBalancer::queue_merge(CDir *dir)
509 {
510   const auto frag = dir->dirfrag();
511   auto callback = [this, frag](int r) {
512     assert(frag.frag != frag_t());
513
514     // frag must be in this set because only one context is in flight
515     // for a given frag at a time (because merge_pending is checked before
516     // starting one), and this context is the only one that erases it.
517     merge_pending.erase(frag);
518
519     CDir *dir = mds->mdcache->get_dirfrag(frag);
520     if (!dir) {
521       dout(10) << "drop merge on " << frag << " because not in cache" << dendl;
522       return;
523     }
524     assert(dir->dirfrag() == frag);
525
526     if(!dir->is_auth()) {
527       dout(10) << "drop merge on " << *dir << " because lost auth" << dendl;
528       return;
529     }
530
531     dout(10) << "merging " << *dir << dendl;
532
533     CInode *diri = dir->get_inode();
534
535     frag_t fg = dir->get_frag();
536     while (fg != frag_t()) {
537       frag_t sibfg = fg.get_sibling();
538       list<CDir*> sibs;
539       bool complete = diri->get_dirfrags_under(sibfg, sibs);
540       if (!complete) {
541         dout(10) << "  not all sibs under " << sibfg << " in cache (have " << sibs << ")" << dendl;
542         break;
543       }
544       bool all = true;
545       for (list<CDir*>::iterator p = sibs.begin(); p != sibs.end(); ++p) {
546         CDir *sib = *p;
547         if (!sib->is_auth() || !sib->should_merge()) {
548           all = false;
549           break;
550         }
551       }
552       if (!all) {
553         dout(10) << "  not all sibs under " << sibfg << " " << sibs << " should_merge" << dendl;
554         break;
555       }
556       dout(10) << "  all sibs under " << sibfg << " " << sibs << " should merge" << dendl;
557       fg = fg.parent();
558     }
559
560     if (fg != dir->get_frag())
561       mds->mdcache->merge_dir(diri, fg);
562   };
563
564   if (merge_pending.count(frag) == 0) {
565     dout(20) << __func__ << " enqueued dir " << *dir << dendl;
566     merge_pending.insert(frag);
567     mds->timer.add_event_after(g_conf->mds_bal_fragment_interval,
568         new FunctionContext(callback));
569   } else {
570     dout(20) << __func__ << " dir already in queue " << *dir << dendl;
571   }
572 }
573
574 void MDBalancer::prep_rebalance(int beat)
575 {
576   balance_state_t state;
577
578   if (g_conf->mds_thrash_exports) {
579     //we're going to randomly export to all the mds in the cluster
580     set<mds_rank_t> up_mds;
581     mds->get_mds_map()->get_up_mds_set(up_mds);
582     for (const auto &rank : up_mds) {
583       state.targets[rank] = 0.0;
584     }
585   } else {
586     int cluster_size = mds->get_mds_map()->get_num_in_mds();
587     mds_rank_t whoami = mds->get_nodeid();
588     rebalance_time = ceph_clock_now();
589
590     dout(5) << " prep_rebalance: cluster loads are" << dendl;
591
592     mds->mdcache->migrator->clear_export_queue();
593
594     // rescale!  turn my mds_load back into meta_load units
595     double load_fac = 1.0;
596     map<mds_rank_t, mds_load_t>::iterator m = mds_load.find(whoami);
597     if ((m != mds_load.end()) && (m->second.mds_load() > 0)) {
598       double metald = m->second.auth.meta_load(rebalance_time, mds->mdcache->decayrate);
599       double mdsld = m->second.mds_load();
600       load_fac = metald / mdsld;
601       dout(7) << " load_fac is " << load_fac
602               << " <- " << m->second.auth << " " << metald
603               << " / " << mdsld
604               << dendl;
605     }
606
607     double total_load = 0.0;
608     multimap<double,mds_rank_t> load_map;
609     for (mds_rank_t i=mds_rank_t(0); i < mds_rank_t(cluster_size); i++) {
610       map<mds_rank_t, mds_load_t>::value_type val(i, mds_load_t(ceph_clock_now()));
611       std::pair < map<mds_rank_t, mds_load_t>::iterator, bool > r(mds_load.insert(val));
612       mds_load_t &load(r.first->second);
613
614       double l = load.mds_load() * load_fac;
615       mds_meta_load[i] = l;
616
617       if (whoami == 0)
618         dout(0) << "  mds." << i
619                 << " " << load
620                 << " = " << load.mds_load()
621                 << " ~ " << l << dendl;
622
623       if (whoami == i) my_load = l;
624       total_load += l;
625
626       load_map.insert(pair<double,mds_rank_t>( l, i ));
627     }
628
629     // target load
630     target_load = total_load / (double)cluster_size;
631     dout(5) << "prep_rebalance:  my load " << my_load
632             << "   target " << target_load
633             << "   total " << total_load
634             << dendl;
635
636     // under or over?
637     if (my_load < target_load * (1.0 + g_conf->mds_bal_min_rebalance)) {
638       dout(5) << "  i am underloaded or barely overloaded, doing nothing." << dendl;
639       last_epoch_under = beat_epoch;
640       mds->mdcache->show_subtrees();
641       return;
642     }
643
644     // am i over long enough?
645     if (last_epoch_under && beat_epoch - last_epoch_under < 2) {
646       dout(5) << "  i am overloaded, but only for " << (beat_epoch - last_epoch_under) << " epochs" << dendl;
647       return;
648     }
649
650     dout(5) << "  i am sufficiently overloaded" << dendl;
651
652
653     // first separate exporters and importers
654     multimap<double,mds_rank_t> importers;
655     multimap<double,mds_rank_t> exporters;
656     set<mds_rank_t>             importer_set;
657     set<mds_rank_t>             exporter_set;
658
659     for (multimap<double,mds_rank_t>::iterator it = load_map.begin();
660          it != load_map.end();
661          ++it) {
662       if (it->first < target_load) {
663         dout(15) << "   mds." << it->second << " is importer" << dendl;
664         importers.insert(pair<double,mds_rank_t>(it->first,it->second));
665         importer_set.insert(it->second);
666       } else {
667         dout(15) << "   mds." << it->second << " is exporter" << dendl;
668         exporters.insert(pair<double,mds_rank_t>(it->first,it->second));
669         exporter_set.insert(it->second);
670       }
671     }
672
673
674     // determine load transfer mapping
675
676     if (true) {
677       // analyze import_map; do any matches i can
678
679       dout(15) << "  matching exporters to import sources" << dendl;
680
681       // big -> small exporters
682       for (multimap<double,mds_rank_t>::reverse_iterator ex = exporters.rbegin();
683            ex != exporters.rend();
684            ++ex) {
685         double maxex = get_maxex(state, ex->second);
686         if (maxex <= .001) continue;
687
688         // check importers. for now, just in arbitrary order (no intelligent matching).
689         for (map<mds_rank_t, float>::iterator im = mds_import_map[ex->second].begin();
690              im != mds_import_map[ex->second].end();
691              ++im) {
692           double maxim = get_maxim(state, im->first);
693           if (maxim <= .001) continue;
694           try_match(state, ex->second, maxex, im->first, maxim);
695           if (maxex <= .001) break;
696         }
697       }
698     }
699
700     // old way
701     if (beat % 2 == 1) {
702       dout(15) << "  matching big exporters to big importers" << dendl;
703       // big exporters to big importers
704       multimap<double,mds_rank_t>::reverse_iterator ex = exporters.rbegin();
705       multimap<double,mds_rank_t>::iterator im = importers.begin();
706       while (ex != exporters.rend() &&
707              im != importers.end()) {
708         double maxex = get_maxex(state, ex->second);
709         double maxim = get_maxim(state, im->second);
710         if (maxex < .001 || maxim < .001) break;
711         try_match(state, ex->second, maxex, im->second, maxim);
712         if (maxex <= .001) ++ex;
713         if (maxim <= .001) ++im;
714       }
715     } else { // new way
716       dout(15) << "  matching small exporters to big importers" << dendl;
717       // small exporters to big importers
718       multimap<double,mds_rank_t>::iterator ex = exporters.begin();
719       multimap<double,mds_rank_t>::iterator im = importers.begin();
720       while (ex != exporters.end() &&
721              im != importers.end()) {
722         double maxex = get_maxex(state, ex->second);
723         double maxim = get_maxim(state, im->second);
724         if (maxex < .001 || maxim < .001) break;
725         try_match(state, ex->second, maxex, im->second, maxim);
726         if (maxex <= .001) ++ex;
727         if (maxim <= .001) ++im;
728       }
729     }
730   }
731   try_rebalance(state);
732 }
733
734 int MDBalancer::mantle_prep_rebalance()
735 {
736   balance_state_t state;
737
738   /* refresh balancer if it has changed */
739   if (bal_version != mds->mdsmap->get_balancer()) {
740     bal_version.assign("");
741     int r = localize_balancer();
742     if (r) return r;
743
744     /* only spam the cluster log from 1 mds on version changes */
745     if (mds->get_nodeid() == 0)
746       mds->clog->info() << "mantle balancer version changed: " << bal_version;
747   }
748
749   /* prepare for balancing */
750   int cluster_size = mds->get_mds_map()->get_num_in_mds();
751   rebalance_time = ceph_clock_now();
752   mds->mdcache->migrator->clear_export_queue();
753
754   /* fill in the metrics for each mds by grabbing load struct */
755   vector < map<string, double> > metrics (cluster_size);
756   for (mds_rank_t i=mds_rank_t(0);
757        i < mds_rank_t(cluster_size);
758        i++) {
759     map<mds_rank_t, mds_load_t>::value_type val(i, mds_load_t(ceph_clock_now()));
760     std::pair < map<mds_rank_t, mds_load_t>::iterator, bool > r(mds_load.insert(val));
761     mds_load_t &load(r.first->second);
762
763     metrics[i] = {{"auth.meta_load", load.auth.meta_load()},
764                   {"all.meta_load", load.all.meta_load()},
765                   {"req_rate", load.req_rate},
766                   {"queue_len", load.queue_len},
767                   {"cpu_load_avg", load.cpu_load_avg}};
768   }
769
770   /* execute the balancer */
771   Mantle mantle;
772   int ret = mantle.balance(bal_code, mds->get_nodeid(), metrics, state.targets);
773   dout(2) << " mantle decided that new targets=" << state.targets << dendl;
774
775   /* mantle doesn't know about cluster size, so check target len here */
776   if ((int) state.targets.size() != cluster_size)
777     return -EINVAL;
778   else if (ret)
779     return ret;
780
781   try_rebalance(state);
782   return 0;
783 }
784
785
786
787 void MDBalancer::try_rebalance(balance_state_t& state)
788 {
789   if (g_conf->mds_thrash_exports) {
790     dout(5) << "mds_thrash is on; not performing standard rebalance operation!"
791             << dendl;
792     return;
793   }
794
795   // make a sorted list of my imports
796   map<double,CDir*>    import_pop_map;
797   multimap<mds_rank_t,CDir*>  import_from_map;
798   set<CDir*> fullauthsubs;
799
800   mds->mdcache->get_fullauth_subtrees(fullauthsubs);
801   for (set<CDir*>::iterator it = fullauthsubs.begin();
802        it != fullauthsubs.end();
803        ++it) {
804     CDir *im = *it;
805     if (im->get_inode()->is_stray()) continue;
806
807     double pop = im->pop_auth_subtree.meta_load(rebalance_time, mds->mdcache->decayrate);
808     if (g_conf->mds_bal_idle_threshold > 0 &&
809         pop < g_conf->mds_bal_idle_threshold &&
810         im->inode != mds->mdcache->get_root() &&
811         im->inode->authority().first != mds->get_nodeid()) {
812       dout(0) << " exporting idle (" << pop << ") import " << *im
813               << " back to mds." << im->inode->authority().first
814               << dendl;
815       mds->mdcache->migrator->export_dir_nicely(im, im->inode->authority().first);
816       continue;
817     }
818
819     import_pop_map[ pop ] = im;
820     mds_rank_t from = im->inode->authority().first;
821     dout(15) << "  map: i imported " << *im << " from " << from << dendl;
822     import_from_map.insert(pair<mds_rank_t,CDir*>(from, im));
823   }
824
825
826
827   // do my exports!
828   set<CDir*> already_exporting;
829
830   for (auto &it : state.targets) {
831     mds_rank_t target = it.first;
832     double amount = it.second;
833
834     if (amount < MIN_OFFLOAD) continue;
835     if (amount / target_load < .2) continue;
836
837     dout(5) << "want to send " << amount << " to mds." << target
838       //<< " .. " << (*it).second << " * " << load_fac
839             << " -> " << amount
840             << dendl;//" .. fudge is " << fudge << dendl;
841     double have = 0.0;
842
843
844     mds->mdcache->show_subtrees();
845
846     // search imports from target
847     if (import_from_map.count(target)) {
848       dout(5) << " aha, looking through imports from target mds." << target << dendl;
849       pair<multimap<mds_rank_t,CDir*>::iterator, multimap<mds_rank_t,CDir*>::iterator> p =
850         import_from_map.equal_range(target);
851       while (p.first != p.second) {
852         CDir *dir = (*p.first).second;
853         dout(5) << "considering " << *dir << " from " << (*p.first).first << dendl;
854         multimap<mds_rank_t,CDir*>::iterator plast = p.first++;
855
856         if (dir->inode->is_base() ||
857             dir->inode->is_stray())
858           continue;
859         if (dir->is_freezing() || dir->is_frozen()) continue;  // export pbly already in progress
860         double pop = dir->pop_auth_subtree.meta_load(rebalance_time, mds->mdcache->decayrate);
861         assert(dir->inode->authority().first == target);  // cuz that's how i put it in the map, dummy
862
863         if (pop <= amount-have) {
864           dout(0) << "reexporting " << *dir
865                   << " pop " << pop
866                   << " back to mds." << target << dendl;
867           mds->mdcache->migrator->export_dir_nicely(dir, target);
868           have += pop;
869           import_from_map.erase(plast);
870           import_pop_map.erase(pop);
871         } else {
872           dout(5) << "can't reexport " << *dir << ", too big " << pop << dendl;
873         }
874         if (amount-have < MIN_OFFLOAD) break;
875       }
876     }
877     if (amount-have < MIN_OFFLOAD) {
878       continue;
879     }
880
881     // any other imports
882     if (false)
883       for (map<double,CDir*>::iterator import = import_pop_map.begin();
884            import != import_pop_map.end();
885            import++) {
886         CDir *imp = (*import).second;
887         if (imp->inode->is_base() ||
888             imp->inode->is_stray())
889           continue;
890
891         double pop = (*import).first;
892         if (pop < amount-have || pop < MIN_REEXPORT) {
893           dout(0) << "reexporting " << *imp
894                   << " pop " << pop
895                   << " back to mds." << imp->inode->authority()
896                   << dendl;
897           have += pop;
898           mds->mdcache->migrator->export_dir_nicely(imp, imp->inode->authority().first);
899         }
900         if (amount-have < MIN_OFFLOAD) break;
901       }
902     if (amount-have < MIN_OFFLOAD) {
903       //fudge = amount-have;
904       continue;
905     }
906
907     // okay, search for fragments of my workload
908     set<CDir*> candidates;
909     mds->mdcache->get_fullauth_subtrees(candidates);
910
911     list<CDir*> exports;
912
913     for (set<CDir*>::iterator pot = candidates.begin();
914          pot != candidates.end();
915          ++pot) {
916       if ((*pot)->get_inode()->is_stray()) continue;
917       find_exports(*pot, amount, exports, have, already_exporting);
918       if (have > amount-MIN_OFFLOAD)
919         break;
920     }
921     //fudge = amount - have;
922
923     for (list<CDir*>::iterator it = exports.begin(); it != exports.end(); ++it) {
924       dout(0) << "   - exporting "
925                << (*it)->pop_auth_subtree
926                << " "
927                << (*it)->pop_auth_subtree.meta_load(rebalance_time, mds->mdcache->decayrate)
928                << " to mds." << target
929                << " " << **it
930                << dendl;
931       mds->mdcache->migrator->export_dir_nicely(*it, target);
932     }
933   }
934
935   dout(5) << "rebalance done" << dendl;
936   mds->mdcache->show_subtrees();
937 }
938
939 void MDBalancer::find_exports(CDir *dir,
940                               double amount,
941                               list<CDir*>& exports,
942                               double& have,
943                               set<CDir*>& already_exporting)
944 {
945   double need = amount - have;
946   if (need < amount * g_conf->mds_bal_min_start)
947     return;   // good enough!
948   double needmax = need * g_conf->mds_bal_need_max;
949   double needmin = need * g_conf->mds_bal_need_min;
950   double midchunk = need * g_conf->mds_bal_midchunk;
951   double minchunk = need * g_conf->mds_bal_minchunk;
952
953   list<CDir*> bigger_rep, bigger_unrep;
954   multimap<double, CDir*> smaller;
955
956   double dir_pop = dir->pop_auth_subtree.meta_load(rebalance_time, mds->mdcache->decayrate);
957   dout(7) << " find_exports in " << dir_pop << " " << *dir << " need " << need << " (" << needmin << " - " << needmax << ")" << dendl;
958
959   double subdir_sum = 0;
960   for (CDir::map_t::iterator it = dir->begin();
961        it != dir->end();
962        ++it) {
963     CInode *in = it->second->get_linkage()->get_inode();
964     if (!in) continue;
965     if (!in->is_dir()) continue;
966
967     list<CDir*> dfls;
968     in->get_dirfrags(dfls);
969     for (list<CDir*>::iterator p = dfls.begin();
970          p != dfls.end();
971          ++p) {
972       CDir *subdir = *p;
973       if (!subdir->is_auth()) continue;
974       if (already_exporting.count(subdir)) continue;
975
976       if (subdir->is_frozen()) continue;  // can't export this right now!
977
978       // how popular?
979       double pop = subdir->pop_auth_subtree.meta_load(rebalance_time, mds->mdcache->decayrate);
980       subdir_sum += pop;
981       dout(15) << "   subdir pop " << pop << " " << *subdir << dendl;
982
983       if (pop < minchunk) continue;
984
985       // lucky find?
986       if (pop > needmin && pop < needmax) {
987         exports.push_back(subdir);
988         already_exporting.insert(subdir);
989         have += pop;
990         return;
991       }
992
993       if (pop > need) {
994         if (subdir->is_rep())
995           bigger_rep.push_back(subdir);
996         else
997           bigger_unrep.push_back(subdir);
998       } else
999         smaller.insert(pair<double,CDir*>(pop, subdir));
1000     }
1001   }
1002   dout(15) << "   sum " << subdir_sum << " / " << dir_pop << dendl;
1003
1004   // grab some sufficiently big small items
1005   multimap<double,CDir*>::reverse_iterator it;
1006   for (it = smaller.rbegin();
1007        it != smaller.rend();
1008        ++it) {
1009
1010     if ((*it).first < midchunk)
1011       break;  // try later
1012
1013     dout(7) << "   taking smaller " << *(*it).second << dendl;
1014
1015     exports.push_back((*it).second);
1016     already_exporting.insert((*it).second);
1017     have += (*it).first;
1018     if (have > needmin)
1019       return;
1020   }
1021
1022   // apprently not enough; drill deeper into the hierarchy (if non-replicated)
1023   for (list<CDir*>::iterator it = bigger_unrep.begin();
1024        it != bigger_unrep.end();
1025        ++it) {
1026     dout(15) << "   descending into " << **it << dendl;
1027     find_exports(*it, amount, exports, have, already_exporting);
1028     if (have > needmin)
1029       return;
1030   }
1031
1032   // ok fine, use smaller bits
1033   for (;
1034        it != smaller.rend();
1035        ++it) {
1036     dout(7) << "   taking (much) smaller " << it->first << " " << *(*it).second << dendl;
1037
1038     exports.push_back((*it).second);
1039     already_exporting.insert((*it).second);
1040     have += (*it).first;
1041     if (have > needmin)
1042       return;
1043   }
1044
1045   // ok fine, drill into replicated dirs
1046   for (list<CDir*>::iterator it = bigger_rep.begin();
1047        it != bigger_rep.end();
1048        ++it) {
1049     dout(7) << "   descending into replicated " << **it << dendl;
1050     find_exports(*it, amount, exports, have, already_exporting);
1051     if (have > needmin)
1052       return;
1053   }
1054
1055 }
1056
1057 void MDBalancer::hit_inode(utime_t now, CInode *in, int type, int who)
1058 {
1059   // hit inode
1060   in->pop.get(type).hit(now, mds->mdcache->decayrate);
1061
1062   if (in->get_parent_dn())
1063     hit_dir(now, in->get_parent_dn()->get_dir(), type, who);
1064 }
1065
1066 void MDBalancer::maybe_fragment(CDir *dir, bool hot)
1067 {
1068   // split/merge
1069   if (g_conf->mds_bal_frag && g_conf->mds_bal_fragment_interval > 0 &&
1070       !dir->inode->is_base() &&        // not root/base (for now at least)
1071       dir->is_auth()) {
1072
1073     // split
1074     if (g_conf->mds_bal_split_size > 0 &&
1075         mds->mdsmap->allows_dirfrags() &&
1076         (dir->should_split() || hot))
1077     {
1078       if (split_pending.count(dir->dirfrag()) == 0) {
1079         queue_split(dir, false);
1080       } else {
1081         if (dir->should_split_fast()) {
1082           queue_split(dir, true);
1083         } else {
1084           dout(10) << __func__ << ": fragment already enqueued to split: "
1085                    << *dir << dendl;
1086         }
1087       }
1088     }
1089
1090     // merge?
1091     if (dir->get_frag() != frag_t() && dir->should_merge() &&
1092         merge_pending.count(dir->dirfrag()) == 0) {
1093       queue_merge(dir);
1094     }
1095   }
1096 }
1097
1098 void MDBalancer::hit_dir(utime_t now, CDir *dir, int type, int who, double amount)
1099 {
1100   // hit me
1101   double v = dir->pop_me.get(type).hit(now, amount);
1102
1103   const bool hot = (v > g_conf->mds_bal_split_rd && type == META_POP_IRD) ||
1104                    (v > g_conf->mds_bal_split_wr && type == META_POP_IWR);
1105
1106   dout(20) << "hit_dir " << type << " pop is " << v << ", frag " << dir->get_frag()
1107            << " size " << dir->get_frag_size() << dendl;
1108
1109   maybe_fragment(dir, hot);
1110
1111   // replicate?
1112   if (type == META_POP_IRD && who >= 0) {
1113     dir->pop_spread.hit(now, mds->mdcache->decayrate, who);
1114   }
1115
1116   double rd_adj = 0.0;
1117   if (type == META_POP_IRD &&
1118       dir->last_popularity_sample < last_sample) {
1119     double dir_pop = dir->pop_auth_subtree.get(type).get(now, mds->mdcache->decayrate);    // hmm??
1120     dir->last_popularity_sample = last_sample;
1121     double pop_sp = dir->pop_spread.get(now, mds->mdcache->decayrate);
1122     dir_pop += pop_sp * 10;
1123
1124     //if (dir->ino() == inodeno_t(0x10000000002))
1125     if (pop_sp > 0) {
1126       dout(20) << "hit_dir " << type << " pop " << dir_pop << " spread " << pop_sp
1127               << " " << dir->pop_spread.last[0]
1128               << " " << dir->pop_spread.last[1]
1129               << " " << dir->pop_spread.last[2]
1130               << " " << dir->pop_spread.last[3]
1131               << " in " << *dir << dendl;
1132     }
1133
1134     if (dir->is_auth() && !dir->is_ambiguous_auth()) {
1135       if (!dir->is_rep() &&
1136           dir_pop >= g_conf->mds_bal_replicate_threshold) {
1137         // replicate
1138         double rdp = dir->pop_me.get(META_POP_IRD).get(now, mds->mdcache->decayrate);
1139         rd_adj = rdp / mds->get_mds_map()->get_num_in_mds() - rdp;
1140         rd_adj /= 2.0;  // temper somewhat
1141
1142         dout(0) << "replicating dir " << *dir << " pop " << dir_pop << " .. rdp " << rdp << " adj " << rd_adj << dendl;
1143
1144         dir->dir_rep = CDir::REP_ALL;
1145         mds->mdcache->send_dir_updates(dir, true);
1146
1147         // fixme this should adjust the whole pop hierarchy
1148         dir->pop_me.get(META_POP_IRD).adjust(rd_adj);
1149         dir->pop_auth_subtree.get(META_POP_IRD).adjust(rd_adj);
1150       }
1151
1152       if (dir->ino() != 1 &&
1153           dir->is_rep() &&
1154           dir_pop < g_conf->mds_bal_unreplicate_threshold) {
1155         // unreplicate
1156         dout(0) << "unreplicating dir " << *dir << " pop " << dir_pop << dendl;
1157
1158         dir->dir_rep = CDir::REP_NONE;
1159         mds->mdcache->send_dir_updates(dir);
1160       }
1161     }
1162   }
1163
1164   // adjust ancestors
1165   bool hit_subtree = dir->is_auth();         // current auth subtree (if any)
1166   bool hit_subtree_nested = dir->is_auth();  // all nested auth subtrees
1167
1168   while (true) {
1169     dir->pop_nested.get(type).hit(now, amount);
1170     if (rd_adj != 0.0)
1171       dir->pop_nested.get(META_POP_IRD).adjust(now, mds->mdcache->decayrate, rd_adj);
1172
1173     if (hit_subtree) {
1174       dir->pop_auth_subtree.get(type).hit(now, amount);
1175       if (rd_adj != 0.0)
1176         dir->pop_auth_subtree.get(META_POP_IRD).adjust(now, mds->mdcache->decayrate, rd_adj);
1177     }
1178
1179     if (hit_subtree_nested) {
1180       dir->pop_auth_subtree_nested.get(type).hit(now, mds->mdcache->decayrate, amount);
1181       if (rd_adj != 0.0)
1182         dir->pop_auth_subtree_nested.get(META_POP_IRD).adjust(now, mds->mdcache->decayrate, rd_adj);
1183     }
1184
1185     if (dir->is_subtree_root())
1186       hit_subtree = false;                // end of auth domain, stop hitting auth counters.
1187
1188     if (dir->inode->get_parent_dn() == 0) break;
1189     dir = dir->inode->get_parent_dn()->get_dir();
1190   }
1191 }
1192
1193
1194 /*
1195  * subtract off an exported chunk.
1196  *  this excludes *dir itself (encode_export_dir should have take care of that)
1197  *  we _just_ do the parents' nested counters.
1198  *
1199  * NOTE: call me _after_ forcing *dir into a subtree root,
1200  *       but _before_ doing the encode_export_dirs.
1201  */
1202 void MDBalancer::subtract_export(CDir *dir, utime_t now)
1203 {
1204   dirfrag_load_vec_t subload = dir->pop_auth_subtree;
1205
1206   while (true) {
1207     dir = dir->inode->get_parent_dir();
1208     if (!dir) break;
1209
1210     dir->pop_nested.sub(now, mds->mdcache->decayrate, subload);
1211     dir->pop_auth_subtree_nested.sub(now, mds->mdcache->decayrate, subload);
1212   }
1213 }
1214
1215
1216 void MDBalancer::add_import(CDir *dir, utime_t now)
1217 {
1218   dirfrag_load_vec_t subload = dir->pop_auth_subtree;
1219
1220   while (true) {
1221     dir = dir->inode->get_parent_dir();
1222     if (!dir) break;
1223
1224     dir->pop_nested.add(now, mds->mdcache->decayrate, subload);
1225     dir->pop_auth_subtree_nested.add(now, mds->mdcache->decayrate, subload);
1226   }
1227 }
1228
1229 void MDBalancer::handle_mds_failure(mds_rank_t who)
1230 {
1231   if (0 == who) {
1232     last_epoch_under = 0;
1233   }
1234 }