Fix some bugs when testing opensds ansible
[stor4nfv.git] / src / ceph / src / osd / OSDMap.cc
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
3 /*
4  * Ceph - scalable distributed file system
5  *
6  * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
7  * Copyright (C) 2013,2014 Cloudwatt <libre.licensing@cloudwatt.com>
8  *
9  * Author: Loic Dachary <loic@dachary.org>
10  *
11  * This is free software; you can redistribute it and/or
12  * modify it under the terms of the GNU Lesser General Public
13  * License version 2.1, as published by the Free Software
14  * Foundation.  See file COPYING.
15  *
16  */
17
18 #include <boost/algorithm/string.hpp>
19
20 #include "OSDMap.h"
21 #include <algorithm>
22 #include "common/config.h"
23 #include "common/errno.h"
24 #include "common/Formatter.h"
25 #include "common/TextTable.h"
26 #include "include/ceph_features.h"
27 #include "include/str_map.h"
28
29 #include "common/code_environment.h"
30 #include "mon/health_check.h"
31
32 #include "crush/CrushTreeDumper.h"
33 #include "common/Clock.h"
34 #include "mon/PGStatService.h"
35  
36 #define dout_subsys ceph_subsys_osd
37
38 MEMPOOL_DEFINE_OBJECT_FACTORY(OSDMap, osdmap, osdmap);
39 MEMPOOL_DEFINE_OBJECT_FACTORY(OSDMap::Incremental, osdmap_inc, osdmap);
40
41
42 // ----------------------------------
43 // osd_info_t
44
45 void osd_info_t::dump(Formatter *f) const
46 {
47   f->dump_int("last_clean_begin", last_clean_begin);
48   f->dump_int("last_clean_end", last_clean_end);
49   f->dump_int("up_from", up_from);
50   f->dump_int("up_thru", up_thru);
51   f->dump_int("down_at", down_at);
52   f->dump_int("lost_at", lost_at);
53 }
54
55 void osd_info_t::encode(bufferlist& bl) const
56 {
57   __u8 struct_v = 1;
58   ::encode(struct_v, bl);
59   ::encode(last_clean_begin, bl);
60   ::encode(last_clean_end, bl);
61   ::encode(up_from, bl);
62   ::encode(up_thru, bl);
63   ::encode(down_at, bl);
64   ::encode(lost_at, bl);
65 }
66
67 void osd_info_t::decode(bufferlist::iterator& bl)
68 {
69   __u8 struct_v;
70   ::decode(struct_v, bl);
71   ::decode(last_clean_begin, bl);
72   ::decode(last_clean_end, bl);
73   ::decode(up_from, bl);
74   ::decode(up_thru, bl);
75   ::decode(down_at, bl);
76   ::decode(lost_at, bl);
77 }
78
79 void osd_info_t::generate_test_instances(list<osd_info_t*>& o)
80 {
81   o.push_back(new osd_info_t);
82   o.push_back(new osd_info_t);
83   o.back()->last_clean_begin = 1;
84   o.back()->last_clean_end = 2;
85   o.back()->up_from = 30;
86   o.back()->up_thru = 40;
87   o.back()->down_at = 5;
88   o.back()->lost_at = 6;
89 }
90
91 ostream& operator<<(ostream& out, const osd_info_t& info)
92 {
93   out << "up_from " << info.up_from
94       << " up_thru " << info.up_thru
95       << " down_at " << info.down_at
96       << " last_clean_interval [" << info.last_clean_begin << "," << info.last_clean_end << ")";
97   if (info.lost_at)
98     out << " lost_at " << info.lost_at;
99   return out;
100 }
101
102 // ----------------------------------
103 // osd_xinfo_t
104
105 void osd_xinfo_t::dump(Formatter *f) const
106 {
107   f->dump_stream("down_stamp") << down_stamp;
108   f->dump_float("laggy_probability", laggy_probability);
109   f->dump_int("laggy_interval", laggy_interval);
110   f->dump_int("features", features);
111   f->dump_unsigned("old_weight", old_weight);
112 }
113
114 void osd_xinfo_t::encode(bufferlist& bl) const
115 {
116   ENCODE_START(3, 1, bl);
117   ::encode(down_stamp, bl);
118   __u32 lp = laggy_probability * 0xfffffffful;
119   ::encode(lp, bl);
120   ::encode(laggy_interval, bl);
121   ::encode(features, bl);
122   ::encode(old_weight, bl);
123   ENCODE_FINISH(bl);
124 }
125
126 void osd_xinfo_t::decode(bufferlist::iterator& bl)
127 {
128   DECODE_START(3, bl);
129   ::decode(down_stamp, bl);
130   __u32 lp;
131   ::decode(lp, bl);
132   laggy_probability = (float)lp / (float)0xffffffff;
133   ::decode(laggy_interval, bl);
134   if (struct_v >= 2)
135     ::decode(features, bl);
136   else
137     features = 0;
138   if (struct_v >= 3)
139     ::decode(old_weight, bl);
140   else
141     old_weight = 0;
142   DECODE_FINISH(bl);
143 }
144
145 void osd_xinfo_t::generate_test_instances(list<osd_xinfo_t*>& o)
146 {
147   o.push_back(new osd_xinfo_t);
148   o.push_back(new osd_xinfo_t);
149   o.back()->down_stamp = utime_t(2, 3);
150   o.back()->laggy_probability = .123;
151   o.back()->laggy_interval = 123456;
152   o.back()->old_weight = 0x7fff;
153 }
154
155 ostream& operator<<(ostream& out, const osd_xinfo_t& xi)
156 {
157   return out << "down_stamp " << xi.down_stamp
158              << " laggy_probability " << xi.laggy_probability
159              << " laggy_interval " << xi.laggy_interval
160              << " old_weight " << xi.old_weight;
161 }
162
163 // ----------------------------------
164 // OSDMap::Incremental
165
166 int OSDMap::Incremental::get_net_marked_out(const OSDMap *previous) const
167 {
168   int n = 0;
169   for (auto &weight : new_weight) {
170     if (weight.second == CEPH_OSD_OUT && !previous->is_out(weight.first))
171       n++;  // marked out
172     else if (weight.second != CEPH_OSD_OUT && previous->is_out(weight.first))
173       n--;  // marked in
174   }
175   return n;
176 }
177
178 int OSDMap::Incremental::get_net_marked_down(const OSDMap *previous) const
179 {
180   int n = 0;
181   for (auto &state : new_state) { // 
182     if (state.second & CEPH_OSD_UP) {
183       if (previous->is_up(state.first))
184         n++;  // marked down
185       else
186         n--;  // marked up
187     }
188   }
189   return n;
190 }
191
192 int OSDMap::Incremental::identify_osd(uuid_d u) const
193 {
194   for (auto &uuid : new_uuid)
195     if (uuid.second == u)
196       return uuid.first;
197   return -1;
198 }
199
200 int OSDMap::Incremental::propagate_snaps_to_tiers(CephContext *cct,
201                                                   const OSDMap& osdmap)
202 {
203   assert(epoch == osdmap.get_epoch() + 1);
204
205   for (auto &new_pool : new_pools) {
206     if (!new_pool.second.tiers.empty()) {
207       pg_pool_t& base = new_pool.second;
208
209       for (const auto &tier_pool : base.tiers) {
210         const auto &r = new_pools.find(tier_pool);
211         pg_pool_t *tier = 0;
212         if (r == new_pools.end()) {
213           const pg_pool_t *orig = osdmap.get_pg_pool(tier_pool);
214           if (!orig) {
215             lderr(cct) << __func__ << " no pool " << tier_pool << dendl;
216             return -EIO;
217           }
218           tier = get_new_pool(tier_pool, orig);
219         } else {
220           tier = &r->second;
221         }
222         if (tier->tier_of != new_pool.first) {
223           lderr(cct) << __func__ << " " << r->first << " tier_of != " << new_pool.first << dendl;
224           return -EIO;
225         }
226
227         ldout(cct, 10) << __func__ << " from " << new_pool.first << " to "
228                        << tier_pool << dendl;
229         tier->snap_seq = base.snap_seq;
230         tier->snap_epoch = base.snap_epoch;
231         tier->snaps = base.snaps;
232         tier->removed_snaps = base.removed_snaps;
233       }
234     }
235   }
236   return 0;
237 }
238
239
240 bool OSDMap::subtree_is_down(int id, set<int> *down_cache) const
241 {
242   if (id >= 0)
243     return is_down(id);
244
245   if (down_cache &&
246       down_cache->count(id)) {
247     return true;
248   }
249
250   list<int> children;
251   crush->get_children(id, &children);
252   for (const auto &child : children) {
253     if (!subtree_is_down(child, down_cache)) {
254       return false;
255     }
256   }
257   if (down_cache) {
258     down_cache->insert(id);
259   }
260   return true;
261 }
262
263 bool OSDMap::containing_subtree_is_down(CephContext *cct, int id, int subtree_type, set<int> *down_cache) const
264 {
265   // use a stack-local down_cache if we didn't get one from the
266   // caller.  then at least this particular call will avoid duplicated
267   // work.
268   set<int> local_down_cache;
269   if (!down_cache) {
270     down_cache = &local_down_cache;
271   }
272
273   int current = id;
274   while (true) {
275     int type;
276     if (current >= 0) {
277       type = 0;
278     } else {
279       type = crush->get_bucket_type(current);
280     }
281     assert(type >= 0);
282
283     if (!subtree_is_down(current, down_cache)) {
284       ldout(cct, 30) << "containing_subtree_is_down(" << id << ") = false" << dendl;
285       return false;
286     }
287
288     // is this a big enough subtree to be marked as down?
289     if (type >= subtree_type) {
290       ldout(cct, 30) << "containing_subtree_is_down(" << id << ") = true ... " << type << " >= " << subtree_type << dendl;
291       return true;
292     }
293
294     int r = crush->get_immediate_parent_id(current, &current);
295     if (r < 0) {
296       return false;
297     }
298   }
299 }
300
301 bool OSDMap::subtree_type_is_down(
302   CephContext *cct,
303   int id,
304   int subtree_type,
305   set<int> *down_in_osds,
306   set<int> *up_in_osds,
307   set<int> *subtree_up,
308   unordered_map<int, set<int> > *subtree_type_down) const
309 {
310   if (id >= 0) {
311     bool is_down_ret = is_down(id);
312     if (!is_out(id)) {
313       if (is_down_ret) {
314         down_in_osds->insert(id);
315       } else {
316         up_in_osds->insert(id);
317       }
318     }
319     return is_down_ret;
320   }
321
322   if (subtree_type_down &&
323       (*subtree_type_down)[subtree_type].count(id)) {
324     return true;
325   }
326
327   list<int> children;
328   crush->get_children(id, &children);
329   for (const auto &child : children) {
330     if (!subtree_type_is_down(
331           cct, child, crush->get_bucket_type(child),
332           down_in_osds, up_in_osds, subtree_up, subtree_type_down)) {
333       subtree_up->insert(id);
334       return false;
335     }
336   }
337   if (subtree_type_down) {
338     (*subtree_type_down)[subtree_type].insert(id);
339   }
340   return true;
341 }
342
343 void OSDMap::Incremental::encode_client_old(bufferlist& bl) const
344 {
345   __u16 v = 5;
346   ::encode(v, bl);
347   ::encode(fsid, bl);
348   ::encode(epoch, bl);
349   ::encode(modified, bl);
350   int32_t new_t = new_pool_max;
351   ::encode(new_t, bl);
352   ::encode(new_flags, bl);
353   ::encode(fullmap, bl);
354   ::encode(crush, bl);
355
356   ::encode(new_max_osd, bl);
357   // for ::encode(new_pools, bl);
358   __u32 n = new_pools.size();
359   ::encode(n, bl);
360   for (const auto &new_pool : new_pools) {
361     n = new_pool.first;
362     ::encode(n, bl);
363     ::encode(new_pool.second, bl, 0);
364   }
365   // for ::encode(new_pool_names, bl);
366   n = new_pool_names.size();
367   ::encode(n, bl);
368
369   for (const auto &new_pool_name : new_pool_names) {
370     n = new_pool_name.first;
371     ::encode(n, bl);
372     ::encode(new_pool_name.second, bl);
373   }
374   // for ::encode(old_pools, bl);
375   n = old_pools.size();
376   ::encode(n, bl);
377   for (auto &old_pool : old_pools) {
378     n = old_pool;
379     ::encode(n, bl);
380   }
381   ::encode(new_up_client, bl, 0);
382   {
383     // legacy is map<int32_t,uint8_t>
384     uint32_t n = new_state.size();
385     ::encode(n, bl);
386     for (auto p : new_state) {
387       ::encode(p.first, bl);
388       ::encode((uint8_t)p.second, bl);
389     }
390   }
391   ::encode(new_weight, bl);
392   // for ::encode(new_pg_temp, bl);
393   n = new_pg_temp.size();
394   ::encode(n, bl);
395
396   for (const auto &pg_temp : new_pg_temp) {
397     old_pg_t opg = pg_temp.first.get_old_pg();
398     ::encode(opg, bl);
399     ::encode(pg_temp.second, bl);
400   }
401 }
402
403 void OSDMap::Incremental::encode_classic(bufferlist& bl, uint64_t features) const
404 {
405   if ((features & CEPH_FEATURE_PGID64) == 0) {
406     encode_client_old(bl);
407     return;
408   }
409
410   // base
411   __u16 v = 6;
412   ::encode(v, bl);
413   ::encode(fsid, bl);
414   ::encode(epoch, bl);
415   ::encode(modified, bl);
416   ::encode(new_pool_max, bl);
417   ::encode(new_flags, bl);
418   ::encode(fullmap, bl);
419   ::encode(crush, bl);
420
421   ::encode(new_max_osd, bl);
422   ::encode(new_pools, bl, features);
423   ::encode(new_pool_names, bl);
424   ::encode(old_pools, bl);
425   ::encode(new_up_client, bl, features);
426   {
427     uint32_t n = new_state.size();
428     ::encode(n, bl);
429     for (auto p : new_state) {
430       ::encode(p.first, bl);
431       ::encode((uint8_t)p.second, bl);
432     }
433   }
434   ::encode(new_weight, bl);
435   ::encode(new_pg_temp, bl);
436
437   // extended
438   __u16 ev = 10;
439   ::encode(ev, bl);
440   ::encode(new_hb_back_up, bl, features);
441   ::encode(new_up_thru, bl);
442   ::encode(new_last_clean_interval, bl);
443   ::encode(new_lost, bl);
444   ::encode(new_blacklist, bl, features);
445   ::encode(old_blacklist, bl, features);
446   ::encode(new_up_cluster, bl, features);
447   ::encode(cluster_snapshot, bl);
448   ::encode(new_uuid, bl);
449   ::encode(new_xinfo, bl);
450   ::encode(new_hb_front_up, bl, features);
451 }
452
453 void OSDMap::Incremental::encode(bufferlist& bl, uint64_t features) const
454 {
455   if ((features & CEPH_FEATURE_OSDMAP_ENC) == 0) {
456     encode_classic(bl, features);
457     return;
458   }
459
460   // only a select set of callers should *ever* be encoding new
461   // OSDMaps.  others should be passing around the canonical encoded
462   // buffers from on high.  select out those callers by passing in an
463   // "impossible" feature bit.
464   assert(features & CEPH_FEATURE_RESERVED);
465   features &= ~CEPH_FEATURE_RESERVED;
466
467   size_t start_offset = bl.length();
468   size_t tail_offset;
469   buffer::list::iterator crc_it;
470
471   // meta-encoding: how we include client-used and osd-specific data
472   ENCODE_START(8, 7, bl);
473
474   {
475     uint8_t v = 5;
476     if (!HAVE_FEATURE(features, SERVER_LUMINOUS)) {
477       v = 3;
478     }
479     ENCODE_START(v, 1, bl); // client-usable data
480     ::encode(fsid, bl);
481     ::encode(epoch, bl);
482     ::encode(modified, bl);
483     ::encode(new_pool_max, bl);
484     ::encode(new_flags, bl);
485     ::encode(fullmap, bl);
486     ::encode(crush, bl);
487
488     ::encode(new_max_osd, bl);
489     ::encode(new_pools, bl, features);
490     ::encode(new_pool_names, bl);
491     ::encode(old_pools, bl);
492     ::encode(new_up_client, bl, features);
493     if (v >= 5) {
494       ::encode(new_state, bl);
495     } else {
496       uint32_t n = new_state.size();
497       ::encode(n, bl);
498       for (auto p : new_state) {
499         ::encode(p.first, bl);
500         ::encode((uint8_t)p.second, bl);
501       }
502     }
503     ::encode(new_weight, bl);
504     ::encode(new_pg_temp, bl);
505     ::encode(new_primary_temp, bl);
506     ::encode(new_primary_affinity, bl);
507     ::encode(new_erasure_code_profiles, bl);
508     ::encode(old_erasure_code_profiles, bl);
509     if (v >= 4) {
510       ::encode(new_pg_upmap, bl);
511       ::encode(old_pg_upmap, bl);
512       ::encode(new_pg_upmap_items, bl);
513       ::encode(old_pg_upmap_items, bl);
514     }
515     ENCODE_FINISH(bl); // client-usable data
516   }
517
518   {
519     uint8_t target_v = 6;
520     if (!HAVE_FEATURE(features, SERVER_LUMINOUS)) {
521       target_v = 2;
522     }
523     ENCODE_START(target_v, 1, bl); // extended, osd-only data
524     ::encode(new_hb_back_up, bl, features);
525     ::encode(new_up_thru, bl);
526     ::encode(new_last_clean_interval, bl);
527     ::encode(new_lost, bl);
528     ::encode(new_blacklist, bl, features);
529     ::encode(old_blacklist, bl, features);
530     ::encode(new_up_cluster, bl, features);
531     ::encode(cluster_snapshot, bl);
532     ::encode(new_uuid, bl);
533     ::encode(new_xinfo, bl);
534     ::encode(new_hb_front_up, bl, features);
535     ::encode(features, bl);         // NOTE: features arg, not the member
536     if (target_v >= 3) {
537       ::encode(new_nearfull_ratio, bl);
538       ::encode(new_full_ratio, bl);
539       ::encode(new_backfillfull_ratio, bl);
540     }
541     // 5 was string-based new_require_min_compat_client
542     if (target_v >= 6) {
543       ::encode(new_require_min_compat_client, bl);
544       ::encode(new_require_osd_release, bl);
545     }
546     ENCODE_FINISH(bl); // osd-only data
547   }
548
549   ::encode((uint32_t)0, bl); // dummy inc_crc
550   crc_it = bl.end();
551   crc_it.advance(-4);
552   tail_offset = bl.length();
553
554   ::encode(full_crc, bl);
555
556   ENCODE_FINISH(bl); // meta-encoding wrapper
557
558   // fill in crc
559   bufferlist front;
560   front.substr_of(bl, start_offset, crc_it.get_off() - start_offset);
561   inc_crc = front.crc32c(-1);
562   bufferlist tail;
563   tail.substr_of(bl, tail_offset, bl.length() - tail_offset);
564   inc_crc = tail.crc32c(inc_crc);
565   ceph_le32 crc_le;
566   crc_le = inc_crc;
567   crc_it.copy_in(4, (char*)&crc_le);
568   have_crc = true;
569 }
570
571 void OSDMap::Incremental::decode_classic(bufferlist::iterator &p)
572 {
573   __u32 n, t;
574   // base
575   __u16 v;
576   ::decode(v, p);
577   ::decode(fsid, p);
578   ::decode(epoch, p);
579   ::decode(modified, p);
580   if (v == 4 || v == 5) {
581     ::decode(n, p);
582     new_pool_max = n;
583   } else if (v >= 6)
584     ::decode(new_pool_max, p);
585   ::decode(new_flags, p);
586   ::decode(fullmap, p);
587   ::decode(crush, p);
588
589   ::decode(new_max_osd, p);
590   if (v < 6) {
591     new_pools.clear();
592     ::decode(n, p);
593     while (n--) {
594       ::decode(t, p);
595       ::decode(new_pools[t], p);
596     }
597   } else {
598     ::decode(new_pools, p);
599   }
600   if (v == 5) {
601     new_pool_names.clear();
602     ::decode(n, p);
603     while (n--) {
604       ::decode(t, p);
605       ::decode(new_pool_names[t], p);
606     }
607   } else if (v >= 6) {
608     ::decode(new_pool_names, p);
609   }
610   if (v < 6) {
611     old_pools.clear();
612     ::decode(n, p);
613     while (n--) {
614       ::decode(t, p);
615       old_pools.insert(t);
616     }
617   } else {
618     ::decode(old_pools, p);
619   }
620   ::decode(new_up_client, p);
621   {
622     map<int32_t,uint8_t> ns;
623     ::decode(ns, p);
624     for (auto q : ns) {
625       new_state[q.first] = q.second;
626     }
627   }
628   ::decode(new_weight, p);
629
630   if (v < 6) {
631     new_pg_temp.clear();
632     ::decode(n, p);
633     while (n--) {
634       old_pg_t opg;
635       ::decode_raw(opg, p);
636       ::decode(new_pg_temp[pg_t(opg)], p);
637     }
638   } else {
639     ::decode(new_pg_temp, p);
640   }
641
642   // decode short map, too.
643   if (v == 5 && p.end())
644     return;
645
646   // extended
647   __u16 ev = 0;
648   if (v >= 5)
649     ::decode(ev, p);
650   ::decode(new_hb_back_up, p);
651   if (v < 5)
652     ::decode(new_pool_names, p);
653   ::decode(new_up_thru, p);
654   ::decode(new_last_clean_interval, p);
655   ::decode(new_lost, p);
656   ::decode(new_blacklist, p);
657   ::decode(old_blacklist, p);
658   if (ev >= 6)
659     ::decode(new_up_cluster, p);
660   if (ev >= 7)
661     ::decode(cluster_snapshot, p);
662   if (ev >= 8)
663     ::decode(new_uuid, p);
664   if (ev >= 9)
665     ::decode(new_xinfo, p);
666   if (ev >= 10)
667     ::decode(new_hb_front_up, p);
668 }
669
670 void OSDMap::Incremental::decode(bufferlist::iterator& bl)
671 {
672   /**
673    * Older encodings of the Incremental had a single struct_v which
674    * covered the whole encoding, and was prior to our modern
675    * stuff which includes a compatv and a size. So if we see
676    * a struct_v < 7, we must rewind to the beginning and use our
677    * classic decoder.
678    */
679   size_t start_offset = bl.get_off();
680   size_t tail_offset = 0;
681   bufferlist crc_front, crc_tail;
682
683   DECODE_START_LEGACY_COMPAT_LEN(8, 7, 7, bl); // wrapper
684   if (struct_v < 7) {
685     int struct_v_size = sizeof(struct_v);
686     bl.advance(-struct_v_size);
687     decode_classic(bl);
688     encode_features = 0;
689     if (struct_v >= 6)
690       encode_features = CEPH_FEATURE_PGID64;
691     else
692       encode_features = 0;
693     return;
694   }
695   {
696     DECODE_START(5, bl); // client-usable data
697     ::decode(fsid, bl);
698     ::decode(epoch, bl);
699     ::decode(modified, bl);
700     ::decode(new_pool_max, bl);
701     ::decode(new_flags, bl);
702     ::decode(fullmap, bl);
703     ::decode(crush, bl);
704
705     ::decode(new_max_osd, bl);
706     ::decode(new_pools, bl);
707     ::decode(new_pool_names, bl);
708     ::decode(old_pools, bl);
709     ::decode(new_up_client, bl);
710     if (struct_v >= 5) {
711       ::decode(new_state, bl);
712     } else {
713       map<int32_t,uint8_t> ns;
714       ::decode(ns, bl);
715       for (auto q : ns) {
716         new_state[q.first] = q.second;
717       }
718     }
719     ::decode(new_weight, bl);
720     ::decode(new_pg_temp, bl);
721     ::decode(new_primary_temp, bl);
722     if (struct_v >= 2)
723       ::decode(new_primary_affinity, bl);
724     else
725       new_primary_affinity.clear();
726     if (struct_v >= 3) {
727       ::decode(new_erasure_code_profiles, bl);
728       ::decode(old_erasure_code_profiles, bl);
729     } else {
730       new_erasure_code_profiles.clear();
731       old_erasure_code_profiles.clear();
732     }
733     if (struct_v >= 4) {
734       ::decode(new_pg_upmap, bl);
735       ::decode(old_pg_upmap, bl);
736       ::decode(new_pg_upmap_items, bl);
737       ::decode(old_pg_upmap_items, bl);
738     }
739     DECODE_FINISH(bl); // client-usable data
740   }
741
742   {
743     DECODE_START(6, bl); // extended, osd-only data
744     ::decode(new_hb_back_up, bl);
745     ::decode(new_up_thru, bl);
746     ::decode(new_last_clean_interval, bl);
747     ::decode(new_lost, bl);
748     ::decode(new_blacklist, bl);
749     ::decode(old_blacklist, bl);
750     ::decode(new_up_cluster, bl);
751     ::decode(cluster_snapshot, bl);
752     ::decode(new_uuid, bl);
753     ::decode(new_xinfo, bl);
754     ::decode(new_hb_front_up, bl);
755     if (struct_v >= 2)
756       ::decode(encode_features, bl);
757     else
758       encode_features = CEPH_FEATURE_PGID64 | CEPH_FEATURE_OSDMAP_ENC;
759     if (struct_v >= 3) {
760       ::decode(new_nearfull_ratio, bl);
761       ::decode(new_full_ratio, bl);
762     } else {
763       new_nearfull_ratio = -1;
764       new_full_ratio = -1;
765     }
766     if (struct_v >= 4) {
767       ::decode(new_backfillfull_ratio, bl);
768     } else {
769       new_backfillfull_ratio = -1;
770     }
771     if (struct_v == 5) {
772       string r;
773       ::decode(r, bl);
774       if (r.length()) {
775         new_require_min_compat_client = ceph_release_from_name(r.c_str());
776       }
777     }
778     if (struct_v >= 6) {
779       ::decode(new_require_min_compat_client, bl);
780       ::decode(new_require_osd_release, bl);
781     } else {
782       if (new_flags >= 0 && (new_flags & CEPH_OSDMAP_REQUIRE_LUMINOUS)) {
783         // only for compat with post-kraken pre-luminous test clusters
784         new_require_osd_release = CEPH_RELEASE_LUMINOUS;
785         new_flags &= ~(CEPH_OSDMAP_LEGACY_REQUIRE_FLAGS);
786       } else if (new_flags >= 0 && (new_flags & CEPH_OSDMAP_REQUIRE_KRAKEN)) {
787         new_require_osd_release = CEPH_RELEASE_KRAKEN;
788       } else if (new_flags >= 0 && (new_flags & CEPH_OSDMAP_REQUIRE_JEWEL)) {
789         new_require_osd_release = CEPH_RELEASE_JEWEL;
790       } else {
791         new_require_osd_release = -1;
792       }
793     }
794     DECODE_FINISH(bl); // osd-only data
795   }
796
797   if (struct_v >= 8) {
798     have_crc = true;
799     crc_front.substr_of(bl.get_bl(), start_offset, bl.get_off() - start_offset);
800     ::decode(inc_crc, bl);
801     tail_offset = bl.get_off();
802     ::decode(full_crc, bl);
803   } else {
804     have_crc = false;
805     full_crc = 0;
806     inc_crc = 0;
807   }
808
809   DECODE_FINISH(bl); // wrapper
810
811   if (have_crc) {
812     // verify crc
813     uint32_t actual = crc_front.crc32c(-1);
814     if (tail_offset < bl.get_off()) {
815       bufferlist tail;
816       tail.substr_of(bl.get_bl(), tail_offset, bl.get_off() - tail_offset);
817       actual = tail.crc32c(actual);
818     }
819     if (inc_crc != actual) {
820       ostringstream ss;
821       ss << "bad crc, actual " << actual << " != expected " << inc_crc;
822       string s = ss.str();
823       throw buffer::malformed_input(s.c_str());
824     }
825   }
826 }
827
828 void OSDMap::Incremental::dump(Formatter *f) const
829 {
830   f->dump_int("epoch", epoch);
831   f->dump_stream("fsid") << fsid;
832   f->dump_stream("modified") << modified;
833   f->dump_int("new_pool_max", new_pool_max);
834   f->dump_int("new_flags", new_flags);
835   f->dump_float("new_full_ratio", new_full_ratio);
836   f->dump_float("new_nearfull_ratio", new_nearfull_ratio);
837   f->dump_float("new_backfillfull_ratio", new_backfillfull_ratio);
838   f->dump_int("new_require_min_compat_client", new_require_min_compat_client);
839   f->dump_int("new_require_osd_release", new_require_osd_release);
840
841   if (fullmap.length()) {
842     f->open_object_section("full_map");
843     OSDMap full;
844     bufferlist fbl = fullmap;  // kludge around constness.
845     auto p = fbl.begin();
846     full.decode(p);
847     full.dump(f);
848     f->close_section();
849   }
850   if (crush.length()) {
851     f->open_object_section("crush");
852     CrushWrapper c;
853     bufferlist tbl = crush;  // kludge around constness.
854     auto p = tbl.begin();
855     c.decode(p);
856     c.dump(f);
857     f->close_section();
858   }
859
860   f->dump_int("new_max_osd", new_max_osd);
861
862   f->open_array_section("new_pools");
863
864   for (const auto &new_pool : new_pools) {
865     f->open_object_section("pool");
866     f->dump_int("pool", new_pool.first);
867     new_pool.second.dump(f);
868     f->close_section();
869   }
870   f->close_section();
871   f->open_array_section("new_pool_names");
872
873   for (const auto &new_pool_name : new_pool_names) {
874     f->open_object_section("pool_name");
875     f->dump_int("pool", new_pool_name.first);
876     f->dump_string("name", new_pool_name.second);
877     f->close_section();
878   }
879   f->close_section();
880   f->open_array_section("old_pools");
881
882   for (const auto &old_pool : old_pools)
883     f->dump_int("pool", old_pool);
884   f->close_section();
885
886   f->open_array_section("new_up_osds");
887
888   for (const auto &upclient : new_up_client) {
889     f->open_object_section("osd");
890     f->dump_int("osd", upclient.first);
891     f->dump_stream("public_addr") << upclient.second;
892     f->dump_stream("cluster_addr") << new_up_cluster.find(upclient.first)->second;
893     f->dump_stream("heartbeat_back_addr") << new_hb_back_up.find(upclient.first)->second;
894     map<int32_t, entity_addr_t>::const_iterator q;
895     if ((q = new_hb_front_up.find(upclient.first)) != new_hb_front_up.end())
896       f->dump_stream("heartbeat_front_addr") << q->second;
897     f->close_section();
898   }
899   f->close_section();
900
901   f->open_array_section("new_weight");
902
903   for (const auto &weight : new_weight) {
904     f->open_object_section("osd");
905     f->dump_int("osd", weight.first);
906     f->dump_int("weight", weight.second);
907     f->close_section();
908   }
909   f->close_section();
910
911   f->open_array_section("osd_state_xor");
912   for (const auto &ns : new_state) {
913     f->open_object_section("osd");
914     f->dump_int("osd", ns.first);
915     set<string> st;
916     calc_state_set(new_state.find(ns.first)->second, st);
917     f->open_array_section("state_xor");
918     for (auto &state : st)
919       f->dump_string("state", state);
920     f->close_section();
921     f->close_section();
922   }
923   f->close_section();
924
925   f->open_array_section("new_pg_temp");
926
927   for (const auto &pg_temp : new_pg_temp) {
928     f->open_object_section("pg");
929     f->dump_stream("pgid") << pg_temp.first;
930     f->open_array_section("osds");
931
932     for (const auto &osd : pg_temp.second)
933       f->dump_int("osd", osd);
934     f->close_section();
935     f->close_section();    
936   }
937   f->close_section();
938
939   f->open_array_section("primary_temp");
940
941   for (const auto &primary_temp : new_primary_temp) {
942     f->dump_stream("pgid") << primary_temp.first;
943     f->dump_int("osd", primary_temp.second);
944   }
945   f->close_section(); // primary_temp
946
947   f->open_array_section("new_pg_upmap");
948   for (auto& i : new_pg_upmap) {
949     f->open_object_section("mapping");
950     f->dump_stream("pgid") << i.first;
951     f->open_array_section("osds");
952     for (auto osd : i.second) {
953       f->dump_int("osd", osd);
954     }
955     f->close_section();
956     f->close_section();
957   }
958   f->close_section();
959   f->open_array_section("old_pg_upmap");
960   for (auto& i : old_pg_upmap) {
961     f->dump_stream("pgid") << i;
962   }
963   f->close_section();
964
965   f->open_array_section("new_pg_upmap_items");
966   for (auto& i : new_pg_upmap_items) {
967     f->open_object_section("mapping");
968     f->dump_stream("pgid") << i.first;
969     f->open_array_section("mappings");
970     for (auto& p : i.second) {
971       f->open_object_section("mapping");
972       f->dump_int("from", p.first);
973       f->dump_int("to", p.second);
974       f->close_section();
975     }
976     f->close_section();
977     f->close_section();
978   }
979   f->close_section();
980   f->open_array_section("old_pg_upmap_items");
981   for (auto& i : old_pg_upmap_items) {
982     f->dump_stream("pgid") << i;
983   }
984   f->close_section();
985
986   f->open_array_section("new_up_thru");
987
988   for (const auto &up_thru : new_up_thru) {
989     f->open_object_section("osd");
990     f->dump_int("osd", up_thru.first);
991     f->dump_int("up_thru", up_thru.second);
992     f->close_section();
993   }
994   f->close_section();
995
996   f->open_array_section("new_lost");
997
998   for (const auto &lost : new_lost) {
999     f->open_object_section("osd");
1000     f->dump_int("osd", lost.first);
1001     f->dump_int("epoch_lost", lost.second);
1002     f->close_section();
1003   }
1004   f->close_section();
1005
1006   f->open_array_section("new_last_clean_interval");
1007
1008   for (const auto &last_clean_interval : new_last_clean_interval) {
1009     f->open_object_section("osd");
1010     f->dump_int("osd", last_clean_interval.first);
1011     f->dump_int("first", last_clean_interval.second.first);
1012     f->dump_int("last", last_clean_interval.second.second);
1013     f->close_section();
1014   }
1015   f->close_section();
1016
1017   f->open_array_section("new_blacklist");
1018   for (const auto &blist : new_blacklist) {
1019     stringstream ss;
1020     ss << blist.first;
1021     f->dump_stream(ss.str().c_str()) << blist.second;
1022   }
1023   f->close_section();
1024   f->open_array_section("old_blacklist");
1025   for (const auto &blist : old_blacklist)
1026     f->dump_stream("addr") << blist;
1027   f->close_section();
1028
1029   f->open_array_section("new_xinfo");
1030   for (const auto &xinfo : new_xinfo) {
1031     f->open_object_section("xinfo");
1032     f->dump_int("osd", xinfo.first);
1033     xinfo.second.dump(f);
1034     f->close_section();
1035   }
1036   f->close_section();
1037
1038   if (cluster_snapshot.size())
1039     f->dump_string("cluster_snapshot", cluster_snapshot);
1040
1041   f->open_array_section("new_uuid");
1042   for (const auto &uuid : new_uuid) {
1043     f->open_object_section("osd");
1044     f->dump_int("osd", uuid.first);
1045     f->dump_stream("uuid") << uuid.second;
1046     f->close_section();
1047   }
1048   f->close_section();
1049
1050   OSDMap::dump_erasure_code_profiles(new_erasure_code_profiles, f);
1051   f->open_array_section("old_erasure_code_profiles");
1052   for (const auto &erasure_code_profile : old_erasure_code_profiles) {
1053     f->dump_string("old", erasure_code_profile.c_str());
1054   }
1055   f->close_section();
1056 }
1057
1058 void OSDMap::Incremental::generate_test_instances(list<Incremental*>& o)
1059 {
1060   o.push_back(new Incremental);
1061 }
1062
1063 // ----------------------------------
1064 // OSDMap
1065
1066 void OSDMap::set_epoch(epoch_t e)
1067 {
1068   epoch = e;
1069   for (auto &pool : pools)
1070     pool.second.last_change = e;
1071 }
1072
1073 bool OSDMap::is_blacklisted(const entity_addr_t& a) const
1074 {
1075   if (blacklist.empty())
1076     return false;
1077
1078   // this specific instance?
1079   if (blacklist.count(a))
1080     return true;
1081
1082   // is entire ip blacklisted?
1083   if (a.is_ip()) {
1084     entity_addr_t b = a;
1085     b.set_port(0);
1086     b.set_nonce(0);
1087     if (blacklist.count(b)) {
1088       return true;
1089     }
1090   }
1091
1092   return false;
1093 }
1094
1095 void OSDMap::get_blacklist(list<pair<entity_addr_t,utime_t> > *bl) const
1096 {
1097    std::copy(blacklist.begin(), blacklist.end(), std::back_inserter(*bl));
1098 }
1099
1100 void OSDMap::get_blacklist(std::set<entity_addr_t> *bl) const
1101 {
1102   for (const auto &i : blacklist) {
1103     bl->insert(i.first);
1104   }
1105 }
1106
1107 void OSDMap::set_max_osd(int m)
1108 {
1109   int o = max_osd;
1110   max_osd = m;
1111   osd_state.resize(m);
1112   osd_weight.resize(m);
1113   for (; o<max_osd; o++) {
1114     osd_state[o] = 0;
1115     osd_weight[o] = CEPH_OSD_OUT;
1116   }
1117   osd_info.resize(m);
1118   osd_xinfo.resize(m);
1119   osd_addrs->client_addr.resize(m);
1120   osd_addrs->cluster_addr.resize(m);
1121   osd_addrs->hb_back_addr.resize(m);
1122   osd_addrs->hb_front_addr.resize(m);
1123   osd_uuid->resize(m);
1124   if (osd_primary_affinity)
1125     osd_primary_affinity->resize(m, CEPH_OSD_DEFAULT_PRIMARY_AFFINITY);
1126
1127   calc_num_osds();
1128 }
1129
1130 int OSDMap::calc_num_osds()
1131 {
1132   num_osd = 0;
1133   num_up_osd = 0;
1134   num_in_osd = 0;
1135   for (int i=0; i<max_osd; i++) {
1136     if (osd_state[i] & CEPH_OSD_EXISTS) {
1137       ++num_osd;
1138       if (osd_state[i] & CEPH_OSD_UP) {
1139         ++num_up_osd;
1140       }
1141       if (get_weight(i) != CEPH_OSD_OUT) {
1142         ++num_in_osd;
1143       }
1144     }
1145   }
1146   return num_osd;
1147 }
1148
1149 void OSDMap::get_full_pools(CephContext *cct,
1150                             set<int64_t> *full,
1151                             set<int64_t> *backfillfull,
1152                             set<int64_t> *nearfull) const
1153 {
1154   assert(full);
1155   assert(backfillfull);
1156   assert(nearfull);
1157   full->clear();
1158   backfillfull->clear();
1159   nearfull->clear();
1160
1161   vector<int> full_osds;
1162   vector<int> backfillfull_osds;
1163   vector<int> nearfull_osds;
1164   for (int i = 0; i < max_osd; ++i) {
1165     if (exists(i) && is_up(i) && is_in(i)) {
1166       if (osd_state[i] & CEPH_OSD_FULL)
1167         full_osds.push_back(i);
1168       else if (osd_state[i] & CEPH_OSD_BACKFILLFULL)
1169         backfillfull_osds.push_back(i);
1170       else if (osd_state[i] & CEPH_OSD_NEARFULL)
1171         nearfull_osds.push_back(i);
1172     }
1173   }
1174
1175   for (auto i: full_osds) {
1176     get_pool_ids_by_osd(cct, i, full);
1177   }
1178   for (auto i: backfillfull_osds) {
1179     get_pool_ids_by_osd(cct, i, backfillfull);
1180   }
1181   for (auto i: nearfull_osds) {
1182     get_pool_ids_by_osd(cct, i, nearfull);
1183   }
1184 }
1185
1186 static bool get_osd_utilization(
1187   const mempool::pgmap::unordered_map<int32_t,osd_stat_t> &osd_stat,
1188   int id, int64_t* kb, int64_t* kb_used, int64_t* kb_avail)
1189 {
1190   auto p = osd_stat.find(id);
1191   if (p == osd_stat.end())
1192     return false;
1193   *kb = p->second.kb;
1194   *kb_used = p->second.kb_used;
1195   *kb_avail = p->second.kb_avail;
1196   return *kb > 0;
1197 }
1198
1199 void OSDMap::get_full_osd_util(
1200   const mempool::pgmap::unordered_map<int32_t,osd_stat_t> &osd_stat,
1201   map<int, float> *full, map<int, float> *backfill, map<int, float> *nearfull) const
1202 {
1203   full->clear();
1204   backfill->clear();
1205   nearfull->clear();
1206   for (int i = 0; i < max_osd; ++i) {
1207     if (exists(i) && is_up(i) && is_in(i)) {
1208       int64_t kb, kb_used, kb_avail;
1209       if (osd_state[i] & CEPH_OSD_FULL) {
1210         if (get_osd_utilization(osd_stat, i, &kb, &kb_used, &kb_avail))
1211           full->emplace(i, (float)kb_used / (float)kb);
1212       } else if (osd_state[i] & CEPH_OSD_BACKFILLFULL) {
1213         if (get_osd_utilization(osd_stat, i, &kb, &kb_used, &kb_avail))
1214           backfill->emplace(i, (float)kb_used / (float)kb);
1215       } else if (osd_state[i] & CEPH_OSD_NEARFULL) {
1216         if (get_osd_utilization(osd_stat, i, &kb, &kb_used, &kb_avail))
1217           nearfull->emplace(i, (float)kb_used / (float)kb);
1218       }
1219     }
1220   }
1221 }
1222
1223 void OSDMap::get_full_osd_counts(set<int> *full, set<int> *backfill,
1224                                  set<int> *nearfull) const
1225 {
1226   full->clear();
1227   backfill->clear();
1228   nearfull->clear();
1229   for (int i = 0; i < max_osd; ++i) {
1230     if (exists(i) && is_up(i) && is_in(i)) {
1231       if (osd_state[i] & CEPH_OSD_FULL)
1232         full->emplace(i);
1233       else if (osd_state[i] & CEPH_OSD_BACKFILLFULL)
1234         backfill->emplace(i);
1235       else if (osd_state[i] & CEPH_OSD_NEARFULL)
1236         nearfull->emplace(i);
1237     }
1238   }
1239 }
1240
1241 void OSDMap::get_all_osds(set<int32_t>& ls) const
1242 {
1243   for (int i=0; i<max_osd; i++)
1244     if (exists(i))
1245       ls.insert(i);
1246 }
1247
1248 void OSDMap::get_up_osds(set<int32_t>& ls) const
1249 {
1250   for (int i = 0; i < max_osd; i++) {
1251     if (is_up(i))
1252       ls.insert(i);
1253   }
1254 }
1255
1256 void OSDMap::get_out_osds(set<int32_t>& ls) const
1257 {
1258   for (int i = 0; i < max_osd; i++) {
1259     if (is_out(i))
1260       ls.insert(i);
1261   }
1262 }
1263
1264 void OSDMap::calc_state_set(int state, set<string>& st)
1265 {
1266   unsigned t = state;
1267   for (unsigned s = 1; t; s <<= 1) {
1268     if (t & s) {
1269       t &= ~s;
1270       st.insert(ceph_osd_state_name(s));
1271     }
1272   }
1273 }
1274
1275 void OSDMap::adjust_osd_weights(const map<int,double>& weights, Incremental& inc) const
1276 {
1277   float max = 0;
1278   for (const auto &weight : weights) {
1279     if (weight.second > max)
1280       max = weight.second;
1281   }
1282
1283   for (const auto &weight : weights) {
1284     inc.new_weight[weight.first] = (unsigned)((weight.second / max) * CEPH_OSD_IN);
1285   }
1286 }
1287
1288 int OSDMap::identify_osd(const entity_addr_t& addr) const
1289 {
1290   for (int i=0; i<max_osd; i++)
1291     if (exists(i) && (get_addr(i) == addr || get_cluster_addr(i) == addr))
1292       return i;
1293   return -1;
1294 }
1295
1296 int OSDMap::identify_osd(const uuid_d& u) const
1297 {
1298   for (int i=0; i<max_osd; i++)
1299     if (exists(i) && get_uuid(i) == u)
1300       return i;
1301   return -1;
1302 }
1303
1304 int OSDMap::identify_osd_on_all_channels(const entity_addr_t& addr) const
1305 {
1306   for (int i=0; i<max_osd; i++)
1307     if (exists(i) && (get_addr(i) == addr || get_cluster_addr(i) == addr ||
1308         get_hb_back_addr(i) == addr || get_hb_front_addr(i) == addr))
1309       return i;
1310   return -1;
1311 }
1312
1313 int OSDMap::find_osd_on_ip(const entity_addr_t& ip) const
1314 {
1315   for (int i=0; i<max_osd; i++)
1316     if (exists(i) && (get_addr(i).is_same_host(ip) || get_cluster_addr(i).is_same_host(ip)))
1317       return i;
1318   return -1;
1319 }
1320
1321
1322 uint64_t OSDMap::get_features(int entity_type, uint64_t *pmask) const
1323 {
1324   uint64_t features = 0;  // things we actually have
1325   uint64_t mask = 0;      // things we could have
1326
1327   if (crush->has_nondefault_tunables())
1328     features |= CEPH_FEATURE_CRUSH_TUNABLES;
1329   if (crush->has_nondefault_tunables2())
1330     features |= CEPH_FEATURE_CRUSH_TUNABLES2;
1331   if (crush->has_nondefault_tunables3())
1332     features |= CEPH_FEATURE_CRUSH_TUNABLES3;
1333   if (crush->has_v4_buckets())
1334     features |= CEPH_FEATURE_CRUSH_V4;
1335   if (crush->has_nondefault_tunables5())
1336     features |= CEPH_FEATURE_CRUSH_TUNABLES5;
1337   if (crush->has_incompat_choose_args()) {
1338     features |= CEPH_FEATUREMASK_CRUSH_CHOOSE_ARGS;
1339   }
1340   mask |= CEPH_FEATURES_CRUSH;
1341
1342   if (!pg_upmap.empty() || !pg_upmap_items.empty())
1343     features |= CEPH_FEATUREMASK_OSDMAP_PG_UPMAP;
1344   mask |= CEPH_FEATUREMASK_OSDMAP_PG_UPMAP;
1345
1346   for (auto &pool: pools) {
1347     if (pool.second.has_flag(pg_pool_t::FLAG_HASHPSPOOL)) {
1348       features |= CEPH_FEATURE_OSDHASHPSPOOL;
1349     }
1350     if (pool.second.is_erasure() &&
1351         entity_type != CEPH_ENTITY_TYPE_CLIENT) { // not for clients
1352       features |= CEPH_FEATURE_OSD_ERASURE_CODES;
1353     }
1354     if (!pool.second.tiers.empty() ||
1355         pool.second.is_tier()) {
1356       features |= CEPH_FEATURE_OSD_CACHEPOOL;
1357     }
1358     int ruleid = crush->find_rule(pool.second.get_crush_rule(),
1359                                   pool.second.get_type(),
1360                                   pool.second.get_size());
1361     if (ruleid >= 0) {
1362       if (crush->is_v2_rule(ruleid))
1363         features |= CEPH_FEATURE_CRUSH_V2;
1364       if (crush->is_v3_rule(ruleid))
1365         features |= CEPH_FEATURE_CRUSH_TUNABLES3;
1366       if (crush->is_v5_rule(ruleid))
1367         features |= CEPH_FEATURE_CRUSH_TUNABLES5;
1368     }
1369   }
1370   if (entity_type == CEPH_ENTITY_TYPE_OSD) {
1371     for (auto &erasure_code_profile : erasure_code_profiles) {
1372       auto& profile = erasure_code_profile.second;
1373       const auto& plugin = profile.find("plugin");
1374       if (plugin != profile.end()) {
1375         if (plugin->second == "isa" || plugin->second == "lrc")
1376           features |= CEPH_FEATURE_ERASURE_CODE_PLUGINS_V2;
1377         if (plugin->second == "shec")
1378           features |= CEPH_FEATURE_ERASURE_CODE_PLUGINS_V3;
1379       }
1380     }
1381   }
1382   mask |= CEPH_FEATURE_OSDHASHPSPOOL | CEPH_FEATURE_OSD_CACHEPOOL;
1383   if (entity_type != CEPH_ENTITY_TYPE_CLIENT)
1384     mask |= CEPH_FEATURE_OSD_ERASURE_CODES;
1385
1386   if (osd_primary_affinity) {
1387     for (int i = 0; i < max_osd; ++i) {
1388       if ((*osd_primary_affinity)[i] != CEPH_OSD_DEFAULT_PRIMARY_AFFINITY) {
1389         features |= CEPH_FEATURE_OSD_PRIMARY_AFFINITY;
1390         break;
1391       }
1392     }
1393   }
1394   mask |= CEPH_FEATURE_OSD_PRIMARY_AFFINITY;
1395
1396   if (entity_type == CEPH_ENTITY_TYPE_OSD) {
1397     const uint64_t jewel_features = CEPH_FEATURE_SERVER_JEWEL;
1398     if (require_osd_release >= CEPH_RELEASE_JEWEL) {
1399       features |= jewel_features;
1400     }
1401     mask |= jewel_features;
1402
1403     const uint64_t kraken_features = CEPH_FEATUREMASK_SERVER_KRAKEN
1404       | CEPH_FEATURE_MSG_ADDR2;
1405     if (require_osd_release >= CEPH_RELEASE_KRAKEN) {
1406       features |= kraken_features;
1407     }
1408     mask |= kraken_features;
1409   }
1410
1411   if (pmask)
1412     *pmask = mask;
1413   return features;
1414 }
1415
1416 uint8_t OSDMap::get_min_compat_client() const
1417 {
1418   uint64_t f = get_features(CEPH_ENTITY_TYPE_CLIENT, nullptr);
1419
1420   if (HAVE_FEATURE(f, OSDMAP_PG_UPMAP) ||      // v12.0.0-1733-g27d6f43
1421       HAVE_FEATURE(f, CRUSH_CHOOSE_ARGS)) {    // v12.0.1-2172-gef1ef28
1422     return CEPH_RELEASE_LUMINOUS;  // v12.2.0
1423   }
1424   if (HAVE_FEATURE(f, CRUSH_TUNABLES5)) {      // v10.0.0-612-g043a737
1425     return CEPH_RELEASE_JEWEL;     // v10.2.0
1426   }
1427   if (HAVE_FEATURE(f, CRUSH_V4)) {             // v0.91-678-g325fc56
1428     return CEPH_RELEASE_HAMMER;    // v0.94.0
1429   }
1430   if (HAVE_FEATURE(f, OSD_PRIMARY_AFFINITY) || // v0.76-553-gf825624
1431       HAVE_FEATURE(f, CRUSH_TUNABLES3) ||      // v0.76-395-ge20a55d
1432       HAVE_FEATURE(f, OSD_ERASURE_CODES) ||    // v0.73-498-gbfc86a8
1433       HAVE_FEATURE(f, OSD_CACHEPOOL)) {        // v0.67-401-gb91c1c5
1434     return CEPH_RELEASE_FIREFLY;   // v0.80.0
1435   }
1436   if (HAVE_FEATURE(f, CRUSH_TUNABLES2) ||      // v0.54-684-g0cc47ff
1437       HAVE_FEATURE(f, OSDHASHPSPOOL)) {        // v0.57-398-g8cc2b0f
1438     return CEPH_RELEASE_DUMPLING;  // v0.67.0
1439   }
1440   if (HAVE_FEATURE(f, CRUSH_TUNABLES)) {       // v0.48argonaut-206-g6f381af
1441     return CEPH_RELEASE_ARGONAUT;  // v0.48argonaut-206-g6f381af
1442   }
1443   return CEPH_RELEASE_ARGONAUT;    // v0.48argonaut-206-g6f381af
1444 }
1445
1446 void OSDMap::_calc_up_osd_features()
1447 {
1448   bool first = true;
1449   cached_up_osd_features = 0;
1450   for (int osd = 0; osd < max_osd; ++osd) {
1451     if (!is_up(osd))
1452       continue;
1453     const osd_xinfo_t &xi = get_xinfo(osd);
1454     if (xi.features == 0)
1455       continue;  // bogus xinfo, maybe #20751 or similar, skipping
1456     if (first) {
1457       cached_up_osd_features = xi.features;
1458       first = false;
1459     } else {
1460       cached_up_osd_features &= xi.features;
1461     }
1462   }
1463 }
1464
1465 uint64_t OSDMap::get_up_osd_features() const
1466 {
1467   return cached_up_osd_features;
1468 }
1469
1470 void OSDMap::dedup(const OSDMap *o, OSDMap *n)
1471 {
1472   if (o->epoch == n->epoch)
1473     return;
1474
1475   int diff = 0;
1476
1477   // do addrs match?
1478   if (o->max_osd != n->max_osd)
1479     diff++;
1480   for (int i = 0; i < o->max_osd && i < n->max_osd; i++) {
1481     if ( n->osd_addrs->client_addr[i] &&  o->osd_addrs->client_addr[i] &&
1482         *n->osd_addrs->client_addr[i] == *o->osd_addrs->client_addr[i])
1483       n->osd_addrs->client_addr[i] = o->osd_addrs->client_addr[i];
1484     else
1485       diff++;
1486     if ( n->osd_addrs->cluster_addr[i] &&  o->osd_addrs->cluster_addr[i] &&
1487         *n->osd_addrs->cluster_addr[i] == *o->osd_addrs->cluster_addr[i])
1488       n->osd_addrs->cluster_addr[i] = o->osd_addrs->cluster_addr[i];
1489     else
1490       diff++;
1491     if ( n->osd_addrs->hb_back_addr[i] &&  o->osd_addrs->hb_back_addr[i] &&
1492         *n->osd_addrs->hb_back_addr[i] == *o->osd_addrs->hb_back_addr[i])
1493       n->osd_addrs->hb_back_addr[i] = o->osd_addrs->hb_back_addr[i];
1494     else
1495       diff++;
1496     if ( n->osd_addrs->hb_front_addr[i] &&  o->osd_addrs->hb_front_addr[i] &&
1497         *n->osd_addrs->hb_front_addr[i] == *o->osd_addrs->hb_front_addr[i])
1498       n->osd_addrs->hb_front_addr[i] = o->osd_addrs->hb_front_addr[i];
1499     else
1500       diff++;
1501   }
1502   if (diff == 0) {
1503     // zoinks, no differences at all!
1504     n->osd_addrs = o->osd_addrs;
1505   }
1506
1507   // does crush match?
1508   bufferlist oc, nc;
1509   ::encode(*o->crush, oc, CEPH_FEATURES_SUPPORTED_DEFAULT);
1510   ::encode(*n->crush, nc, CEPH_FEATURES_SUPPORTED_DEFAULT);
1511   if (oc.contents_equal(nc)) {
1512     n->crush = o->crush;
1513   }
1514
1515   // does pg_temp match?
1516   if (*o->pg_temp == *n->pg_temp)
1517     n->pg_temp = o->pg_temp;
1518
1519   // does primary_temp match?
1520   if (o->primary_temp->size() == n->primary_temp->size()) {
1521     if (*o->primary_temp == *n->primary_temp)
1522       n->primary_temp = o->primary_temp;
1523   }
1524
1525   // do uuids match?
1526   if (o->osd_uuid->size() == n->osd_uuid->size() &&
1527       *o->osd_uuid == *n->osd_uuid)
1528     n->osd_uuid = o->osd_uuid;
1529 }
1530
1531 void OSDMap::clean_temps(CephContext *cct,
1532                          const OSDMap& osdmap, Incremental *pending_inc)
1533 {
1534   ldout(cct, 10) << __func__ << dendl;
1535   OSDMap tmpmap;
1536   tmpmap.deepish_copy_from(osdmap);
1537   tmpmap.apply_incremental(*pending_inc);
1538
1539   for (auto pg : *tmpmap.pg_temp) {
1540     // if pool does not exist, remove any existing pg_temps associated with
1541     // it.  we don't care about pg_temps on the pending_inc either; if there
1542     // are new_pg_temp entries on the pending, clear them out just as well.
1543     if (!osdmap.have_pg_pool(pg.first.pool())) {
1544       ldout(cct, 10) << __func__ << " removing pg_temp " << pg.first
1545                      << " for nonexistent pool " << pg.first.pool() << dendl;
1546       pending_inc->new_pg_temp[pg.first].clear();
1547       continue;
1548     }
1549     // all osds down?
1550     unsigned num_up = 0;
1551     for (auto o : pg.second) {
1552       if (!tmpmap.is_down(o)) {
1553         ++num_up;
1554         break;
1555       }
1556     }
1557     if (num_up == 0) {
1558       ldout(cct, 10) << __func__ << "  removing pg_temp " << pg.first
1559                      << " with all down osds" << pg.second << dendl;
1560       pending_inc->new_pg_temp[pg.first].clear();
1561       continue;
1562     }
1563     // redundant pg_temp?
1564     vector<int> raw_up;
1565     int primary;
1566     tmpmap.pg_to_raw_up(pg.first, &raw_up, &primary);
1567     if (vectors_equal(raw_up, pg.second)) {
1568       ldout(cct, 10) << __func__ << "  removing pg_temp " << pg.first << " "
1569                      << pg.second << " that matches raw_up mapping" << dendl;
1570       if (osdmap.pg_temp->count(pg.first))
1571         pending_inc->new_pg_temp[pg.first].clear();
1572       else
1573         pending_inc->new_pg_temp.erase(pg.first);
1574     }
1575   }
1576   
1577   for (auto &pg : *tmpmap.primary_temp) {
1578     // primary down?
1579     if (tmpmap.is_down(pg.second)) {
1580       ldout(cct, 10) << __func__ << "  removing primary_temp " << pg.first
1581                      << " to down " << pg.second << dendl;
1582       pending_inc->new_primary_temp[pg.first] = -1;
1583       continue;
1584     }
1585     // redundant primary_temp?
1586     vector<int> real_up, templess_up;
1587     int real_primary, templess_primary;
1588     pg_t pgid = pg.first;
1589     tmpmap.pg_to_acting_osds(pgid, &real_up, &real_primary);
1590     tmpmap.pg_to_raw_up(pgid, &templess_up, &templess_primary);
1591     if (real_primary == templess_primary){
1592       ldout(cct, 10) << __func__ << "  removing primary_temp "
1593                      << pgid << " -> " << real_primary
1594                      << " (unnecessary/redundant)" << dendl;
1595       if (osdmap.primary_temp->count(pgid))
1596         pending_inc->new_primary_temp[pgid] = -1;
1597       else
1598         pending_inc->new_primary_temp.erase(pgid);
1599     }
1600   }
1601 }
1602
1603 int OSDMap::apply_incremental(const Incremental &inc)
1604 {
1605   new_blacklist_entries = false;
1606   if (inc.epoch == 1)
1607     fsid = inc.fsid;
1608   else if (inc.fsid != fsid)
1609     return -EINVAL;
1610   
1611   assert(inc.epoch == epoch+1);
1612
1613   epoch++;
1614   modified = inc.modified;
1615
1616   // full map?
1617   if (inc.fullmap.length()) {
1618     bufferlist bl(inc.fullmap);
1619     decode(bl);
1620     return 0;
1621   }
1622
1623   // nope, incremental.
1624   if (inc.new_flags >= 0) {
1625     flags = inc.new_flags;
1626     // the below is just to cover a newly-upgraded luminous mon
1627     // cluster that has to set require_jewel_osds or
1628     // require_kraken_osds before the osds can be upgraded to
1629     // luminous.
1630     if (flags & CEPH_OSDMAP_REQUIRE_KRAKEN) {
1631       if (require_osd_release < CEPH_RELEASE_KRAKEN) {
1632         require_osd_release = CEPH_RELEASE_KRAKEN;
1633       }
1634     } else if (flags & CEPH_OSDMAP_REQUIRE_JEWEL) {
1635       if (require_osd_release < CEPH_RELEASE_JEWEL) {
1636         require_osd_release = CEPH_RELEASE_JEWEL;
1637       }
1638     }
1639   }
1640
1641   if (inc.new_max_osd >= 0)
1642     set_max_osd(inc.new_max_osd);
1643
1644   if (inc.new_pool_max != -1)
1645     pool_max = inc.new_pool_max;
1646
1647   for (const auto &pool : inc.new_pools) {
1648     pools[pool.first] = pool.second;
1649     pools[pool.first].last_change = epoch;
1650   }
1651
1652   for (const auto &pname : inc.new_pool_names) {
1653     auto pool_name_entry = pool_name.find(pname.first);
1654     if (pool_name_entry != pool_name.end()) {
1655       name_pool.erase(pool_name_entry->second);
1656       pool_name_entry->second = pname.second;
1657     } else {
1658       pool_name[pname.first] = pname.second;
1659     }
1660     name_pool[pname.second] = pname.first;
1661   }
1662   
1663   for (const auto &pool : inc.old_pools) {
1664     pools.erase(pool);
1665     name_pool.erase(pool_name[pool]);
1666     pool_name.erase(pool);
1667   }
1668
1669   for (const auto &weight : inc.new_weight) {
1670     set_weight(weight.first, weight.second);
1671
1672     // if we are marking in, clear the AUTOOUT and NEW bits, and clear
1673     // xinfo old_weight.
1674     if (weight.second) {
1675       osd_state[weight.first] &= ~(CEPH_OSD_AUTOOUT | CEPH_OSD_NEW);
1676       osd_xinfo[weight.first].old_weight = 0;
1677     }
1678   }
1679
1680   for (const auto &primary_affinity : inc.new_primary_affinity) {
1681     set_primary_affinity(primary_affinity.first, primary_affinity.second);
1682   }
1683
1684   // erasure_code_profiles
1685   for (const auto &profile : inc.old_erasure_code_profiles)
1686     erasure_code_profiles.erase(profile);
1687   
1688   for (const auto &profile : inc.new_erasure_code_profiles) {
1689     set_erasure_code_profile(profile.first, profile.second);
1690   }
1691   
1692   // up/down
1693   for (const auto &state : inc.new_state) {
1694     const auto osd = state.first;
1695     int s = state.second ? state.second : CEPH_OSD_UP;
1696     if ((osd_state[osd] & CEPH_OSD_UP) &&
1697         (s & CEPH_OSD_UP)) {
1698       osd_info[osd].down_at = epoch;
1699       osd_xinfo[osd].down_stamp = modified;
1700     }
1701     if ((osd_state[osd] & CEPH_OSD_EXISTS) &&
1702         (s & CEPH_OSD_EXISTS)) {
1703       // osd is destroyed; clear out anything interesting.
1704       (*osd_uuid)[osd] = uuid_d();
1705       osd_info[osd] = osd_info_t();
1706       osd_xinfo[osd] = osd_xinfo_t();
1707       set_primary_affinity(osd, CEPH_OSD_DEFAULT_PRIMARY_AFFINITY);
1708       osd_addrs->client_addr[osd].reset(new entity_addr_t());
1709       osd_addrs->cluster_addr[osd].reset(new entity_addr_t());
1710       osd_addrs->hb_front_addr[osd].reset(new entity_addr_t());
1711       osd_addrs->hb_back_addr[osd].reset(new entity_addr_t());
1712       osd_state[osd] = 0;
1713     } else {
1714       osd_state[osd] ^= s;
1715     }
1716   }
1717
1718   for (const auto &client : inc.new_up_client) {
1719     osd_state[client.first] |= CEPH_OSD_EXISTS | CEPH_OSD_UP;
1720     osd_addrs->client_addr[client.first].reset(new entity_addr_t(client.second));
1721     if (inc.new_hb_back_up.empty())
1722       osd_addrs->hb_back_addr[client.first].reset(new entity_addr_t(client.second)); //this is a backward-compatibility hack
1723     else
1724       osd_addrs->hb_back_addr[client.first].reset(
1725         new entity_addr_t(inc.new_hb_back_up.find(client.first)->second));
1726     const auto j = inc.new_hb_front_up.find(client.first);
1727     if (j != inc.new_hb_front_up.end())
1728       osd_addrs->hb_front_addr[client.first].reset(new entity_addr_t(j->second));
1729     else
1730       osd_addrs->hb_front_addr[client.first].reset();
1731
1732     osd_info[client.first].up_from = epoch;
1733   }
1734
1735   for (const auto &cluster : inc.new_up_cluster)
1736     osd_addrs->cluster_addr[cluster.first].reset(new entity_addr_t(cluster.second));
1737
1738   // info
1739   for (const auto &thru : inc.new_up_thru)
1740     osd_info[thru.first].up_thru = thru.second;
1741   
1742   for (const auto &interval : inc.new_last_clean_interval) {
1743     osd_info[interval.first].last_clean_begin = interval.second.first;
1744     osd_info[interval.first].last_clean_end = interval.second.second;
1745   }
1746   
1747   for (const auto &lost : inc.new_lost)
1748     osd_info[lost.first].lost_at = lost.second;
1749
1750   // xinfo
1751   for (const auto &xinfo : inc.new_xinfo)
1752     osd_xinfo[xinfo.first] = xinfo.second;
1753
1754   // uuid
1755   for (const auto &uuid : inc.new_uuid)
1756     (*osd_uuid)[uuid.first] = uuid.second;
1757
1758   // pg rebuild
1759   for (const auto &pg : inc.new_pg_temp) {
1760     if (pg.second.empty())
1761       pg_temp->erase(pg.first);
1762     else
1763       pg_temp->set(pg.first, pg.second);
1764   }
1765   if (!inc.new_pg_temp.empty()) {
1766     // make sure pg_temp is efficiently stored
1767     pg_temp->rebuild();
1768   }
1769
1770   for (const auto &pg : inc.new_primary_temp) {
1771     if (pg.second == -1)
1772       primary_temp->erase(pg.first);
1773     else
1774       (*primary_temp)[pg.first] = pg.second;
1775   }
1776
1777   for (auto& p : inc.new_pg_upmap) {
1778     pg_upmap[p.first] = p.second;
1779   }
1780   for (auto& pg : inc.old_pg_upmap) {
1781     pg_upmap.erase(pg);
1782   }
1783   for (auto& p : inc.new_pg_upmap_items) {
1784     pg_upmap_items[p.first] = p.second;
1785   }
1786   for (auto& pg : inc.old_pg_upmap_items) {
1787     pg_upmap_items.erase(pg);
1788   }
1789
1790   // blacklist
1791   if (!inc.new_blacklist.empty()) {
1792     blacklist.insert(inc.new_blacklist.begin(),inc.new_blacklist.end());
1793     new_blacklist_entries = true;
1794   }
1795   for (const auto &addr : inc.old_blacklist)
1796     blacklist.erase(addr);
1797
1798   // cluster snapshot?
1799   if (inc.cluster_snapshot.length()) {
1800     cluster_snapshot = inc.cluster_snapshot;
1801     cluster_snapshot_epoch = inc.epoch;
1802   } else {
1803     cluster_snapshot.clear();
1804     cluster_snapshot_epoch = 0;
1805   }
1806
1807   if (inc.new_nearfull_ratio >= 0) {
1808     nearfull_ratio = inc.new_nearfull_ratio;
1809   }
1810   if (inc.new_backfillfull_ratio >= 0) {
1811     backfillfull_ratio = inc.new_backfillfull_ratio;
1812   }
1813   if (inc.new_full_ratio >= 0) {
1814     full_ratio = inc.new_full_ratio;
1815   }
1816   if (inc.new_require_min_compat_client > 0) {
1817     require_min_compat_client = inc.new_require_min_compat_client;
1818   }
1819   if (inc.new_require_osd_release >= 0) {
1820     require_osd_release = inc.new_require_osd_release;
1821     if (require_osd_release >= CEPH_RELEASE_LUMINOUS) {
1822       flags &= ~(CEPH_OSDMAP_LEGACY_REQUIRE_FLAGS);
1823       flags |= CEPH_OSDMAP_RECOVERY_DELETES;
1824     }
1825   }
1826
1827   // do new crush map last (after up/down stuff)
1828   if (inc.crush.length()) {
1829     bufferlist bl(inc.crush);
1830     auto blp = bl.begin();
1831     crush.reset(new CrushWrapper);
1832     crush->decode(blp);
1833     if (require_osd_release >= CEPH_RELEASE_LUMINOUS) {
1834       // only increment if this is a luminous-encoded osdmap, lest
1835       // the mon's crush_version diverge from what the osds or others
1836       // are decoding and applying on their end.  if we won't encode
1837       // it in the canonical version, don't change it.
1838       ++crush_version;
1839     }
1840   }
1841
1842   calc_num_osds();
1843   _calc_up_osd_features();
1844   return 0;
1845 }
1846
1847 // mapping
1848 int OSDMap::map_to_pg(
1849   int64_t poolid,
1850   const string& name,
1851   const string& key,
1852   const string& nspace,
1853   pg_t *pg) const
1854 {
1855   // calculate ps (placement seed)
1856   const pg_pool_t *pool = get_pg_pool(poolid);
1857   if (!pool)
1858     return -ENOENT;
1859   ps_t ps;
1860   if (!key.empty())
1861     ps = pool->hash_key(key, nspace);
1862   else
1863     ps = pool->hash_key(name, nspace);
1864   *pg = pg_t(ps, poolid);
1865   return 0;
1866 }
1867
1868 int OSDMap::object_locator_to_pg(
1869   const object_t& oid, const object_locator_t& loc, pg_t &pg) const
1870 {
1871   if (loc.hash >= 0) {
1872     if (!get_pg_pool(loc.get_pool())) {
1873       return -ENOENT;
1874     }
1875     pg = pg_t(loc.hash, loc.get_pool());
1876     return 0;
1877   }
1878   return map_to_pg(loc.get_pool(), oid.name, loc.key, loc.nspace, &pg);
1879 }
1880
1881 ceph_object_layout OSDMap::make_object_layout(
1882   object_t oid, int pg_pool, string nspace) const
1883 {
1884   object_locator_t loc(pg_pool, nspace);
1885
1886   ceph_object_layout ol;
1887   pg_t pgid = object_locator_to_pg(oid, loc);
1888   ol.ol_pgid = pgid.get_old_pg().v;
1889   ol.ol_stripe_unit = 0;
1890   return ol;
1891 }
1892
1893 void OSDMap::_remove_nonexistent_osds(const pg_pool_t& pool,
1894                                       vector<int>& osds) const
1895 {
1896   if (pool.can_shift_osds()) {
1897     unsigned removed = 0;
1898     for (unsigned i = 0; i < osds.size(); i++) {
1899       if (!exists(osds[i])) {
1900         removed++;
1901         continue;
1902       }
1903       if (removed) {
1904         osds[i - removed] = osds[i];
1905       }
1906     }
1907     if (removed)
1908       osds.resize(osds.size() - removed);
1909   } else {
1910     for (auto& osd : osds) {
1911       if (!exists(osd))
1912         osd = CRUSH_ITEM_NONE;
1913     }
1914   }
1915 }
1916
1917 void OSDMap::_pg_to_raw_osds(
1918   const pg_pool_t& pool, pg_t pg,
1919   vector<int> *osds,
1920   ps_t *ppps) const
1921 {
1922   // map to osds[]
1923   ps_t pps = pool.raw_pg_to_pps(pg);  // placement ps
1924   unsigned size = pool.get_size();
1925
1926   // what crush rule?
1927   int ruleno = crush->find_rule(pool.get_crush_rule(), pool.get_type(), size);
1928   if (ruleno >= 0)
1929     crush->do_rule(ruleno, pps, *osds, size, osd_weight, pg.pool());
1930
1931   _remove_nonexistent_osds(pool, *osds);
1932
1933   if (ppps)
1934     *ppps = pps;
1935 }
1936
1937 int OSDMap::_pick_primary(const vector<int>& osds) const
1938 {
1939   for (auto osd : osds) {
1940     if (osd != CRUSH_ITEM_NONE) {
1941       return osd;
1942     }
1943   }
1944   return -1;
1945 }
1946
1947 void OSDMap::_apply_upmap(const pg_pool_t& pi, pg_t raw_pg, vector<int> *raw) const
1948 {
1949   pg_t pg = pi.raw_pg_to_pg(raw_pg);
1950   auto p = pg_upmap.find(pg);
1951   if (p != pg_upmap.end()) {
1952     // make sure targets aren't marked out
1953     for (auto osd : p->second) {
1954       if (osd != CRUSH_ITEM_NONE && osd < max_osd && osd_weight[osd] == 0) {
1955         // reject/ignore the explicit mapping
1956         return;
1957       }
1958     }
1959     *raw = vector<int>(p->second.begin(), p->second.end());
1960     // continue to check and apply pg_upmap_items if any
1961   }
1962
1963   auto q = pg_upmap_items.find(pg);
1964   if (q != pg_upmap_items.end()) {
1965     // NOTE: this approach does not allow a bidirectional swap,
1966     // e.g., [[1,2],[2,1]] applied to [0,1,2] -> [0,2,1].
1967     for (auto& r : q->second) {
1968       // make sure the replacement value doesn't already appear
1969       bool exists = false;
1970       ssize_t pos = -1;
1971       for (unsigned i = 0; i < raw->size(); ++i) {
1972         int osd = (*raw)[i];
1973         if (osd == r.second) {
1974           exists = true;
1975           break;
1976         }
1977         // ignore mapping if target is marked out (or invalid osd id)
1978         if (osd == r.first &&
1979             pos < 0 &&
1980             !(r.second != CRUSH_ITEM_NONE && r.second < max_osd &&
1981               osd_weight[r.second] == 0)) {
1982           pos = i;
1983         }
1984       }
1985       if (!exists && pos >= 0) {
1986         (*raw)[pos] = r.second;
1987       }
1988     }
1989   }
1990 }
1991
1992 // pg -> (up osd list)
1993 void OSDMap::_raw_to_up_osds(const pg_pool_t& pool, const vector<int>& raw,
1994                              vector<int> *up) const
1995 {
1996   if (pool.can_shift_osds()) {
1997     // shift left
1998     up->clear();
1999     up->reserve(raw.size());
2000     for (unsigned i=0; i<raw.size(); i++) {
2001       if (!exists(raw[i]) || is_down(raw[i]))
2002         continue;
2003       up->push_back(raw[i]);
2004     }
2005   } else {
2006     // set down/dne devices to NONE
2007     up->resize(raw.size());
2008     for (int i = raw.size() - 1; i >= 0; --i) {
2009       if (!exists(raw[i]) || is_down(raw[i])) {
2010         (*up)[i] = CRUSH_ITEM_NONE;
2011       } else {
2012         (*up)[i] = raw[i];
2013       }
2014     }
2015   }
2016 }
2017
2018 void OSDMap::_apply_primary_affinity(ps_t seed,
2019                                      const pg_pool_t& pool,
2020                                      vector<int> *osds,
2021                                      int *primary) const
2022 {
2023   // do we have any non-default primary_affinity values for these osds?
2024   if (!osd_primary_affinity)
2025     return;
2026
2027   bool any = false;
2028   for (const auto osd : *osds) {
2029     if (osd != CRUSH_ITEM_NONE &&
2030         (*osd_primary_affinity)[osd] != CEPH_OSD_DEFAULT_PRIMARY_AFFINITY) {
2031       any = true;
2032       break;
2033     }
2034   }
2035   if (!any)
2036     return;
2037
2038   // pick the primary.  feed both the seed (for the pg) and the osd
2039   // into the hash/rng so that a proportional fraction of an osd's pgs
2040   // get rejected as primary.
2041   int pos = -1;
2042   for (unsigned i = 0; i < osds->size(); ++i) {
2043     int o = (*osds)[i];
2044     if (o == CRUSH_ITEM_NONE)
2045       continue;
2046     unsigned a = (*osd_primary_affinity)[o];
2047     if (a < CEPH_OSD_MAX_PRIMARY_AFFINITY &&
2048         (crush_hash32_2(CRUSH_HASH_RJENKINS1,
2049                         seed, o) >> 16) >= a) {
2050       // we chose not to use this primary.  note it anyway as a
2051       // fallback in case we don't pick anyone else, but keep looking.
2052       if (pos < 0)
2053         pos = i;
2054     } else {
2055       pos = i;
2056       break;
2057     }
2058   }
2059   if (pos < 0)
2060     return;
2061
2062   *primary = (*osds)[pos];
2063
2064   if (pool.can_shift_osds() && pos > 0) {
2065     // move the new primary to the front.
2066     for (int i = pos; i > 0; --i) {
2067       (*osds)[i] = (*osds)[i-1];
2068     }
2069     (*osds)[0] = *primary;
2070   }
2071 }
2072
2073 void OSDMap::_get_temp_osds(const pg_pool_t& pool, pg_t pg,
2074                             vector<int> *temp_pg, int *temp_primary) const
2075 {
2076   pg = pool.raw_pg_to_pg(pg);
2077   const auto p = pg_temp->find(pg);
2078   temp_pg->clear();
2079   if (p != pg_temp->end()) {
2080     for (unsigned i=0; i<p->second.size(); i++) {
2081       if (!exists(p->second[i]) || is_down(p->second[i])) {
2082         if (pool.can_shift_osds()) {
2083           continue;
2084         } else {
2085           temp_pg->push_back(CRUSH_ITEM_NONE);
2086         }
2087       } else {
2088         temp_pg->push_back(p->second[i]);
2089       }
2090     }
2091   }
2092   const auto &pp = primary_temp->find(pg);
2093   *temp_primary = -1;
2094   if (pp != primary_temp->end()) {
2095     *temp_primary = pp->second;
2096   } else if (!temp_pg->empty()) { // apply pg_temp's primary
2097     for (unsigned i = 0; i < temp_pg->size(); ++i) {
2098       if ((*temp_pg)[i] != CRUSH_ITEM_NONE) {
2099         *temp_primary = (*temp_pg)[i];
2100         break;
2101       }
2102     }
2103   }
2104 }
2105
2106 void OSDMap::pg_to_raw_osds(pg_t pg, vector<int> *raw, int *primary) const
2107 {
2108   *primary = -1;
2109   raw->clear();
2110   const pg_pool_t *pool = get_pg_pool(pg.pool());
2111   if (!pool)
2112     return;
2113   _pg_to_raw_osds(*pool, pg, raw, NULL);
2114   if (primary)
2115     *primary = _pick_primary(*raw);
2116 }
2117
2118 void OSDMap::pg_to_raw_up(pg_t pg, vector<int> *up, int *primary) const
2119 {
2120   const pg_pool_t *pool = get_pg_pool(pg.pool());
2121   if (!pool) {
2122     if (primary)
2123       *primary = -1;
2124     if (up)
2125       up->clear();
2126     return;
2127   }
2128   vector<int> raw;
2129   ps_t pps;
2130   _pg_to_raw_osds(*pool, pg, &raw, &pps);
2131   _apply_upmap(*pool, pg, &raw);
2132   _raw_to_up_osds(*pool, raw, up);
2133   *primary = _pick_primary(raw);
2134   _apply_primary_affinity(pps, *pool, up, primary);
2135 }
2136
2137 void OSDMap::_pg_to_up_acting_osds(
2138   const pg_t& pg, vector<int> *up, int *up_primary,
2139   vector<int> *acting, int *acting_primary,
2140   bool raw_pg_to_pg) const
2141 {
2142   const pg_pool_t *pool = get_pg_pool(pg.pool());
2143   if (!pool ||
2144       (!raw_pg_to_pg && pg.ps() >= pool->get_pg_num())) {
2145     if (up)
2146       up->clear();
2147     if (up_primary)
2148       *up_primary = -1;
2149     if (acting)
2150       acting->clear();
2151     if (acting_primary)
2152       *acting_primary = -1;
2153     return;
2154   }
2155   vector<int> raw;
2156   vector<int> _up;
2157   vector<int> _acting;
2158   int _up_primary;
2159   int _acting_primary;
2160   ps_t pps;
2161   _get_temp_osds(*pool, pg, &_acting, &_acting_primary);
2162   if (_acting.empty() || up || up_primary) {
2163     _pg_to_raw_osds(*pool, pg, &raw, &pps);
2164     _apply_upmap(*pool, pg, &raw);
2165     _raw_to_up_osds(*pool, raw, &_up);
2166     _up_primary = _pick_primary(_up);
2167     _apply_primary_affinity(pps, *pool, &_up, &_up_primary);
2168     if (_acting.empty()) {
2169       _acting = _up;
2170       if (_acting_primary == -1) {
2171         _acting_primary = _up_primary;
2172       }
2173     }
2174   
2175     if (up)
2176       up->swap(_up);
2177     if (up_primary)
2178       *up_primary = _up_primary;
2179   }
2180
2181   if (acting)
2182     acting->swap(_acting);
2183   if (acting_primary)
2184     *acting_primary = _acting_primary;
2185 }
2186
2187 int OSDMap::calc_pg_rank(int osd, const vector<int>& acting, int nrep)
2188 {
2189   if (!nrep)
2190     nrep = acting.size();
2191   for (int i=0; i<nrep; i++) 
2192     if (acting[i] == osd)
2193       return i;
2194   return -1;
2195 }
2196
2197 int OSDMap::calc_pg_role(int osd, const vector<int>& acting, int nrep)
2198 {
2199   return calc_pg_rank(osd, acting, nrep);
2200 }
2201
2202 bool OSDMap::primary_changed(
2203   int oldprimary,
2204   const vector<int> &oldacting,
2205   int newprimary,
2206   const vector<int> &newacting)
2207 {
2208   if (oldacting.empty() && newacting.empty())
2209     return false;    // both still empty
2210   if (oldacting.empty() ^ newacting.empty())
2211     return true;     // was empty, now not, or vice versa
2212   if (oldprimary != newprimary)
2213     return true;     // primary changed
2214   if (calc_pg_rank(oldprimary, oldacting) !=
2215       calc_pg_rank(newprimary, newacting))
2216     return true;
2217   return false;      // same primary (tho replicas may have changed)
2218 }
2219
2220
2221 // serialize, unserialize
2222 void OSDMap::encode_client_old(bufferlist& bl) const
2223 {
2224   __u16 v = 5;
2225   ::encode(v, bl);
2226
2227   // base
2228   ::encode(fsid, bl);
2229   ::encode(epoch, bl);
2230   ::encode(created, bl);
2231   ::encode(modified, bl);
2232
2233   // for ::encode(pools, bl);
2234   __u32 n = pools.size();
2235   ::encode(n, bl);
2236
2237   for (const auto &pool : pools) {
2238     n = pool.first;
2239     ::encode(n, bl);
2240     ::encode(pool.second, bl, 0);
2241   }
2242   // for ::encode(pool_name, bl);
2243   n = pool_name.size();
2244   ::encode(n, bl);
2245   for (const auto &pname : pool_name) {
2246     n = pname.first;
2247     ::encode(n, bl);
2248     ::encode(pname.second, bl);
2249   }
2250   // for ::encode(pool_max, bl);
2251   n = pool_max;
2252   ::encode(n, bl);
2253
2254   ::encode(flags, bl);
2255
2256   ::encode(max_osd, bl);
2257   {
2258     uint32_t n = osd_state.size();
2259     ::encode(n, bl);
2260     for (auto s : osd_state) {
2261       ::encode((uint8_t)s, bl);
2262     }
2263   }
2264   ::encode(osd_weight, bl);
2265   ::encode(osd_addrs->client_addr, bl, 0);
2266
2267   // for ::encode(pg_temp, bl);
2268   n = pg_temp->size();
2269   ::encode(n, bl);
2270   for (const auto pg : *pg_temp) {
2271     old_pg_t opg = pg.first.get_old_pg();
2272     ::encode(opg, bl);
2273     ::encode(pg.second, bl);
2274   }
2275
2276   // crush
2277   bufferlist cbl;
2278   crush->encode(cbl, 0 /* legacy (no) features */);
2279   ::encode(cbl, bl);
2280 }
2281
2282 void OSDMap::encode_classic(bufferlist& bl, uint64_t features) const
2283 {
2284   if ((features & CEPH_FEATURE_PGID64) == 0) {
2285     encode_client_old(bl);
2286     return;
2287   }
2288
2289   __u16 v = 6;
2290   ::encode(v, bl);
2291
2292   // base
2293   ::encode(fsid, bl);
2294   ::encode(epoch, bl);
2295   ::encode(created, bl);
2296   ::encode(modified, bl);
2297
2298   ::encode(pools, bl, features);
2299   ::encode(pool_name, bl);
2300   ::encode(pool_max, bl);
2301
2302   ::encode(flags, bl);
2303
2304   ::encode(max_osd, bl);
2305   {
2306     uint32_t n = osd_state.size();
2307     ::encode(n, bl);
2308     for (auto s : osd_state) {
2309       ::encode((uint8_t)s, bl);
2310     }
2311   }
2312   ::encode(osd_weight, bl);
2313   ::encode(osd_addrs->client_addr, bl, features);
2314
2315   ::encode(*pg_temp, bl);
2316
2317   // crush
2318   bufferlist cbl;
2319   crush->encode(cbl, 0 /* legacy (no) features */);
2320   ::encode(cbl, bl);
2321
2322   // extended
2323   __u16 ev = 10;
2324   ::encode(ev, bl);
2325   ::encode(osd_addrs->hb_back_addr, bl, features);
2326   ::encode(osd_info, bl);
2327   ::encode(blacklist, bl, features);
2328   ::encode(osd_addrs->cluster_addr, bl, features);
2329   ::encode(cluster_snapshot_epoch, bl);
2330   ::encode(cluster_snapshot, bl);
2331   ::encode(*osd_uuid, bl);
2332   ::encode(osd_xinfo, bl);
2333   ::encode(osd_addrs->hb_front_addr, bl, features);
2334 }
2335
2336 void OSDMap::encode(bufferlist& bl, uint64_t features) const
2337 {
2338   if ((features & CEPH_FEATURE_OSDMAP_ENC) == 0) {
2339     encode_classic(bl, features);
2340     return;
2341   }
2342
2343   // only a select set of callers should *ever* be encoding new
2344   // OSDMaps.  others should be passing around the canonical encoded
2345   // buffers from on high.  select out those callers by passing in an
2346   // "impossible" feature bit.
2347   assert(features & CEPH_FEATURE_RESERVED);
2348   features &= ~CEPH_FEATURE_RESERVED;
2349
2350   size_t start_offset = bl.length();
2351   size_t tail_offset;
2352   buffer::list::iterator crc_it;
2353
2354   // meta-encoding: how we include client-used and osd-specific data
2355   ENCODE_START(8, 7, bl);
2356
2357   {
2358     uint8_t v = 6;
2359     if (!HAVE_FEATURE(features, SERVER_LUMINOUS)) {
2360       v = 3;
2361     }
2362     ENCODE_START(v, 1, bl); // client-usable data
2363     // base
2364     ::encode(fsid, bl);
2365     ::encode(epoch, bl);
2366     ::encode(created, bl);
2367     ::encode(modified, bl);
2368
2369     ::encode(pools, bl, features);
2370     ::encode(pool_name, bl);
2371     ::encode(pool_max, bl);
2372
2373     if (v < 4) {
2374       decltype(flags) f = flags;
2375       if (require_osd_release >= CEPH_RELEASE_LUMINOUS)
2376         f |= CEPH_OSDMAP_REQUIRE_LUMINOUS | CEPH_OSDMAP_RECOVERY_DELETES;
2377       else if (require_osd_release == CEPH_RELEASE_KRAKEN)
2378         f |= CEPH_OSDMAP_REQUIRE_KRAKEN;
2379       else if (require_osd_release == CEPH_RELEASE_JEWEL)
2380         f |= CEPH_OSDMAP_REQUIRE_JEWEL;
2381       ::encode(f, bl);
2382     } else {
2383       ::encode(flags, bl);
2384     }
2385
2386     ::encode(max_osd, bl);
2387     if (v >= 5) {
2388       ::encode(osd_state, bl);
2389     } else {
2390       uint32_t n = osd_state.size();
2391       ::encode(n, bl);
2392       for (auto s : osd_state) {
2393         ::encode((uint8_t)s, bl);
2394       }
2395     }
2396     ::encode(osd_weight, bl);
2397     ::encode(osd_addrs->client_addr, bl, features);
2398
2399     ::encode(*pg_temp, bl);
2400     ::encode(*primary_temp, bl);
2401     if (osd_primary_affinity) {
2402       ::encode(*osd_primary_affinity, bl);
2403     } else {
2404       vector<__u32> v;
2405       ::encode(v, bl);
2406     }
2407
2408     // crush
2409     bufferlist cbl;
2410     crush->encode(cbl, features);
2411     ::encode(cbl, bl);
2412     ::encode(erasure_code_profiles, bl);
2413
2414     if (v >= 4) {
2415       ::encode(pg_upmap, bl);
2416       ::encode(pg_upmap_items, bl);
2417     } else {
2418       assert(pg_upmap.empty());
2419       assert(pg_upmap_items.empty());
2420     }
2421     if (v >= 6) {
2422       ::encode(crush_version, bl);
2423     }
2424     ENCODE_FINISH(bl); // client-usable data
2425   }
2426
2427   {
2428     uint8_t target_v = 5;
2429     if (!HAVE_FEATURE(features, SERVER_LUMINOUS)) {
2430       target_v = 1;
2431     }
2432     ENCODE_START(target_v, 1, bl); // extended, osd-only data
2433     ::encode(osd_addrs->hb_back_addr, bl, features);
2434     ::encode(osd_info, bl);
2435     {
2436       // put this in a sorted, ordered map<> so that we encode in a
2437       // deterministic order.
2438       map<entity_addr_t,utime_t> blacklist_map;
2439       for (const auto &addr : blacklist)
2440         blacklist_map.insert(make_pair(addr.first, addr.second));
2441       ::encode(blacklist_map, bl, features);
2442     }
2443     ::encode(osd_addrs->cluster_addr, bl, features);
2444     ::encode(cluster_snapshot_epoch, bl);
2445     ::encode(cluster_snapshot, bl);
2446     ::encode(*osd_uuid, bl);
2447     ::encode(osd_xinfo, bl);
2448     ::encode(osd_addrs->hb_front_addr, bl, features);
2449     if (target_v >= 2) {
2450       ::encode(nearfull_ratio, bl);
2451       ::encode(full_ratio, bl);
2452       ::encode(backfillfull_ratio, bl);
2453     }
2454     // 4 was string-based new_require_min_compat_client
2455     if (target_v >= 5) {
2456       ::encode(require_min_compat_client, bl);
2457       ::encode(require_osd_release, bl);
2458     }
2459     ENCODE_FINISH(bl); // osd-only data
2460   }
2461
2462   ::encode((uint32_t)0, bl); // dummy crc
2463   crc_it = bl.end();
2464   crc_it.advance(-4);
2465   tail_offset = bl.length();
2466
2467   ENCODE_FINISH(bl); // meta-encoding wrapper
2468
2469   // fill in crc
2470   bufferlist front;
2471   front.substr_of(bl, start_offset, crc_it.get_off() - start_offset);
2472   crc = front.crc32c(-1);
2473   if (tail_offset < bl.length()) {
2474     bufferlist tail;
2475     tail.substr_of(bl, tail_offset, bl.length() - tail_offset);
2476     crc = tail.crc32c(crc);
2477   }
2478   ceph_le32 crc_le;
2479   crc_le = crc;
2480   crc_it.copy_in(4, (char*)&crc_le);
2481   crc_defined = true;
2482 }
2483
2484 void OSDMap::decode(bufferlist& bl)
2485 {
2486   auto p = bl.begin();
2487   decode(p);
2488 }
2489
2490 void OSDMap::decode_classic(bufferlist::iterator& p)
2491 {
2492   __u32 n, t;
2493   __u16 v;
2494   ::decode(v, p);
2495
2496   // base
2497   ::decode(fsid, p);
2498   ::decode(epoch, p);
2499   ::decode(created, p);
2500   ::decode(modified, p);
2501
2502   if (v < 6) {
2503     if (v < 4) {
2504       int32_t max_pools = 0;
2505       ::decode(max_pools, p);
2506       pool_max = max_pools;
2507     }
2508     pools.clear();
2509     ::decode(n, p);
2510     while (n--) {
2511       ::decode(t, p);
2512       ::decode(pools[t], p);
2513     }
2514     if (v == 4) {
2515       ::decode(n, p);
2516       pool_max = n;
2517     } else if (v == 5) {
2518       pool_name.clear();
2519       ::decode(n, p);
2520       while (n--) {
2521         ::decode(t, p);
2522         ::decode(pool_name[t], p);
2523       }
2524       ::decode(n, p);
2525       pool_max = n;
2526     }
2527   } else {
2528     ::decode(pools, p);
2529     ::decode(pool_name, p);
2530     ::decode(pool_max, p);
2531   }
2532   // kludge around some old bug that zeroed out pool_max (#2307)
2533   if (pools.size() && pool_max < pools.rbegin()->first) {
2534     pool_max = pools.rbegin()->first;
2535   }
2536
2537   ::decode(flags, p);
2538
2539   ::decode(max_osd, p);
2540   {
2541     vector<uint8_t> os;
2542     ::decode(os, p);
2543     osd_state.resize(os.size());
2544     for (unsigned i = 0; i < os.size(); ++i) {
2545       osd_state[i] = os[i];
2546     }
2547   }
2548   ::decode(osd_weight, p);
2549   ::decode(osd_addrs->client_addr, p);
2550   if (v <= 5) {
2551     pg_temp->clear();
2552     ::decode(n, p);
2553     while (n--) {
2554       old_pg_t opg;
2555       ::decode_raw(opg, p);
2556       mempool::osdmap::vector<int32_t> v;
2557       ::decode(v, p);
2558       pg_temp->set(pg_t(opg), v);
2559     }
2560   } else {
2561     ::decode(*pg_temp, p);
2562   }
2563
2564   // crush
2565   bufferlist cbl;
2566   ::decode(cbl, p);
2567   auto cblp = cbl.begin();
2568   crush->decode(cblp);
2569
2570   // extended
2571   __u16 ev = 0;
2572   if (v >= 5)
2573     ::decode(ev, p);
2574   ::decode(osd_addrs->hb_back_addr, p);
2575   ::decode(osd_info, p);
2576   if (v < 5)
2577     ::decode(pool_name, p);
2578
2579   ::decode(blacklist, p);
2580   if (ev >= 6)
2581     ::decode(osd_addrs->cluster_addr, p);
2582   else
2583     osd_addrs->cluster_addr.resize(osd_addrs->client_addr.size());
2584
2585   if (ev >= 7) {
2586     ::decode(cluster_snapshot_epoch, p);
2587     ::decode(cluster_snapshot, p);
2588   }
2589
2590   if (ev >= 8) {
2591     ::decode(*osd_uuid, p);
2592   } else {
2593     osd_uuid->resize(max_osd);
2594   }
2595   if (ev >= 9)
2596     ::decode(osd_xinfo, p);
2597   else
2598     osd_xinfo.resize(max_osd);
2599
2600   if (ev >= 10)
2601     ::decode(osd_addrs->hb_front_addr, p);
2602   else
2603     osd_addrs->hb_front_addr.resize(osd_addrs->hb_back_addr.size());
2604
2605   osd_primary_affinity.reset();
2606
2607   post_decode();
2608 }
2609
2610 void OSDMap::decode(bufferlist::iterator& bl)
2611 {
2612   /**
2613    * Older encodings of the OSDMap had a single struct_v which
2614    * covered the whole encoding, and was prior to our modern
2615    * stuff which includes a compatv and a size. So if we see
2616    * a struct_v < 7, we must rewind to the beginning and use our
2617    * classic decoder.
2618    */
2619   size_t start_offset = bl.get_off();
2620   size_t tail_offset = 0;
2621   bufferlist crc_front, crc_tail;
2622
2623   DECODE_START_LEGACY_COMPAT_LEN(8, 7, 7, bl); // wrapper
2624   if (struct_v < 7) {
2625     int struct_v_size = sizeof(struct_v);
2626     bl.advance(-struct_v_size);
2627     decode_classic(bl);
2628     return;
2629   }
2630   /**
2631    * Since we made it past that hurdle, we can use our normal paths.
2632    */
2633   {
2634     DECODE_START(6, bl); // client-usable data
2635     // base
2636     ::decode(fsid, bl);
2637     ::decode(epoch, bl);
2638     ::decode(created, bl);
2639     ::decode(modified, bl);
2640
2641     ::decode(pools, bl);
2642     ::decode(pool_name, bl);
2643     ::decode(pool_max, bl);
2644
2645     ::decode(flags, bl);
2646
2647     ::decode(max_osd, bl);
2648     if (struct_v >= 5) {
2649       ::decode(osd_state, bl);
2650     } else {
2651       vector<uint8_t> os;
2652       ::decode(os, bl);
2653       osd_state.resize(os.size());
2654       for (unsigned i = 0; i < os.size(); ++i) {
2655         osd_state[i] = os[i];
2656       }
2657     }
2658     ::decode(osd_weight, bl);
2659     ::decode(osd_addrs->client_addr, bl);
2660
2661     ::decode(*pg_temp, bl);
2662     ::decode(*primary_temp, bl);
2663     if (struct_v >= 2) {
2664       osd_primary_affinity.reset(new mempool::osdmap::vector<__u32>);
2665       ::decode(*osd_primary_affinity, bl);
2666       if (osd_primary_affinity->empty())
2667         osd_primary_affinity.reset();
2668     } else {
2669       osd_primary_affinity.reset();
2670     }
2671
2672     // crush
2673     bufferlist cbl;
2674     ::decode(cbl, bl);
2675     auto cblp = cbl.begin();
2676     crush->decode(cblp);
2677     if (struct_v >= 3) {
2678       ::decode(erasure_code_profiles, bl);
2679     } else {
2680       erasure_code_profiles.clear();
2681     }
2682     if (struct_v >= 4) {
2683       ::decode(pg_upmap, bl);
2684       ::decode(pg_upmap_items, bl);
2685     } else {
2686       pg_upmap.clear();
2687       pg_upmap_items.clear();
2688     }
2689     if (struct_v >= 6) {
2690       ::decode(crush_version, bl);
2691     }
2692     DECODE_FINISH(bl); // client-usable data
2693   }
2694
2695   {
2696     DECODE_START(5, bl); // extended, osd-only data
2697     ::decode(osd_addrs->hb_back_addr, bl);
2698     ::decode(osd_info, bl);
2699     ::decode(blacklist, bl);
2700     ::decode(osd_addrs->cluster_addr, bl);
2701     ::decode(cluster_snapshot_epoch, bl);
2702     ::decode(cluster_snapshot, bl);
2703     ::decode(*osd_uuid, bl);
2704     ::decode(osd_xinfo, bl);
2705     ::decode(osd_addrs->hb_front_addr, bl);
2706     if (struct_v >= 2) {
2707       ::decode(nearfull_ratio, bl);
2708       ::decode(full_ratio, bl);
2709     } else {
2710       nearfull_ratio = 0;
2711       full_ratio = 0;
2712     }
2713     if (struct_v >= 3) {
2714       ::decode(backfillfull_ratio, bl);
2715     } else {
2716       backfillfull_ratio = 0;
2717     }
2718     if (struct_v == 4) {
2719       string r;
2720       ::decode(r, bl);
2721       if (r.length())
2722         require_min_compat_client = ceph_release_from_name(r.c_str());
2723     }
2724     if (struct_v >= 5) {
2725       ::decode(require_min_compat_client, bl);
2726       ::decode(require_osd_release, bl);
2727       if (require_osd_release >= CEPH_RELEASE_LUMINOUS) {
2728         flags &= ~(CEPH_OSDMAP_LEGACY_REQUIRE_FLAGS);
2729         flags |= CEPH_OSDMAP_RECOVERY_DELETES;
2730       }
2731     } else {
2732       if (flags & CEPH_OSDMAP_REQUIRE_LUMINOUS) {
2733         // only for compat with post-kraken pre-luminous test clusters
2734         require_osd_release = CEPH_RELEASE_LUMINOUS;
2735         flags &= ~(CEPH_OSDMAP_LEGACY_REQUIRE_FLAGS);
2736         flags |= CEPH_OSDMAP_RECOVERY_DELETES;
2737       } else if (flags & CEPH_OSDMAP_REQUIRE_KRAKEN) {
2738         require_osd_release = CEPH_RELEASE_KRAKEN;
2739       } else if (flags & CEPH_OSDMAP_REQUIRE_JEWEL) {
2740         require_osd_release = CEPH_RELEASE_JEWEL;
2741       } else {
2742         require_osd_release = 0;
2743       }
2744     }
2745     DECODE_FINISH(bl); // osd-only data
2746   }
2747
2748   if (struct_v >= 8) {
2749     crc_front.substr_of(bl.get_bl(), start_offset, bl.get_off() - start_offset);
2750     ::decode(crc, bl);
2751     tail_offset = bl.get_off();
2752     crc_defined = true;
2753   } else {
2754     crc_defined = false;
2755     crc = 0;
2756   }
2757
2758   DECODE_FINISH(bl); // wrapper
2759
2760   if (tail_offset) {
2761     // verify crc
2762     uint32_t actual = crc_front.crc32c(-1);
2763     if (tail_offset < bl.get_off()) {
2764       bufferlist tail;
2765       tail.substr_of(bl.get_bl(), tail_offset, bl.get_off() - tail_offset);
2766       actual = tail.crc32c(actual);
2767     }
2768     if (crc != actual) {
2769       ostringstream ss;
2770       ss << "bad crc, actual " << actual << " != expected " << crc;
2771       string s = ss.str();
2772       throw buffer::malformed_input(s.c_str());
2773     }
2774   }
2775
2776   post_decode();
2777 }
2778
2779 void OSDMap::post_decode()
2780 {
2781   // index pool names
2782   name_pool.clear();
2783   for (const auto &pname : pool_name) {
2784     name_pool[pname.second] = pname.first;
2785   }
2786
2787   calc_num_osds();
2788   _calc_up_osd_features();
2789 }
2790
2791 void OSDMap::dump_erasure_code_profiles(
2792   const mempool::osdmap::map<string,map<string,string>>& profiles,
2793   Formatter *f)
2794 {
2795   f->open_object_section("erasure_code_profiles");
2796   for (const auto &profile : profiles) {
2797     f->open_object_section(profile.first.c_str());
2798     for (const auto &profm : profile.second) {
2799       f->dump_string(profm.first.c_str(), profm.second.c_str());
2800     }
2801     f->close_section();
2802   }
2803   f->close_section();
2804 }
2805
2806 void OSDMap::dump(Formatter *f) const
2807 {
2808   f->dump_int("epoch", get_epoch());
2809   f->dump_stream("fsid") << get_fsid();
2810   f->dump_stream("created") << get_created();
2811   f->dump_stream("modified") << get_modified();
2812   f->dump_string("flags", get_flag_string());
2813   f->dump_unsigned("crush_version", get_crush_version());
2814   f->dump_float("full_ratio", full_ratio);
2815   f->dump_float("backfillfull_ratio", backfillfull_ratio);
2816   f->dump_float("nearfull_ratio", nearfull_ratio);
2817   f->dump_string("cluster_snapshot", get_cluster_snapshot());
2818   f->dump_int("pool_max", get_pool_max());
2819   f->dump_int("max_osd", get_max_osd());
2820   f->dump_string("require_min_compat_client",
2821                  ceph_release_name(require_min_compat_client));
2822   f->dump_string("min_compat_client",
2823                  ceph_release_name(get_min_compat_client()));
2824   f->dump_string("require_osd_release",
2825                  ceph_release_name(require_osd_release));
2826
2827   f->open_array_section("pools");
2828   for (const auto &pool : pools) {
2829     std::string name("<unknown>");
2830     const auto &pni = pool_name.find(pool.first);
2831     if (pni != pool_name.end())
2832       name = pni->second;
2833     f->open_object_section("pool");
2834     f->dump_int("pool", pool.first);
2835     f->dump_string("pool_name", name);
2836     pool.second.dump(f);
2837     f->close_section();
2838   }
2839   f->close_section();
2840
2841   f->open_array_section("osds");
2842   for (int i=0; i<get_max_osd(); i++)
2843     if (exists(i)) {
2844       f->open_object_section("osd_info");
2845       f->dump_int("osd", i);
2846       f->dump_stream("uuid") << get_uuid(i);
2847       f->dump_int("up", is_up(i));
2848       f->dump_int("in", is_in(i));
2849       f->dump_float("weight", get_weightf(i));
2850       f->dump_float("primary_affinity", get_primary_affinityf(i));
2851       get_info(i).dump(f);
2852       f->dump_stream("public_addr") << get_addr(i);
2853       f->dump_stream("cluster_addr") << get_cluster_addr(i);
2854       f->dump_stream("heartbeat_back_addr") << get_hb_back_addr(i);
2855       f->dump_stream("heartbeat_front_addr") << get_hb_front_addr(i);
2856
2857       set<string> st;
2858       get_state(i, st);
2859       f->open_array_section("state");
2860       for (const auto &state : st)
2861         f->dump_string("state", state);
2862       f->close_section();
2863
2864       f->close_section();
2865     }
2866   f->close_section();
2867
2868   f->open_array_section("osd_xinfo");
2869   for (int i=0; i<get_max_osd(); i++) {
2870     if (exists(i)) {
2871       f->open_object_section("xinfo");
2872       f->dump_int("osd", i);
2873       osd_xinfo[i].dump(f);
2874       f->close_section();
2875     }
2876   }
2877   f->close_section();
2878
2879   f->open_array_section("pg_upmap");
2880   for (auto& p : pg_upmap) {
2881     f->open_object_section("mapping");
2882     f->dump_stream("pgid") << p.first;
2883     f->open_array_section("osds");
2884     for (auto q : p.second) {
2885       f->dump_int("osd", q);
2886     }
2887     f->close_section();
2888     f->close_section();
2889   }
2890   f->close_section();
2891   f->open_array_section("pg_upmap_items");
2892   for (auto& p : pg_upmap_items) {
2893     f->open_object_section("mapping");
2894     f->dump_stream("pgid") << p.first;
2895     f->open_array_section("mappings");
2896     for (auto& q : p.second) {
2897       f->open_object_section("mapping");
2898       f->dump_int("from", q.first);
2899       f->dump_int("to", q.second);
2900       f->close_section();
2901     }
2902     f->close_section();
2903     f->close_section();
2904   }
2905   f->close_section();
2906   f->open_array_section("pg_temp");
2907   pg_temp->dump(f);
2908   f->close_section();
2909
2910   f->open_array_section("primary_temp");
2911   for (const auto &pg : *primary_temp) {
2912     f->dump_stream("pgid") << pg.first;
2913     f->dump_int("osd", pg.second);
2914   }
2915   f->close_section(); // primary_temp
2916
2917   f->open_object_section("blacklist");
2918   for (const auto &addr : blacklist) {
2919     stringstream ss;
2920     ss << addr.first;
2921     f->dump_stream(ss.str().c_str()) << addr.second;
2922   }
2923   f->close_section();
2924
2925   dump_erasure_code_profiles(erasure_code_profiles, f);
2926 }
2927
2928 void OSDMap::generate_test_instances(list<OSDMap*>& o)
2929 {
2930   o.push_back(new OSDMap);
2931
2932   CephContext *cct = new CephContext(CODE_ENVIRONMENT_UTILITY);
2933   o.push_back(new OSDMap);
2934   uuid_d fsid;
2935   o.back()->build_simple(cct, 1, fsid, 16);
2936   o.back()->created = o.back()->modified = utime_t(1, 2);  // fix timestamp
2937   o.back()->blacklist[entity_addr_t()] = utime_t(5, 6);
2938   cct->put();
2939 }
2940
2941 string OSDMap::get_flag_string(unsigned f)
2942 {
2943   string s;
2944   if ( f& CEPH_OSDMAP_NEARFULL)
2945     s += ",nearfull";
2946   if (f & CEPH_OSDMAP_FULL)
2947     s += ",full";
2948   if (f & CEPH_OSDMAP_PAUSERD)
2949     s += ",pauserd";
2950   if (f & CEPH_OSDMAP_PAUSEWR)
2951     s += ",pausewr";
2952   if (f & CEPH_OSDMAP_PAUSEREC)
2953     s += ",pauserec";
2954   if (f & CEPH_OSDMAP_NOUP)
2955     s += ",noup";
2956   if (f & CEPH_OSDMAP_NODOWN)
2957     s += ",nodown";
2958   if (f & CEPH_OSDMAP_NOOUT)
2959     s += ",noout";
2960   if (f & CEPH_OSDMAP_NOIN)
2961     s += ",noin";
2962   if (f & CEPH_OSDMAP_NOBACKFILL)
2963     s += ",nobackfill";
2964   if (f & CEPH_OSDMAP_NOREBALANCE)
2965     s += ",norebalance";
2966   if (f & CEPH_OSDMAP_NORECOVER)
2967     s += ",norecover";
2968   if (f & CEPH_OSDMAP_NOSCRUB)
2969     s += ",noscrub";
2970   if (f & CEPH_OSDMAP_NODEEP_SCRUB)
2971     s += ",nodeep-scrub";
2972   if (f & CEPH_OSDMAP_NOTIERAGENT)
2973     s += ",notieragent";
2974   if (f & CEPH_OSDMAP_SORTBITWISE)
2975     s += ",sortbitwise";
2976   if (f & CEPH_OSDMAP_REQUIRE_JEWEL)
2977     s += ",require_jewel_osds";
2978   if (f & CEPH_OSDMAP_REQUIRE_KRAKEN)
2979     s += ",require_kraken_osds";
2980   if (f & CEPH_OSDMAP_REQUIRE_LUMINOUS)
2981     s += ",require_luminous_osds";
2982   if (f & CEPH_OSDMAP_RECOVERY_DELETES)
2983     s += ",recovery_deletes";
2984   if (f & CEPH_OSDMAP_PURGED_SNAPDIRS)
2985     s += ",purged_snapdirs";
2986   if (s.length())
2987     s.erase(0, 1);
2988   return s;
2989 }
2990
2991 string OSDMap::get_flag_string() const
2992 {
2993   return get_flag_string(flags);
2994 }
2995
2996 void OSDMap::print_pools(ostream& out) const
2997 {
2998   for (const auto &pool : pools) {
2999     std::string name("<unknown>");
3000     const auto &pni = pool_name.find(pool.first);
3001     if (pni != pool_name.end())
3002       name = pni->second;
3003     out << "pool " << pool.first
3004         << " '" << name
3005         << "' " << pool.second << "\n";
3006
3007     for (const auto &snap : pool.second.snaps)
3008       out << "\tsnap " << snap.second.snapid << " '" << snap.second.name << "' " << snap.second.stamp << "\n";
3009
3010     if (!pool.second.removed_snaps.empty())
3011       out << "\tremoved_snaps " << pool.second.removed_snaps << "\n";
3012   }
3013   out << std::endl;
3014 }
3015
3016 void OSDMap::print(ostream& out) const
3017 {
3018   out << "epoch " << get_epoch() << "\n"
3019       << "fsid " << get_fsid() << "\n"
3020       << "created " << get_created() << "\n"
3021       << "modified " << get_modified() << "\n";
3022
3023   out << "flags " << get_flag_string() << "\n";
3024   out << "crush_version " << get_crush_version() << "\n";
3025   out << "full_ratio " << full_ratio << "\n";
3026   out << "backfillfull_ratio " << backfillfull_ratio << "\n";
3027   out << "nearfull_ratio " << nearfull_ratio << "\n";
3028   if (require_min_compat_client > 0) {
3029     out << "require_min_compat_client "
3030         << ceph_release_name(require_min_compat_client) << "\n";
3031   }
3032   out << "min_compat_client " << ceph_release_name(get_min_compat_client())
3033       << "\n";
3034   if (require_osd_release > 0) {
3035     out << "require_osd_release " << ceph_release_name(require_osd_release)
3036         << "\n";
3037   }
3038   if (get_cluster_snapshot().length())
3039     out << "cluster_snapshot " << get_cluster_snapshot() << "\n";
3040   out << "\n";
3041
3042   print_pools(out);
3043
3044   out << "max_osd " << get_max_osd() << "\n";
3045   for (int i=0; i<get_max_osd(); i++) {
3046     if (exists(i)) {
3047       out << "osd." << i;
3048       out << (is_up(i) ? " up  ":" down");
3049       out << (is_in(i) ? " in ":" out");
3050       out << " weight " << get_weightf(i);
3051       if (get_primary_affinity(i) != CEPH_OSD_DEFAULT_PRIMARY_AFFINITY)
3052         out << " primary_affinity " << get_primary_affinityf(i);
3053       const osd_info_t& info(get_info(i));
3054       out << " " << info;
3055       out << " " << get_addr(i) << " " << get_cluster_addr(i) << " " << get_hb_back_addr(i)
3056           << " " << get_hb_front_addr(i);
3057       set<string> st;
3058       get_state(i, st);
3059       out << " " << st;
3060       if (!get_uuid(i).is_zero())
3061         out << " " << get_uuid(i);
3062       out << "\n";
3063     }
3064   }
3065   out << std::endl;
3066
3067   for (auto& p : pg_upmap) {
3068     out << "pg_upmap " << p.first << " " << p.second << "\n";
3069   }
3070   for (auto& p : pg_upmap_items) {
3071     out << "pg_upmap_items " << p.first << " " << p.second << "\n";
3072   }
3073
3074   for (const auto pg : *pg_temp)
3075     out << "pg_temp " << pg.first << " " << pg.second << "\n";
3076
3077   for (const auto pg : *primary_temp)
3078     out << "primary_temp " << pg.first << " " << pg.second << "\n";
3079
3080   for (const auto &addr : blacklist)
3081     out << "blacklist " << addr.first << " expires " << addr.second << "\n";
3082
3083   // ignore pg_swap_primary
3084 }
3085
3086 class OSDTreePlainDumper : public CrushTreeDumper::Dumper<TextTable> {
3087 public:
3088   typedef CrushTreeDumper::Dumper<TextTable> Parent;
3089
3090   OSDTreePlainDumper(const CrushWrapper *crush, const OSDMap *osdmap_,
3091                      unsigned f)
3092     : Parent(crush, osdmap_->get_pool_names()), osdmap(osdmap_), filter(f) { }
3093
3094   bool should_dump_leaf(int i) const override {
3095     if (!filter) {
3096       return true; // normal case
3097     }
3098     if (((filter & OSDMap::DUMP_UP) && osdmap->is_up(i)) ||
3099         ((filter & OSDMap::DUMP_DOWN) && osdmap->is_down(i)) ||
3100         ((filter & OSDMap::DUMP_IN) && osdmap->is_in(i)) ||
3101         ((filter & OSDMap::DUMP_OUT) && osdmap->is_out(i)) ||
3102         ((filter & OSDMap::DUMP_DESTROYED) && osdmap->is_destroyed(i))) {
3103       return true;
3104     }
3105     return false;
3106   }
3107
3108   bool should_dump_empty_bucket() const override {
3109     return !filter;
3110   }
3111
3112   void dump(TextTable *tbl) {
3113     tbl->define_column("ID", TextTable::LEFT, TextTable::RIGHT);
3114     tbl->define_column("CLASS", TextTable::LEFT, TextTable::RIGHT);
3115     tbl->define_column("WEIGHT", TextTable::LEFT, TextTable::RIGHT);
3116     tbl->define_column("TYPE NAME", TextTable::LEFT, TextTable::LEFT);
3117     tbl->define_column("STATUS", TextTable::LEFT, TextTable::RIGHT);
3118     tbl->define_column("REWEIGHT", TextTable::LEFT, TextTable::RIGHT);
3119     tbl->define_column("PRI-AFF", TextTable::LEFT, TextTable::RIGHT);
3120
3121     Parent::dump(tbl);
3122
3123     for (int i = 0; i < osdmap->get_max_osd(); i++) {
3124       if (osdmap->exists(i) && !is_touched(i) && should_dump_leaf(i)) {
3125         dump_item(CrushTreeDumper::Item(i, 0, 0, 0), tbl);
3126       }
3127     }
3128   }
3129
3130 protected:
3131   void dump_item(const CrushTreeDumper::Item &qi, TextTable *tbl) override {
3132     const char *c = crush->get_item_class(qi.id);
3133     if (!c)
3134       c = "";
3135     *tbl << qi.id
3136          << c
3137          << weightf_t(qi.weight);
3138
3139     ostringstream name;
3140     for (int k = 0; k < qi.depth; k++)
3141       name << "    ";
3142     if (qi.is_bucket()) {
3143       name << crush->get_type_name(crush->get_bucket_type(qi.id)) << " "
3144            << crush->get_item_name(qi.id);
3145     } else {
3146       name << "osd." << qi.id;
3147     }
3148     *tbl << name.str();
3149
3150     if (!qi.is_bucket()) {
3151       if (!osdmap->exists(qi.id)) {
3152         *tbl << "DNE"
3153              << 0;
3154       } else {
3155         string s;
3156         if (osdmap->is_up(qi.id)) {
3157           s = "up";
3158         } else if (osdmap->is_destroyed(qi.id)) {
3159           s = "destroyed";
3160         } else {
3161           s = "down";
3162         }
3163         *tbl << s
3164              << weightf_t(osdmap->get_weightf(qi.id))
3165              << weightf_t(osdmap->get_primary_affinityf(qi.id));
3166       }
3167     }
3168     *tbl << TextTable::endrow;
3169   }
3170
3171 private:
3172   const OSDMap *osdmap;
3173   const unsigned filter;
3174 };
3175
3176 class OSDTreeFormattingDumper : public CrushTreeDumper::FormattingDumper {
3177 public:
3178   typedef CrushTreeDumper::FormattingDumper Parent;
3179
3180   OSDTreeFormattingDumper(const CrushWrapper *crush, const OSDMap *osdmap_,
3181                           unsigned f)
3182     : Parent(crush, osdmap_->get_pool_names()), osdmap(osdmap_), filter(f) { }
3183
3184   bool should_dump_leaf(int i) const override {
3185     if (!filter) {
3186       return true; // normal case
3187     }
3188     if (((filter & OSDMap::DUMP_UP) && osdmap->is_up(i)) ||
3189         ((filter & OSDMap::DUMP_DOWN) && osdmap->is_down(i)) ||
3190         ((filter & OSDMap::DUMP_IN) && osdmap->is_in(i)) ||
3191         ((filter & OSDMap::DUMP_OUT) && osdmap->is_out(i)) ||
3192         ((filter & OSDMap::DUMP_DESTROYED) && osdmap->is_destroyed(i))) {
3193       return true;
3194     }
3195     return false;
3196   }
3197
3198   bool should_dump_empty_bucket() const override {
3199     return !filter;
3200   }
3201
3202   void dump(Formatter *f) {
3203     f->open_array_section("nodes");
3204     Parent::dump(f);
3205     f->close_section();
3206     f->open_array_section("stray");
3207     for (int i = 0; i < osdmap->get_max_osd(); i++) {
3208       if (osdmap->exists(i) && !is_touched(i) && should_dump_leaf(i))
3209         dump_item(CrushTreeDumper::Item(i, 0, 0, 0), f);
3210     }
3211     f->close_section();
3212   }
3213
3214 protected:
3215   void dump_item_fields(const CrushTreeDumper::Item &qi, Formatter *f) override {
3216     Parent::dump_item_fields(qi, f);
3217     if (!qi.is_bucket())
3218     {
3219       string s;
3220       if (osdmap->is_up(qi.id)) {
3221         s = "up";
3222       } else if (osdmap->is_destroyed(qi.id)) {
3223         s = "destroyed";
3224       } else {
3225         s = "down";
3226       }
3227       f->dump_unsigned("exists", (int)osdmap->exists(qi.id));
3228       f->dump_string("status", s);
3229       f->dump_float("reweight", osdmap->get_weightf(qi.id));
3230       f->dump_float("primary_affinity", osdmap->get_primary_affinityf(qi.id));
3231     }
3232   }
3233
3234 private:
3235   const OSDMap *osdmap;
3236   const unsigned filter;
3237 };
3238
3239 void OSDMap::print_tree(Formatter *f, ostream *out, unsigned filter) const
3240 {
3241   if (f) {
3242     OSDTreeFormattingDumper(crush.get(), this, filter).dump(f);
3243   } else {
3244     assert(out);
3245     TextTable tbl;
3246     OSDTreePlainDumper(crush.get(), this, filter).dump(&tbl);
3247     *out << tbl;
3248   }
3249 }
3250
3251 void OSDMap::print_summary(Formatter *f, ostream& out,
3252                            const string& prefix) const
3253 {
3254   if (f) {
3255     f->open_object_section("osdmap");
3256     f->dump_int("epoch", get_epoch());
3257     f->dump_int("num_osds", get_num_osds());
3258     f->dump_int("num_up_osds", get_num_up_osds());
3259     f->dump_int("num_in_osds", get_num_in_osds());
3260     f->dump_bool("full", test_flag(CEPH_OSDMAP_FULL) ? true : false);
3261     f->dump_bool("nearfull", test_flag(CEPH_OSDMAP_NEARFULL) ? true : false);
3262     f->dump_unsigned("num_remapped_pgs", get_num_pg_temp());
3263     f->close_section();
3264   } else {
3265     out << get_num_osds() << " osds: "
3266         << get_num_up_osds() << " up, "
3267         << get_num_in_osds() << " in";
3268     if (get_num_pg_temp())
3269       out << "; " << get_num_pg_temp() << " remapped pgs";
3270     out << "\n";
3271     uint64_t important_flags = flags & ~CEPH_OSDMAP_SEMIHIDDEN_FLAGS;
3272     if (important_flags)
3273       out << prefix << "flags " << get_flag_string(important_flags) << "\n";
3274   }
3275 }
3276
3277 void OSDMap::print_oneline_summary(ostream& out) const
3278 {
3279   out << "e" << get_epoch() << ": "
3280       << get_num_osds() << " total, "
3281       << get_num_up_osds() << " up, "
3282       << get_num_in_osds() << " in";
3283   if (test_flag(CEPH_OSDMAP_FULL))
3284     out << " full";
3285   else if (test_flag(CEPH_OSDMAP_NEARFULL))
3286     out << " nearfull";
3287 }
3288
3289 bool OSDMap::crush_rule_in_use(int rule_id) const
3290 {
3291   for (const auto &pool : pools) {
3292     if (pool.second.crush_rule == rule_id)
3293       return true;
3294   }
3295   return false;
3296 }
3297
3298 int OSDMap::validate_crush_rules(CrushWrapper *newcrush,
3299                                  ostream *ss) const
3300 {
3301   for (auto& i : pools) {
3302     auto& pool = i.second;
3303     int ruleno = pool.get_crush_rule();
3304     if (!newcrush->rule_exists(ruleno)) {
3305       *ss << "pool " << i.first << " references crush_rule " << ruleno
3306           << " but it is not present";
3307       return -EINVAL;
3308     }
3309     if (newcrush->get_rule_mask_ruleset(ruleno) != ruleno) {
3310       *ss << "rule " << ruleno << " mask ruleset does not match rule id";
3311       return -EINVAL;
3312     }
3313     if (newcrush->get_rule_mask_type(ruleno) != (int)pool.get_type()) {
3314       *ss << "pool " << i.first << " type does not match rule " << ruleno;
3315       return -EINVAL;
3316     }
3317     if (pool.get_size() < (int)newcrush->get_rule_mask_min_size(ruleno) ||
3318         pool.get_size() > (int)newcrush->get_rule_mask_max_size(ruleno)) {
3319       *ss << "pool " << i.first << " size " << pool.get_size() << " does not"
3320           << " fall within rule " << ruleno
3321           << " min_size " << newcrush->get_rule_mask_min_size(ruleno)
3322           << " and max_size " << newcrush->get_rule_mask_max_size(ruleno);
3323       return -EINVAL;
3324     }
3325   }
3326   return 0;
3327 }
3328
3329 int OSDMap::build_simple_optioned(CephContext *cct, epoch_t e, uuid_d &fsid,
3330                                   int nosd, int pg_bits, int pgp_bits,
3331                                   bool default_pool)
3332 {
3333   ldout(cct, 10) << "build_simple on " << nosd
3334                  << " osds" << dendl;
3335   epoch = e;
3336   set_fsid(fsid);
3337   created = modified = ceph_clock_now();
3338
3339   if (nosd >=  0) {
3340     set_max_osd(nosd);
3341   } else {
3342     // count osds
3343     int maxosd = 0;
3344     const md_config_t *conf = cct->_conf;
3345     vector<string> sections;
3346     conf->get_all_sections(sections);
3347
3348     for (auto &section : sections) {
3349       if (section.find("osd.") != 0)
3350         continue;
3351
3352       const char *begin = section.c_str() + 4;
3353       char *end = (char*)begin;
3354       int o = strtol(begin, &end, 10);
3355       if (*end != '\0')
3356         continue;
3357
3358       if (o > cct->_conf->mon_max_osd) {
3359         lderr(cct) << "[osd." << o << "] in config has id > mon_max_osd " << cct->_conf->mon_max_osd << dendl;
3360         return -ERANGE;
3361       }
3362
3363       if (o > maxosd)
3364         maxosd = o;
3365     }
3366
3367     set_max_osd(maxosd + 1);
3368   }
3369
3370
3371   stringstream ss;
3372   int r;
3373   if (nosd >= 0)
3374     r = build_simple_crush_map(cct, *crush, nosd, &ss);
3375   else
3376     r = build_simple_crush_map_from_conf(cct, *crush, &ss);
3377   assert(r == 0);
3378
3379   int poolbase = get_max_osd() ? get_max_osd() : 1;
3380
3381   const int default_replicated_rule = crush->get_osd_pool_default_crush_replicated_ruleset(cct);
3382   assert(default_replicated_rule >= 0);
3383
3384   if (default_pool) {
3385     // pgp_num <= pg_num
3386     if (pgp_bits > pg_bits)
3387       pgp_bits = pg_bits;
3388
3389     vector<string> pool_names;
3390     pool_names.push_back("rbd");
3391     for (auto &plname : pool_names) {
3392       int64_t pool = ++pool_max;
3393       pools[pool].type = pg_pool_t::TYPE_REPLICATED;
3394       pools[pool].flags = cct->_conf->osd_pool_default_flags;
3395       if (cct->_conf->osd_pool_default_flag_hashpspool)
3396         pools[pool].set_flag(pg_pool_t::FLAG_HASHPSPOOL);
3397       if (cct->_conf->osd_pool_default_flag_nodelete)
3398         pools[pool].set_flag(pg_pool_t::FLAG_NODELETE);
3399       if (cct->_conf->osd_pool_default_flag_nopgchange)
3400         pools[pool].set_flag(pg_pool_t::FLAG_NOPGCHANGE);
3401       if (cct->_conf->osd_pool_default_flag_nosizechange)
3402         pools[pool].set_flag(pg_pool_t::FLAG_NOSIZECHANGE);
3403       pools[pool].size = cct->_conf->osd_pool_default_size;
3404       pools[pool].min_size = cct->_conf->get_osd_pool_default_min_size();
3405       pools[pool].crush_rule = default_replicated_rule;
3406       pools[pool].object_hash = CEPH_STR_HASH_RJENKINS;
3407       pools[pool].set_pg_num(poolbase << pg_bits);
3408       pools[pool].set_pgp_num(poolbase << pgp_bits);
3409       pools[pool].last_change = epoch;
3410       pools[pool].application_metadata.insert(
3411         {pg_pool_t::APPLICATION_NAME_RBD, {}});
3412       pool_name[pool] = plname;
3413       name_pool[plname] = pool;
3414     }
3415   }
3416
3417   for (int i=0; i<get_max_osd(); i++) {
3418     set_state(i, 0);
3419     set_weight(i, CEPH_OSD_OUT);
3420   }
3421
3422   map<string,string> profile_map;
3423   r = get_erasure_code_profile_default(cct, profile_map, &ss);
3424   if (r < 0) {
3425     lderr(cct) << ss.str() << dendl;
3426     return r;
3427   }
3428   set_erasure_code_profile("default", profile_map);
3429   return 0;
3430 }
3431
3432 int OSDMap::get_erasure_code_profile_default(CephContext *cct,
3433                                              map<string,string> &profile_map,
3434                                              ostream *ss)
3435 {
3436   int r = get_json_str_map(cct->_conf->osd_pool_default_erasure_code_profile,
3437                       *ss,
3438                       &profile_map);
3439   return r;
3440 }
3441
3442 int OSDMap::_build_crush_types(CrushWrapper& crush)
3443 {
3444   crush.set_type_name(0, "osd");
3445   crush.set_type_name(1, "host");
3446   crush.set_type_name(2, "chassis");
3447   crush.set_type_name(3, "rack");
3448   crush.set_type_name(4, "row");
3449   crush.set_type_name(5, "pdu");
3450   crush.set_type_name(6, "pod");
3451   crush.set_type_name(7, "room");
3452   crush.set_type_name(8, "datacenter");
3453   crush.set_type_name(9, "region");
3454   crush.set_type_name(10, "root");
3455   return 10;
3456 }
3457
3458 int OSDMap::build_simple_crush_map(CephContext *cct, CrushWrapper& crush,
3459                                    int nosd, ostream *ss)
3460 {
3461   crush.create();
3462
3463   // root
3464   int root_type = _build_crush_types(crush);
3465   int rootid;
3466   int r = crush.add_bucket(0, 0, CRUSH_HASH_DEFAULT,
3467                            root_type, 0, NULL, NULL, &rootid);
3468   assert(r == 0);
3469   crush.set_item_name(rootid, "default");
3470
3471   for (int o=0; o<nosd; o++) {
3472     map<string,string> loc;
3473     loc["host"] = "localhost";
3474     loc["rack"] = "localrack";
3475     loc["root"] = "default";
3476     ldout(cct, 10) << " adding osd." << o << " at " << loc << dendl;
3477     char name[32];
3478     snprintf(name, sizeof(name), "osd.%d", o);
3479     crush.insert_item(cct, o, 1.0, name, loc);
3480   }
3481
3482   build_simple_crush_rules(cct, crush, "default", ss);
3483
3484   crush.finalize();
3485
3486   return 0;
3487 }
3488
3489 int OSDMap::build_simple_crush_map_from_conf(CephContext *cct,
3490                                              CrushWrapper& crush,
3491                                              ostream *ss)
3492 {
3493   const md_config_t *conf = cct->_conf;
3494
3495   crush.create();
3496
3497   // root
3498   int root_type = _build_crush_types(crush);
3499   int rootid;
3500   int r = crush.add_bucket(0, 0,
3501                            CRUSH_HASH_DEFAULT,
3502                            root_type, 0, NULL, NULL, &rootid);
3503   assert(r == 0);
3504   crush.set_item_name(rootid, "default");
3505
3506   // add osds
3507   vector<string> sections;
3508   conf->get_all_sections(sections);
3509
3510   for (auto &section : sections) {
3511     if (section.find("osd.") != 0)
3512       continue;
3513
3514     const char *begin = section.c_str() + 4;
3515     char *end = (char*)begin;
3516     int o = strtol(begin, &end, 10);
3517     if (*end != '\0')
3518       continue;
3519
3520     string host, rack, row, room, dc, pool;
3521     vector<string> sectiontmp;
3522     sectiontmp.push_back("osd");
3523     sectiontmp.push_back(section);
3524     conf->get_val_from_conf_file(sectiontmp, "host", host, false);
3525     conf->get_val_from_conf_file(sectiontmp, "rack", rack, false);
3526     conf->get_val_from_conf_file(sectiontmp, "row", row, false);
3527     conf->get_val_from_conf_file(sectiontmp, "room", room, false);
3528     conf->get_val_from_conf_file(sectiontmp, "datacenter", dc, false);
3529     conf->get_val_from_conf_file(sectiontmp, "root", pool, false);
3530
3531     if (host.length() == 0)
3532       host = "unknownhost";
3533     if (rack.length() == 0)
3534       rack = "unknownrack";
3535
3536     map<string,string> loc;
3537     loc["host"] = host;
3538     loc["rack"] = rack;
3539     if (row.size())
3540       loc["row"] = row;
3541     if (room.size())
3542       loc["room"] = room;
3543     if (dc.size())
3544       loc["datacenter"] = dc;
3545     loc["root"] = "default";
3546
3547     ldout(cct, 5) << " adding osd." << o << " at " << loc << dendl;
3548     crush.insert_item(cct, o, 1.0, section, loc);
3549   }
3550
3551   build_simple_crush_rules(cct, crush, "default", ss);
3552
3553   crush.finalize();
3554
3555   return 0;
3556 }
3557
3558
3559 int OSDMap::build_simple_crush_rules(
3560   CephContext *cct,
3561   CrushWrapper& crush,
3562   const string& root,
3563   ostream *ss)
3564 {
3565   int crush_rule = crush.get_osd_pool_default_crush_replicated_ruleset(cct);
3566   string failure_domain =
3567     crush.get_type_name(cct->_conf->osd_crush_chooseleaf_type);
3568
3569   int r;
3570   r = crush.add_simple_rule_at(
3571     "replicated_rule", root, failure_domain, "",
3572     "firstn", pg_pool_t::TYPE_REPLICATED,
3573     crush_rule, ss);
3574   if (r < 0)
3575     return r;
3576   // do not add an erasure rule by default or else we will implicitly
3577   // require the crush_v2 feature of clients
3578   return 0;
3579 }
3580
3581 int OSDMap::summarize_mapping_stats(
3582   OSDMap *newmap,
3583   const set<int64_t> *pools,
3584   std::string *out,
3585   Formatter *f) const
3586 {
3587   set<int64_t> ls;
3588   if (pools) {
3589     ls = *pools;
3590   } else {
3591     for (auto &p : get_pools())
3592       ls.insert(p.first);
3593   }
3594
3595   unsigned total_pg = 0;
3596   unsigned moved_pg = 0;
3597   vector<unsigned> base_by_osd(get_max_osd(), 0);
3598   vector<unsigned> new_by_osd(get_max_osd(), 0);
3599   for (int64_t pool_id : ls) {
3600     const pg_pool_t *pi = get_pg_pool(pool_id);
3601     vector<int> up, up2;
3602     int up_primary;
3603     for (unsigned ps = 0; ps < pi->get_pg_num(); ++ps) {
3604       pg_t pgid(ps, pool_id, -1);
3605       total_pg += pi->get_size();
3606       pg_to_up_acting_osds(pgid, &up, &up_primary, nullptr, nullptr);
3607       for (int osd : up) {
3608         if (osd >= 0 && osd < get_max_osd())
3609           ++base_by_osd[osd];
3610       }
3611       if (newmap) {
3612         newmap->pg_to_up_acting_osds(pgid, &up2, &up_primary, nullptr, nullptr);
3613         for (int osd : up2) {
3614           if (osd >= 0 && osd < get_max_osd())
3615             ++new_by_osd[osd];
3616         }
3617         if (pi->type == pg_pool_t::TYPE_ERASURE) {
3618           for (unsigned i=0; i<up.size(); ++i) {
3619             if (up[i] != up2[i]) {
3620               ++moved_pg;
3621             }
3622           }
3623         } else if (pi->type == pg_pool_t::TYPE_REPLICATED) {
3624           for (int osd : up) {
3625             if (std::find(up2.begin(), up2.end(), osd) == up2.end()) {
3626               ++moved_pg;
3627             }
3628           }
3629         } else {
3630           assert(0 == "unhandled pool type");
3631         }
3632       }
3633     }
3634   }
3635
3636   unsigned num_up_in = 0;
3637   for (int osd = 0; osd < get_max_osd(); ++osd) {
3638     if (is_up(osd) && is_in(osd))
3639       ++num_up_in;
3640   }
3641   if (!num_up_in) {
3642     return -EINVAL;
3643   }
3644
3645   float avg_pg = (float)total_pg / (float)num_up_in;
3646   float base_stddev = 0, new_stddev = 0;
3647   int min = -1, max = -1;
3648   unsigned min_base_pg = 0, max_base_pg = 0;
3649   unsigned min_new_pg = 0, max_new_pg = 0;
3650   for (int osd = 0; osd < get_max_osd(); ++osd) {
3651     if (is_up(osd) && is_in(osd)) {
3652       float base_diff = (float)base_by_osd[osd] - avg_pg;
3653       base_stddev += base_diff * base_diff;
3654       float new_diff = (float)new_by_osd[osd] - avg_pg;
3655       new_stddev += new_diff * new_diff;
3656       if (min < 0 || base_by_osd[osd] < min_base_pg) {
3657         min = osd;
3658         min_base_pg = base_by_osd[osd];
3659         min_new_pg = new_by_osd[osd];
3660       }
3661       if (max < 0 || base_by_osd[osd] > max_base_pg) {
3662         max = osd;
3663         max_base_pg = base_by_osd[osd];
3664         max_new_pg = new_by_osd[osd];
3665       }
3666     }
3667   }
3668   base_stddev = sqrt(base_stddev / num_up_in);
3669   new_stddev = sqrt(new_stddev / num_up_in);
3670
3671   float edev = sqrt(avg_pg * (1.0 - (1.0 / (double)num_up_in)));
3672
3673   ostringstream ss;
3674   if (f)
3675     f->open_object_section("utilization");
3676   if (newmap) {
3677     if (f) {
3678       f->dump_unsigned("moved_pgs", moved_pg);
3679       f->dump_unsigned("total_pgs", total_pg);
3680     } else {
3681       float percent = 0;
3682       if (total_pg)
3683         percent = (float)moved_pg * 100.0 / (float)total_pg;
3684       ss << "moved " << moved_pg << " / " << total_pg
3685          << " (" << percent << "%)\n";
3686     }
3687   }
3688   if (f) {
3689     f->dump_float("avg_pgs", avg_pg);
3690     f->dump_float("std_dev", base_stddev);
3691     f->dump_float("expected_baseline_std_dev", edev);
3692     if (newmap)
3693       f->dump_float("new_std_dev", new_stddev);
3694   } else {
3695     ss << "avg " << avg_pg << "\n";
3696     ss << "stddev " << base_stddev;
3697     if (newmap)
3698       ss << " -> " << new_stddev;
3699     ss << " (expected baseline " << edev << ")\n";
3700   }
3701   if (min >= 0) {
3702     if (f) {
3703       f->dump_unsigned("min_osd", min);
3704       f->dump_unsigned("min_osd_pgs", min_base_pg);
3705       if (newmap)
3706         f->dump_unsigned("new_min_osd_pgs", min_new_pg);
3707     } else {
3708       ss << "min osd." << min << " with " << min_base_pg;
3709       if (newmap)
3710         ss << " -> " << min_new_pg;
3711       ss << " pgs (" << (float)min_base_pg / avg_pg;
3712       if (newmap)
3713         ss << " -> " << (float)min_new_pg / avg_pg;
3714       ss << " * mean)\n";
3715     }
3716   }
3717   if (max >= 0) {
3718     if (f) {
3719       f->dump_unsigned("max_osd", max);
3720       f->dump_unsigned("max_osd_pgs", max_base_pg);
3721       if (newmap)
3722         f->dump_unsigned("new_max_osd_pgs", max_new_pg);
3723     } else {
3724       ss << "max osd." << max << " with " << max_base_pg;
3725       if (newmap)
3726         ss << " -> " << max_new_pg;
3727       ss << " pgs (" << (float)max_base_pg / avg_pg;
3728       if (newmap)
3729         ss << " -> " << (float)max_new_pg / avg_pg;
3730       ss << " * mean)\n";
3731     }
3732   }
3733   if (f)
3734     f->close_section();
3735   if (out)
3736     *out = ss.str();
3737   return 0;
3738 }
3739
3740
3741 int OSDMap::clean_pg_upmaps(
3742   CephContext *cct,
3743   Incremental *pending_inc)
3744 {
3745   ldout(cct, 10) << __func__ << dendl;
3746   int changed = 0;
3747   for (auto& p : pg_upmap) {
3748     vector<int> raw;
3749     int primary;
3750     pg_to_raw_osds(p.first, &raw, &primary);
3751     if (vectors_equal(raw, p.second)) {
3752       ldout(cct, 10) << " removing redundant pg_upmap " << p.first << " "
3753                      << p.second << dendl;
3754       pending_inc->old_pg_upmap.insert(p.first);
3755       ++changed;
3756     }
3757   }
3758   for (auto& p : pg_upmap_items) {
3759     vector<int> raw;
3760     int primary;
3761     pg_to_raw_osds(p.first, &raw, &primary);
3762     mempool::osdmap::vector<pair<int,int>> newmap;
3763     for (auto& q : p.second) {
3764       if (std::find(raw.begin(), raw.end(), q.first) != raw.end()) {
3765         newmap.push_back(q);
3766       }
3767     }
3768     if (newmap.empty()) {
3769       ldout(cct, 10) << " removing no-op pg_upmap_items " << p.first << " "
3770                      << p.second << dendl;
3771       pending_inc->old_pg_upmap_items.insert(p.first);
3772       ++changed;
3773     } else if (newmap != p.second) {
3774       ldout(cct, 10) << " simplifying partially no-op pg_upmap_items "
3775                      << p.first << " " << p.second << " -> " << newmap << dendl;
3776       pending_inc->new_pg_upmap_items[p.first] = newmap;
3777       ++changed;
3778     }
3779   }
3780   return changed;
3781 }
3782
3783 bool OSDMap::try_pg_upmap(
3784   CephContext *cct,
3785   pg_t pg,                       ///< pg to potentially remap
3786   const set<int>& overfull,      ///< osds we'd want to evacuate
3787   const vector<int>& underfull,  ///< osds to move to, in order of preference
3788   vector<int> *orig,
3789   vector<int> *out)              ///< resulting alternative mapping
3790 {
3791   const pg_pool_t *pool = get_pg_pool(pg.pool());
3792   if (!pool)
3793     return false;
3794   int rule = crush->find_rule(pool->get_crush_rule(), pool->get_type(),
3795                               pool->get_size());
3796   if (rule < 0)
3797     return false;
3798
3799   // get original mapping
3800   _pg_to_raw_osds(*pool, pg, orig, NULL);
3801
3802   // make sure there is something there to remap
3803   bool any = false;
3804   for (auto osd : *orig) {
3805     if (overfull.count(osd)) {
3806       any = true;
3807       break;
3808     }
3809   }
3810   if (!any) {
3811     return false;
3812   }
3813
3814   int r = crush->try_remap_rule(
3815     cct,
3816     rule,
3817     pool->get_size(),
3818     overfull, underfull,
3819     *orig,
3820     out);
3821   if (r < 0)
3822     return false;
3823   if (*out == *orig)
3824     return false;
3825   return true;
3826 }
3827
3828 int OSDMap::calc_pg_upmaps(
3829   CephContext *cct,
3830   float max_deviation_ratio,
3831   int max,
3832   const set<int64_t>& only_pools_orig,
3833   OSDMap::Incremental *pending_inc)
3834 {
3835   set<int64_t> only_pools;
3836   if (only_pools_orig.empty()) {
3837     for (auto& i : pools) {
3838       only_pools.insert(i.first);
3839     }
3840   } else {
3841     only_pools = only_pools_orig;
3842   }
3843   OSDMap tmp;
3844   tmp.deepish_copy_from(*this);
3845   float start_deviation = 0;
3846   float end_deviation = 0;
3847   int num_changed = 0;
3848   while (true) {
3849     map<int,set<pg_t>> pgs_by_osd;
3850     int total_pgs = 0;
3851     float osd_weight_total = 0;
3852     map<int,float> osd_weight;
3853     for (auto& i : pools) {
3854       if (!only_pools.empty() && !only_pools.count(i.first))
3855         continue;
3856       for (unsigned ps = 0; ps < i.second.get_pg_num(); ++ps) {
3857         pg_t pg(ps, i.first);
3858         vector<int> up;
3859         tmp.pg_to_up_acting_osds(pg, &up, nullptr, nullptr, nullptr);
3860         for (auto osd : up) {
3861           if (osd != CRUSH_ITEM_NONE)
3862             pgs_by_osd[osd].insert(pg);
3863         }
3864       }
3865       total_pgs += i.second.get_size() * i.second.get_pg_num();
3866
3867       map<int,float> pmap;
3868       int ruleno = tmp.crush->find_rule(i.second.get_crush_rule(),
3869                                         i.second.get_type(),
3870                                         i.second.get_size());
3871       tmp.crush->get_rule_weight_osd_map(ruleno, &pmap);
3872       ldout(cct,30) << __func__ << " pool " << i.first << " ruleno " << ruleno << dendl;
3873       for (auto p : pmap) {
3874         auto adjusted_weight = tmp.get_weightf(p.first) * p.second;
3875         osd_weight[p.first] += adjusted_weight;
3876         osd_weight_total += adjusted_weight;
3877       }
3878     }
3879     for (auto& i : osd_weight) {
3880       int pgs = 0;
3881       auto p = pgs_by_osd.find(i.first);
3882       if (p != pgs_by_osd.end())
3883         pgs = p->second.size();
3884       else
3885         pgs_by_osd.emplace(i.first, set<pg_t>());
3886       ldout(cct, 20) << " osd." << i.first << " weight " << i.second
3887                      << " pgs " << pgs << dendl;
3888     }
3889
3890     if (osd_weight_total == 0) {
3891       lderr(cct) << __func__ << " abort due to osd_weight_total == 0" << dendl;
3892       break;
3893     }
3894     float pgs_per_weight = total_pgs / osd_weight_total;
3895     ldout(cct, 10) << " osd_weight_total " << osd_weight_total << dendl;
3896     ldout(cct, 10) << " pgs_per_weight " << pgs_per_weight << dendl;
3897
3898     // osd deviation
3899     float total_deviation = 0;
3900     map<int,float> osd_deviation;       // osd, deviation(pgs)
3901     multimap<float,int> deviation_osd;  // deviation(pgs), osd
3902     set<int> overfull;
3903     for (auto& i : pgs_by_osd) {
3904       float target = osd_weight[i.first] * pgs_per_weight;
3905       float deviation = (float)i.second.size() - target;
3906       ldout(cct, 20) << " osd." << i.first
3907                      << "\tpgs " << i.second.size()
3908                      << "\ttarget " << target
3909                      << "\tdeviation " << deviation
3910                      << dendl;
3911       osd_deviation[i.first] = deviation;
3912       deviation_osd.insert(make_pair(deviation, i.first));
3913       if (deviation >= 1.0)
3914         overfull.insert(i.first);
3915       total_deviation += abs(deviation);
3916     }
3917     if (num_changed == 0) {
3918       start_deviation = total_deviation;
3919     }
3920     end_deviation = total_deviation;
3921
3922     // build underfull, sorted from least-full to most-average
3923     vector<int> underfull;
3924     for (auto i = deviation_osd.begin();
3925          i != deviation_osd.end();
3926          ++i) {
3927       if (i->first >= -.999)
3928         break;
3929       underfull.push_back(i->second);
3930     }
3931     ldout(cct, 10) << " total_deviation " << total_deviation
3932                    << " overfull " << overfull
3933                    << " underfull " << underfull << dendl;
3934     if (overfull.empty() || underfull.empty())
3935       break;
3936
3937     // pick fullest
3938     bool restart = false;
3939     for (auto p = deviation_osd.rbegin(); p != deviation_osd.rend(); ++p) {
3940       int osd = p->second;
3941       float deviation = p->first;
3942       float target = osd_weight[osd] * pgs_per_weight;
3943       assert(target > 0);
3944       if (deviation/target < max_deviation_ratio) {
3945         ldout(cct, 10) << " osd." << osd
3946                        << " target " << target
3947                        << " deviation " << deviation
3948                        << " -> ratio " << deviation/target
3949                        << " < max ratio " << max_deviation_ratio << dendl;
3950         break;
3951       }
3952       int num_to_move = deviation;
3953       ldout(cct, 10) << " osd." << osd << " move " << num_to_move << dendl;
3954       if (num_to_move < 1)
3955         break;
3956
3957       set<pg_t>& pgs = pgs_by_osd[osd];
3958
3959       // look for remaps we can un-remap
3960       for (auto pg : pgs) {
3961         auto p = tmp.pg_upmap_items.find(pg);
3962         if (p != tmp.pg_upmap_items.end()) {
3963           for (auto q : p->second) {
3964             if (q.second == osd) {
3965               ldout(cct, 10) << "  dropping pg_upmap_items " << pg
3966                              << " " << p->second << dendl;
3967               tmp.pg_upmap_items.erase(p);
3968               pending_inc->old_pg_upmap_items.insert(pg);
3969               ++num_changed;
3970               restart = true;
3971             }
3972           }
3973         }
3974         if (restart)
3975           break;
3976       } // pg loop
3977       if (restart)
3978         break;
3979
3980       for (auto pg : pgs) {
3981         if (tmp.pg_upmap.count(pg) ||
3982             tmp.pg_upmap_items.count(pg)) {
3983           ldout(cct, 20) << "  already remapped " << pg << dendl;
3984           continue;
3985         }
3986         ldout(cct, 10) << "  trying " << pg << dendl;
3987         vector<int> orig, out;
3988         if (!try_pg_upmap(cct, pg, overfull, underfull, &orig, &out)) {
3989           continue;
3990         }
3991         ldout(cct, 10) << "  " << pg << " " << orig << " -> " << out << dendl;
3992         if (orig.size() != out.size()) {
3993           continue;
3994         }
3995         assert(orig != out);
3996         auto& rmi = tmp.pg_upmap_items[pg];
3997         for (unsigned i = 0; i < out.size(); ++i) {
3998           if (orig[i] != out[i]) {
3999             rmi.push_back(make_pair(orig[i], out[i]));
4000           }
4001         }
4002         pending_inc->new_pg_upmap_items[pg] = rmi;
4003         ldout(cct, 10) << "  " << pg << " pg_upmap_items " << rmi << dendl;
4004         restart = true;
4005         ++num_changed;
4006         break;
4007       } // pg loop
4008       if (restart)
4009         break;
4010     } // osd loop
4011
4012     if (!restart) {
4013       ldout(cct, 10) << " failed to find any changes to make" << dendl;
4014       break;
4015     }
4016     if (--max == 0) {
4017       ldout(cct, 10) << " hit max iterations, stopping" << dendl;
4018       break;
4019     }
4020   }
4021   ldout(cct, 10) << " start deviation " << start_deviation << dendl;
4022   ldout(cct, 10) << " end deviation " << end_deviation << dendl;
4023   return num_changed;
4024 }
4025
4026 int OSDMap::get_osds_by_bucket_name(const string &name, set<int> *osds) const
4027 {
4028   return crush->get_leaves(name, osds);
4029 }
4030
4031 // get pools whose crush rules might reference the given osd
4032 void OSDMap::get_pool_ids_by_osd(CephContext *cct,
4033                                 int osd,
4034                                 set<int64_t> *pool_ids) const
4035 {
4036   assert(pool_ids);
4037   set<int> raw_rules;
4038   int r = crush->get_rules_by_osd(osd, &raw_rules);
4039   if (r < 0) {
4040     lderr(cct) << __func__ << " get_rules_by_osd failed: " << cpp_strerror(r)
4041                << dendl;
4042     assert(r >= 0);
4043   }
4044   set<int> rules;
4045   for (auto &i: raw_rules) {
4046     // exclude any dead rule
4047     if (crush_rule_in_use(i)) {
4048       rules.insert(i);
4049     }
4050   }
4051   for (auto &r: rules) {
4052     get_pool_ids_by_rule(r, pool_ids);
4053   }
4054 }
4055
4056 template <typename F>
4057 class OSDUtilizationDumper : public CrushTreeDumper::Dumper<F> {
4058 public:
4059   typedef CrushTreeDumper::Dumper<F> Parent;
4060
4061   OSDUtilizationDumper(const CrushWrapper *crush, const OSDMap *osdmap_,
4062                        const PGStatService *pgs_, bool tree_) :
4063     Parent(crush, osdmap_->get_pool_names()),
4064     osdmap(osdmap_),
4065     pgs(pgs_),
4066     tree(tree_),
4067     average_util(average_utilization()),
4068     min_var(-1),
4069     max_var(-1),
4070     stddev(0),
4071     sum(0) {
4072   }
4073
4074 protected:
4075   void dump_stray(F *f) {
4076     for (int i = 0; i < osdmap->get_max_osd(); i++) {
4077       if (osdmap->exists(i) && !this->is_touched(i))
4078         dump_item(CrushTreeDumper::Item(i, 0, 0, 0), f);
4079     }
4080   }
4081
4082   void dump_item(const CrushTreeDumper::Item &qi, F *f) override {
4083     if (!tree && qi.is_bucket())
4084       return;
4085
4086     float reweight = qi.is_bucket() ? -1 : osdmap->get_weightf(qi.id);
4087     int64_t kb = 0, kb_used = 0, kb_avail = 0;
4088     double util = 0;
4089     if (get_bucket_utilization(qi.id, &kb, &kb_used, &kb_avail))
4090       if (kb_used && kb)
4091         util = 100.0 * (double)kb_used / (double)kb;
4092
4093     double var = 1.0;
4094     if (average_util)
4095       var = util / average_util;
4096
4097     size_t num_pgs = qi.is_bucket() ? 0 : pgs->get_num_pg_by_osd(qi.id);
4098
4099     dump_item(qi, reweight, kb, kb_used, kb_avail, util, var, num_pgs, f);
4100
4101     if (!qi.is_bucket() && reweight > 0) {
4102       if (min_var < 0 || var < min_var)
4103         min_var = var;
4104       if (max_var < 0 || var > max_var)
4105         max_var = var;
4106
4107       double dev = util - average_util;
4108       dev *= dev;
4109       stddev += reweight * dev;
4110       sum += reweight;
4111     }
4112   }
4113
4114   virtual void dump_item(const CrushTreeDumper::Item &qi,
4115                          float &reweight,
4116                          int64_t kb,
4117                          int64_t kb_used,
4118                          int64_t kb_avail,
4119                          double& util,
4120                          double& var,
4121                          const size_t num_pgs,
4122                          F *f) = 0;
4123
4124   double dev() {
4125     return sum > 0 ? sqrt(stddev / sum) : 0;
4126   }
4127
4128   double average_utilization() {
4129     int64_t kb = 0, kb_used = 0;
4130     for (int i = 0; i < osdmap->get_max_osd(); i++) {
4131       if (!osdmap->exists(i) || osdmap->get_weight(i) == 0)
4132         continue;
4133       int64_t kb_i, kb_used_i, kb_avail_i;
4134       if (get_osd_utilization(i, &kb_i, &kb_used_i, &kb_avail_i)) {
4135         kb += kb_i;
4136         kb_used += kb_used_i;
4137       }
4138     }
4139     return kb > 0 ? 100.0 * (double)kb_used / (double)kb : 0;
4140   }
4141
4142   bool get_osd_utilization(int id, int64_t* kb, int64_t* kb_used,
4143                            int64_t* kb_avail) const {
4144     const osd_stat_t *p = pgs->get_osd_stat(id);
4145     if (!p) return false;
4146     *kb = p->kb;
4147     *kb_used = p->kb_used;
4148     *kb_avail = p->kb_avail;
4149     return *kb > 0;
4150   }
4151
4152   bool get_bucket_utilization(int id, int64_t* kb, int64_t* kb_used,
4153                               int64_t* kb_avail) const {
4154     if (id >= 0) {
4155       if (osdmap->is_out(id)) {
4156         *kb = 0;
4157         *kb_used = 0;
4158         *kb_avail = 0;
4159         return true;
4160       }
4161       return get_osd_utilization(id, kb, kb_used, kb_avail);
4162     }
4163
4164     *kb = 0;
4165     *kb_used = 0;
4166     *kb_avail = 0;
4167
4168     for (int k = osdmap->crush->get_bucket_size(id) - 1; k >= 0; k--) {
4169       int item = osdmap->crush->get_bucket_item(id, k);
4170       int64_t kb_i = 0, kb_used_i = 0, kb_avail_i = 0;
4171       if (!get_bucket_utilization(item, &kb_i, &kb_used_i, &kb_avail_i))
4172         return false;
4173       *kb += kb_i;
4174       *kb_used += kb_used_i;
4175       *kb_avail += kb_avail_i;
4176     }
4177     return *kb > 0;
4178   }
4179
4180 protected:
4181   const OSDMap *osdmap;
4182   const PGStatService *pgs;
4183   bool tree;
4184   double average_util;
4185   double min_var;
4186   double max_var;
4187   double stddev;
4188   double sum;
4189 };
4190
4191
4192 class OSDUtilizationPlainDumper : public OSDUtilizationDumper<TextTable> {
4193 public:
4194   typedef OSDUtilizationDumper<TextTable> Parent;
4195
4196   OSDUtilizationPlainDumper(const CrushWrapper *crush, const OSDMap *osdmap,
4197                      const PGStatService *pgs, bool tree) :
4198     Parent(crush, osdmap, pgs, tree) {}
4199
4200   void dump(TextTable *tbl) {
4201     tbl->define_column("ID", TextTable::LEFT, TextTable::RIGHT);
4202     tbl->define_column("CLASS", TextTable::LEFT, TextTable::RIGHT);
4203     tbl->define_column("WEIGHT", TextTable::LEFT, TextTable::RIGHT);
4204     tbl->define_column("REWEIGHT", TextTable::LEFT, TextTable::RIGHT);
4205     tbl->define_column("SIZE", TextTable::LEFT, TextTable::RIGHT);
4206     tbl->define_column("USE", TextTable::LEFT, TextTable::RIGHT);
4207     tbl->define_column("AVAIL", TextTable::LEFT, TextTable::RIGHT);
4208     tbl->define_column("%USE", TextTable::LEFT, TextTable::RIGHT);
4209     tbl->define_column("VAR", TextTable::LEFT, TextTable::RIGHT);
4210     tbl->define_column("PGS", TextTable::LEFT, TextTable::RIGHT);
4211     if (tree)
4212       tbl->define_column("TYPE NAME", TextTable::LEFT, TextTable::LEFT);
4213
4214     Parent::dump(tbl);
4215
4216     dump_stray(tbl);
4217
4218     *tbl << ""
4219          << ""
4220          << "" << "TOTAL"
4221          << si_t(pgs->get_osd_sum().kb << 10)
4222          << si_t(pgs->get_osd_sum().kb_used << 10)
4223          << si_t(pgs->get_osd_sum().kb_avail << 10)
4224          << lowprecision_t(average_util)
4225          << ""
4226          << TextTable::endrow;
4227   }
4228
4229 protected:
4230   struct lowprecision_t {
4231     float v;
4232     explicit lowprecision_t(float _v) : v(_v) {}
4233   };
4234   friend std::ostream &operator<<(ostream& out, const lowprecision_t& v);
4235
4236   using OSDUtilizationDumper<TextTable>::dump_item;
4237   void dump_item(const CrushTreeDumper::Item &qi,
4238                          float &reweight,
4239                          int64_t kb,
4240                          int64_t kb_used,
4241                          int64_t kb_avail,
4242                          double& util,
4243                          double& var,
4244                          const size_t num_pgs,
4245                          TextTable *tbl) override {
4246     const char *c = crush->get_item_class(qi.id);
4247     if (!c)
4248       c = "";
4249     *tbl << qi.id
4250          << c
4251          << weightf_t(qi.weight)
4252          << weightf_t(reweight)
4253          << si_t(kb << 10)
4254          << si_t(kb_used << 10)
4255          << si_t(kb_avail << 10)
4256          << lowprecision_t(util)
4257          << lowprecision_t(var);
4258
4259     if (qi.is_bucket()) {
4260       *tbl << "-";
4261     } else {
4262       *tbl << num_pgs;
4263     }
4264
4265     if (tree) {
4266       ostringstream name;
4267       for (int k = 0; k < qi.depth; k++)
4268         name << "    ";
4269       if (qi.is_bucket()) {
4270         int type = crush->get_bucket_type(qi.id);
4271         name << crush->get_type_name(type) << " "
4272              << crush->get_item_name(qi.id);
4273       } else {
4274         name << "osd." << qi.id;
4275       }
4276       *tbl << name.str();
4277     }
4278
4279     *tbl << TextTable::endrow;
4280   }
4281
4282 public:
4283   string summary() {
4284     ostringstream out;
4285     out << "MIN/MAX VAR: " << lowprecision_t(min_var)
4286         << "/" << lowprecision_t(max_var) << "  "
4287         << "STDDEV: " << lowprecision_t(dev());
4288     return out.str();
4289   }
4290 };
4291
4292 ostream& operator<<(ostream& out,
4293                     const OSDUtilizationPlainDumper::lowprecision_t& v)
4294 {
4295   if (v.v < -0.01) {
4296     return out << "-";
4297   } else if (v.v < 0.001) {
4298     return out << "0";
4299   } else {
4300     std::streamsize p = out.precision();
4301     return out << std::fixed << std::setprecision(2) << v.v << std::setprecision(p);
4302   }
4303 }
4304
4305 class OSDUtilizationFormatDumper : public OSDUtilizationDumper<Formatter> {
4306 public:
4307   typedef OSDUtilizationDumper<Formatter> Parent;
4308
4309   OSDUtilizationFormatDumper(const CrushWrapper *crush, const OSDMap *osdmap,
4310                              const PGStatService *pgs, bool tree) :
4311     Parent(crush, osdmap, pgs, tree) {}
4312
4313   void dump(Formatter *f) {
4314     f->open_array_section("nodes");
4315     Parent::dump(f);
4316     f->close_section();
4317
4318     f->open_array_section("stray");
4319     dump_stray(f);
4320     f->close_section();
4321   }
4322
4323 protected:
4324   using OSDUtilizationDumper<Formatter>::dump_item;
4325   void dump_item(const CrushTreeDumper::Item &qi,
4326                          float &reweight,
4327                          int64_t kb,
4328                          int64_t kb_used,
4329                          int64_t kb_avail,
4330                          double& util,
4331                          double& var,
4332                          const size_t num_pgs,
4333                          Formatter *f) override {
4334     f->open_object_section("item");
4335     CrushTreeDumper::dump_item_fields(crush, weight_set_names, qi, f);
4336     f->dump_float("reweight", reweight);
4337     f->dump_int("kb", kb);
4338     f->dump_int("kb_used", kb_used);
4339     f->dump_int("kb_avail", kb_avail);
4340     f->dump_float("utilization", util);
4341     f->dump_float("var", var);
4342     f->dump_unsigned("pgs", num_pgs);
4343     CrushTreeDumper::dump_bucket_children(crush, qi, f);
4344     f->close_section();
4345   }
4346
4347 public:
4348   void summary(Formatter *f) {
4349     f->open_object_section("summary");
4350     f->dump_int("total_kb", pgs->get_osd_sum().kb);
4351     f->dump_int("total_kb_used", pgs->get_osd_sum().kb_used);
4352     f->dump_int("total_kb_avail", pgs->get_osd_sum().kb_avail);
4353     f->dump_float("average_utilization", average_util);
4354     f->dump_float("min_var", min_var);
4355     f->dump_float("max_var", max_var);
4356     f->dump_float("dev", dev());
4357     f->close_section();
4358   }
4359 };
4360
4361 void print_osd_utilization(const OSDMap& osdmap,
4362                            const PGStatService *pgstat,
4363                            ostream& out,
4364                            Formatter *f,
4365                            bool tree)
4366 {
4367   const CrushWrapper *crush = osdmap.crush.get();
4368   if (f) {
4369     f->open_object_section("df");
4370     OSDUtilizationFormatDumper d(crush, &osdmap, pgstat, tree);
4371     d.dump(f);
4372     d.summary(f);
4373     f->close_section();
4374     f->flush(out);
4375   } else {
4376     OSDUtilizationPlainDumper d(crush, &osdmap, pgstat, tree);
4377     TextTable tbl;
4378     d.dump(&tbl);
4379     out << tbl << d.summary() << "\n";
4380   }
4381 }
4382
4383 void OSDMap::check_health(health_check_map_t *checks) const
4384 {
4385   int num_osds = get_num_osds();
4386
4387   // OSD_DOWN
4388   // OSD_$subtree_DOWN
4389   // OSD_ORPHAN
4390   if (num_osds >= 0) {
4391     int num_in_osds = 0;
4392     int num_down_in_osds = 0;
4393     set<int> osds;
4394     set<int> down_in_osds;
4395     set<int> up_in_osds;
4396     set<int> subtree_up;
4397     unordered_map<int, set<int> > subtree_type_down;
4398     unordered_map<int, int> num_osds_subtree;
4399     int max_type = crush->get_max_type_id();
4400
4401     for (int i = 0; i < get_max_osd(); i++) {
4402       if (!exists(i)) {
4403         if (crush->item_exists(i)) {
4404           osds.insert(i);
4405         }
4406         continue;
4407       }
4408       if (is_out(i))
4409         continue;
4410       ++num_in_osds;
4411       if (down_in_osds.count(i) || up_in_osds.count(i))
4412         continue;
4413       if (!is_up(i)) {
4414         down_in_osds.insert(i);
4415         int parent_id = 0;
4416         int current = i;
4417         for (int type = 0; type <= max_type; type++) {
4418           if (!crush->get_type_name(type))
4419             continue;
4420           int r = crush->get_immediate_parent_id(current, &parent_id);
4421           if (r == -ENOENT)
4422             break;
4423           // break early if this parent is already marked as up
4424           if (subtree_up.count(parent_id))
4425             break;
4426           type = crush->get_bucket_type(parent_id);
4427           if (!subtree_type_is_down(
4428                 g_ceph_context, parent_id, type,
4429                 &down_in_osds, &up_in_osds, &subtree_up, &subtree_type_down))
4430             break;
4431           current = parent_id;
4432         }
4433       }
4434     }
4435
4436     // calculate the number of down osds in each down subtree and
4437     // store it in num_osds_subtree
4438     for (int type = 1; type <= max_type; type++) {
4439       if (!crush->get_type_name(type))
4440         continue;
4441       for (auto j = subtree_type_down[type].begin();
4442            j != subtree_type_down[type].end();
4443            ++j) {
4444         list<int> children;
4445         int num = 0;
4446         int num_children = crush->get_children(*j, &children);
4447         if (num_children == 0)
4448           continue;
4449         for (auto l = children.begin(); l != children.end(); ++l) {
4450           if (*l >= 0) {
4451             ++num;
4452           } else if (num_osds_subtree[*l] > 0) {
4453             num = num + num_osds_subtree[*l];
4454           }
4455         }
4456         num_osds_subtree[*j] = num;
4457       }
4458     }
4459     num_down_in_osds = down_in_osds.size();
4460     assert(num_down_in_osds <= num_in_osds);
4461     if (num_down_in_osds > 0) {
4462       // summary of down subtree types and osds
4463       for (int type = max_type; type > 0; type--) {
4464         if (!crush->get_type_name(type))
4465           continue;
4466         if (subtree_type_down[type].size() > 0) {
4467           ostringstream ss;
4468           ss << subtree_type_down[type].size() << " "
4469              << crush->get_type_name(type);
4470           if (subtree_type_down[type].size() > 1) {
4471             ss << "s";
4472           }
4473           int sum_down_osds = 0;
4474           for (auto j = subtree_type_down[type].begin();
4475                j != subtree_type_down[type].end();
4476                ++j) {
4477             sum_down_osds = sum_down_osds + num_osds_subtree[*j];
4478           }
4479           ss << " (" << sum_down_osds << " osds) down";
4480           string err = string("OSD_") +
4481             string(crush->get_type_name(type)) + "_DOWN";
4482           boost::to_upper(err);
4483           auto& d = checks->add(err, HEALTH_WARN, ss.str());
4484           for (auto j = subtree_type_down[type].rbegin();
4485                j != subtree_type_down[type].rend();
4486                ++j) {
4487             ostringstream ss;
4488             ss << crush->get_type_name(type);
4489             ss << " ";
4490             ss << crush->get_item_name(*j);
4491             // at the top level, do not print location
4492             if (type != max_type) {
4493               ss << " (";
4494               ss << crush->get_full_location_ordered_string(*j);
4495               ss << ")";
4496             }
4497             int num = num_osds_subtree[*j];
4498             ss << " (" << num << " osds)";
4499             ss << " is down";
4500             d.detail.push_back(ss.str());
4501           }
4502         }
4503       }
4504       ostringstream ss;
4505       ss << down_in_osds.size() << " osds down";
4506       auto& d = checks->add("OSD_DOWN", HEALTH_WARN, ss.str());
4507       for (auto it = down_in_osds.begin(); it != down_in_osds.end(); ++it) {
4508         ostringstream ss;
4509         ss << "osd." << *it << " (";
4510         ss << crush->get_full_location_ordered_string(*it);
4511         ss << ") is down";
4512         d.detail.push_back(ss.str());
4513       }
4514     }
4515
4516     if (!osds.empty()) {
4517       ostringstream ss;
4518       ss << osds.size() << " osds exist in the crush map but not in the osdmap";
4519       auto& d = checks->add("OSD_ORPHAN", HEALTH_WARN, ss.str());
4520       for (auto osd : osds) {
4521         ostringstream ss;
4522         ss << "osd." << osd << " exists in crush map but not in osdmap";
4523         d.detail.push_back(ss.str());
4524       }
4525     }
4526   }
4527
4528   // OSD_OUT_OF_ORDER_FULL
4529   {
4530     // An osd could configure failsafe ratio, to something different
4531     // but for now assume it is the same here.
4532     float fsr = g_conf->osd_failsafe_full_ratio;
4533     if (fsr > 1.0) fsr /= 100;
4534     float fr = get_full_ratio();
4535     float br = get_backfillfull_ratio();
4536     float nr = get_nearfull_ratio();
4537
4538     list<string> detail;
4539     // These checks correspond to how OSDService::check_full_status() in an OSD
4540     // handles the improper setting of these values.
4541     if (br < nr) {
4542       ostringstream ss;
4543       ss << "backfillfull_ratio (" << br
4544          << ") < nearfull_ratio (" << nr << "), increased";
4545       detail.push_back(ss.str());
4546       br = nr;
4547     }
4548     if (fr < br) {
4549       ostringstream ss;
4550       ss << "full_ratio (" << fr << ") < backfillfull_ratio (" << br
4551          << "), increased";
4552       detail.push_back(ss.str());
4553       fr = br;
4554     }
4555     if (fsr < fr) {
4556       ostringstream ss;
4557       ss << "osd_failsafe_full_ratio (" << fsr << ") < full_ratio (" << fr
4558          << "), increased";
4559       detail.push_back(ss.str());
4560     }
4561     if (!detail.empty()) {
4562       auto& d = checks->add("OSD_OUT_OF_ORDER_FULL", HEALTH_ERR,
4563                          "full ratio(s) out of order");
4564       d.detail.swap(detail);
4565     }
4566   }
4567
4568   // OSD_FULL
4569   // OSD_NEARFULL
4570   // OSD_BACKFILLFULL
4571   // OSD_FAILSAFE_FULL
4572   {
4573     set<int> full, backfillfull, nearfull;
4574     get_full_osd_counts(&full, &backfillfull, &nearfull);
4575     if (full.size()) {
4576       ostringstream ss;
4577       ss << full.size() << " full osd(s)";
4578       auto& d = checks->add("OSD_FULL", HEALTH_ERR, ss.str());
4579       for (auto& i: full) {
4580         ostringstream ss;
4581         ss << "osd." << i << " is full";
4582         d.detail.push_back(ss.str());
4583       }
4584     }
4585     if (backfillfull.size()) {
4586       ostringstream ss;
4587       ss << backfillfull.size() << " backfillfull osd(s)";
4588       auto& d = checks->add("OSD_BACKFILLFULL", HEALTH_WARN, ss.str());
4589       for (auto& i: backfillfull) {
4590         ostringstream ss;
4591         ss << "osd." << i << " is backfill full";
4592         d.detail.push_back(ss.str());
4593       }
4594     }
4595     if (nearfull.size()) {
4596       ostringstream ss;
4597       ss << nearfull.size() << " nearfull osd(s)";
4598       auto& d = checks->add("OSD_NEARFULL", HEALTH_WARN, ss.str());
4599       for (auto& i: nearfull) {
4600         ostringstream ss;
4601         ss << "osd." << i << " is near full";
4602         d.detail.push_back(ss.str());
4603       }
4604     }
4605   }
4606
4607   // OSDMAP_FLAGS
4608   {
4609     // warn about flags
4610     uint64_t warn_flags =
4611       CEPH_OSDMAP_NEARFULL |
4612       CEPH_OSDMAP_FULL |
4613       CEPH_OSDMAP_PAUSERD |
4614       CEPH_OSDMAP_PAUSEWR |
4615       CEPH_OSDMAP_PAUSEREC |
4616       CEPH_OSDMAP_NOUP |
4617       CEPH_OSDMAP_NODOWN |
4618       CEPH_OSDMAP_NOIN |
4619       CEPH_OSDMAP_NOOUT |
4620       CEPH_OSDMAP_NOBACKFILL |
4621       CEPH_OSDMAP_NORECOVER |
4622       CEPH_OSDMAP_NOSCRUB |
4623       CEPH_OSDMAP_NODEEP_SCRUB |
4624       CEPH_OSDMAP_NOTIERAGENT |
4625       CEPH_OSDMAP_NOREBALANCE;
4626     if (test_flag(warn_flags)) {
4627       ostringstream ss;
4628       ss << get_flag_string(get_flags() & warn_flags)
4629          << " flag(s) set";
4630       checks->add("OSDMAP_FLAGS", HEALTH_WARN, ss.str());
4631     }
4632   }
4633
4634   // OSD_FLAGS
4635   {
4636     list<string> detail;
4637     const unsigned flags =
4638       CEPH_OSD_NOUP |
4639       CEPH_OSD_NOIN |
4640       CEPH_OSD_NODOWN |
4641       CEPH_OSD_NOOUT;
4642     for (int i = 0; i < max_osd; ++i) {
4643       if (osd_state[i] & flags) {
4644         ostringstream ss;
4645         set<string> states;
4646         OSDMap::calc_state_set(osd_state[i] & flags, states);
4647         ss << "osd." << i << " has flags " << states;
4648         detail.push_back(ss.str());
4649       }
4650     }
4651     if (!detail.empty()) {
4652       ostringstream ss;
4653       ss << detail.size() << " osd(s) have {NOUP,NODOWN,NOIN,NOOUT} flags set";
4654       auto& d = checks->add("OSD_FLAGS", HEALTH_WARN, ss.str());
4655       d.detail.swap(detail);
4656     }
4657   }
4658
4659   // OLD_CRUSH_TUNABLES
4660   if (g_conf->mon_warn_on_legacy_crush_tunables) {
4661     string min = crush->get_min_required_version();
4662     if (min < g_conf->mon_crush_min_required_version) {
4663       ostringstream ss;
4664       ss << "crush map has legacy tunables (require " << min
4665          << ", min is " << g_conf->mon_crush_min_required_version << ")";
4666       auto& d = checks->add("OLD_CRUSH_TUNABLES", HEALTH_WARN, ss.str());
4667       d.detail.push_back("see http://docs.ceph.com/docs/master/rados/operations/crush-map/#tunables");
4668     }
4669   }
4670
4671   // OLD_CRUSH_STRAW_CALC_VERSION
4672   if (g_conf->mon_warn_on_crush_straw_calc_version_zero) {
4673     if (crush->get_straw_calc_version() == 0) {
4674       ostringstream ss;
4675       ss << "crush map has straw_calc_version=0";
4676       auto& d = checks->add("OLD_CRUSH_STRAW_CALC_VERSION", HEALTH_WARN, ss.str());
4677       d.detail.push_back(
4678         "see http://docs.ceph.com/docs/master/rados/operations/crush-map/#tunables");
4679     }
4680   }
4681
4682   // CACHE_POOL_NO_HIT_SET
4683   if (g_conf->mon_warn_on_cache_pools_without_hit_sets) {
4684     list<string> detail;
4685     for (map<int64_t, pg_pool_t>::const_iterator p = pools.begin();
4686          p != pools.end();
4687          ++p) {
4688       const pg_pool_t& info = p->second;
4689       if (info.cache_mode_requires_hit_set() &&
4690           info.hit_set_params.get_type() == HitSet::TYPE_NONE) {
4691         ostringstream ss;
4692         ss << "pool '" << get_pool_name(p->first)
4693            << "' with cache_mode " << info.get_cache_mode_name()
4694            << " needs hit_set_type to be set but it is not";
4695         detail.push_back(ss.str());
4696       }
4697     }
4698     if (!detail.empty()) {
4699       ostringstream ss;
4700       ss << detail.size() << " cache pools are missing hit_sets";
4701       auto& d = checks->add("CACHE_POOL_NO_HIT_SET", HEALTH_WARN, ss.str());
4702       d.detail.swap(detail);
4703     }
4704   }
4705
4706   // OSD_NO_SORTBITWISE
4707   if (!test_flag(CEPH_OSDMAP_SORTBITWISE) &&
4708       (get_up_osd_features() &
4709        CEPH_FEATURE_OSD_BITWISE_HOBJ_SORT)) {
4710     ostringstream ss;
4711     ss << "no legacy OSD present but 'sortbitwise' flag is not set";
4712     checks->add("OSD_NO_SORTBITWISE", HEALTH_WARN, ss.str());
4713   }
4714
4715   // OSD_UPGRADE_FINISHED
4716   // none of these (yet) since we don't run until luminous upgrade is done.
4717
4718   // POOL_NEARFULL/BACKFILLFULL/FULL
4719   {
4720     list<string> full_detail, backfillfull_detail, nearfull_detail;
4721     for (auto it : get_pools()) {
4722       const pg_pool_t &pool = it.second;
4723       const string& pool_name = get_pool_name(it.first);
4724       if (pool.has_flag(pg_pool_t::FLAG_FULL)) {
4725         stringstream ss;
4726         if (pool.has_flag(pg_pool_t::FLAG_FULL_NO_QUOTA)) {
4727           // may run out of space too,
4728           // but we want EQUOTA taking precedence
4729           ss << "pool '" << pool_name << "' is full (no quota)";
4730         } else {
4731           ss << "pool '" << pool_name << "' is full (no space)";
4732         }
4733         full_detail.push_back(ss.str());
4734       } else if (pool.has_flag(pg_pool_t::FLAG_BACKFILLFULL)) {
4735         stringstream ss;
4736         ss << "pool '" << pool_name << "' is backfillfull";
4737         backfillfull_detail.push_back(ss.str());
4738       } else if (pool.has_flag(pg_pool_t::FLAG_NEARFULL)) {
4739         stringstream ss;
4740         ss << "pool '" << pool_name << "' is nearfull";
4741         nearfull_detail.push_back(ss.str());
4742       }
4743     }
4744     if (!full_detail.empty()) {
4745       ostringstream ss;
4746       ss << full_detail.size() << " pool(s) full";
4747       auto& d = checks->add("POOL_FULL", HEALTH_WARN, ss.str());
4748       d.detail.swap(full_detail);
4749     }
4750     if (!backfillfull_detail.empty()) {
4751       ostringstream ss;
4752       ss << backfillfull_detail.size() << " pool(s) backfillfull";
4753       auto& d = checks->add("POOL_BACKFILLFULL", HEALTH_WARN, ss.str());
4754       d.detail.swap(backfillfull_detail);
4755     }
4756     if (!nearfull_detail.empty()) {
4757       ostringstream ss;
4758       ss << nearfull_detail.size() << " pool(s) nearfull";
4759       auto& d = checks->add("POOL_NEARFULL", HEALTH_WARN, ss.str());
4760       d.detail.swap(nearfull_detail);
4761     }
4762   }
4763 }
4764
4765 int OSDMap::parse_osd_id_list(const vector<string>& ls, set<int> *out,
4766                               ostream *ss) const
4767 {
4768   out->clear();
4769   for (auto i = ls.begin(); i != ls.end(); ++i) {
4770     if (i == ls.begin() &&
4771         (*i == "any" || *i == "all" || *i == "*")) {
4772       get_all_osds(*out);
4773       break;
4774     }
4775     long osd = parse_osd_id(i->c_str(), ss);
4776     if (osd < 0) {
4777       *ss << "invalid osd id '" << *i << "'";
4778       return -EINVAL;
4779     }
4780     out->insert(osd);
4781   }
4782   return 0;
4783 }