Fix some bugs when testing opensds ansible
[stor4nfv.git] / src / ceph / src / crush / CrushWrapper.cc
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
3
4 #include "osd/osd_types.h"
5 #include "common/debug.h"
6 #include "common/Formatter.h"
7 #include "common/errno.h"
8 #include "common/TextTable.h"
9 #include "include/stringify.h"
10
11 #include "CrushWrapper.h"
12 #include "CrushTreeDumper.h"
13
14 #define dout_subsys ceph_subsys_crush
15
16 bool CrushWrapper::has_legacy_rule_ids() const
17 {
18   for (unsigned i=0; i<crush->max_rules; i++) {
19     crush_rule *r = crush->rules[i];
20     if (r &&
21         r->mask.ruleset != i) {
22       return true;
23     }
24   }
25   return false;
26 }
27
28 std::map<int, int> CrushWrapper::renumber_rules()
29 {
30   std::map<int, int> result;
31   for (unsigned i=0; i<crush->max_rules; i++) {
32     crush_rule *r = crush->rules[i];
33     if (r && r->mask.ruleset != i) {
34       result[r->mask.ruleset] = i;
35       r->mask.ruleset = i;
36     }
37   }
38   return result;
39 }
40
41 bool CrushWrapper::has_non_straw2_buckets() const
42 {
43   for (int i=0; i<crush->max_buckets; ++i) {
44     crush_bucket *b = crush->buckets[i];
45     if (!b)
46       continue;
47     if (b->alg != CRUSH_BUCKET_STRAW2)
48       return true;
49   }
50   return false;
51 }
52
53 bool CrushWrapper::has_v2_rules() const
54 {
55   for (unsigned i=0; i<crush->max_rules; i++) {
56     if (is_v2_rule(i)) {
57       return true;
58     }
59   }
60   return false;
61 }
62
63 bool CrushWrapper::is_v2_rule(unsigned ruleid) const
64 {
65   // check rule for use of indep or new SET_* rule steps
66   if (ruleid >= crush->max_rules)
67     return false;
68   crush_rule *r = crush->rules[ruleid];
69   if (!r)
70     return false;
71   for (unsigned j=0; j<r->len; j++) {
72     if (r->steps[j].op == CRUSH_RULE_CHOOSE_INDEP ||
73         r->steps[j].op == CRUSH_RULE_CHOOSELEAF_INDEP ||
74         r->steps[j].op == CRUSH_RULE_SET_CHOOSE_TRIES ||
75         r->steps[j].op == CRUSH_RULE_SET_CHOOSELEAF_TRIES) {
76       return true;
77     }
78   }
79   return false;
80 }
81
82 bool CrushWrapper::has_v3_rules() const
83 {
84   for (unsigned i=0; i<crush->max_rules; i++) {
85     if (is_v3_rule(i)) {
86       return true;
87     }
88   }
89   return false;
90 }
91
92 bool CrushWrapper::is_v3_rule(unsigned ruleid) const
93 {
94   // check rule for use of SET_CHOOSELEAF_VARY_R step
95   if (ruleid >= crush->max_rules)
96     return false;
97   crush_rule *r = crush->rules[ruleid];
98   if (!r)
99     return false;
100   for (unsigned j=0; j<r->len; j++) {
101     if (r->steps[j].op == CRUSH_RULE_SET_CHOOSELEAF_VARY_R) {
102       return true;
103     }
104   }
105   return false;
106 }
107
108 bool CrushWrapper::has_v4_buckets() const
109 {
110   for (int i=0; i<crush->max_buckets; ++i) {
111     crush_bucket *b = crush->buckets[i];
112     if (!b)
113       continue;
114     if (b->alg == CRUSH_BUCKET_STRAW2)
115       return true;
116   }
117   return false;
118 }
119
120 bool CrushWrapper::has_v5_rules() const
121 {
122   for (unsigned i=0; i<crush->max_rules; i++) {
123     if (is_v5_rule(i)) {
124       return true;
125     }
126   }
127   return false;
128 }
129
130 bool CrushWrapper::is_v5_rule(unsigned ruleid) const
131 {
132   // check rule for use of SET_CHOOSELEAF_STABLE step
133   if (ruleid >= crush->max_rules)
134     return false;
135   crush_rule *r = crush->rules[ruleid];
136   if (!r)
137     return false;
138   for (unsigned j=0; j<r->len; j++) {
139     if (r->steps[j].op == CRUSH_RULE_SET_CHOOSELEAF_STABLE) {
140       return true;
141     }
142   }
143   return false;
144 }
145
146 bool CrushWrapper::has_choose_args() const
147 {
148   return !choose_args.empty();
149 }
150
151 bool CrushWrapper::has_incompat_choose_args() const
152 {
153   if (choose_args.empty())
154     return false;
155   if (choose_args.size() > 1)
156     return true;
157   if (choose_args.begin()->first != DEFAULT_CHOOSE_ARGS)
158     return true;
159   crush_choose_arg_map arg_map = choose_args.begin()->second;
160   for (__u32 i = 0; i < arg_map.size; i++) {
161     crush_choose_arg *arg = &arg_map.args[i];
162     if (arg->weight_set_size == 0 &&
163         arg->ids_size == 0)
164         continue;
165     if (arg->weight_set_size != 1)
166       return true;
167     if (arg->ids_size != 0)
168       return true;
169   }
170   return false;
171 }
172
173 int CrushWrapper::split_id_class(int i, int *idout, int *classout) const
174 {
175   if (!item_exists(i))
176     return -EINVAL;
177   string name = get_item_name(i);
178   size_t pos = name.find("~");
179   if (pos == string::npos) {
180     *idout = i;
181     *classout = -1;
182     return 0;
183   }
184   string name_no_class = name.substr(0, pos);
185   if (!name_exists(name_no_class))
186     return -ENOENT;
187   string class_name = name.substr(pos + 1);
188   if (!class_exists(class_name))
189     return -ENOENT;
190   *idout = get_item_id(name_no_class);
191   *classout = get_class_id(class_name);
192   return 0;
193 }
194
195 int CrushWrapper::can_rename_item(const string& srcname,
196                                   const string& dstname,
197                                   ostream *ss) const
198 {
199   if (name_exists(srcname)) {
200     if (name_exists(dstname)) {
201       *ss << "dstname = '" << dstname << "' already exists";
202       return -EEXIST;
203     }
204     if (is_valid_crush_name(dstname)) {
205       return 0;
206     } else {
207       *ss << "dstname = '" << dstname << "' does not match [-_.0-9a-zA-Z]+";
208       return -EINVAL;
209     }
210   } else {
211     if (name_exists(dstname)) {
212       *ss << "srcname = '" << srcname << "' does not exist "
213           << "and dstname = '" << dstname << "' already exists";
214       return -EALREADY;
215     } else {
216       *ss << "srcname = '" << srcname << "' does not exist";
217       return -ENOENT;
218     }
219   }
220 }
221
222 int CrushWrapper::rename_item(const string& srcname,
223                               const string& dstname,
224                               ostream *ss)
225 {
226   int ret = can_rename_item(srcname, dstname, ss);
227   if (ret < 0)
228     return ret;
229   int oldid = get_item_id(srcname);
230   return set_item_name(oldid, dstname);
231 }
232
233 int CrushWrapper::can_rename_bucket(const string& srcname,
234                                     const string& dstname,
235                                     ostream *ss) const
236 {
237   int ret = can_rename_item(srcname, dstname, ss);
238   if (ret)
239     return ret;
240   int srcid = get_item_id(srcname);
241   if (srcid >= 0) {
242     *ss << "srcname = '" << srcname << "' is not a bucket "
243         << "because its id = " << srcid << " is >= 0";
244     return -ENOTDIR;
245   }
246   return 0;
247 }
248
249 int CrushWrapper::rename_bucket(const string& srcname,
250                                 const string& dstname,
251                                 ostream *ss)
252 {
253   int ret = can_rename_bucket(srcname, dstname, ss);
254   if (ret < 0)
255     return ret;
256   int oldid = get_item_id(srcname);
257   return set_item_name(oldid, dstname);
258 }
259
260 int CrushWrapper::rename_rule(const string& srcname,
261                               const string& dstname,
262                               ostream *ss)
263 {
264   if (!rule_exists(srcname)) {
265     if (ss) {
266       *ss << "source rule name '" << srcname << "' does not exist";
267     }
268     return -ENOENT;
269   }
270   if (rule_exists(dstname)) {
271     if (ss) {
272       *ss << "destination rule name '" << dstname << "' already exists";
273     }
274     return -EEXIST;
275   }
276   int rule_id = get_rule_id(srcname);
277   auto it = rule_name_map.find(rule_id);
278   assert(it != rule_name_map.end());
279   it->second = dstname;
280   if (have_rmaps) {
281     rule_name_rmap.erase(srcname);
282     rule_name_rmap[dstname] = rule_id;
283   }
284   return 0;
285 }
286
287 void CrushWrapper::find_takes(set<int> *roots) const
288 {
289   for (unsigned i=0; i<crush->max_rules; i++) {
290     crush_rule *r = crush->rules[i];
291     if (!r)
292       continue;
293     for (unsigned j=0; j<r->len; j++) {
294       if (r->steps[j].op == CRUSH_RULE_TAKE)
295         roots->insert(r->steps[j].arg1);
296     }
297   }
298 }
299
300 void CrushWrapper::find_roots(set<int> *roots) const
301 {
302   for (int i = 0; i < crush->max_buckets; i++) {
303     if (!crush->buckets[i])
304       continue;
305     crush_bucket *b = crush->buckets[i];
306     if (!_search_item_exists(b->id))
307       roots->insert(b->id);
308   }
309 }
310
311 bool CrushWrapper::subtree_contains(int root, int item) const
312 {
313   if (root == item)
314     return true;
315
316   if (root >= 0)
317     return false;  // root is a leaf
318
319   const crush_bucket *b = get_bucket(root);
320   if (IS_ERR(b))
321     return false;
322
323   for (unsigned j=0; j<b->size; j++) {
324     if (subtree_contains(b->items[j], item))
325       return true;
326   }
327   return false;
328 }
329
330 bool CrushWrapper::_maybe_remove_last_instance(CephContext *cct, int item, bool unlink_only)
331 {
332   // last instance?
333   if (_search_item_exists(item)) {
334     return false;
335   }
336   if (item < 0 && _bucket_is_in_use(item)) {
337     return false;
338   }
339
340   if (item < 0 && !unlink_only) {
341     crush_bucket *t = get_bucket(item);
342     ldout(cct, 5) << "_maybe_remove_last_instance removing bucket " << item << dendl;
343     crush_remove_bucket(crush, t);
344     if (class_bucket.count(item) != 0)
345       class_bucket.erase(item);
346     class_remove_item(item);
347   }
348   if ((item >= 0 || !unlink_only) && name_map.count(item)) {
349     ldout(cct, 5) << "_maybe_remove_last_instance removing name for item " << item << dendl;
350     name_map.erase(item);
351     have_rmaps = false;
352     if (item >= 0 && !unlink_only) {
353       class_remove_item(item);
354     }
355   }
356   rebuild_roots_with_classes();
357   return true;
358 }
359
360 int CrushWrapper::remove_root(int item)
361 {
362   crush_bucket *b = get_bucket(item);
363   if (IS_ERR(b)) {
364     // should be idempotent
365     // e.g.: we use 'crush link' to link same host into
366     // different roots, which as a result can cause different
367     // shadow trees reference same hosts too. This means
368     // we may need to destory the same buckets(hosts, racks, etc.)
369     // multiple times during rebuilding all shadow trees.
370     return 0;
371   }
372
373   for (unsigned n = 0; n < b->size; n++) {
374     if (b->items[n] >= 0)
375       continue;
376     int r = remove_root(b->items[n]);
377     if (r < 0)
378       return r;
379   }
380
381   crush_remove_bucket(crush, b);
382   if (name_map.count(item) != 0) {
383     name_map.erase(item);
384     have_rmaps = false;
385   }
386   if (class_bucket.count(item) != 0)
387     class_bucket.erase(item);
388   class_remove_item(item);
389   return 0;
390 }
391
392 int CrushWrapper::remove_item(CephContext *cct, int item, bool unlink_only)
393 {
394   ldout(cct, 5) << "remove_item " << item
395                 << (unlink_only ? " unlink_only":"") << dendl;
396
397   int ret = -ENOENT;
398
399   if (item < 0 && !unlink_only) {
400     crush_bucket *t = get_bucket(item);
401     if (IS_ERR(t)) {
402       ldout(cct, 1) << "remove_item bucket " << item << " does not exist"
403                     << dendl;
404       return -ENOENT;
405     }
406
407     if (t->size) {
408       ldout(cct, 1) << "remove_item bucket " << item << " has " << t->size
409                     << " items, not empty" << dendl;
410       return -ENOTEMPTY;
411     }
412     if (_bucket_is_in_use(item)) {
413       return -EBUSY;
414     }
415   }
416
417   for (int i = 0; i < crush->max_buckets; i++) {
418     if (!crush->buckets[i])
419       continue;
420     crush_bucket *b = crush->buckets[i];
421
422     for (unsigned i=0; i<b->size; ++i) {
423       int id = b->items[i];
424       if (id == item) {
425         ldout(cct, 5) << "remove_item removing item " << item
426                       << " from bucket " << b->id << dendl;
427         for (auto& p : choose_args) {
428           // weight down each weight-set to 0 before we remove the item
429           vector<int> weightv(get_choose_args_positions(p.second), 0);
430           choose_args_adjust_item_weight(cct, p.second, item, weightv, nullptr);
431         }
432         bucket_remove_item(b, item);
433         adjust_item_weight(cct, b->id, b->weight);
434         ret = 0;
435       }
436     }
437   }
438
439   if (_maybe_remove_last_instance(cct, item, unlink_only))
440     ret = 0;
441   
442   return ret;
443 }
444
445 bool CrushWrapper::_search_item_exists(int item) const
446 {
447   for (int i = 0; i < crush->max_buckets; i++) {
448     if (!crush->buckets[i])
449       continue;
450     crush_bucket *b = crush->buckets[i];
451     for (unsigned j=0; j<b->size; ++j) {
452       if (b->items[j] == item)
453         return true;
454     }
455   }
456   return false;
457 }
458
459 bool CrushWrapper::_bucket_is_in_use(int item)
460 {
461   for (auto &i : class_bucket)
462     for (auto &j : i.second)
463       if (j.second == item)
464         return true;
465   for (unsigned i = 0; i < crush->max_rules; ++i) {
466     crush_rule *r = crush->rules[i];
467     if (!r)
468       continue;
469     for (unsigned j = 0; j < r->len; ++j) {
470       if (r->steps[j].op == CRUSH_RULE_TAKE) {
471         int step_item = r->steps[j].arg1;
472         int original_item;
473         int c;
474         int res = split_id_class(step_item, &original_item, &c);
475         if (res < 0)
476           return false;
477         if (step_item == item || original_item == item)
478           return true;
479       }
480     }
481   }
482   return false;
483 }
484
485 int CrushWrapper::_remove_item_under(
486   CephContext *cct, int item, int ancestor, bool unlink_only)
487 {
488   ldout(cct, 5) << "_remove_item_under " << item << " under " << ancestor
489                 << (unlink_only ? " unlink_only":"") << dendl;
490
491   if (ancestor >= 0) {
492     return -EINVAL;
493   }
494
495   if (!bucket_exists(ancestor))
496     return -EINVAL;
497
498   int ret = -ENOENT;
499
500   crush_bucket *b = get_bucket(ancestor);
501   for (unsigned i=0; i<b->size; ++i) {
502     int id = b->items[i];
503     if (id == item) {
504       ldout(cct, 5) << "_remove_item_under removing item " << item
505                     << " from bucket " << b->id << dendl;
506       for (auto& p : choose_args) {
507         // weight down each weight-set to 0 before we remove the item
508         vector<int> weightv(get_choose_args_positions(p.second), 0);
509         _choose_args_adjust_item_weight_in_bucket(
510           cct, p.second, b->id, item, weightv, nullptr);
511       }
512       bucket_remove_item(b, item);
513       adjust_item_weight(cct, b->id, b->weight);
514       ret = 0;
515     } else if (id < 0) {
516       int r = remove_item_under(cct, item, id, unlink_only);
517       if (r == 0)
518         ret = 0;
519     }
520   }
521   return ret;
522 }
523
524 int CrushWrapper::remove_item_under(
525   CephContext *cct, int item, int ancestor, bool unlink_only)
526 {
527   ldout(cct, 5) << "remove_item_under " << item << " under " << ancestor
528                 << (unlink_only ? " unlink_only":"") << dendl;
529
530   if (!unlink_only && _bucket_is_in_use(item)) {
531     return -EBUSY;
532   }
533
534   int ret = _remove_item_under(cct, item, ancestor, unlink_only);
535   if (ret < 0)
536     return ret;
537
538   if (item < 0 && !unlink_only) {
539     crush_bucket *t = get_bucket(item);
540     if (IS_ERR(t)) {
541       ldout(cct, 1) << "remove_item_under bucket " << item
542                     << " does not exist" << dendl;
543       return -ENOENT;
544     }
545
546     if (t->size) {
547       ldout(cct, 1) << "remove_item_under bucket " << item << " has " << t->size
548                     << " items, not empty" << dendl;
549       return -ENOTEMPTY;
550     }
551   }
552
553   if (_maybe_remove_last_instance(cct, item, unlink_only))
554     ret = 0;
555
556   return ret;
557 }
558
559 int CrushWrapper::get_common_ancestor_distance(CephContext *cct, int id,
560                                const std::multimap<string,string>& loc)
561 {
562   ldout(cct, 5) << __func__ << " " << id << " " << loc << dendl;
563   if (!item_exists(id))
564     return -ENOENT;
565   map<string,string> id_loc = get_full_location(id);
566   ldout(cct, 20) << " id is at " << id_loc << dendl;
567
568   for (map<int,string>::const_iterator p = type_map.begin();
569        p != type_map.end();
570        ++p) {
571     map<string,string>::iterator ip = id_loc.find(p->second);
572     if (ip == id_loc.end())
573       continue;
574     for (std::multimap<string,string>::const_iterator q = loc.find(p->second);
575          q != loc.end();
576          ++q) {
577       if (q->first != p->second)
578         break;
579       if (q->second == ip->second)
580         return p->first;
581     }
582   }
583   return -ERANGE;
584 }
585
586 int CrushWrapper::parse_loc_map(const std::vector<string>& args,
587                                 std::map<string,string> *ploc)
588 {
589   ploc->clear();
590   for (unsigned i = 0; i < args.size(); ++i) {
591     const char *s = args[i].c_str();
592     const char *pos = strchr(s, '=');
593     if (!pos)
594       return -EINVAL;
595     string key(s, 0, pos-s);
596     string value(pos+1);
597     if (value.length())
598       (*ploc)[key] = value;
599     else
600       return -EINVAL;
601   }
602   return 0;
603 }
604
605 int CrushWrapper::parse_loc_multimap(const std::vector<string>& args,
606                                             std::multimap<string,string> *ploc)
607 {
608   ploc->clear();
609   for (unsigned i = 0; i < args.size(); ++i) {
610     const char *s = args[i].c_str();
611     const char *pos = strchr(s, '=');
612     if (!pos)
613       return -EINVAL;
614     string key(s, 0, pos-s);
615     string value(pos+1);
616     if (value.length())
617       ploc->insert(make_pair(key, value));
618     else
619       return -EINVAL;
620   }
621   return 0;
622 }
623
624 bool CrushWrapper::check_item_loc(CephContext *cct, int item, const map<string,string>& loc,
625                                   int *weight)
626 {
627   ldout(cct, 5) << "check_item_loc item " << item << " loc " << loc << dendl;
628
629   for (map<int,string>::const_iterator p = type_map.begin(); p != type_map.end(); ++p) {
630     // ignore device
631     if (p->first == 0)
632       continue;
633
634     // ignore types that aren't specified in loc
635     map<string,string>::const_iterator q = loc.find(p->second);
636     if (q == loc.end()) {
637       ldout(cct, 2) << "warning: did not specify location for '" << p->second << "' level (levels are "
638                     << type_map << ")" << dendl;
639       continue;
640     }
641
642     if (!name_exists(q->second)) {
643       ldout(cct, 5) << "check_item_loc bucket " << q->second << " dne" << dendl;
644       return false;
645     }
646
647     int id = get_item_id(q->second);
648     if (id >= 0) {
649       ldout(cct, 5) << "check_item_loc requested " << q->second << " for type " << p->second
650                     << " is a device, not bucket" << dendl;
651       return false;
652     }
653
654     assert(bucket_exists(id));
655     crush_bucket *b = get_bucket(id);
656
657     // see if item exists in this bucket
658     for (unsigned j=0; j<b->size; j++) {
659       if (b->items[j] == item) {
660         ldout(cct, 2) << "check_item_loc " << item << " exists in bucket " << b->id << dendl;
661         if (weight)
662           *weight = crush_get_bucket_item_weight(b, j);
663         return true;
664       }
665     }
666     return false;
667   }
668   
669   ldout(cct, 1) << "check_item_loc item " << item << " loc " << loc << dendl;
670   return false;
671 }
672
673 map<string, string> CrushWrapper::get_full_location(int id)
674 {
675   vector<pair<string, string> > full_location_ordered;
676   map<string,string> full_location;
677
678   get_full_location_ordered(id, full_location_ordered);
679
680   std::copy(full_location_ordered.begin(),
681       full_location_ordered.end(),
682       std::inserter(full_location, full_location.begin()));
683
684   return full_location;
685 }
686
687 int CrushWrapper::get_full_location_ordered(int id, vector<pair<string, string> >& path)
688 {
689   if (!item_exists(id))
690     return -ENOENT;
691   int cur = id;
692   int ret;
693   while (true) {
694     pair<string, string> parent_coord = get_immediate_parent(cur, &ret);
695     if (ret != 0)
696       break;
697     path.push_back(parent_coord);
698     cur = get_item_id(parent_coord.second);
699   }
700   return 0;
701 }
702
703 string CrushWrapper::get_full_location_ordered_string(int id)
704 {
705   vector<pair<string, string> > full_location_ordered;
706   string full_location;
707   get_full_location_ordered(id, full_location_ordered);
708   reverse(begin(full_location_ordered), end(full_location_ordered));
709   for(auto i = full_location_ordered.begin(); i != full_location_ordered.end(); i++) {
710     full_location = full_location + i->first + "=" + i->second;
711     if (i != full_location_ordered.end() - 1) {
712       full_location = full_location + ",";
713     }
714   }
715   return full_location;
716 }
717
718 map<int, string> CrushWrapper::get_parent_hierarchy(int id)
719 {
720   map<int,string> parent_hierarchy;
721   pair<string, string> parent_coord = get_immediate_parent(id);
722   int parent_id;
723
724   // get the integer type for id and create a counter from there
725   int type_counter = get_bucket_type(id);
726
727   // if we get a negative type then we can assume that we have an OSD
728   // change behavior in get_item_type FIXME
729   if (type_counter < 0)
730     type_counter = 0;
731
732   // read the type map and get the name of the type with the largest ID
733   int high_type = 0;
734   for (map<int, string>::iterator it = type_map.begin(); it != type_map.end(); ++it){
735     if ( (*it).first > high_type )
736       high_type = (*it).first;
737   }
738
739   parent_id = get_item_id(parent_coord.second);
740
741   while (type_counter < high_type) {
742     type_counter++;
743     parent_hierarchy[ type_counter ] = parent_coord.first;
744
745     if (type_counter < high_type){
746       // get the coordinate information for the next parent
747       parent_coord = get_immediate_parent(parent_id);
748       parent_id = get_item_id(parent_coord.second);
749     }
750   }
751
752   return parent_hierarchy;
753 }
754
755 int CrushWrapper::get_children(int id, list<int> *children)
756 {
757   // leaf?
758   if (id >= 0) {
759     return 0;
760   }
761
762   crush_bucket *b = get_bucket(id);
763   if (IS_ERR(b)) {
764     return -ENOENT;
765   }
766
767   for (unsigned n=0; n<b->size; n++) {
768     children->push_back(b->items[n]);
769   }
770   return b->size;
771 }
772
773 int CrushWrapper::_get_leaves(int id, list<int> *leaves)
774 {
775   assert(leaves);
776
777   // Already leaf?
778   if (id >= 0) {
779     leaves->push_back(id);
780     return 0;
781   }
782
783   crush_bucket *b = get_bucket(id);
784   if (IS_ERR(b)) {
785     return -ENOENT;
786   }
787
788   for (unsigned n = 0; n < b->size; n++) {
789     if (b->items[n] >= 0) {
790       leaves->push_back(b->items[n]);
791     } else {
792       // is a bucket, do recursive call
793       int r = _get_leaves(b->items[n], leaves);
794       if (r < 0) {
795         return r;
796       }
797     }
798   }
799
800   return 0; // all is well
801 }
802
803 int CrushWrapper::get_leaves(const string &name, set<int> *leaves)
804 {
805   assert(leaves);
806   leaves->clear();
807
808   if (!name_exists(name)) {
809     return -ENOENT;
810   }
811
812   int id = get_item_id(name);
813   if (id >= 0) {
814     // already leaf
815     leaves->insert(id);
816     return 0;
817   }
818
819   list<int> unordered;
820   int r = _get_leaves(id, &unordered);
821   if (r < 0) {
822     return r;
823   }
824
825   for (auto &p : unordered) {
826     leaves->insert(p);
827   }
828
829   return 0;
830 }
831
832 int CrushWrapper::insert_item(
833   CephContext *cct, int item, float weight, string name,
834   const map<string,string>& loc)  // typename -> bucketname
835 {
836   ldout(cct, 5) << "insert_item item " << item << " weight " << weight
837                 << " name " << name << " loc " << loc << dendl;
838
839   if (!is_valid_crush_name(name))
840     return -EINVAL;
841
842   if (!is_valid_crush_loc(cct, loc))
843     return -EINVAL;
844
845   int r = validate_weightf(weight);
846   if (r < 0) {
847     return r;
848   }
849
850   if (name_exists(name)) {
851     if (get_item_id(name) != item) {
852       ldout(cct, 10) << "device name '" << name << "' already exists as id "
853                      << get_item_id(name) << dendl;
854       return -EEXIST;
855     }
856   } else {
857     set_item_name(item, name);
858   }
859
860   int cur = item;
861
862   // create locations if locations don't exist and add child in
863   // location with 0 weight the more detail in the insert_item method
864   // declaration in CrushWrapper.h
865   for (auto p = type_map.begin(); p != type_map.end(); ++p) {
866     // ignore device type
867     if (p->first == 0)
868       continue;
869
870     // skip types that are unspecified
871     map<string,string>::const_iterator q = loc.find(p->second);
872     if (q == loc.end()) {
873       ldout(cct, 2) << "warning: did not specify location for '"
874                     << p->second << "' level (levels are "
875                     << type_map << ")" << dendl;
876       continue;
877     }
878
879     if (!name_exists(q->second)) {
880       ldout(cct, 5) << "insert_item creating bucket " << q->second << dendl;
881       int empty = 0, newid;
882       int r = add_bucket(0, 0,
883                          CRUSH_HASH_DEFAULT, p->first, 1, &cur, &empty, &newid);
884       if (r < 0) {
885         ldout(cct, 1) << "add_bucket failure error: " << cpp_strerror(r)
886                       << dendl;
887         return r;
888       }
889       set_item_name(newid, q->second);
890       
891       cur = newid;
892       continue;
893     }
894
895     // add to an existing bucket
896     int id = get_item_id(q->second);
897     if (!bucket_exists(id)) {
898       ldout(cct, 1) << "insert_item doesn't have bucket " << id << dendl;
899       return -EINVAL;
900     }
901
902     // check that we aren't creating a cycle.
903     if (subtree_contains(id, cur)) {
904       ldout(cct, 1) << "insert_item item " << cur << " already exists beneath "
905                     << id << dendl;
906       return -EINVAL;
907     }
908
909     // we have done sanity check above
910     crush_bucket *b = get_bucket(id);
911
912     if (p->first != b->type) {
913       ldout(cct, 1) << "insert_item existing bucket has type "
914         << "'" << type_map[b->type] << "' != "
915         << "'" << type_map[p->first] << "'" << dendl;
916       return -EINVAL;
917     }
918
919     // are we forming a loop?
920     if (subtree_contains(cur, b->id)) {
921       ldout(cct, 1) << "insert_item " << cur << " already contains " << b->id
922                     << "; cannot form loop" << dendl;
923       return -ELOOP;
924     }
925
926     ldout(cct, 5) << "insert_item adding " << cur << " weight " << weight
927                   << " to bucket " << id << dendl;
928     int r = bucket_add_item(b, cur, 0);
929     assert (!r);
930     break;
931   }
932
933   // adjust the item's weight in location
934   if (adjust_item_weightf_in_loc(cct, item, weight, loc) > 0) {
935     if (item >= crush->max_devices) {
936       crush->max_devices = item + 1;
937       ldout(cct, 5) << "insert_item max_devices now " << crush->max_devices
938                     << dendl;
939     }
940     r = rebuild_roots_with_classes();
941     if (r < 0) {
942       ldout(cct, 0) << __func__ << " unable to rebuild roots with classes: "
943                     << cpp_strerror(r) << dendl;
944       return r;
945     }
946     return 0;
947   }
948
949   ldout(cct, 1) << "error: didn't find anywhere to add item " << item
950                 << " in " << loc << dendl;
951   return -EINVAL;
952 }
953
954
955 int CrushWrapper::move_bucket(
956   CephContext *cct, int id, const map<string,string>& loc)
957 {
958   // sorry this only works for buckets
959   if (id >= 0)
960     return -EINVAL;
961
962   if (!item_exists(id))
963     return -ENOENT;
964
965   // get the name of the bucket we are trying to move for later
966   string id_name = get_item_name(id);
967
968   // detach the bucket
969   int bucket_weight = detach_bucket(cct, id);
970
971   // insert the bucket back into the hierarchy
972   return insert_item(cct, id, bucket_weight / (float)0x10000, id_name, loc);
973 }
974
975 int CrushWrapper::detach_bucket(CephContext *cct, int item)
976 {
977   if (!crush)
978     return (-EINVAL);
979
980   if (item >= 0)
981     return (-EINVAL);
982
983   // check that the bucket that we want to detach exists
984   assert(bucket_exists(item));
985
986   // get the bucket's weight
987   crush_bucket *b = get_bucket(item);
988   unsigned bucket_weight = b->weight;
989
990   // get where the bucket is located
991   pair<string, string> bucket_location = get_immediate_parent(item);
992
993   // get the id of the parent bucket
994   int parent_id = get_item_id(bucket_location.second);
995
996   // get the parent bucket
997   crush_bucket *parent_bucket = get_bucket(parent_id);
998
999   if (!IS_ERR(parent_bucket)) {
1000     // zero out the bucket weight
1001     bucket_adjust_item_weight(cct, parent_bucket, item, 0);
1002     adjust_item_weight(cct, parent_bucket->id, parent_bucket->weight);
1003     for (auto& p : choose_args) {
1004       // weight down each weight-set to 0 before we remove the item
1005       vector<int> weightv(get_choose_args_positions(p.second), 0);
1006       choose_args_adjust_item_weight(cct, p.second, item, weightv, nullptr);
1007     }
1008
1009     // remove the bucket from the parent
1010     bucket_remove_item(parent_bucket, item);
1011   } else if (PTR_ERR(parent_bucket) != -ENOENT) {
1012     return PTR_ERR(parent_bucket);
1013   }
1014
1015   // check that we're happy
1016   int test_weight = 0;
1017   map<string,string> test_location;
1018   test_location[ bucket_location.first ] = (bucket_location.second);
1019
1020   bool successful_detach = !(check_item_loc(cct, item, test_location,
1021                                             &test_weight));
1022   assert(successful_detach);
1023   assert(test_weight == 0);
1024
1025   return bucket_weight;
1026 }
1027
1028 int CrushWrapper::swap_bucket(CephContext *cct, int src, int dst)
1029 {
1030   if (src >= 0 || dst >= 0)
1031     return -EINVAL;
1032   if (!item_exists(src) || !item_exists(dst))
1033     return -EINVAL;
1034   crush_bucket *a = get_bucket(src);
1035   crush_bucket *b = get_bucket(dst);
1036   unsigned aw = a->weight;
1037   unsigned bw = b->weight;
1038
1039   // swap weights
1040   adjust_item_weight(cct, a->id, bw);
1041   adjust_item_weight(cct, b->id, aw);
1042
1043   // swap items
1044   map<int,unsigned> tmp;
1045   unsigned as = a->size;
1046   unsigned bs = b->size;
1047   for (unsigned i = 0; i < as; ++i) {
1048     int item = a->items[0];
1049     int itemw = crush_get_bucket_item_weight(a, 0);
1050     tmp[item] = itemw;
1051     bucket_remove_item(a, item);
1052   }
1053   assert(a->size == 0);
1054   assert(b->size == bs);
1055   for (unsigned i = 0; i < bs; ++i) {
1056     int item = b->items[0];
1057     int itemw = crush_get_bucket_item_weight(b, 0);
1058     bucket_remove_item(b, item);
1059     bucket_add_item(a, item, itemw);
1060   }
1061   assert(a->size == bs);
1062   assert(b->size == 0);
1063   for (auto t : tmp) {
1064     bucket_add_item(b, t.first, t.second);
1065   }
1066   assert(a->size == bs);
1067   assert(b->size == as);
1068
1069   // swap names
1070   swap_names(src, dst);
1071   return rebuild_roots_with_classes();
1072 }
1073
1074 int CrushWrapper::link_bucket(
1075   CephContext *cct, int id, const map<string,string>& loc)
1076 {
1077   // sorry this only works for buckets
1078   if (id >= 0)
1079     return -EINVAL;
1080
1081   if (!item_exists(id))
1082     return -ENOENT;
1083
1084   // get the name of the bucket we are trying to move for later
1085   string id_name = get_item_name(id);
1086
1087   crush_bucket *b = get_bucket(id);
1088   unsigned bucket_weight = b->weight;
1089
1090   return insert_item(cct, id, bucket_weight / (float)0x10000, id_name, loc);
1091 }
1092
1093 int CrushWrapper::create_or_move_item(
1094   CephContext *cct, int item, float weight, string name,
1095   const map<string,string>& loc)  // typename -> bucketname
1096 {
1097   int ret = 0;
1098   int old_iweight;
1099
1100   if (!is_valid_crush_name(name))
1101     return -EINVAL;
1102
1103   if (check_item_loc(cct, item, loc, &old_iweight)) {
1104     ldout(cct, 5) << "create_or_move_item " << item << " already at " << loc
1105                   << dendl;
1106   } else {
1107     if (_search_item_exists(item)) {
1108       weight = get_item_weightf(item);
1109       ldout(cct, 10) << "create_or_move_item " << item
1110                      << " exists with weight " << weight << dendl;
1111       remove_item(cct, item, true);
1112     }
1113     ldout(cct, 5) << "create_or_move_item adding " << item
1114                   << " weight " << weight
1115                   << " at " << loc << dendl;
1116     ret = insert_item(cct, item, weight, name, loc);
1117     if (ret == 0)
1118       ret = 1;  // changed
1119   }
1120   return ret;
1121 }
1122
1123 int CrushWrapper::update_item(
1124   CephContext *cct, int item, float weight, string name,
1125   const map<string,string>& loc)  // typename -> bucketname
1126 {
1127   ldout(cct, 5) << "update_item item " << item << " weight " << weight
1128                 << " name " << name << " loc " << loc << dendl;
1129   int ret = 0;
1130
1131   if (!is_valid_crush_name(name))
1132     return -EINVAL;
1133
1134   if (!is_valid_crush_loc(cct, loc))
1135     return -EINVAL;
1136
1137   ret = validate_weightf(weight);
1138   if (ret < 0) {
1139     return ret;
1140   }
1141
1142   // compare quantized (fixed-point integer) weights!  
1143   int iweight = (int)(weight * (float)0x10000);
1144   int old_iweight;
1145   if (check_item_loc(cct, item, loc, &old_iweight)) {
1146     ldout(cct, 5) << "update_item " << item << " already at " << loc << dendl;
1147     if (old_iweight != iweight) {
1148       ldout(cct, 5) << "update_item " << item << " adjusting weight "
1149                     << ((float)old_iweight/(float)0x10000) << " -> " << weight
1150                     << dendl;
1151       adjust_item_weight_in_loc(cct, item, iweight, loc);
1152       ret = 1;
1153     }
1154     if (get_item_name(item) != name) {
1155       ldout(cct, 5) << "update_item setting " << item << " name to " << name
1156                     << dendl;
1157       set_item_name(item, name);
1158       ret = 1;
1159     }
1160   } else {
1161     if (item_exists(item)) {
1162       remove_item(cct, item, true);
1163     }
1164     ldout(cct, 5) << "update_item adding " << item << " weight " << weight
1165                   << " at " << loc << dendl;
1166     ret = insert_item(cct, item, weight, name, loc);
1167     if (ret == 0)
1168       ret = 1;  // changed
1169   }
1170   return ret;
1171 }
1172
1173 int CrushWrapper::get_item_weight(int id) const
1174 {
1175   for (int bidx = 0; bidx < crush->max_buckets; bidx++) {
1176     crush_bucket *b = crush->buckets[bidx];
1177     if (b == NULL)
1178       continue;
1179     if (b->id == id)
1180       return b->weight;
1181     for (unsigned i = 0; i < b->size; i++)
1182       if (b->items[i] == id)
1183         return crush_get_bucket_item_weight(b, i);
1184   }
1185   return -ENOENT;
1186 }
1187
1188 int CrushWrapper::get_item_weight_in_loc(int id, const map<string,string> &loc)
1189 {
1190   for (map<string,string>::const_iterator l = loc.begin(); l != loc.end(); ++l) {
1191
1192     int bid = get_item_id(l->second);
1193     if (!bucket_exists(bid))
1194       continue;
1195     crush_bucket *b = get_bucket(bid);
1196     for (unsigned int i = 0; i < b->size; i++) {
1197       if (b->items[i] == id) {
1198         return crush_get_bucket_item_weight(b, i);
1199       }
1200     }
1201   }
1202   return -ENOENT;
1203 }
1204
1205 int CrushWrapper::adjust_item_weight(CephContext *cct, int id, int weight)
1206 {
1207   ldout(cct, 5) << "adjust_item_weight " << id << " weight " << weight << dendl;
1208   int changed = 0;
1209   for (int bidx = 0; bidx < crush->max_buckets; bidx++) {
1210     crush_bucket *b = crush->buckets[bidx];
1211     if (b == 0)
1212       continue;
1213     for (unsigned i = 0; i < b->size; i++) {
1214       if (b->items[i] == id) {
1215         int diff = bucket_adjust_item_weight(cct, b, id, weight);
1216         ldout(cct, 5) << "adjust_item_weight " << id << " diff " << diff
1217                       << " in bucket " << bidx << dendl;
1218         adjust_item_weight(cct, -1 - bidx, b->weight);
1219         changed++;
1220       }
1221     }
1222   }
1223   if (!changed)
1224     return -ENOENT;
1225   return changed;
1226 }
1227
1228 int CrushWrapper::adjust_item_weight_in_loc(CephContext *cct, int id, int weight, const map<string,string>& loc)
1229 {
1230   ldout(cct, 5) << "adjust_item_weight_in_loc " << id << " weight " << weight
1231                 << " in " << loc << dendl;
1232   int changed = 0;
1233
1234   for (auto l = loc.begin(); l != loc.end(); ++l) {
1235     int bid = get_item_id(l->second);
1236     if (!bucket_exists(bid))
1237       continue;
1238     crush_bucket *b = get_bucket(bid);
1239     for (unsigned int i = 0; i < b->size; i++) {
1240       if (b->items[i] == id) {
1241         int diff = bucket_adjust_item_weight(cct, b, id, weight);
1242         ldout(cct, 5) << "adjust_item_weight_in_loc " << id << " diff " << diff
1243                       << " in bucket " << bid << dendl;
1244         adjust_item_weight(cct, bid, b->weight);
1245         changed++;
1246       }
1247     }
1248   }
1249   if (!changed)
1250     return -ENOENT;
1251   return changed;
1252 }
1253
1254 int CrushWrapper::adjust_subtree_weight(CephContext *cct, int id, int weight)
1255 {
1256   ldout(cct, 5) << __func__ << " " << id << " weight " << weight << dendl;
1257   crush_bucket *b = get_bucket(id);
1258   if (IS_ERR(b))
1259     return PTR_ERR(b);
1260   int changed = 0;
1261   list<crush_bucket*> q;
1262   q.push_back(b);
1263   while (!q.empty()) {
1264     b = q.front();
1265     q.pop_front();
1266     int local_changed = 0;
1267     for (unsigned i=0; i<b->size; ++i) {
1268       int n = b->items[i];
1269       if (n >= 0) {
1270         bucket_adjust_item_weight(cct, b, n, weight);
1271         ++changed;
1272         ++local_changed;
1273       } else {
1274         crush_bucket *sub = get_bucket(n);
1275         if (IS_ERR(sub))
1276           continue;
1277         q.push_back(sub);
1278       }
1279     }
1280     if (local_changed) {
1281       adjust_item_weight(cct, b->id, b->weight);
1282     }
1283   }
1284   return changed;
1285 }
1286
1287 bool CrushWrapper::check_item_present(int id) const
1288 {
1289   bool found = false;
1290
1291   for (int bidx = 0; bidx < crush->max_buckets; bidx++) {
1292     crush_bucket *b = crush->buckets[bidx];
1293     if (b == 0)
1294       continue;
1295     for (unsigned i = 0; i < b->size; i++)
1296       if (b->items[i] == id)
1297         found = true;
1298   }
1299   return found;
1300 }
1301
1302
1303 pair<string,string> CrushWrapper::get_immediate_parent(int id, int *_ret)
1304 {
1305
1306   for (int bidx = 0; bidx < crush->max_buckets; bidx++) {
1307     crush_bucket *b = crush->buckets[bidx];
1308     if (b == 0)
1309       continue;
1310    if (is_shadow_item(b->id))
1311       continue;
1312     for (unsigned i = 0; i < b->size; i++)
1313       if (b->items[i] == id) {
1314         string parent_id = name_map[b->id];
1315         string parent_bucket_type = type_map[b->type];
1316         if (_ret)
1317           *_ret = 0;
1318         return make_pair(parent_bucket_type, parent_id);
1319       }
1320   }
1321
1322   if (_ret)
1323     *_ret = -ENOENT;
1324
1325   return pair<string, string>();
1326 }
1327
1328 int CrushWrapper::get_immediate_parent_id(int id, int *parent) const
1329 {
1330   for (int bidx = 0; bidx < crush->max_buckets; bidx++) {
1331     crush_bucket *b = crush->buckets[bidx];
1332     if (b == 0)
1333       continue;
1334     if (is_shadow_item(b->id))
1335       continue;
1336     for (unsigned i = 0; i < b->size; i++) {
1337       if (b->items[i] == id) {
1338         *parent = b->id;
1339         return 0;
1340       }
1341     }
1342   }
1343   return -ENOENT;
1344 }
1345
1346 int CrushWrapper::get_parent_of_type(int item, int type) const
1347 {
1348   do {
1349     int r = get_immediate_parent_id(item, &item);
1350     if (r < 0) {
1351       return 0;
1352     }
1353   } while (get_bucket_type(item) != type);
1354   return item;
1355 }
1356
1357 int CrushWrapper::rename_class(const string& srcname, const string& dstname)
1358 {
1359   auto i = class_rname.find(srcname);
1360   if (i == class_rname.end())
1361     return -ENOENT;
1362   auto j = class_rname.find(dstname);
1363   if (j != class_rname.end())
1364     return -EEXIST;
1365
1366   int class_id = i->second;
1367   assert(class_name.count(class_id));
1368   // rename any shadow buckets of old class name
1369   for (auto &it: class_map) {
1370     if (it.first < 0 && it.second == class_id) {
1371         string old_name = get_item_name(it.first);
1372         size_t pos = old_name.find("~");
1373         assert(pos != string::npos);
1374         string name_no_class = old_name.substr(0, pos);
1375         string old_class_name = old_name.substr(pos + 1);
1376         assert(old_class_name == srcname);
1377         string new_name = name_no_class + "~" + dstname;
1378         // we do not use set_item_name
1379         // because the name is intentionally invalid
1380         name_map[it.first] = new_name;
1381         have_rmaps = false;
1382     }
1383   }
1384
1385   // rename class
1386   class_rname.erase(srcname);
1387   class_name.erase(class_id);
1388   class_rname[dstname] = class_id;
1389   class_name[class_id] = dstname;
1390   return 0;
1391 }
1392
1393 int CrushWrapper::populate_classes(
1394   const std::map<int32_t, map<int32_t, int32_t>>& old_class_bucket)
1395 {
1396   // build set of previous used shadow ids
1397   set<int32_t> used_ids;
1398   for (auto& p : old_class_bucket) {
1399     for (auto& q : p.second) {
1400       used_ids.insert(q.second);
1401     }
1402   }
1403   // accumulate weight values for each carg and bucket as we go. because it is
1404   // depth first, we will have the nested bucket weights we need when we
1405   // finish constructing the containing buckets.
1406   map<int,map<int,vector<int>>> cmap_item_weight; // cargs -> bno -> weights
1407   set<int> roots;
1408   find_nonshadow_roots(&roots);
1409   for (auto &r : roots) {
1410     if (r >= 0)
1411       continue;
1412     for (auto &c : class_name) {
1413       int clone;
1414       int res = device_class_clone(r, c.first, old_class_bucket, used_ids,
1415                                    &clone, &cmap_item_weight);
1416       if (res < 0)
1417         return res;
1418     }
1419   }
1420   return 0;
1421 }
1422
1423 int CrushWrapper::trim_roots_with_class()
1424 {
1425   set<int> roots;
1426   find_shadow_roots(&roots);
1427   for (auto &r : roots) {
1428     if (r >= 0)
1429       continue;
1430     int res = remove_root(r);
1431     if (res)
1432       return res;
1433   }
1434   // there is no need to reweight because we only remove from the
1435   // root and down
1436   return 0;
1437 }
1438
1439 int32_t CrushWrapper::_alloc_class_id() const {
1440   if (class_name.empty()) {
1441     return 0;
1442   }
1443   int32_t class_id = class_name.rbegin()->first + 1;
1444   if (class_id >= 0) {
1445     return class_id;
1446   }
1447   // wrapped, pick a random start and do exhaustive search
1448   uint32_t upperlimit = numeric_limits<int32_t>::max();
1449   upperlimit++;
1450   class_id = rand() % upperlimit;
1451   const auto start = class_id;
1452   do {
1453     if (!class_name.count(class_id)) {
1454       return class_id;
1455     } else {
1456       class_id++;
1457       if (class_id < 0) {
1458         class_id = 0;
1459       }
1460     }
1461   } while (class_id != start);
1462   assert(0 == "no available class id");
1463 }
1464
1465 void CrushWrapper::reweight(CephContext *cct)
1466 {
1467   set<int> roots;
1468   find_roots(&roots);
1469   for (set<int>::iterator p = roots.begin(); p != roots.end(); ++p) {
1470     if (*p >= 0)
1471       continue;
1472     crush_bucket *b = get_bucket(*p);
1473     ldout(cct, 5) << "reweight bucket " << *p << dendl;
1474     int r = crush_reweight_bucket(crush, b);
1475     assert(r == 0);
1476   }
1477 }
1478
1479 int CrushWrapper::add_simple_rule_at(
1480   string name, string root_name,
1481   string failure_domain_name,
1482   string device_class,
1483   string mode, int rule_type,
1484   int rno,
1485   ostream *err)
1486 {
1487   if (rule_exists(name)) {
1488     if (err)
1489       *err << "rule " << name << " exists";
1490     return -EEXIST;
1491   }
1492   if (rno >= 0) {
1493     if (rule_exists(rno)) {
1494       if (err)
1495         *err << "rule with ruleno " << rno << " exists";
1496       return -EEXIST;
1497     }
1498     if (ruleset_exists(rno)) {
1499       if (err)
1500         *err << "ruleset " << rno << " exists";
1501       return -EEXIST;
1502     }
1503   } else {
1504     for (rno = 0; rno < get_max_rules(); rno++) {
1505       if (!rule_exists(rno) && !ruleset_exists(rno))
1506         break;
1507     }
1508   }
1509   if (!name_exists(root_name)) {
1510     if (err)
1511       *err << "root item " << root_name << " does not exist";
1512     return -ENOENT;
1513   }
1514   int root = get_item_id(root_name);
1515   int type = 0;
1516   if (failure_domain_name.length()) {
1517     type = get_type_id(failure_domain_name);
1518     if (type < 0) {
1519       if (err)
1520         *err << "unknown type " << failure_domain_name;
1521       return -EINVAL;
1522     }
1523   }
1524   if (device_class.size()) {
1525     if (!class_exists(device_class)) {
1526       if (err)
1527         *err << "device class " << device_class << " does not exist";
1528       return -EINVAL;
1529     }
1530     int c = get_class_id(device_class);
1531     if (class_bucket.count(root) == 0 ||
1532         class_bucket[root].count(c) == 0) {
1533       if (err)
1534         *err << "root " << root_name << " has no devices with class "
1535              << device_class;
1536       return -EINVAL;
1537     }
1538     root = class_bucket[root][c];
1539   }
1540   if (mode != "firstn" && mode != "indep") {
1541     if (err)
1542       *err << "unknown mode " << mode;
1543     return -EINVAL;
1544   }
1545
1546   int steps = 3;
1547   if (mode == "indep")
1548     steps = 5;
1549   int min_rep = mode == "firstn" ? 1 : 3;
1550   int max_rep = mode == "firstn" ? 10 : 20;
1551   //set the ruleset the same as rule_id(rno)
1552   crush_rule *rule = crush_make_rule(steps, rno, rule_type, min_rep, max_rep);
1553   assert(rule);
1554   int step = 0;
1555   if (mode == "indep") {
1556     crush_rule_set_step(rule, step++, CRUSH_RULE_SET_CHOOSELEAF_TRIES, 5, 0);
1557     crush_rule_set_step(rule, step++, CRUSH_RULE_SET_CHOOSE_TRIES, 100, 0);
1558   }
1559   crush_rule_set_step(rule, step++, CRUSH_RULE_TAKE, root, 0);
1560   if (type)
1561     crush_rule_set_step(rule, step++,
1562                         mode == "firstn" ? CRUSH_RULE_CHOOSELEAF_FIRSTN :
1563                         CRUSH_RULE_CHOOSELEAF_INDEP,
1564                         CRUSH_CHOOSE_N,
1565                         type);
1566   else
1567     crush_rule_set_step(rule, step++,
1568                         mode == "firstn" ? CRUSH_RULE_CHOOSE_FIRSTN :
1569                         CRUSH_RULE_CHOOSE_INDEP,
1570                         CRUSH_CHOOSE_N,
1571                         0);
1572   crush_rule_set_step(rule, step++, CRUSH_RULE_EMIT, 0, 0);
1573
1574   int ret = crush_add_rule(crush, rule, rno);
1575   if(ret < 0) {
1576     *err << "failed to add rule " << rno << " because " << cpp_strerror(ret);
1577     return ret;
1578   }
1579   set_rule_name(rno, name);
1580   have_rmaps = false;
1581   return rno;
1582 }
1583
1584 int CrushWrapper::add_simple_rule(
1585   string name, string root_name,
1586   string failure_domain_name,
1587   string device_class,
1588   string mode, int rule_type,
1589   ostream *err)
1590 {
1591   return add_simple_rule_at(name, root_name, failure_domain_name, device_class,
1592                             mode,
1593                             rule_type, -1, err);
1594 }
1595
1596 float CrushWrapper::_get_take_weight_osd_map(int root,
1597                                              map<int,float> *pmap) const
1598 {
1599   float sum = 0.0;
1600   list<int> q;
1601   q.push_back(root);
1602   //breadth first iterate the OSD tree
1603   while (!q.empty()) {
1604     int bno = q.front();
1605     q.pop_front();
1606     crush_bucket *b = crush->buckets[-1-bno];
1607     assert(b);
1608     for (unsigned j=0; j<b->size; ++j) {
1609       int item_id = b->items[j];
1610       if (item_id >= 0) { //it's an OSD
1611         float w = crush_get_bucket_item_weight(b, j);
1612         (*pmap)[item_id] = w;
1613         sum += w;
1614       } else { //not an OSD, expand the child later
1615         q.push_back(item_id);
1616       }
1617     }
1618   }
1619   return sum;
1620 }
1621
1622 void CrushWrapper::_normalize_weight_map(float sum,
1623                                          const map<int,float>& m,
1624                                          map<int,float> *pmap) const
1625 {
1626   for (auto& p : m) {
1627     map<int,float>::iterator q = pmap->find(p.first);
1628     if (q == pmap->end()) {
1629       (*pmap)[p.first] = p.second / sum;
1630     } else {
1631       q->second += p.second / sum;
1632     }
1633   }
1634 }
1635
1636 int CrushWrapper::get_take_weight_osd_map(int root, map<int,float> *pmap) const
1637 {
1638   map<int,float> m;
1639   float sum = _get_take_weight_osd_map(root, &m);
1640   _normalize_weight_map(sum, m, pmap);
1641   return 0;
1642 }
1643
1644 int CrushWrapper::get_rule_weight_osd_map(unsigned ruleno,
1645                                           map<int,float> *pmap) const
1646 {
1647   if (ruleno >= crush->max_rules)
1648     return -ENOENT;
1649   if (crush->rules[ruleno] == NULL)
1650     return -ENOENT;
1651   crush_rule *rule = crush->rules[ruleno];
1652
1653   // build a weight map for each TAKE in the rule, and then merge them
1654
1655   // FIXME: if there are multiple takes that place a different number of
1656   // objects we do not take that into account.  (Also, note that doing this
1657   // right is also a function of the pool, since the crush rule
1658   // might choose 2 + choose 2 but pool size may only be 3.)
1659   for (unsigned i=0; i<rule->len; ++i) {
1660     map<int,float> m;
1661     float sum = 0;
1662     if (rule->steps[i].op == CRUSH_RULE_TAKE) {
1663       int n = rule->steps[i].arg1;
1664       if (n >= 0) {
1665         m[n] = 1.0;
1666         sum = 1.0;
1667       } else {
1668         sum += _get_take_weight_osd_map(n, &m);
1669       }
1670     }
1671     _normalize_weight_map(sum, m, pmap);
1672   }
1673
1674   return 0;
1675 }
1676
1677 int CrushWrapper::remove_rule(int ruleno)
1678 {
1679   if (ruleno >= (int)crush->max_rules)
1680     return -ENOENT;
1681   if (crush->rules[ruleno] == NULL)
1682     return -ENOENT;
1683   crush_destroy_rule(crush->rules[ruleno]);
1684   crush->rules[ruleno] = NULL;
1685   rule_name_map.erase(ruleno);
1686   have_rmaps = false;
1687   return rebuild_roots_with_classes();
1688 }
1689
1690 int CrushWrapper::bucket_adjust_item_weight(CephContext *cct, crush_bucket *bucket, int item, int weight)
1691 {
1692   if (cct->_conf->osd_crush_update_weight_set) {
1693     unsigned position;
1694     for (position = 0; position < bucket->size; position++)
1695       if (bucket->items[position] == item)
1696         break;
1697     assert(position != bucket->size);
1698     for (auto &w : choose_args) {
1699       crush_choose_arg_map &arg_map = w.second;
1700       crush_choose_arg *arg = &arg_map.args[-1-bucket->id];
1701       for (__u32 j = 0; j < arg->weight_set_size; j++) {
1702         crush_weight_set *weight_set = &arg->weight_set[j];
1703         weight_set->weights[position] = weight;
1704       }
1705     }
1706   }
1707   return crush_bucket_adjust_item_weight(crush, bucket, item, weight);
1708 }
1709
1710 int CrushWrapper::add_bucket(
1711   int bucketno, int alg, int hash, int type, int size,
1712   int *items, int *weights, int *idout)
1713 {
1714   if (alg == 0) {
1715     alg = get_default_bucket_alg();
1716     if (alg == 0)
1717       return -EINVAL;
1718   }
1719   crush_bucket *b = crush_make_bucket(crush, alg, hash, type, size, items,
1720                                       weights);
1721   assert(b);
1722   assert(idout);
1723   int r = crush_add_bucket(crush, bucketno, b, idout);
1724   int pos = -1 - *idout;
1725   for (auto& p : choose_args) {
1726     crush_choose_arg_map& cmap = p.second;
1727     if (cmap.args) {
1728       if ((int)cmap.size <= pos) {
1729         cmap.args = (crush_choose_arg*)realloc(
1730           cmap.args,
1731           sizeof(crush_choose_arg) * (pos + 1));
1732         assert(cmap.args);
1733         memset(&cmap.args[cmap.size], 0,
1734                sizeof(crush_choose_arg) * (pos + 1 - cmap.size));
1735         cmap.size = pos + 1;
1736       }
1737     } else {
1738       cmap.args = (crush_choose_arg*)calloc(sizeof(crush_choose_arg),
1739                                             pos + 1);
1740       assert(cmap.args);
1741       cmap.size = pos + 1;
1742     }
1743     if (size > 0) {
1744       int positions = get_choose_args_positions(cmap);
1745       crush_choose_arg& carg = cmap.args[pos];
1746       carg.weight_set = (crush_weight_set*)calloc(sizeof(crush_weight_set),
1747                                                   size);
1748       carg.weight_set_size = positions;
1749       for (int ppos = 0; ppos < positions; ++ppos) {
1750         carg.weight_set[ppos].weights = (__u32*)calloc(sizeof(__u32), size);
1751         carg.weight_set[ppos].size = size;
1752         for (int bpos = 0; bpos < size; ++bpos) {
1753           carg.weight_set[ppos].weights[bpos] = weights[bpos];
1754         }
1755       }
1756     }
1757   }
1758   return r;
1759 }
1760
1761 int CrushWrapper::bucket_add_item(crush_bucket *bucket, int item, int weight)
1762 {
1763   __u32 new_size = bucket->size + 1;
1764   int r = crush_bucket_add_item(crush, bucket, item, weight);
1765   if (r < 0) {
1766     return r;
1767   }
1768   for (auto &w : choose_args) {
1769     crush_choose_arg_map &arg_map = w.second;
1770     crush_choose_arg *arg = &arg_map.args[-1-bucket->id];
1771     for (__u32 j = 0; j < arg->weight_set_size; j++) {
1772       crush_weight_set *weight_set = &arg->weight_set[j];
1773       weight_set->weights = (__u32*)realloc(weight_set->weights,
1774                                             new_size * sizeof(__u32));
1775       assert(weight_set->size + 1 == new_size);
1776       weight_set->weights[weight_set->size] = weight;
1777       weight_set->size = new_size;
1778     }
1779     if (arg->ids_size) {
1780       arg->ids = (__s32 *)realloc(arg->ids, new_size * sizeof(__s32));
1781       assert(arg->ids_size + 1 == new_size);
1782       arg->ids[arg->ids_size] = item;
1783       arg->ids_size = new_size;
1784     }
1785   }
1786   return 0;
1787 }
1788
1789 int CrushWrapper::bucket_remove_item(crush_bucket *bucket, int item)
1790 {
1791   __u32 new_size = bucket->size - 1;
1792   unsigned position;
1793   for (position = 0; position < bucket->size; position++)
1794     if (bucket->items[position] == item)
1795       break;
1796   assert(position != bucket->size);
1797   int r = crush_bucket_remove_item(crush, bucket, item);
1798   if (r < 0) {
1799     return r;
1800   }
1801   for (auto &w : choose_args) {
1802     crush_choose_arg_map &arg_map = w.second;
1803     crush_choose_arg *arg = &arg_map.args[-1-bucket->id];
1804     for (__u32 j = 0; j < arg->weight_set_size; j++) {
1805       crush_weight_set *weight_set = &arg->weight_set[j];
1806       assert(weight_set->size - 1 == new_size);
1807       for (__u32 k = position; k < new_size; k++)
1808         weight_set->weights[k] = weight_set->weights[k+1];
1809       if (new_size) {
1810         weight_set->weights = (__u32*)realloc(weight_set->weights,
1811                                               new_size * sizeof(__u32));
1812       } else {
1813         weight_set->weights = NULL;
1814       }
1815       weight_set->size = new_size;
1816     }
1817     if (arg->ids_size) {
1818       assert(arg->ids_size - 1 == new_size);
1819       for (__u32 k = position; k < new_size; k++)
1820         arg->ids[k] = arg->ids[k+1];
1821       if (new_size) {
1822         arg->ids = (__s32 *)realloc(arg->ids, new_size * sizeof(__s32));
1823       } else {
1824         arg->ids = NULL;
1825       }
1826       arg->ids_size = new_size;
1827     }
1828   }
1829   return 0;
1830 }
1831
1832 int CrushWrapper::bucket_set_alg(int bid, int alg)
1833 {
1834   crush_bucket *b = get_bucket(bid);
1835   if (!b) {
1836     return -ENOENT;
1837   }
1838   b->alg = alg;
1839   return 0;
1840 }
1841
1842 int CrushWrapper::update_device_class(int id,
1843                                       const string& class_name,
1844                                       const string& name,
1845                                       ostream *ss)
1846 {
1847   assert(item_exists(id));
1848   auto old_class_name = get_item_class(id);
1849   if (old_class_name && old_class_name != class_name) {
1850     *ss << "osd." << id << " has already bound to class '" << old_class_name
1851         << "', can not reset class to '" << class_name  << "'; "
1852         << "use 'ceph osd crush rm-device-class <osd>' to "
1853         << "remove old class first";
1854     return -EBUSY;
1855   }
1856
1857   int class_id = get_or_create_class_id(class_name);
1858   if (id < 0) {
1859     *ss << name << " id " << id << " is negative";
1860     return -EINVAL;
1861   }
1862
1863   if (class_map.count(id) != 0 && class_map[id] == class_id) {
1864     *ss << name << " already set to class " << class_name;
1865     return 0;
1866   }
1867
1868   set_item_class(id, class_id);
1869
1870   int r = rebuild_roots_with_classes();
1871   if (r < 0)
1872     return r;
1873   return 1;
1874 }
1875
1876 int CrushWrapper::remove_device_class(CephContext *cct, int id, ostream *ss)
1877 {
1878   assert(ss);
1879   const char *name = get_item_name(id);
1880   if (!name) {
1881     *ss << "osd." << id << " does not have a name";
1882     return -ENOENT;
1883   }
1884
1885   const char *class_name = get_item_class(id);
1886   if (!class_name) {
1887     *ss << "osd." << id << " has not been bound to a specific class yet";
1888     return 0;
1889   }
1890   class_remove_item(id);
1891
1892   int r = rebuild_roots_with_classes();
1893   if (r < 0) {
1894     *ss << "unable to rebuild roots with class '" << class_name << "' "
1895         << "of osd." << id << ": " << cpp_strerror(r);
1896     return r;
1897   }
1898   return 0;
1899 }
1900
1901 int CrushWrapper::device_class_clone(
1902   int original_id, int device_class,
1903   const std::map<int32_t, map<int32_t, int32_t>>& old_class_bucket,
1904   const std::set<int32_t>& used_ids,
1905   int *clone,
1906   map<int,map<int,vector<int>>> *cmap_item_weight)
1907 {
1908   const char *item_name = get_item_name(original_id);
1909   if (item_name == NULL)
1910     return -ECHILD;
1911   const char *class_name = get_class_name(device_class);
1912   if (class_name == NULL)
1913     return -EBADF;
1914   string copy_name = item_name + string("~") + class_name;
1915   if (name_exists(copy_name)) {
1916     *clone = get_item_id(copy_name);
1917     return 0;
1918   }
1919
1920   crush_bucket *original = get_bucket(original_id);
1921   assert(!IS_ERR(original));
1922   crush_bucket *copy = crush_make_bucket(crush,
1923                                          original->alg,
1924                                          original->hash,
1925                                          original->type,
1926                                          0, NULL, NULL);
1927   assert(copy);
1928
1929   vector<unsigned> item_orig_pos;  // new item pos -> orig item pos
1930   for (unsigned i = 0; i < original->size; i++) {
1931     int item = original->items[i];
1932     int weight = crush_get_bucket_item_weight(original, i);
1933     if (item >= 0) {
1934       if (class_map.count(item) != 0 && class_map[item] == device_class) {
1935         int res = crush_bucket_add_item(crush, copy, item, weight);
1936         if (res)
1937           return res;
1938       } else {
1939         continue;
1940       }
1941     } else {
1942       int child_copy_id;
1943       int res = device_class_clone(item, device_class, old_class_bucket,
1944                                    used_ids, &child_copy_id,
1945                                    cmap_item_weight);
1946       if (res < 0)
1947         return res;
1948       crush_bucket *child_copy = get_bucket(child_copy_id);
1949       assert(!IS_ERR(child_copy));
1950       res = crush_bucket_add_item(crush, copy, child_copy_id,
1951                                   child_copy->weight);
1952       if (res)
1953         return res;
1954     }
1955     item_orig_pos.push_back(i);
1956   }
1957   assert(item_orig_pos.size() == copy->size);
1958
1959   int bno = 0;
1960   if (old_class_bucket.count(original_id) &&
1961       old_class_bucket.at(original_id).count(device_class)) {
1962     bno = old_class_bucket.at(original_id).at(device_class);
1963   } else {
1964     // pick a new shadow bucket id that is not used by the current map
1965     // *or* any previous shadow buckets.
1966     bno = -1;
1967     while (((-1-bno) < crush->max_buckets && crush->buckets[-1-bno]) ||
1968            used_ids.count(bno)) {
1969       --bno;
1970     }
1971   }
1972   int res = crush_add_bucket(crush, bno, copy, clone);
1973   if (res)
1974     return res;
1975   assert(!bno || bno == *clone);
1976
1977   res = set_item_class(*clone, device_class);
1978   if (res < 0)
1979     return res;
1980
1981   // we do not use set_item_name because the name is intentionally invalid
1982   name_map[*clone] = copy_name;
1983   if (have_rmaps)
1984     name_rmap[copy_name] = *clone;
1985   class_bucket[original_id][device_class] = *clone;
1986
1987   // set up choose_args for the new bucket.
1988   for (auto& w : choose_args) {
1989     crush_choose_arg_map& cmap = w.second;
1990     if (-1-bno >= (int)cmap.size) {
1991       unsigned new_size = -1-bno + 1;
1992       cmap.args = (crush_choose_arg*)realloc(cmap.args,
1993                                              new_size * sizeof(cmap.args[0]));
1994       assert(cmap.args);
1995       memset(cmap.args + cmap.size, 0,
1996              (new_size - cmap.size) * sizeof(cmap.args[0]));
1997       cmap.size = new_size;
1998     }
1999     auto& o = cmap.args[-1-original_id];
2000     auto& n = cmap.args[-1-bno];
2001     n.ids_size = 0; // FIXME: implement me someday
2002     n.weight_set_size = o.weight_set_size;
2003     n.weight_set = (crush_weight_set*)calloc(
2004       n.weight_set_size, sizeof(crush_weight_set));
2005     for (size_t s = 0; s < n.weight_set_size; ++s) {
2006       n.weight_set[s].size = copy->size;
2007       n.weight_set[s].weights = (__u32*)calloc(copy->size, sizeof(__u32));
2008     }
2009     for (size_t s = 0; s < n.weight_set_size; ++s) {
2010       vector<int> bucket_weights(n.weight_set_size);
2011       for (size_t i = 0; i < copy->size; ++i) {
2012         int item = copy->items[i];
2013         if (item >= 0) {
2014           n.weight_set[s].weights[i] = o.weight_set[s].weights[item_orig_pos[i]];
2015         } else {
2016           n.weight_set[s].weights[i] = (*cmap_item_weight)[w.first][item][s];
2017         }
2018         bucket_weights[s] += n.weight_set[s].weights[i];
2019       }
2020       (*cmap_item_weight)[w.first][bno] = bucket_weights;
2021     }
2022   }
2023   return 0;
2024 }
2025
2026 int CrushWrapper::get_rules_by_class(const string &class_name, set<int> *rules)
2027 {
2028   assert(rules);
2029   rules->clear();
2030   if (!class_exists(class_name)) {
2031     return -ENOENT;
2032   }
2033   int class_id = get_class_id(class_name);
2034   for (unsigned i = 0; i < crush->max_rules; ++i) {
2035     crush_rule *r = crush->rules[i];
2036     if (!r)
2037       continue;
2038     for (unsigned j = 0; j < r->len; ++j) {
2039       if (r->steps[j].op == CRUSH_RULE_TAKE) {
2040         int step_item = r->steps[j].arg1;
2041         int original_item;
2042         int c;
2043         int res = split_id_class(step_item, &original_item, &c);
2044         if (res < 0) {
2045           return res;
2046         }
2047         if (c != -1 && c == class_id) {
2048           rules->insert(i);
2049           break;
2050         }
2051       }
2052     }
2053   }
2054   return 0;
2055 }
2056
2057 // return rules that might reference the given osd
2058 int CrushWrapper::get_rules_by_osd(int osd, set<int> *rules)
2059 {
2060   assert(rules);
2061   rules->clear();
2062   if (osd < 0) {
2063     return -EINVAL;
2064   }
2065   for (unsigned i = 0; i < crush->max_rules; ++i) {
2066     crush_rule *r = crush->rules[i];
2067     if (!r)
2068       continue;
2069     for (unsigned j = 0; j < r->len; ++j) {
2070       if (r->steps[j].op == CRUSH_RULE_TAKE) {
2071         int step_item = r->steps[j].arg1;
2072         list<int> unordered;
2073         int rc = _get_leaves(step_item, &unordered);
2074         if (rc < 0) {
2075           return rc; // propagate fatal errors!
2076         }
2077         bool match = false;
2078         for (auto &o: unordered) {
2079           assert(o >= 0);
2080           if (o == osd) {
2081             match = true;
2082             break;
2083           }
2084         }
2085         if (match) {
2086           rules->insert(i);
2087           break;
2088         }
2089       }
2090     }
2091   }
2092   return 0;
2093 }
2094
2095 bool CrushWrapper::_class_is_dead(int class_id)
2096 {
2097   for (auto &p: class_map) {
2098     if (p.first >= 0 && p.second == class_id) {
2099       return false;
2100     }
2101   }
2102   for (unsigned i = 0; i < crush->max_rules; ++i) {
2103     crush_rule *r = crush->rules[i];
2104     if (!r)
2105       continue;
2106     for (unsigned j = 0; j < r->len; ++j) {
2107       if (r->steps[j].op == CRUSH_RULE_TAKE) {
2108         int root = r->steps[j].arg1;
2109         for (auto &p : class_bucket) {
2110           auto& q = p.second;
2111           if (q.count(class_id) && q[class_id] == root) {
2112             return false;
2113           }
2114         }
2115       }
2116     }
2117   }
2118   // no more referenced by any devices or crush rules
2119   return true;
2120 }
2121
2122 void CrushWrapper::cleanup_dead_classes()
2123 {
2124   auto p = class_name.begin();
2125   while (p != class_name.end()) {
2126     if (_class_is_dead(p->first)) {
2127       string n = p->second;
2128       ++p;
2129       remove_class_name(n);
2130     } else {
2131       ++p;
2132     }
2133   }
2134 }
2135
2136 int CrushWrapper::rebuild_roots_with_classes()
2137 {
2138   std::map<int32_t, map<int32_t, int32_t> > old_class_bucket = class_bucket;
2139   cleanup_dead_classes();
2140   int r = trim_roots_with_class();
2141   if (r < 0)
2142     return r;
2143   class_bucket.clear();
2144   return populate_classes(old_class_bucket);
2145 }
2146
2147 void CrushWrapper::encode(bufferlist& bl, uint64_t features) const
2148 {
2149   assert(crush);
2150
2151   __u32 magic = CRUSH_MAGIC;
2152   ::encode(magic, bl);
2153
2154   ::encode(crush->max_buckets, bl);
2155   ::encode(crush->max_rules, bl);
2156   ::encode(crush->max_devices, bl);
2157
2158   bool encode_compat_choose_args = false;
2159   crush_choose_arg_map arg_map;
2160   memset(&arg_map, '\0', sizeof(arg_map));
2161   if (has_choose_args() &&
2162       !HAVE_FEATURE(features, CRUSH_CHOOSE_ARGS)) {
2163     assert(!has_incompat_choose_args());
2164     encode_compat_choose_args = true;
2165     arg_map = choose_args.begin()->second;
2166   }
2167
2168   // buckets
2169   for (int i=0; i<crush->max_buckets; i++) {
2170     __u32 alg = 0;
2171     if (crush->buckets[i]) alg = crush->buckets[i]->alg;
2172     ::encode(alg, bl);
2173     if (!alg)
2174       continue;
2175
2176     ::encode(crush->buckets[i]->id, bl);
2177     ::encode(crush->buckets[i]->type, bl);
2178     ::encode(crush->buckets[i]->alg, bl);
2179     ::encode(crush->buckets[i]->hash, bl);
2180     ::encode(crush->buckets[i]->weight, bl);
2181     ::encode(crush->buckets[i]->size, bl);
2182     for (unsigned j=0; j<crush->buckets[i]->size; j++)
2183       ::encode(crush->buckets[i]->items[j], bl);
2184
2185     switch (crush->buckets[i]->alg) {
2186     case CRUSH_BUCKET_UNIFORM:
2187       ::encode((reinterpret_cast<crush_bucket_uniform*>(crush->buckets[i]))->item_weight, bl);
2188       break;
2189
2190     case CRUSH_BUCKET_LIST:
2191       for (unsigned j=0; j<crush->buckets[i]->size; j++) {
2192         ::encode((reinterpret_cast<crush_bucket_list*>(crush->buckets[i]))->item_weights[j], bl);
2193         ::encode((reinterpret_cast<crush_bucket_list*>(crush->buckets[i]))->sum_weights[j], bl);
2194       }
2195       break;
2196
2197     case CRUSH_BUCKET_TREE:
2198       ::encode((reinterpret_cast<crush_bucket_tree*>(crush->buckets[i]))->num_nodes, bl);
2199       for (unsigned j=0; j<(reinterpret_cast<crush_bucket_tree*>(crush->buckets[i]))->num_nodes; j++)
2200         ::encode((reinterpret_cast<crush_bucket_tree*>(crush->buckets[i]))->node_weights[j], bl);
2201       break;
2202
2203     case CRUSH_BUCKET_STRAW:
2204       for (unsigned j=0; j<crush->buckets[i]->size; j++) {
2205         ::encode((reinterpret_cast<crush_bucket_straw*>(crush->buckets[i]))->item_weights[j], bl);
2206         ::encode((reinterpret_cast<crush_bucket_straw*>(crush->buckets[i]))->straws[j], bl);
2207       }
2208       break;
2209
2210     case CRUSH_BUCKET_STRAW2:
2211       {
2212         __u32 *weights;
2213         if (encode_compat_choose_args &&
2214             arg_map.args[i].weight_set_size > 0) {
2215           weights = arg_map.args[i].weight_set[0].weights;
2216         } else {
2217           weights = (reinterpret_cast<crush_bucket_straw2*>(crush->buckets[i]))->item_weights;
2218         }
2219         for (unsigned j=0; j<crush->buckets[i]->size; j++) {
2220           ::encode(weights[j], bl);
2221         }
2222       }
2223       break;
2224
2225     default:
2226       ceph_abort();
2227       break;
2228     }
2229   }
2230
2231   // rules
2232   for (unsigned i=0; i<crush->max_rules; i++) {
2233     __u32 yes = crush->rules[i] ? 1:0;
2234     ::encode(yes, bl);
2235     if (!yes)
2236       continue;
2237
2238     ::encode(crush->rules[i]->len, bl);
2239     ::encode(crush->rules[i]->mask, bl);
2240     for (unsigned j=0; j<crush->rules[i]->len; j++)
2241       ::encode(crush->rules[i]->steps[j], bl);
2242   }
2243
2244   // name info
2245   ::encode(type_map, bl);
2246   ::encode(name_map, bl);
2247   ::encode(rule_name_map, bl);
2248
2249   // tunables
2250   ::encode(crush->choose_local_tries, bl);
2251   ::encode(crush->choose_local_fallback_tries, bl);
2252   ::encode(crush->choose_total_tries, bl);
2253   ::encode(crush->chooseleaf_descend_once, bl);
2254   ::encode(crush->chooseleaf_vary_r, bl);
2255   ::encode(crush->straw_calc_version, bl);
2256   ::encode(crush->allowed_bucket_algs, bl);
2257   if (features & CEPH_FEATURE_CRUSH_TUNABLES5) {
2258     ::encode(crush->chooseleaf_stable, bl);
2259   }
2260
2261   if (HAVE_FEATURE(features, SERVER_LUMINOUS)) {
2262     // device classes
2263     ::encode(class_map, bl);
2264     ::encode(class_name, bl);
2265     ::encode(class_bucket, bl);
2266
2267     // choose args
2268     __u32 size = (__u32)choose_args.size();
2269     ::encode(size, bl);
2270     for (auto c : choose_args) {
2271       ::encode(c.first, bl);
2272       crush_choose_arg_map arg_map = c.second;
2273       size = 0;
2274       for (__u32 i = 0; i < arg_map.size; i++) {
2275         crush_choose_arg *arg = &arg_map.args[i];
2276         if (arg->weight_set_size == 0 &&
2277             arg->ids_size == 0)
2278           continue;
2279         size++;
2280       }
2281       ::encode(size, bl);
2282       for (__u32 i = 0; i < arg_map.size; i++) {
2283         crush_choose_arg *arg = &arg_map.args[i];
2284         if (arg->weight_set_size == 0 &&
2285             arg->ids_size == 0)
2286           continue;
2287         ::encode(i, bl);
2288         ::encode(arg->weight_set_size, bl);
2289         for (__u32 j = 0; j < arg->weight_set_size; j++) {
2290           crush_weight_set *weight_set = &arg->weight_set[j];
2291           ::encode(weight_set->size, bl);
2292           for (__u32 k = 0; k < weight_set->size; k++)
2293             ::encode(weight_set->weights[k], bl);
2294         }
2295         ::encode(arg->ids_size, bl);
2296         for (__u32 j = 0; j < arg->ids_size; j++)
2297           ::encode(arg->ids[j], bl);
2298       }
2299     }
2300   }
2301 }
2302
2303 static void decode_32_or_64_string_map(map<int32_t,string>& m, bufferlist::iterator& blp)
2304 {
2305   m.clear();
2306   __u32 n;
2307   ::decode(n, blp);
2308   while (n--) {
2309     __s32 key;
2310     ::decode(key, blp);
2311
2312     __u32 strlen;
2313     ::decode(strlen, blp);
2314     if (strlen == 0) {
2315       // der, key was actually 64-bits!
2316       ::decode(strlen, blp);
2317     }
2318     ::decode_nohead(strlen, m[key], blp);
2319   }
2320 }
2321
2322 void CrushWrapper::decode(bufferlist::iterator& blp)
2323 {
2324   create();
2325
2326   __u32 magic;
2327   ::decode(magic, blp);
2328   if (magic != CRUSH_MAGIC)
2329     throw buffer::malformed_input("bad magic number");
2330
2331   ::decode(crush->max_buckets, blp);
2332   ::decode(crush->max_rules, blp);
2333   ::decode(crush->max_devices, blp);
2334
2335   // legacy tunables, unless we decode something newer
2336   set_tunables_legacy();
2337
2338   try {
2339     // buckets
2340     crush->buckets = (crush_bucket**)calloc(1, crush->max_buckets * sizeof(crush_bucket*));
2341     for (int i=0; i<crush->max_buckets; i++) {
2342       decode_crush_bucket(&crush->buckets[i], blp);
2343     }
2344
2345     // rules
2346     crush->rules = (crush_rule**)calloc(1, crush->max_rules * sizeof(crush_rule*));
2347     for (unsigned i = 0; i < crush->max_rules; ++i) {
2348       __u32 yes;
2349       ::decode(yes, blp);
2350       if (!yes) {
2351         crush->rules[i] = NULL;
2352         continue;
2353       }
2354
2355       __u32 len;
2356       ::decode(len, blp);
2357       crush->rules[i] = reinterpret_cast<crush_rule*>(calloc(1, crush_rule_size(len)));
2358       crush->rules[i]->len = len;
2359       ::decode(crush->rules[i]->mask, blp);
2360       for (unsigned j=0; j<crush->rules[i]->len; j++)
2361         ::decode(crush->rules[i]->steps[j], blp);
2362     }
2363
2364     // name info
2365     // NOTE: we had a bug where we were incoding int instead of int32, which means the
2366     // 'key' field for these maps may be either 32 or 64 bits, depending.  tolerate
2367     // both by assuming the string is always non-empty.
2368     decode_32_or_64_string_map(type_map, blp);
2369     decode_32_or_64_string_map(name_map, blp);
2370     decode_32_or_64_string_map(rule_name_map, blp);
2371
2372     // tunables
2373     if (!blp.end()) {
2374       ::decode(crush->choose_local_tries, blp);
2375       ::decode(crush->choose_local_fallback_tries, blp);
2376       ::decode(crush->choose_total_tries, blp);
2377     }
2378     if (!blp.end()) {
2379       ::decode(crush->chooseleaf_descend_once, blp);
2380     }
2381     if (!blp.end()) {
2382       ::decode(crush->chooseleaf_vary_r, blp);
2383     }
2384     if (!blp.end()) {
2385       ::decode(crush->straw_calc_version, blp);
2386     }
2387     if (!blp.end()) {
2388       ::decode(crush->allowed_bucket_algs, blp);
2389     }
2390     if (!blp.end()) {
2391       ::decode(crush->chooseleaf_stable, blp);
2392     }
2393     if (!blp.end()) {
2394       ::decode(class_map, blp);
2395       ::decode(class_name, blp);
2396       for (auto &c : class_name)
2397         class_rname[c.second] = c.first;
2398       ::decode(class_bucket, blp);
2399     }
2400     if (!blp.end()) {
2401       __u32 choose_args_size;
2402       ::decode(choose_args_size, blp);
2403       for (__u32 i = 0; i < choose_args_size; i++) {
2404         typename decltype(choose_args)::key_type choose_args_index;
2405         ::decode(choose_args_index, blp);
2406         crush_choose_arg_map arg_map;
2407         arg_map.size = crush->max_buckets;
2408         arg_map.args = (crush_choose_arg*)calloc(
2409           arg_map.size, sizeof(crush_choose_arg));
2410         __u32 size;
2411         ::decode(size, blp);
2412         for (__u32 j = 0; j < size; j++) {
2413           __u32 bucket_index;
2414           ::decode(bucket_index, blp);
2415           assert(bucket_index < arg_map.size);
2416           crush_choose_arg *arg = &arg_map.args[bucket_index];
2417           ::decode(arg->weight_set_size, blp);
2418           if (arg->weight_set_size) {
2419             arg->weight_set = (crush_weight_set*)calloc(
2420               arg->weight_set_size, sizeof(crush_weight_set));
2421             for (__u32 k = 0; k < arg->weight_set_size; k++) {
2422               crush_weight_set *weight_set = &arg->weight_set[k];
2423               ::decode(weight_set->size, blp);
2424               weight_set->weights = (__u32*)calloc(
2425                 weight_set->size, sizeof(__u32));
2426               for (__u32 l = 0; l < weight_set->size; l++)
2427                 ::decode(weight_set->weights[l], blp);
2428             }
2429           }
2430           ::decode(arg->ids_size, blp);
2431           if (arg->ids_size) {
2432             assert(arg->ids_size == crush->buckets[bucket_index]->size);
2433             arg->ids = (__s32 *)calloc(arg->ids_size, sizeof(__s32));
2434             for (__u32 k = 0; k < arg->ids_size; k++)
2435               ::decode(arg->ids[k], blp);
2436           }
2437         }
2438         choose_args[choose_args_index] = arg_map;
2439       }
2440     }
2441     finalize();
2442   }
2443   catch (...) {
2444     crush_destroy(crush);
2445     throw;
2446   }
2447 }
2448
2449 void CrushWrapper::decode_crush_bucket(crush_bucket** bptr, bufferlist::iterator &blp)
2450 {
2451   __u32 alg;
2452   ::decode(alg, blp);
2453   if (!alg) {
2454     *bptr = NULL;
2455     return;
2456   }
2457
2458   int size = 0;
2459   switch (alg) {
2460   case CRUSH_BUCKET_UNIFORM:
2461     size = sizeof(crush_bucket_uniform);
2462     break;
2463   case CRUSH_BUCKET_LIST:
2464     size = sizeof(crush_bucket_list);
2465     break;
2466   case CRUSH_BUCKET_TREE:
2467     size = sizeof(crush_bucket_tree);
2468     break;
2469   case CRUSH_BUCKET_STRAW:
2470     size = sizeof(crush_bucket_straw);
2471     break;
2472   case CRUSH_BUCKET_STRAW2:
2473     size = sizeof(crush_bucket_straw2);
2474     break;
2475   default:
2476     {
2477       char str[128];
2478       snprintf(str, sizeof(str), "unsupported bucket algorithm: %d", alg);
2479       throw buffer::malformed_input(str);
2480     }
2481   }
2482   crush_bucket *bucket = reinterpret_cast<crush_bucket*>(calloc(1, size));
2483   *bptr = bucket;
2484     
2485   ::decode(bucket->id, blp);
2486   ::decode(bucket->type, blp);
2487   ::decode(bucket->alg, blp);
2488   ::decode(bucket->hash, blp);
2489   ::decode(bucket->weight, blp);
2490   ::decode(bucket->size, blp);
2491
2492   bucket->items = (__s32*)calloc(1, bucket->size * sizeof(__s32));
2493   for (unsigned j = 0; j < bucket->size; ++j) {
2494     ::decode(bucket->items[j], blp);
2495   }
2496
2497   switch (bucket->alg) {
2498   case CRUSH_BUCKET_UNIFORM:
2499     ::decode((reinterpret_cast<crush_bucket_uniform*>(bucket))->item_weight, blp);
2500     break;
2501
2502   case CRUSH_BUCKET_LIST: {
2503     crush_bucket_list* cbl = reinterpret_cast<crush_bucket_list*>(bucket);
2504     cbl->item_weights = (__u32*)calloc(1, bucket->size * sizeof(__u32));
2505     cbl->sum_weights = (__u32*)calloc(1, bucket->size * sizeof(__u32));
2506
2507     for (unsigned j = 0; j < bucket->size; ++j) {
2508       ::decode(cbl->item_weights[j], blp);
2509       ::decode(cbl->sum_weights[j], blp);
2510     }
2511     break;
2512   }
2513
2514   case CRUSH_BUCKET_TREE: {
2515     crush_bucket_tree* cbt = reinterpret_cast<crush_bucket_tree*>(bucket);
2516     ::decode(cbt->num_nodes, blp);
2517     cbt->node_weights = (__u32*)calloc(1, cbt->num_nodes * sizeof(__u32));
2518     for (unsigned j=0; j<cbt->num_nodes; j++) {
2519       ::decode(cbt->node_weights[j], blp);
2520     }
2521     break;
2522   }
2523
2524   case CRUSH_BUCKET_STRAW: {
2525     crush_bucket_straw* cbs = reinterpret_cast<crush_bucket_straw*>(bucket);
2526     cbs->straws = (__u32*)calloc(1, bucket->size * sizeof(__u32));
2527     cbs->item_weights = (__u32*)calloc(1, bucket->size * sizeof(__u32));
2528     for (unsigned j = 0; j < bucket->size; ++j) {
2529       ::decode(cbs->item_weights[j], blp);
2530       ::decode(cbs->straws[j], blp);
2531     }
2532     break;
2533   }
2534
2535   case CRUSH_BUCKET_STRAW2: {
2536     crush_bucket_straw2* cbs = reinterpret_cast<crush_bucket_straw2*>(bucket);
2537     cbs->item_weights = (__u32*)calloc(1, bucket->size * sizeof(__u32));
2538     for (unsigned j = 0; j < bucket->size; ++j) {
2539       ::decode(cbs->item_weights[j], blp);
2540     }
2541     break;
2542   }
2543
2544   default:
2545     // We should have handled this case in the first switch statement
2546     ceph_abort();
2547     break;
2548   }
2549 }
2550
2551   
2552 void CrushWrapper::dump(Formatter *f) const
2553 {
2554   f->open_array_section("devices");
2555   for (int i=0; i<get_max_devices(); i++) {
2556     f->open_object_section("device");
2557     f->dump_int("id", i);
2558     const char *n = get_item_name(i);
2559     if (n) {
2560       f->dump_string("name", n);
2561     } else {
2562       char name[20];
2563       sprintf(name, "device%d", i);
2564       f->dump_string("name", name);
2565     }
2566     const char *device_class = get_item_class(i);
2567     if (device_class != NULL)
2568       f->dump_string("class", device_class);
2569     f->close_section();
2570   }
2571   f->close_section();
2572
2573   f->open_array_section("types");
2574   int n = get_num_type_names();
2575   for (int i=0; n; i++) {
2576     const char *name = get_type_name(i);
2577     if (!name) {
2578       if (i == 0) {
2579         f->open_object_section("type");
2580         f->dump_int("type_id", 0);
2581         f->dump_string("name", "device");
2582         f->close_section();
2583       }
2584       continue;
2585     }
2586     n--;
2587     f->open_object_section("type");
2588     f->dump_int("type_id", i);
2589     f->dump_string("name", name);
2590     f->close_section();
2591   }
2592   f->close_section();
2593
2594   f->open_array_section("buckets");
2595   for (int bucket = -1; bucket > -1-get_max_buckets(); --bucket) {
2596     if (!bucket_exists(bucket))
2597       continue;
2598     f->open_object_section("bucket");
2599     f->dump_int("id", bucket);
2600     if (get_item_name(bucket))
2601       f->dump_string("name", get_item_name(bucket));
2602     f->dump_int("type_id", get_bucket_type(bucket));
2603     if (get_type_name(get_bucket_type(bucket)))
2604       f->dump_string("type_name", get_type_name(get_bucket_type(bucket)));
2605     f->dump_int("weight", get_bucket_weight(bucket));
2606     f->dump_string("alg", crush_bucket_alg_name(get_bucket_alg(bucket)));
2607     f->dump_string("hash", crush_hash_name(get_bucket_hash(bucket)));
2608     f->open_array_section("items");
2609     for (int j=0; j<get_bucket_size(bucket); j++) {
2610       f->open_object_section("item");
2611       f->dump_int("id", get_bucket_item(bucket, j));
2612       f->dump_int("weight", get_bucket_item_weight(bucket, j));
2613       f->dump_int("pos", j);
2614       f->close_section();
2615     }
2616     f->close_section();
2617     f->close_section();
2618   }
2619   f->close_section();
2620
2621   f->open_array_section("rules");
2622   dump_rules(f);
2623   f->close_section();
2624
2625   f->open_object_section("tunables");
2626   dump_tunables(f);
2627   f->close_section();
2628
2629   dump_choose_args(f);
2630 }
2631
2632 namespace {
2633   // depth first walker
2634   class TreeDumper {
2635     typedef CrushTreeDumper::Item Item;
2636     const CrushWrapper *crush;
2637     const CrushTreeDumper::name_map_t& weight_set_names;
2638   public:
2639     explicit TreeDumper(const CrushWrapper *crush,
2640                         const CrushTreeDumper::name_map_t& wsnames)
2641       : crush(crush), weight_set_names(wsnames) {}
2642
2643     void dump(Formatter *f) {
2644       set<int> roots;
2645       crush->find_roots(&roots);
2646       for (set<int>::iterator root = roots.begin(); root != roots.end(); ++root) {
2647         dump_item(Item(*root, 0, 0, crush->get_bucket_weightf(*root)), f);
2648       }
2649     }
2650
2651   private:
2652     void dump_item(const Item& qi, Formatter* f) {
2653       if (qi.is_bucket()) {
2654         f->open_object_section("bucket");
2655         CrushTreeDumper::dump_item_fields(crush, weight_set_names, qi, f);
2656         dump_bucket_children(qi, f);
2657         f->close_section();
2658       } else {
2659         f->open_object_section("device");
2660         CrushTreeDumper::dump_item_fields(crush, weight_set_names, qi, f);
2661         f->close_section();
2662       }
2663     }
2664
2665     void dump_bucket_children(const Item& parent, Formatter* f) {
2666       f->open_array_section("items");
2667       const int max_pos = crush->get_bucket_size(parent.id);
2668       for (int pos = 0; pos < max_pos; pos++) {
2669         int id = crush->get_bucket_item(parent.id, pos);
2670         float weight = crush->get_bucket_item_weightf(parent.id, pos);
2671         dump_item(Item(id, parent.id, parent.depth + 1, weight), f);
2672       }
2673       f->close_section();
2674     }
2675   };
2676 }
2677
2678 void CrushWrapper::dump_tree(
2679   Formatter *f,
2680   const CrushTreeDumper::name_map_t& weight_set_names) const
2681 {
2682   assert(f);
2683   TreeDumper(this, weight_set_names).dump(f);
2684 }
2685
2686 void CrushWrapper::dump_tunables(Formatter *f) const
2687 {
2688   f->dump_int("choose_local_tries", get_choose_local_tries());
2689   f->dump_int("choose_local_fallback_tries", get_choose_local_fallback_tries());
2690   f->dump_int("choose_total_tries", get_choose_total_tries());
2691   f->dump_int("chooseleaf_descend_once", get_chooseleaf_descend_once());
2692   f->dump_int("chooseleaf_vary_r", get_chooseleaf_vary_r());
2693   f->dump_int("chooseleaf_stable", get_chooseleaf_stable());
2694   f->dump_int("straw_calc_version", get_straw_calc_version());
2695   f->dump_int("allowed_bucket_algs", get_allowed_bucket_algs());
2696
2697   // be helpful about it
2698   if (has_jewel_tunables())
2699     f->dump_string("profile", "jewel");
2700   else if (has_hammer_tunables())
2701     f->dump_string("profile", "hammer");
2702   else if (has_firefly_tunables())
2703     f->dump_string("profile", "firefly");
2704   else if (has_bobtail_tunables())
2705     f->dump_string("profile", "bobtail");
2706   else if (has_argonaut_tunables())
2707     f->dump_string("profile", "argonaut");
2708   else
2709     f->dump_string("profile", "unknown");
2710   f->dump_int("optimal_tunables", (int)has_optimal_tunables());
2711   f->dump_int("legacy_tunables", (int)has_legacy_tunables());
2712
2713   // be helpful about minimum version required
2714   f->dump_string("minimum_required_version", get_min_required_version());
2715
2716   f->dump_int("require_feature_tunables", (int)has_nondefault_tunables());
2717   f->dump_int("require_feature_tunables2", (int)has_nondefault_tunables2());
2718   f->dump_int("has_v2_rules", (int)has_v2_rules());
2719   f->dump_int("require_feature_tunables3", (int)has_nondefault_tunables3());
2720   f->dump_int("has_v3_rules", (int)has_v3_rules());
2721   f->dump_int("has_v4_buckets", (int)has_v4_buckets());
2722   f->dump_int("require_feature_tunables5", (int)has_nondefault_tunables5());
2723   f->dump_int("has_v5_rules", (int)has_v5_rules());
2724 }
2725
2726 void CrushWrapper::dump_choose_args(Formatter *f) const
2727 {
2728   f->open_object_section("choose_args");
2729   for (auto c : choose_args) {
2730     crush_choose_arg_map arg_map = c.second;
2731     f->open_array_section(stringify(c.first).c_str());
2732     for (__u32 i = 0; i < arg_map.size; i++) {
2733       crush_choose_arg *arg = &arg_map.args[i];
2734       if (arg->weight_set_size == 0 &&
2735           arg->ids_size == 0)
2736         continue;
2737       f->open_object_section("choose_args");
2738       int bucket_index = i;
2739       f->dump_int("bucket_id", -1-bucket_index);
2740       if (arg->weight_set_size > 0) {
2741         f->open_array_section("weight_set");
2742         for (__u32 j = 0; j < arg->weight_set_size; j++) {
2743           f->open_array_section("weights");
2744           __u32 *weights = arg->weight_set[j].weights;
2745           __u32 size = arg->weight_set[j].size;
2746           for (__u32 k = 0; k < size; k++) {
2747             f->dump_float("weight", (float)weights[k]/(float)0x10000);
2748           }
2749           f->close_section();
2750         }
2751         f->close_section();
2752       }
2753       if (arg->ids_size > 0) {
2754         f->open_array_section("ids");
2755         for (__u32 j = 0; j < arg->ids_size; j++)
2756           f->dump_int("id", arg->ids[j]);
2757         f->close_section();
2758       }
2759       f->close_section();
2760     }
2761     f->close_section();
2762   }
2763   f->close_section();
2764 }
2765
2766 void CrushWrapper::dump_rules(Formatter *f) const
2767 {
2768   for (int i=0; i<get_max_rules(); i++) {
2769     if (!rule_exists(i))
2770       continue;
2771     dump_rule(i, f);
2772   }
2773 }
2774
2775 void CrushWrapper::dump_rule(int ruleset, Formatter *f) const
2776 {
2777   f->open_object_section("rule");
2778   f->dump_int("rule_id", ruleset);
2779   if (get_rule_name(ruleset))
2780     f->dump_string("rule_name", get_rule_name(ruleset));
2781   f->dump_int("ruleset", get_rule_mask_ruleset(ruleset));
2782   f->dump_int("type", get_rule_mask_type(ruleset));
2783   f->dump_int("min_size", get_rule_mask_min_size(ruleset));
2784   f->dump_int("max_size", get_rule_mask_max_size(ruleset));
2785   f->open_array_section("steps");
2786   for (int j=0; j<get_rule_len(ruleset); j++) {
2787     f->open_object_section("step");
2788     switch (get_rule_op(ruleset, j)) {
2789     case CRUSH_RULE_NOOP:
2790       f->dump_string("op", "noop");
2791       break;
2792     case CRUSH_RULE_TAKE:
2793       f->dump_string("op", "take");
2794       {
2795         int item = get_rule_arg1(ruleset, j);
2796         f->dump_int("item", item);
2797
2798         const char *name = get_item_name(item);
2799         f->dump_string("item_name", name ? name : "");
2800       }
2801       break;
2802     case CRUSH_RULE_EMIT:
2803       f->dump_string("op", "emit");
2804       break;
2805     case CRUSH_RULE_CHOOSE_FIRSTN:
2806       f->dump_string("op", "choose_firstn");
2807       f->dump_int("num", get_rule_arg1(ruleset, j));
2808       f->dump_string("type", get_type_name(get_rule_arg2(ruleset, j)));
2809       break;
2810     case CRUSH_RULE_CHOOSE_INDEP:
2811       f->dump_string("op", "choose_indep");
2812       f->dump_int("num", get_rule_arg1(ruleset, j));
2813       f->dump_string("type", get_type_name(get_rule_arg2(ruleset, j)));
2814       break;
2815     case CRUSH_RULE_CHOOSELEAF_FIRSTN:
2816       f->dump_string("op", "chooseleaf_firstn");
2817       f->dump_int("num", get_rule_arg1(ruleset, j));
2818       f->dump_string("type", get_type_name(get_rule_arg2(ruleset, j)));
2819       break;
2820     case CRUSH_RULE_CHOOSELEAF_INDEP:
2821       f->dump_string("op", "chooseleaf_indep");
2822       f->dump_int("num", get_rule_arg1(ruleset, j));
2823       f->dump_string("type", get_type_name(get_rule_arg2(ruleset, j)));
2824       break;
2825     case CRUSH_RULE_SET_CHOOSE_TRIES:
2826       f->dump_string("op", "set_choose_tries");
2827       f->dump_int("num", get_rule_arg1(ruleset, j));
2828       break;
2829     case CRUSH_RULE_SET_CHOOSELEAF_TRIES:
2830       f->dump_string("op", "set_chooseleaf_tries");
2831       f->dump_int("num", get_rule_arg1(ruleset, j));
2832       break;
2833     default:
2834       f->dump_int("opcode", get_rule_op(ruleset, j));
2835       f->dump_int("arg1", get_rule_arg1(ruleset, j));
2836       f->dump_int("arg2", get_rule_arg2(ruleset, j));
2837     }
2838     f->close_section();
2839   }
2840   f->close_section();
2841   f->close_section();
2842 }
2843
2844 void CrushWrapper::list_rules(Formatter *f) const
2845 {
2846   for (int rule = 0; rule < get_max_rules(); rule++) {
2847     if (!rule_exists(rule))
2848       continue;
2849     f->dump_string("name", get_rule_name(rule));
2850   }
2851 }
2852
2853 void CrushWrapper::list_rules(ostream *ss) const
2854 {
2855   for (int rule = 0; rule < get_max_rules(); rule++) {
2856     if (!rule_exists(rule))
2857       continue;
2858     *ss << get_rule_name(rule) << "\n";
2859   }
2860 }
2861
2862 class CrushTreePlainDumper : public CrushTreeDumper::Dumper<TextTable> {
2863 public:
2864   typedef CrushTreeDumper::Dumper<TextTable> Parent;
2865
2866   explicit CrushTreePlainDumper(const CrushWrapper *crush,
2867                                 const CrushTreeDumper::name_map_t& wsnames)
2868     : Parent(crush, wsnames) {}
2869   explicit CrushTreePlainDumper(const CrushWrapper *crush,
2870                                 const CrushTreeDumper::name_map_t& wsnames,
2871                                 bool show_shadow)
2872     : Parent(crush, wsnames, show_shadow) {}
2873
2874
2875   void dump(TextTable *tbl) {
2876     tbl->define_column("ID", TextTable::LEFT, TextTable::RIGHT);
2877     tbl->define_column("CLASS", TextTable::LEFT, TextTable::RIGHT);
2878     tbl->define_column("WEIGHT", TextTable::LEFT, TextTable::RIGHT);
2879     for (auto& p : crush->choose_args) {
2880       if (p.first == CrushWrapper::DEFAULT_CHOOSE_ARGS) {
2881         tbl->define_column("(compat)", TextTable::LEFT, TextTable::RIGHT);
2882       } else {
2883         string name;
2884         auto q = weight_set_names.find(p.first);
2885         name = q != weight_set_names.end() ? q->second :
2886           stringify(p.first);
2887         tbl->define_column(name.c_str(), TextTable::LEFT, TextTable::RIGHT);
2888       }
2889     }
2890     tbl->define_column("TYPE NAME", TextTable::LEFT, TextTable::LEFT);
2891     Parent::dump(tbl);
2892   }
2893
2894 protected:
2895   void dump_item(const CrushTreeDumper::Item &qi, TextTable *tbl) override {
2896     const char *c = crush->get_item_class(qi.id);
2897     if (!c)
2898       c = "";
2899     *tbl << qi.id
2900          << c
2901          << weightf_t(qi.weight);
2902     for (auto& p : crush->choose_args) {
2903       if (qi.parent < 0) {
2904         const crush_choose_arg_map cmap = crush->choose_args_get(p.first);
2905         int bidx = -1 - qi.parent;
2906         const crush_bucket *b = crush->get_bucket(qi.parent);
2907         if (b &&
2908             bidx < (int)cmap.size &&
2909             cmap.args[bidx].weight_set &&
2910             cmap.args[bidx].weight_set_size >= 1) {
2911           int pos;
2912           for (pos = 0;
2913                pos < (int)cmap.args[bidx].weight_set[0].size &&
2914                  b->items[pos] != qi.id;
2915                ++pos) ;
2916           *tbl << weightf_t((float)cmap.args[bidx].weight_set[0].weights[pos] /
2917                             (float)0x10000);
2918           continue;
2919         }
2920       }
2921       *tbl << "";
2922     }
2923     ostringstream ss;
2924     for (int k=0; k < qi.depth; k++) {
2925       ss << "    ";
2926     }
2927     if (qi.is_bucket()) {
2928       ss << crush->get_type_name(crush->get_bucket_type(qi.id)) << " "
2929          << crush->get_item_name(qi.id);
2930     } else {
2931       ss << "osd." << qi.id;
2932     }
2933     *tbl << ss.str();
2934     *tbl << TextTable::endrow;
2935   }
2936 };
2937
2938
2939 class CrushTreeFormattingDumper : public CrushTreeDumper::FormattingDumper {
2940 public:
2941   typedef CrushTreeDumper::FormattingDumper Parent;
2942
2943   explicit CrushTreeFormattingDumper(
2944     const CrushWrapper *crush,
2945     const CrushTreeDumper::name_map_t& wsnames)
2946     : Parent(crush, wsnames) {}
2947
2948   explicit CrushTreeFormattingDumper(
2949     const CrushWrapper *crush,
2950     const CrushTreeDumper::name_map_t& wsnames,
2951     bool show_shadow)
2952     : Parent(crush, wsnames, show_shadow) {}
2953
2954   void dump(Formatter *f) {
2955     f->open_array_section("nodes");
2956     Parent::dump(f);
2957     f->close_section();
2958     f->open_array_section("stray");
2959     f->close_section();
2960   }
2961 };
2962
2963
2964 void CrushWrapper::dump_tree(
2965   ostream *out,
2966   Formatter *f,
2967   const CrushTreeDumper::name_map_t& weight_set_names,
2968   bool show_shadow) const
2969 {
2970   if (out) {
2971     TextTable tbl;
2972     CrushTreePlainDumper(this, weight_set_names, show_shadow).dump(&tbl);
2973     *out << tbl;
2974   }
2975   if (f) {
2976     CrushTreeFormattingDumper(this, weight_set_names, show_shadow).dump(f);
2977   }
2978 }
2979
2980 void CrushWrapper::generate_test_instances(list<CrushWrapper*>& o)
2981 {
2982   o.push_back(new CrushWrapper);
2983   // fixme
2984 }
2985
2986 /**
2987  * Determine the default CRUSH ruleset ID to be used with
2988  * newly created replicated pools.
2989  *
2990  * @returns a ruleset ID (>=0) or -1 if no suitable ruleset found
2991  */
2992 int CrushWrapper::get_osd_pool_default_crush_replicated_ruleset(CephContext *cct)
2993 {
2994   int crush_ruleset = cct->_conf->osd_pool_default_crush_rule;
2995   if (crush_ruleset < 0) {
2996     crush_ruleset = find_first_ruleset(pg_pool_t::TYPE_REPLICATED);
2997   } else if (!ruleset_exists(crush_ruleset)) {
2998     crush_ruleset = -1; // match find_first_ruleset() retval
2999   }
3000   return crush_ruleset;
3001 }
3002
3003 bool CrushWrapper::is_valid_crush_name(const string& s)
3004 {
3005   if (s.empty())
3006     return false;
3007   for (string::const_iterator p = s.begin(); p != s.end(); ++p) {
3008     if (!(*p == '-') &&
3009         !(*p == '_') &&
3010         !(*p == '.') &&
3011         !(*p >= '0' && *p <= '9') &&
3012         !(*p >= 'A' && *p <= 'Z') &&
3013         !(*p >= 'a' && *p <= 'z'))
3014       return false;
3015   }
3016   return true;
3017 }
3018
3019 bool CrushWrapper::is_valid_crush_loc(CephContext *cct,
3020                                       const map<string,string>& loc)
3021 {
3022   for (map<string,string>::const_iterator l = loc.begin(); l != loc.end(); ++l) {
3023     if (!is_valid_crush_name(l->first) ||
3024         !is_valid_crush_name(l->second)) {
3025       ldout(cct, 1) << "loc["
3026                     << l->first << "] = '"
3027                     << l->second << "' not a valid crush name ([A-Za-z0-9_-.]+)"
3028                     << dendl;
3029       return false;
3030     }
3031   }
3032   return true;
3033 }
3034
3035 int CrushWrapper::_choose_type_stack(
3036   CephContext *cct,
3037   const vector<pair<int,int>>& stack,
3038   const set<int>& overfull,
3039   const vector<int>& underfull,
3040   const vector<int>& orig,
3041   vector<int>::const_iterator& i,
3042   set<int>& used,
3043   vector<int> *pw) const
3044 {
3045   vector<int> w = *pw;
3046   vector<int> o;
3047
3048   ldout(cct, 10) << __func__ << " stack " << stack
3049                  << " orig " << orig
3050                  << " at " << *i
3051                  << " pw " << *pw
3052                  << dendl;
3053
3054   vector<int> cumulative_fanout(stack.size());
3055   int f = 1;
3056   for (int j = (int)stack.size() - 1; j >= 0; --j) {
3057     cumulative_fanout[j] = f;
3058     f *= stack[j].second;
3059   }
3060   ldout(cct, 10) << __func__ << " cumulative_fanout " << cumulative_fanout
3061                  << dendl;
3062
3063   // identify underful targets for each intermediate level.
3064   // this serves two purposes:
3065   //   1. we can tell when we are selecting a bucket that does not have any underfull
3066   //      devices beneath it.  that means that if the current input includes an overfull
3067   //      device, we won't be able to find an underfull device with this parent to
3068   //      swap for it.
3069   //   2. when we decide we should reject a bucket due to the above, this list gives us
3070   //      a list of peers to consider that *do* have underfull devices available..  (we
3071   //      are careful to pick one that has the same parent.)
3072   vector<set<int>> underfull_buckets; // level -> set of buckets with >0 underfull item(s)
3073   underfull_buckets.resize(stack.size() - 1);
3074   for (auto osd : underfull) {
3075     int item = osd;
3076     for (int j = (int)stack.size() - 2; j >= 0; --j) {
3077       int type = stack[j].first;
3078       item = get_parent_of_type(item, type);
3079       ldout(cct, 10) << __func__ << " underfull " << osd << " type " << type
3080                      << " is " << item << dendl;
3081       underfull_buckets[j].insert(item);
3082     }
3083   }
3084   ldout(cct, 20) << __func__ << " underfull_buckets " << underfull_buckets << dendl;
3085
3086   for (unsigned j = 0; j < stack.size(); ++j) {
3087     int type = stack[j].first;
3088     int fanout = stack[j].second;
3089     int cum_fanout = cumulative_fanout[j];
3090     ldout(cct, 10) << " level " << j << ": type " << type << " fanout " << fanout
3091                    << " cumulative " << cum_fanout
3092                    << " w " << w << dendl;
3093     vector<int> o;
3094     auto tmpi = i;
3095     for (auto from : w) {
3096       ldout(cct, 10) << " from " << from << dendl;
3097       // identify leaves under each choice.  we use this to check whether any of these
3098       // leaves are overfull.  (if so, we need to make sure there are underfull candidates
3099       // to swap for them.)
3100       vector<set<int>> leaves;
3101       leaves.resize(fanout);
3102       for (int pos = 0; pos < fanout; ++pos) {
3103         if (type > 0) {
3104           // non-leaf
3105           int item = get_parent_of_type(*tmpi, type);
3106           o.push_back(item);
3107           int n = cum_fanout;
3108           while (n-- && tmpi != orig.end()) {
3109             leaves[pos].insert(*tmpi++);
3110           }
3111           ldout(cct, 10) << __func__ << "   from " << *tmpi << " got " << item
3112                          << " of type " << type << " over leaves " << leaves[pos] << dendl;
3113         } else {
3114           // leaf
3115           bool replaced = false;
3116           if (overfull.count(*i)) {
3117             for (auto item : underfull) {
3118               ldout(cct, 10) << __func__ << " pos " << pos
3119                              << " was " << *i << " considering " << item
3120                              << dendl;
3121               if (used.count(item)) {
3122                 ldout(cct, 20) << __func__ << "   in used " << used << dendl;
3123                 continue;
3124               }
3125               if (!subtree_contains(from, item)) {
3126                 ldout(cct, 20) << __func__ << "   not in subtree " << from << dendl;
3127                 continue;
3128               }
3129               if (std::find(orig.begin(), orig.end(), item) != orig.end()) {
3130                 ldout(cct, 20) << __func__ << "   in orig " << orig << dendl;
3131                 continue;
3132               }
3133               o.push_back(item);
3134               used.insert(item);
3135               ldout(cct, 10) << __func__ << " pos " << pos << " replace "
3136                              << *i << " -> " << item << dendl;
3137               replaced = true;
3138               ++i;
3139               break;
3140             }
3141           }
3142           if (!replaced) {
3143             ldout(cct, 10) << __func__ << " pos " << pos << " keep " << *i
3144                            << dendl;
3145             o.push_back(*i);
3146             ++i;
3147           }
3148           if (i == orig.end()) {
3149             ldout(cct, 10) << __func__ << " end of orig, break 1" << dendl;
3150             break;
3151           }
3152         }
3153       }
3154       if (j + 1 < stack.size()) {
3155         // check if any buckets have overfull leaves but no underfull candidates
3156         for (int pos = 0; pos < fanout; ++pos) {
3157           if (underfull_buckets[j].count(o[pos]) == 0) {
3158             // are any leaves overfull?
3159             bool any_overfull = false;
3160             for (auto osd : leaves[pos]) {
3161               if (overfull.count(osd)) {
3162                 any_overfull = true;
3163               }
3164             }
3165             if (any_overfull) {
3166               ldout(cct, 10) << " bucket " << o[pos] << " has no underfull targets and "
3167                              << ">0 leaves " << leaves[pos] << " is overfull; alts "
3168                              << underfull_buckets[j]
3169                              << dendl;
3170               for (auto alt : underfull_buckets[j]) {
3171                 if (std::find(o.begin(), o.end(), alt) == o.end()) {
3172                   // see if alt has the same parent
3173                   if (j == 0 ||
3174                       get_parent_of_type(o[pos], stack[j-1].first) ==
3175                       get_parent_of_type(alt, stack[j-1].first)) {
3176                     if (j)
3177                       ldout(cct, 10) << "  replacing " << o[pos]
3178                                      << " (which has no underfull leaves) with " << alt
3179                                      << " (same parent "
3180                                      << get_parent_of_type(alt, stack[j-1].first) << " type "
3181                                      << type << ")" << dendl;
3182                     else
3183                       ldout(cct, 10) << "  replacing " << o[pos]
3184                                      << " (which has no underfull leaves) with " << alt
3185                                      << " (first level)" << dendl;
3186                     o[pos] = alt;
3187                     break;
3188                   } else {
3189                     ldout(cct, 30) << "  alt " << alt << " for " << o[pos]
3190                                    << " has different parent, skipping" << dendl;
3191                   }
3192                 }
3193               }
3194             }
3195           }
3196         }
3197       }
3198       if (i == orig.end()) {
3199         ldout(cct, 10) << __func__ << " end of orig, break 2" << dendl;
3200         break;
3201       }
3202     }
3203     ldout(cct, 10) << __func__ << "  w <- " << o << " was " << w << dendl;
3204     w.swap(o);
3205   }
3206   *pw = w;
3207   return 0;
3208 }
3209
3210 int CrushWrapper::try_remap_rule(
3211   CephContext *cct,
3212   int ruleno,
3213   int maxout,
3214   const set<int>& overfull,
3215   const vector<int>& underfull,
3216   const vector<int>& orig,
3217   vector<int> *out) const
3218 {
3219   const crush_map *map = crush;
3220   const crush_rule *rule = get_rule(ruleno);
3221   assert(rule);
3222
3223   ldout(cct, 10) << __func__ << " ruleno " << ruleno
3224                 << " numrep " << maxout << " overfull " << overfull
3225                 << " underfull " << underfull << " orig " << orig
3226                 << dendl;
3227   vector<int> w; // working set
3228   out->clear();
3229
3230   auto i = orig.begin();
3231   set<int> used;
3232
3233   vector<pair<int,int>> type_stack;  // (type, fan-out)
3234
3235   for (unsigned step = 0; step < rule->len; ++step) {
3236     const crush_rule_step *curstep = &rule->steps[step];
3237     ldout(cct, 10) << __func__ << " step " << step << " w " << w << dendl;
3238     switch (curstep->op) {
3239     case CRUSH_RULE_TAKE:
3240       if ((curstep->arg1 >= 0 && curstep->arg1 < map->max_devices) ||
3241           (-1-curstep->arg1 >= 0 && -1-curstep->arg1 < map->max_buckets &&
3242            map->buckets[-1-curstep->arg1])) {
3243         w.clear();
3244         w.push_back(curstep->arg1);
3245         ldout(cct, 10) << __func__ << " take " << w << dendl;
3246       } else {
3247         ldout(cct, 1) << " bad take value " << curstep->arg1 << dendl;
3248       }
3249       break;
3250
3251     case CRUSH_RULE_CHOOSELEAF_FIRSTN:
3252     case CRUSH_RULE_CHOOSELEAF_INDEP:
3253       {
3254         int numrep = curstep->arg1;
3255         int type = curstep->arg2;
3256         if (numrep <= 0)
3257           numrep += maxout;
3258         type_stack.push_back(make_pair(type, numrep));
3259         type_stack.push_back(make_pair(0, 1));
3260         int r = _choose_type_stack(cct, type_stack, overfull, underfull, orig,
3261                                    i, used, &w);
3262         if (r < 0)
3263           return r;
3264         type_stack.clear();
3265       }
3266       break;
3267
3268     case CRUSH_RULE_CHOOSE_FIRSTN:
3269     case CRUSH_RULE_CHOOSE_INDEP:
3270       {
3271         int numrep = curstep->arg1;
3272         int type = curstep->arg2;
3273         if (numrep <= 0)
3274           numrep += maxout;
3275         type_stack.push_back(make_pair(type, numrep));
3276       }
3277       break;
3278
3279     case CRUSH_RULE_EMIT:
3280       ldout(cct, 10) << " emit " << w << dendl;
3281       if (!type_stack.empty()) {
3282         int r = _choose_type_stack(cct, type_stack, overfull, underfull, orig,
3283                                    i, used, &w);
3284         if (r < 0)
3285           return r;
3286         type_stack.clear();
3287       }
3288       for (auto item : w) {
3289         out->push_back(item);
3290       }
3291       w.clear();
3292       break;
3293
3294     default:
3295       // ignore
3296       break;
3297     }
3298   }
3299
3300   return 0;
3301 }
3302
3303
3304 int CrushWrapper::_choose_args_adjust_item_weight_in_bucket(
3305   CephContext *cct,
3306   crush_choose_arg_map cmap,
3307   int bucketid,
3308   int id,
3309   const vector<int>& weight,
3310   ostream *ss)
3311 {
3312   int changed = 0;
3313   int bidx = -1 - bucketid;
3314   crush_bucket *b = crush->buckets[bidx];
3315   if (bidx >= (int)cmap.size) {
3316     if (ss)
3317       *ss << "no weight-set for bucket " << b->id;
3318     ldout(cct, 10) << __func__ << "  no crush_choose_arg for bucket " << b->id
3319                    << dendl;
3320     return 0;
3321   }
3322   crush_choose_arg *carg = &cmap.args[bidx];
3323   if (carg->weight_set == NULL) {
3324     if (ss)
3325       *ss << "no weight-set for bucket " << b->id;
3326     ldout(cct, 10) << __func__ << "  no weight_set for bucket " << b->id
3327                    << dendl;
3328     return 0;
3329   }
3330   if (carg->weight_set_size != weight.size()) {
3331     if (ss)
3332       *ss << "weight_set_size != " << weight.size() << " for bucket " << b->id;
3333     ldout(cct, 10) << __func__ << "  weight_set_size != " << weight.size()
3334                    << " for bucket " << b->id << dendl;
3335     return 0;
3336   }
3337   for (unsigned i = 0; i < b->size; i++) {
3338     if (b->items[i] == id) {
3339       for (unsigned j = 0; j < weight.size(); ++j) {
3340         carg->weight_set[j].weights[i] = weight[j];
3341       }
3342       ldout(cct, 5) << __func__ << "  set " << id << " to " << weight
3343                     << " in bucket " << b->id << dendl;
3344       changed++;
3345     }
3346   }
3347   if (changed) {
3348     vector<int> bucket_weight(weight.size(), 0);
3349     for (unsigned i = 0; i < b->size; i++) {
3350       for (unsigned j = 0; j < weight.size(); ++j) {
3351         bucket_weight[j] += carg->weight_set[j].weights[i];
3352       }
3353     }
3354     choose_args_adjust_item_weight(cct, cmap, b->id, bucket_weight, nullptr);
3355   }
3356   return changed;
3357 }
3358
3359 int CrushWrapper::choose_args_adjust_item_weight(
3360   CephContext *cct,
3361   crush_choose_arg_map cmap,
3362   int id,
3363   const vector<int>& weight,
3364   ostream *ss)
3365 {
3366   ldout(cct, 5) << __func__ << " " << id << " weight " << weight << dendl;
3367   int changed = 0;
3368   for (int bidx = 0; bidx < crush->max_buckets; bidx++) {
3369     crush_bucket *b = crush->buckets[bidx];
3370     if (b == nullptr) {
3371       continue;
3372     }
3373     changed += _choose_args_adjust_item_weight_in_bucket(
3374       cct, cmap, b->id, id, weight, ss);
3375   }
3376   if (!changed) {
3377     if (ss)
3378       *ss << "item " << id << " not found in crush map";
3379     return -ENOENT;
3380   }
3381   return changed;
3382 }