Fix some bugs when testing opensds ansible
[stor4nfv.git] / src / ceph / src / osd / osd_types.cc
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
3 /*
4  * Ceph - scalable distributed file system
5  *
6  * Copyright (C) 2011 New Dream Network
7  * Copyright (C) 2013,2014 Cloudwatt <libre.licensing@cloudwatt.com>
8  *
9  * Author: Loic Dachary <loic@dachary.org>
10  *
11  * This is free software; you can redistribute it and/or
12  * modify it under the terms of the GNU Lesser General Public
13  * License version 2.1, as published by the Free Software
14  * Foundation.  See file COPYING.
15  *
16  */
17
18 #include <boost/assign/list_of.hpp>
19
20 #include "osd_types.h"
21 #include "include/ceph_features.h"
22 extern "C" {
23 #include "crush/hash.h"
24 }
25 #include "PG.h"
26 #include "OSDMap.h"
27 #include "PGBackend.h"
28
29 const char *ceph_osd_flag_name(unsigned flag)
30 {
31   switch (flag) {
32   case CEPH_OSD_FLAG_ACK: return "ack";
33   case CEPH_OSD_FLAG_ONNVRAM: return "onnvram";
34   case CEPH_OSD_FLAG_ONDISK: return "ondisk";
35   case CEPH_OSD_FLAG_RETRY: return "retry";
36   case CEPH_OSD_FLAG_READ: return "read";
37   case CEPH_OSD_FLAG_WRITE: return "write";
38   case CEPH_OSD_FLAG_ORDERSNAP: return "ordersnap";
39   case CEPH_OSD_FLAG_PEERSTAT_OLD: return "peerstat_old";
40   case CEPH_OSD_FLAG_BALANCE_READS: return "balance_reads";
41   case CEPH_OSD_FLAG_PARALLELEXEC: return "parallelexec";
42   case CEPH_OSD_FLAG_PGOP: return "pgop";
43   case CEPH_OSD_FLAG_EXEC: return "exec";
44   case CEPH_OSD_FLAG_EXEC_PUBLIC: return "exec_public";
45   case CEPH_OSD_FLAG_LOCALIZE_READS: return "localize_reads";
46   case CEPH_OSD_FLAG_RWORDERED: return "rwordered";
47   case CEPH_OSD_FLAG_IGNORE_CACHE: return "ignore_cache";
48   case CEPH_OSD_FLAG_SKIPRWLOCKS: return "skiprwlocks";
49   case CEPH_OSD_FLAG_IGNORE_OVERLAY: return "ignore_overlay";
50   case CEPH_OSD_FLAG_FLUSH: return "flush";
51   case CEPH_OSD_FLAG_MAP_SNAP_CLONE: return "map_snap_clone";
52   case CEPH_OSD_FLAG_ENFORCE_SNAPC: return "enforce_snapc";
53   case CEPH_OSD_FLAG_REDIRECTED: return "redirected";
54   case CEPH_OSD_FLAG_KNOWN_REDIR: return "known_if_redirected";
55   case CEPH_OSD_FLAG_FULL_TRY: return "full_try";
56   case CEPH_OSD_FLAG_FULL_FORCE: return "full_force";
57   case CEPH_OSD_FLAG_IGNORE_REDIRECT: return "ignore_redirect";
58   default: return "???";
59   }
60 }
61
62 string ceph_osd_flag_string(unsigned flags)
63 {
64   string s;
65   for (unsigned i=0; i<32; ++i) {
66     if (flags & (1u<<i)) {
67       if (s.length())
68         s += "+";
69       s += ceph_osd_flag_name(1u << i);
70     }
71   }
72   if (s.length())
73     return s;
74   return string("-");
75 }
76
77 const char * ceph_osd_op_flag_name(unsigned flag)
78 {
79   const char *name;
80
81   switch(flag) {
82     case CEPH_OSD_OP_FLAG_EXCL:
83       name = "excl";
84       break;
85     case CEPH_OSD_OP_FLAG_FAILOK:
86       name = "failok";
87       break;
88     case CEPH_OSD_OP_FLAG_FADVISE_RANDOM:
89       name = "fadvise_random";
90       break;
91     case CEPH_OSD_OP_FLAG_FADVISE_SEQUENTIAL:
92       name = "fadvise_sequential";
93       break;
94     case CEPH_OSD_OP_FLAG_FADVISE_WILLNEED:
95       name = "favise_willneed";
96       break;
97     case CEPH_OSD_OP_FLAG_FADVISE_DONTNEED:
98       name = "fadvise_dontneed";
99       break;
100     case CEPH_OSD_OP_FLAG_FADVISE_NOCACHE:
101       name = "fadvise_nocache";
102       break;
103     default:
104       name = "???";
105   };
106
107   return name;
108 }
109
110 string ceph_osd_op_flag_string(unsigned flags)
111 {
112   string s;
113   for (unsigned i=0; i<32; ++i) {
114     if (flags & (1u<<i)) {
115       if (s.length())
116         s += "+";
117       s += ceph_osd_op_flag_name(1u << i);
118     }
119   }
120   if (s.length())
121     return s;
122   return string("-");
123 }
124
125 string ceph_osd_alloc_hint_flag_string(unsigned flags)
126 {
127   string s;
128   for (unsigned i=0; i<32; ++i) {
129     if (flags & (1u<<i)) {
130       if (s.length())
131         s += "+";
132       s += ceph_osd_alloc_hint_flag_name(1u << i);
133     }
134   }
135   if (s.length())
136     return s;
137   return string("-");
138 }
139
140 void pg_shard_t::encode(bufferlist &bl) const
141 {
142   ENCODE_START(1, 1, bl);
143   ::encode(osd, bl);
144   ::encode(shard, bl);
145   ENCODE_FINISH(bl);
146 }
147 void pg_shard_t::decode(bufferlist::iterator &bl)
148 {
149   DECODE_START(1, bl);
150   ::decode(osd, bl);
151   ::decode(shard, bl);
152   DECODE_FINISH(bl);
153 }
154
155 ostream &operator<<(ostream &lhs, const pg_shard_t &rhs)
156 {
157   if (rhs.is_undefined())
158     return lhs << "?";
159   if (rhs.shard == shard_id_t::NO_SHARD)
160     return lhs << rhs.osd;
161   return lhs << rhs.osd << '(' << (unsigned)(rhs.shard) << ')';
162 }
163
164 // -- osd_reqid_t --
165 void osd_reqid_t::dump(Formatter *f) const
166 {
167   f->dump_stream("name") << name;
168   f->dump_int("inc", inc);
169   f->dump_unsigned("tid", tid);
170 }
171
172 void osd_reqid_t::generate_test_instances(list<osd_reqid_t*>& o)
173 {
174   o.push_back(new osd_reqid_t);
175   o.push_back(new osd_reqid_t(entity_name_t::CLIENT(123), 1, 45678));
176 }
177
178 // -- object_locator_t --
179
180 void object_locator_t::encode(bufferlist& bl) const
181 {
182   // verify that nobody's corrupted the locator
183   assert(hash == -1 || key.empty());
184   __u8 encode_compat = 3;
185   ENCODE_START(6, encode_compat, bl);
186   ::encode(pool, bl);
187   int32_t preferred = -1;  // tell old code there is no preferred osd (-1).
188   ::encode(preferred, bl);
189   ::encode(key, bl);
190   ::encode(nspace, bl);
191   ::encode(hash, bl);
192   if (hash != -1)
193     encode_compat = MAX(encode_compat, 6); // need to interpret the hash
194   ENCODE_FINISH_NEW_COMPAT(bl, encode_compat);
195 }
196
197 void object_locator_t::decode(bufferlist::iterator& p)
198 {
199   DECODE_START_LEGACY_COMPAT_LEN(6, 3, 3, p);
200   if (struct_v < 2) {
201     int32_t op;
202     ::decode(op, p);
203     pool = op;
204     int16_t pref;
205     ::decode(pref, p);
206   } else {
207     ::decode(pool, p);
208     int32_t preferred;
209     ::decode(preferred, p);
210   }
211   ::decode(key, p);
212   if (struct_v >= 5)
213     ::decode(nspace, p);
214   if (struct_v >= 6)
215     ::decode(hash, p);
216   else
217     hash = -1;
218   DECODE_FINISH(p);
219   // verify that nobody's corrupted the locator
220   assert(hash == -1 || key.empty());
221 }
222
223 void object_locator_t::dump(Formatter *f) const
224 {
225   f->dump_int("pool", pool);
226   f->dump_string("key", key);
227   f->dump_string("namespace", nspace);
228   f->dump_int("hash", hash);
229 }
230
231 void object_locator_t::generate_test_instances(list<object_locator_t*>& o)
232 {
233   o.push_back(new object_locator_t);
234   o.push_back(new object_locator_t(123));
235   o.push_back(new object_locator_t(123, 876));
236   o.push_back(new object_locator_t(1, "n2"));
237   o.push_back(new object_locator_t(1234, "", "key"));
238   o.push_back(new object_locator_t(12, "n1", "key2"));
239 }
240
241 // -- request_redirect_t --
242 void request_redirect_t::encode(bufferlist& bl) const
243 {
244   ENCODE_START(1, 1, bl);
245   ::encode(redirect_locator, bl);
246   ::encode(redirect_object, bl);
247   ::encode(osd_instructions, bl);
248   ENCODE_FINISH(bl);
249 }
250
251 void request_redirect_t::decode(bufferlist::iterator& bl)
252 {
253   DECODE_START(1, bl);
254   ::decode(redirect_locator, bl);
255   ::decode(redirect_object, bl);
256   ::decode(osd_instructions, bl);
257   DECODE_FINISH(bl);
258 }
259
260 void request_redirect_t::dump(Formatter *f) const
261 {
262   f->dump_string("object", redirect_object);
263   f->open_object_section("locator");
264   redirect_locator.dump(f);
265   f->close_section(); // locator
266 }
267
268 void request_redirect_t::generate_test_instances(list<request_redirect_t*>& o)
269 {
270   object_locator_t loc(1, "redir_obj");
271   o.push_back(new request_redirect_t());
272   o.push_back(new request_redirect_t(loc, 0));
273   o.push_back(new request_redirect_t(loc, "redir_obj"));
274   o.push_back(new request_redirect_t(loc));
275 }
276
277 void objectstore_perf_stat_t::dump(Formatter *f) const
278 {
279   f->dump_unsigned("commit_latency_ms", os_commit_latency);
280   f->dump_unsigned("apply_latency_ms", os_apply_latency);
281 }
282
283 void objectstore_perf_stat_t::encode(bufferlist &bl) const
284 {
285   ENCODE_START(1, 1, bl);
286   ::encode(os_commit_latency, bl);
287   ::encode(os_apply_latency, bl);
288   ENCODE_FINISH(bl);
289 }
290
291 void objectstore_perf_stat_t::decode(bufferlist::iterator &bl)
292 {
293   DECODE_START(1, bl);
294   ::decode(os_commit_latency, bl);
295   ::decode(os_apply_latency, bl);
296   DECODE_FINISH(bl);
297 }
298
299 void objectstore_perf_stat_t::generate_test_instances(std::list<objectstore_perf_stat_t*>& o)
300 {
301   o.push_back(new objectstore_perf_stat_t());
302   o.push_back(new objectstore_perf_stat_t());
303   o.back()->os_commit_latency = 20;
304   o.back()->os_apply_latency = 30;
305 }
306
307 // -- osd_stat_t --
308 void osd_stat_t::dump(Formatter *f) const
309 {
310   f->dump_unsigned("up_from", up_from);
311   f->dump_unsigned("seq", seq);
312   f->dump_unsigned("num_pgs", num_pgs);
313   f->dump_unsigned("kb", kb);
314   f->dump_unsigned("kb_used", kb_used);
315   f->dump_unsigned("kb_avail", kb_avail);
316   f->open_array_section("hb_peers");
317   for (auto p : hb_peers)
318     f->dump_int("osd", p);
319   f->close_section();
320   f->dump_int("snap_trim_queue_len", snap_trim_queue_len);
321   f->dump_int("num_snap_trimming", num_snap_trimming);
322   f->open_object_section("op_queue_age_hist");
323   op_queue_age_hist.dump(f);
324   f->close_section();
325   f->open_object_section("perf_stat");
326   os_perf_stat.dump(f);
327   f->close_section();
328 }
329
330 void osd_stat_t::encode(bufferlist &bl) const
331 {
332   ENCODE_START(7, 2, bl);
333   ::encode(kb, bl);
334   ::encode(kb_used, bl);
335   ::encode(kb_avail, bl);
336   ::encode(snap_trim_queue_len, bl);
337   ::encode(num_snap_trimming, bl);
338   ::encode(hb_peers, bl);
339   ::encode((uint32_t)0, bl);
340   ::encode(op_queue_age_hist, bl);
341   ::encode(os_perf_stat, bl);
342   ::encode(up_from, bl);
343   ::encode(seq, bl);
344   ::encode(num_pgs, bl);
345   ENCODE_FINISH(bl);
346 }
347
348 void osd_stat_t::decode(bufferlist::iterator &bl)
349 {
350   DECODE_START_LEGACY_COMPAT_LEN(6, 2, 2, bl);
351   ::decode(kb, bl);
352   ::decode(kb_used, bl);
353   ::decode(kb_avail, bl);
354   ::decode(snap_trim_queue_len, bl);
355   ::decode(num_snap_trimming, bl);
356   ::decode(hb_peers, bl);
357   vector<int> num_hb_out;
358   ::decode(num_hb_out, bl);
359   if (struct_v >= 3)
360     ::decode(op_queue_age_hist, bl);
361   if (struct_v >= 4)
362     ::decode(os_perf_stat, bl);
363   if (struct_v >= 6) {
364     ::decode(up_from, bl);
365     ::decode(seq, bl);
366   }
367   if (struct_v >= 7) {
368     ::decode(num_pgs, bl);
369   }
370   DECODE_FINISH(bl);
371 }
372
373 void osd_stat_t::generate_test_instances(std::list<osd_stat_t*>& o)
374 {
375   o.push_back(new osd_stat_t);
376
377   o.push_back(new osd_stat_t);
378   o.back()->kb = 1;
379   o.back()->kb_used = 2;
380   o.back()->kb_avail = 3;
381   o.back()->hb_peers.push_back(7);
382   o.back()->snap_trim_queue_len = 8;
383   o.back()->num_snap_trimming = 99;
384 }
385
386 // -- pg_t --
387
388 int pg_t::print(char *o, int maxlen) const
389 {
390   if (preferred() >= 0)
391     return snprintf(o, maxlen, "%llu.%xp%d", (unsigned long long)pool(), ps(), preferred());
392   else
393     return snprintf(o, maxlen, "%llu.%x", (unsigned long long)pool(), ps());
394 }
395
396 bool pg_t::parse(const char *s)
397 {
398   uint64_t ppool;
399   uint32_t pseed;
400   int32_t pref;
401   int r = sscanf(s, "%llu.%xp%d", (long long unsigned *)&ppool, &pseed, &pref);
402   if (r < 2)
403     return false;
404   m_pool = ppool;
405   m_seed = pseed;
406   if (r == 3)
407     m_preferred = pref;
408   else
409     m_preferred = -1;
410   return true;
411 }
412
413 bool spg_t::parse(const char *s)
414 {
415   pgid.set_preferred(-1);
416   shard = shard_id_t::NO_SHARD;
417   uint64_t ppool;
418   uint32_t pseed;
419   int32_t pref;
420   uint32_t pshard;
421   int r = sscanf(s, "%llu.%x", (long long unsigned *)&ppool, &pseed);
422   if (r < 2)
423     return false;
424   pgid.set_pool(ppool);
425   pgid.set_ps(pseed);
426
427   const char *p = strchr(s, 'p');
428   if (p) {
429     r = sscanf(p, "p%d", &pref);
430     if (r == 1) {
431       pgid.set_preferred(pref);
432     } else {
433       return false;
434     }
435   }
436
437   p = strchr(s, 's');
438   if (p) {
439     r = sscanf(p, "s%d", &pshard);
440     if (r == 1) {
441       shard = shard_id_t(pshard);
442     } else {
443       return false;
444     }
445   }
446   return true;
447 }
448
449 char *spg_t::calc_name(char *buf, const char *suffix_backwords) const
450 {
451   while (*suffix_backwords)
452     *--buf = *suffix_backwords++;
453
454   if (!is_no_shard()) {
455     buf = ritoa<uint8_t, 10>((uint8_t)shard.id, buf);
456     *--buf = 's';
457   }
458
459   return pgid.calc_name(buf, "");
460 }
461
462 ostream& operator<<(ostream& out, const spg_t &pg)
463 {
464   char buf[spg_t::calc_name_buf_size];
465   buf[spg_t::calc_name_buf_size - 1] = '\0';
466   out << pg.calc_name(buf + spg_t::calc_name_buf_size - 1, "");
467   return out;
468 }
469
470 pg_t pg_t::get_ancestor(unsigned old_pg_num) const
471 {
472   int old_bits = cbits(old_pg_num);
473   int old_mask = (1 << old_bits) - 1;
474   pg_t ret = *this;
475   ret.m_seed = ceph_stable_mod(m_seed, old_pg_num, old_mask);
476   return ret;
477 }
478
479 bool pg_t::is_split(unsigned old_pg_num, unsigned new_pg_num, set<pg_t> *children) const
480 {
481   assert(m_seed < old_pg_num);
482   if (new_pg_num <= old_pg_num)
483     return false;
484
485   bool split = false;
486   if (true) {
487     unsigned old_bits = cbits(old_pg_num);
488     unsigned old_mask = (1 << old_bits) - 1;
489     for (unsigned n = 1; ; n++) {
490       unsigned next_bit = (n << (old_bits-1));
491       unsigned s = next_bit | m_seed;
492
493       if (s < old_pg_num || s == m_seed)
494         continue;
495       if (s >= new_pg_num)
496         break;
497       if ((unsigned)ceph_stable_mod(s, old_pg_num, old_mask) == m_seed) {
498         split = true;
499         if (children)
500           children->insert(pg_t(s, m_pool, m_preferred));
501       }
502     }
503   }
504   if (false) {
505     // brute force
506     int old_bits = cbits(old_pg_num);
507     int old_mask = (1 << old_bits) - 1;
508     for (unsigned x = old_pg_num; x < new_pg_num; ++x) {
509       unsigned o = ceph_stable_mod(x, old_pg_num, old_mask);
510       if (o == m_seed) {
511         split = true;
512         children->insert(pg_t(x, m_pool, m_preferred));
513       }
514     }
515   }
516   return split;
517 }
518
519 unsigned pg_t::get_split_bits(unsigned pg_num) const {
520   if (pg_num == 1)
521     return 0;
522   assert(pg_num > 1);
523
524   // Find unique p such that pg_num \in [2^(p-1), 2^p)
525   unsigned p = cbits(pg_num);
526   assert(p); // silence coverity #751330 
527
528   if ((m_seed % (1<<(p-1))) < (pg_num % (1<<(p-1))))
529     return p;
530   else
531     return p - 1;
532 }
533
534 pg_t pg_t::get_parent() const
535 {
536   unsigned bits = cbits(m_seed);
537   assert(bits);
538   pg_t retval = *this;
539   retval.m_seed &= ~((~0)<<(bits - 1));
540   return retval;
541 }
542
543 hobject_t pg_t::get_hobj_start() const
544 {
545   return hobject_t(object_t(), string(), CEPH_NOSNAP, m_seed, m_pool,
546                    string());
547 }
548
549 hobject_t pg_t::get_hobj_end(unsigned pg_num) const
550 {
551   // note: this assumes a bitwise sort; with the legacy nibblewise
552   // sort a PG did not always cover a single contiguous range of the
553   // (bit-reversed) hash range.
554   unsigned bits = get_split_bits(pg_num);
555   uint64_t rev_start = hobject_t::_reverse_bits(m_seed);
556   uint64_t rev_end = (rev_start | (0xffffffff >> bits)) + 1;
557   if (rev_end >= 0x100000000) {
558     assert(rev_end == 0x100000000);
559     return hobject_t::get_max();
560   } else {
561     return hobject_t(object_t(), string(), CEPH_NOSNAP,
562                    hobject_t::_reverse_bits(rev_end), m_pool,
563                    string());
564   }
565 }
566
567 void pg_t::dump(Formatter *f) const
568 {
569   f->dump_unsigned("pool", m_pool);
570   f->dump_unsigned("seed", m_seed);
571   f->dump_int("preferred_osd", m_preferred);
572 }
573
574 void pg_t::generate_test_instances(list<pg_t*>& o)
575 {
576   o.push_back(new pg_t);
577   o.push_back(new pg_t(1, 2, -1));
578   o.push_back(new pg_t(13123, 3, -1));
579   o.push_back(new pg_t(131223, 4, 23));
580 }
581
582 char *pg_t::calc_name(char *buf, const char *suffix_backwords) const
583 {
584   while (*suffix_backwords)
585     *--buf = *suffix_backwords++;
586
587   if (m_preferred >= 0)
588     *--buf ='p';
589
590   buf = ritoa<uint32_t, 16>(m_seed, buf);
591
592   *--buf = '.';
593
594   return  ritoa<uint64_t, 10>(m_pool, buf);
595 }
596
597 ostream& operator<<(ostream& out, const pg_t &pg)
598 {
599   char buf[pg_t::calc_name_buf_size];
600   buf[pg_t::calc_name_buf_size - 1] = '\0';
601   out << pg.calc_name(buf + pg_t::calc_name_buf_size - 1, "");
602   return out;
603 }
604
605
606 // -- coll_t --
607
608 void coll_t::calc_str()
609 {
610   switch (type) {
611   case TYPE_META:
612     strcpy(_str_buff, "meta");
613     _str = _str_buff;
614     break;
615   case TYPE_PG:
616     _str_buff[spg_t::calc_name_buf_size - 1] = '\0';
617     _str = pgid.calc_name(_str_buff + spg_t::calc_name_buf_size - 1, "daeh_");
618     break;
619   case TYPE_PG_TEMP:
620     _str_buff[spg_t::calc_name_buf_size - 1] = '\0';
621     _str = pgid.calc_name(_str_buff + spg_t::calc_name_buf_size - 1, "PMET_");
622     break;
623   default:
624     assert(0 == "unknown collection type");
625   }
626 }
627
628 bool coll_t::parse(const std::string& s)
629 {
630   if (s == "meta") {
631     type = TYPE_META;
632     pgid = spg_t();
633     removal_seq = 0;
634     calc_str();
635     assert(s == _str);
636     return true;
637   }
638   if (s.find("_head") == s.length() - 5 &&
639       pgid.parse(s.substr(0, s.length() - 5))) {
640     type = TYPE_PG;
641     removal_seq = 0;
642     calc_str();
643     assert(s == _str);
644     return true;
645   }
646   if (s.find("_TEMP") == s.length() - 5 &&
647       pgid.parse(s.substr(0, s.length() - 5))) {
648     type = TYPE_PG_TEMP;
649     removal_seq = 0;
650     calc_str();
651     assert(s == _str);
652     return true;
653   }
654   return false;
655 }
656
657 void coll_t::encode(bufferlist& bl) const
658 {
659   // when changing this, remember to update encoded_size() too.
660   if (is_temp()) {
661     // can't express this as v2...
662     __u8 struct_v = 3;
663     ::encode(struct_v, bl);
664     ::encode(to_str(), bl);
665   } else {
666     __u8 struct_v = 2;
667     ::encode(struct_v, bl);
668     ::encode((__u8)type, bl);
669     ::encode(pgid, bl);
670     snapid_t snap = CEPH_NOSNAP;
671     ::encode(snap, bl);
672   }
673 }
674
675 size_t coll_t::encoded_size() const
676 {
677   size_t r = sizeof(__u8);
678   if (is_temp()) {
679     // v3
680     r += sizeof(__u32);
681     if (_str) {
682       r += strlen(_str);
683     }
684   } else {
685       // v2
686       // 1. type
687       r += sizeof(__u8);
688       // 2. pgid
689       //  - encoding header
690       r += sizeof(ceph_le32) + 2 * sizeof(__u8);
691       // - pg_t
692       r += sizeof(__u8) + sizeof(uint64_t) + 2 * sizeof(uint32_t);
693       // - shard_id_t
694       r += sizeof(int8_t);
695       // 3. snapid_t
696       r += sizeof(uint64_t);
697   }
698
699   return r;
700 }
701
702 void coll_t::decode(bufferlist::iterator& bl)
703 {
704   __u8 struct_v;
705   ::decode(struct_v, bl);
706   switch (struct_v) {
707   case 1:
708     {
709       snapid_t snap;
710       ::decode(pgid, bl);
711       ::decode(snap, bl);
712
713       // infer the type
714       if (pgid == spg_t() && snap == 0) {
715         type = TYPE_META;
716       } else {
717         type = TYPE_PG;
718       }
719       removal_seq = 0;
720     }
721     break;
722
723   case 2:
724     {
725       __u8 _type;
726       snapid_t snap;
727       ::decode(_type, bl);
728       ::decode(pgid, bl);
729       ::decode(snap, bl);
730       type = (type_t)_type;
731       removal_seq = 0;
732     }
733     break;
734
735   case 3:
736     {
737       string str;
738       ::decode(str, bl);
739       bool ok = parse(str);
740       if (!ok)
741         throw std::domain_error(std::string("unable to parse pg ") + str);
742     }
743     break;
744
745   default:
746     {
747       ostringstream oss;
748       oss << "coll_t::decode(): don't know how to decode version "
749           << struct_v;
750       throw std::domain_error(oss.str());
751     }
752   }
753 }
754
755 void coll_t::dump(Formatter *f) const
756 {
757   f->dump_unsigned("type_id", (unsigned)type);
758   if (type != TYPE_META)
759     f->dump_stream("pgid") << pgid;
760   f->dump_string("name", to_str());
761 }
762
763 void coll_t::generate_test_instances(list<coll_t*>& o)
764 {
765   o.push_back(new coll_t());
766   o.push_back(new coll_t(spg_t(pg_t(1, 0), shard_id_t::NO_SHARD)));
767   o.push_back(new coll_t(o.back()->get_temp()));
768   o.push_back(new coll_t(spg_t(pg_t(3, 2), shard_id_t(12))));
769   o.push_back(new coll_t(o.back()->get_temp()));
770   o.push_back(new coll_t());
771 }
772
773 // ---
774
775 std::string pg_vector_string(const vector<int32_t> &a)
776 {
777   ostringstream oss;
778   oss << "[";
779   for (vector<int32_t>::const_iterator i = a.begin(); i != a.end(); ++i) {
780     if (i != a.begin()) 
781       oss << ",";
782     if (*i != CRUSH_ITEM_NONE) 
783       oss << *i;
784     else 
785       oss << "NONE";
786   }
787   oss << "]";
788   return oss.str();
789 }
790
791 std::string pg_state_string(int state)
792 {
793   ostringstream oss;
794   if (state & PG_STATE_STALE)
795     oss << "stale+";
796   if (state & PG_STATE_CREATING)
797     oss << "creating+";
798   if (state & PG_STATE_ACTIVE)
799     oss << "active+";
800   if (state & PG_STATE_ACTIVATING)
801     oss << "activating+";
802   if (state & PG_STATE_CLEAN)
803     oss << "clean+";
804   if (state & PG_STATE_RECOVERY_WAIT)
805     oss << "recovery_wait+";
806   if (state & PG_STATE_RECOVERY_TOOFULL)
807     oss << "recovery_toofull+";
808   if (state & PG_STATE_RECOVERING)
809     oss << "recovering+";
810   if (state & PG_STATE_FORCED_RECOVERY)
811     oss << "forced_recovery+";
812   if (state & PG_STATE_DOWN)
813     oss << "down+";
814   if (state & PG_STATE_UNDERSIZED)
815     oss << "undersized+";
816   if (state & PG_STATE_DEGRADED)
817     oss << "degraded+";
818   if (state & PG_STATE_REMAPPED)
819     oss << "remapped+";
820   if (state & PG_STATE_SCRUBBING)
821     oss << "scrubbing+";
822   if (state & PG_STATE_DEEP_SCRUB)
823     oss << "deep+";
824   if (state & PG_STATE_INCONSISTENT)
825     oss << "inconsistent+";
826   if (state & PG_STATE_PEERING)
827     oss << "peering+";
828   if (state & PG_STATE_REPAIR)
829     oss << "repair+";
830   if (state & PG_STATE_BACKFILL_WAIT)
831     oss << "backfill_wait+";
832   if (state & PG_STATE_BACKFILLING)
833     oss << "backfilling+";
834   if (state & PG_STATE_FORCED_BACKFILL)
835     oss << "forced_backfill+";
836   if (state & PG_STATE_BACKFILL_TOOFULL)
837     oss << "backfill_toofull+";
838   if (state & PG_STATE_INCOMPLETE)
839     oss << "incomplete+";
840   if (state & PG_STATE_PEERED)
841     oss << "peered+";
842   if (state & PG_STATE_SNAPTRIM)
843     oss << "snaptrim+";
844   if (state & PG_STATE_SNAPTRIM_WAIT)
845     oss << "snaptrim_wait+";
846   if (state & PG_STATE_SNAPTRIM_ERROR)
847     oss << "snaptrim_error+";
848   string ret(oss.str());
849   if (ret.length() > 0)
850     ret.resize(ret.length() - 1);
851   else
852     ret = "unknown";
853   return ret;
854 }
855
856 boost::optional<uint64_t> pg_string_state(const std::string& state)
857 {
858   boost::optional<uint64_t> type;
859   if (state == "active")
860     type = PG_STATE_ACTIVE;
861   else if (state == "clean")
862     type = PG_STATE_CLEAN;
863   else if (state == "down")
864     type = PG_STATE_DOWN;
865   else if (state == "scrubbing")
866     type = PG_STATE_SCRUBBING;
867   else if (state == "degraded")
868     type = PG_STATE_DEGRADED;
869   else if (state == "inconsistent")
870     type = PG_STATE_INCONSISTENT;
871   else if (state == "peering")
872     type = PG_STATE_PEERING;
873   else if (state == "repair")
874     type = PG_STATE_REPAIR;
875   else if (state == "recovering")
876     type = PG_STATE_RECOVERING;
877   else if (state == "forced_recovery")
878     type = PG_STATE_FORCED_RECOVERY;
879   else if (state == "backfill_wait")
880     type = PG_STATE_BACKFILL_WAIT;
881   else if (state == "incomplete")
882     type = PG_STATE_INCOMPLETE;
883   else if (state == "stale")
884     type = PG_STATE_STALE;
885   else if (state == "remapped")
886     type = PG_STATE_REMAPPED;
887   else if (state == "deep_scrub")
888     type = PG_STATE_DEEP_SCRUB;
889   else if (state == "backfilling")
890     type = PG_STATE_BACKFILLING;
891   else if (state == "forced_backfill")
892     type = PG_STATE_FORCED_BACKFILL;
893   else if (state == "backfill_toofull")
894     type = PG_STATE_BACKFILL_TOOFULL;
895   else if (state == "recovery_wait")
896     type = PG_STATE_RECOVERY_WAIT;
897   else if (state == "recovery_toofull")
898     type = PG_STATE_RECOVERY_TOOFULL;
899   else if (state == "undersized")
900     type = PG_STATE_UNDERSIZED;
901   else if (state == "activating")
902     type = PG_STATE_ACTIVATING;
903   else if (state == "peered")
904     type = PG_STATE_PEERED;
905   else if (state == "snaptrim")
906     type = PG_STATE_SNAPTRIM;
907   else if (state == "snaptrim_wait")
908     type = PG_STATE_SNAPTRIM_WAIT;
909   else if (state == "snaptrim_error")
910     type = PG_STATE_SNAPTRIM_ERROR;
911   else
912     type = boost::none;
913   return type;
914 }
915
916 // -- eversion_t --
917 string eversion_t::get_key_name() const
918 {
919   char key[32];
920   // Below is equivalent of sprintf("%010u.%020llu");
921   key[31] = 0;
922   ritoa<uint64_t, 10, 20>(version, key + 31);
923   key[10] = '.';
924   ritoa<uint32_t, 10, 10>(epoch, key + 10);
925   return string(key);
926 }
927
928
929 // -- pool_snap_info_t --
930 void pool_snap_info_t::dump(Formatter *f) const
931 {
932   f->dump_unsigned("snapid", snapid);
933   f->dump_stream("stamp") << stamp;
934   f->dump_string("name", name);
935 }
936
937 void pool_snap_info_t::encode(bufferlist& bl, uint64_t features) const
938 {
939   if ((features & CEPH_FEATURE_PGPOOL3) == 0) {
940     __u8 struct_v = 1;
941     ::encode(struct_v, bl);
942     ::encode(snapid, bl);
943     ::encode(stamp, bl);
944     ::encode(name, bl);
945     return;
946   }
947   ENCODE_START(2, 2, bl);
948   ::encode(snapid, bl);
949   ::encode(stamp, bl);
950   ::encode(name, bl);
951   ENCODE_FINISH(bl);
952 }
953
954 void pool_snap_info_t::decode(bufferlist::iterator& bl)
955 {
956   DECODE_START_LEGACY_COMPAT_LEN(2, 2, 2, bl);
957   ::decode(snapid, bl);
958   ::decode(stamp, bl);
959   ::decode(name, bl);
960   DECODE_FINISH(bl);
961 }
962
963 void pool_snap_info_t::generate_test_instances(list<pool_snap_info_t*>& o)
964 {
965   o.push_back(new pool_snap_info_t);
966   o.push_back(new pool_snap_info_t);
967   o.back()->snapid = 1;
968   o.back()->stamp = utime_t(1, 2);
969   o.back()->name = "foo";
970 }
971
972 // -- pool_opts_t --
973
974 typedef std::map<std::string, pool_opts_t::opt_desc_t> opt_mapping_t;
975 static opt_mapping_t opt_mapping = boost::assign::map_list_of
976            ("scrub_min_interval", pool_opts_t::opt_desc_t(
977              pool_opts_t::SCRUB_MIN_INTERVAL, pool_opts_t::DOUBLE))
978            ("scrub_max_interval", pool_opts_t::opt_desc_t(
979              pool_opts_t::SCRUB_MAX_INTERVAL, pool_opts_t::DOUBLE))
980            ("deep_scrub_interval", pool_opts_t::opt_desc_t(
981              pool_opts_t::DEEP_SCRUB_INTERVAL, pool_opts_t::DOUBLE))
982            ("recovery_priority", pool_opts_t::opt_desc_t(
983              pool_opts_t::RECOVERY_PRIORITY, pool_opts_t::INT))
984            ("recovery_op_priority", pool_opts_t::opt_desc_t(
985              pool_opts_t::RECOVERY_OP_PRIORITY, pool_opts_t::INT))
986            ("scrub_priority", pool_opts_t::opt_desc_t(
987              pool_opts_t::SCRUB_PRIORITY, pool_opts_t::INT))
988            ("compression_mode", pool_opts_t::opt_desc_t(
989              pool_opts_t::COMPRESSION_MODE, pool_opts_t::STR))
990            ("compression_algorithm", pool_opts_t::opt_desc_t(
991              pool_opts_t::COMPRESSION_ALGORITHM, pool_opts_t::STR))
992            ("compression_required_ratio", pool_opts_t::opt_desc_t(
993              pool_opts_t::COMPRESSION_REQUIRED_RATIO, pool_opts_t::DOUBLE))
994            ("compression_max_blob_size", pool_opts_t::opt_desc_t(
995              pool_opts_t::COMPRESSION_MAX_BLOB_SIZE, pool_opts_t::INT))
996            ("compression_min_blob_size", pool_opts_t::opt_desc_t(
997              pool_opts_t::COMPRESSION_MIN_BLOB_SIZE, pool_opts_t::INT))
998            ("csum_type", pool_opts_t::opt_desc_t(
999              pool_opts_t::CSUM_TYPE, pool_opts_t::INT))
1000            ("csum_max_block", pool_opts_t::opt_desc_t(
1001              pool_opts_t::CSUM_MAX_BLOCK, pool_opts_t::INT))
1002            ("csum_min_block", pool_opts_t::opt_desc_t(
1003              pool_opts_t::CSUM_MIN_BLOCK, pool_opts_t::INT));
1004
1005 bool pool_opts_t::is_opt_name(const std::string& name) {
1006     return opt_mapping.count(name);
1007 }
1008
1009 pool_opts_t::opt_desc_t pool_opts_t::get_opt_desc(const std::string& name) {
1010     opt_mapping_t::iterator i = opt_mapping.find(name);
1011     assert(i != opt_mapping.end());
1012     return i->second;
1013 }
1014
1015 bool pool_opts_t::is_set(pool_opts_t::key_t key) const {
1016     return opts.count(key);
1017 }
1018
1019 const pool_opts_t::value_t& pool_opts_t::get(pool_opts_t::key_t key) const {
1020   opts_t::const_iterator i = opts.find(key);
1021   assert(i != opts.end());
1022   return i->second;
1023 }
1024
1025 bool pool_opts_t::unset(pool_opts_t::key_t key) {
1026   return opts.erase(key) > 0;
1027 }
1028
1029 class pool_opts_dumper_t : public boost::static_visitor<>
1030 {
1031 public:
1032   pool_opts_dumper_t(const std::string& name_, Formatter* f_) :
1033     name(name_.c_str()), f(f_) {}
1034
1035   void operator()(std::string s) const {
1036     f->dump_string(name, s);
1037   }
1038   void operator()(int i) const {
1039     f->dump_int(name, i);
1040   }
1041   void operator()(double d) const {
1042     f->dump_float(name, d);
1043   }
1044
1045 private:
1046   const char* name;
1047   Formatter* f;
1048 };
1049
1050 void pool_opts_t::dump(const std::string& name, Formatter* f) const
1051 {
1052   const opt_desc_t& desc = get_opt_desc(name);
1053   opts_t::const_iterator i = opts.find(desc.key);
1054   if (i == opts.end()) {
1055       return;
1056   }
1057   boost::apply_visitor(pool_opts_dumper_t(name, f), i->second);
1058 }
1059
1060 void pool_opts_t::dump(Formatter* f) const
1061 {
1062   for (opt_mapping_t::iterator i = opt_mapping.begin(); i != opt_mapping.end();
1063        ++i) {
1064     const std::string& name = i->first;
1065     const opt_desc_t& desc = i->second;
1066     opts_t::const_iterator j = opts.find(desc.key);
1067     if (j == opts.end()) {
1068       continue;
1069     }
1070     boost::apply_visitor(pool_opts_dumper_t(name, f), j->second);
1071   }
1072 }
1073
1074 class pool_opts_encoder_t : public boost::static_visitor<>
1075 {
1076 public:
1077   explicit pool_opts_encoder_t(bufferlist& bl_) : bl(bl_) {}
1078
1079   void operator()(std::string s) const {
1080     ::encode(static_cast<int32_t>(pool_opts_t::STR), bl);
1081     ::encode(s, bl);
1082   }
1083   void operator()(int i) const {
1084     ::encode(static_cast<int32_t>(pool_opts_t::INT), bl);
1085     ::encode(i, bl);
1086   }
1087   void operator()(double d) const {
1088     ::encode(static_cast<int32_t>(pool_opts_t::DOUBLE), bl);
1089     ::encode(d, bl);
1090   }
1091
1092 private:
1093   bufferlist& bl;
1094 };
1095
1096 void pool_opts_t::encode(bufferlist& bl) const {
1097   ENCODE_START(1, 1, bl);
1098   uint32_t n = static_cast<uint32_t>(opts.size());
1099   ::encode(n, bl);
1100   for (opts_t::const_iterator i = opts.begin(); i != opts.end(); ++i) {
1101     ::encode(static_cast<int32_t>(i->first), bl);
1102     boost::apply_visitor(pool_opts_encoder_t(bl), i->second);
1103   }
1104   ENCODE_FINISH(bl);
1105 }
1106
1107 void pool_opts_t::decode(bufferlist::iterator& bl) {
1108   DECODE_START(1, bl);
1109   __u32 n;
1110   ::decode(n, bl);
1111   opts.clear();
1112   while (n--) {
1113     int32_t k, t;
1114     ::decode(k, bl);
1115     ::decode(t, bl);
1116     if (t == STR) {
1117       std::string s;
1118       ::decode(s, bl);
1119       opts[static_cast<key_t>(k)] = s;
1120     } else if (t == INT) {
1121       int i;
1122       ::decode(i, bl);
1123       opts[static_cast<key_t>(k)] = i;
1124     } else if (t == DOUBLE) {
1125       double d;
1126       ::decode(d, bl);
1127       opts[static_cast<key_t>(k)] = d;
1128     } else {
1129       assert(!"invalid type");
1130     }
1131   }
1132   DECODE_FINISH(bl);
1133 }
1134
1135 ostream& operator<<(ostream& out, const pool_opts_t& opts)
1136 {
1137   for (opt_mapping_t::iterator i = opt_mapping.begin(); i != opt_mapping.end();
1138        ++i) {
1139     const std::string& name = i->first;
1140     const pool_opts_t::opt_desc_t& desc = i->second;
1141     pool_opts_t::opts_t::const_iterator j = opts.opts.find(desc.key);
1142     if (j == opts.opts.end()) {
1143       continue;
1144     }
1145     out << " " << name << " " << j->second;
1146   }
1147   return out;
1148 }
1149
1150 // -- pg_pool_t --
1151
1152 const char *pg_pool_t::APPLICATION_NAME_CEPHFS("cephfs");
1153 const char *pg_pool_t::APPLICATION_NAME_RBD("rbd");
1154 const char *pg_pool_t::APPLICATION_NAME_RGW("rgw");
1155
1156 void pg_pool_t::dump(Formatter *f) const
1157 {
1158   f->dump_unsigned("flags", get_flags());
1159   f->dump_string("flags_names", get_flags_string());
1160   f->dump_int("type", get_type());
1161   f->dump_int("size", get_size());
1162   f->dump_int("min_size", get_min_size());
1163   f->dump_int("crush_rule", get_crush_rule());
1164   f->dump_int("object_hash", get_object_hash());
1165   f->dump_unsigned("pg_num", get_pg_num());
1166   f->dump_unsigned("pg_placement_num", get_pgp_num());
1167   f->dump_unsigned("crash_replay_interval", get_crash_replay_interval());
1168   f->dump_stream("last_change") << get_last_change();
1169   f->dump_stream("last_force_op_resend") << get_last_force_op_resend();
1170   f->dump_stream("last_force_op_resend_preluminous")
1171     << get_last_force_op_resend_preluminous();
1172   f->dump_unsigned("auid", get_auid());
1173   f->dump_string("snap_mode", is_pool_snaps_mode() ? "pool" : "selfmanaged");
1174   f->dump_unsigned("snap_seq", get_snap_seq());
1175   f->dump_unsigned("snap_epoch", get_snap_epoch());
1176   f->open_array_section("pool_snaps");
1177   for (map<snapid_t, pool_snap_info_t>::const_iterator p = snaps.begin(); p != snaps.end(); ++p) {
1178     f->open_object_section("pool_snap_info");
1179     p->second.dump(f);
1180     f->close_section();
1181   }
1182   f->close_section();
1183   f->dump_stream("removed_snaps") << removed_snaps;
1184   f->dump_unsigned("quota_max_bytes", quota_max_bytes);
1185   f->dump_unsigned("quota_max_objects", quota_max_objects);
1186   f->open_array_section("tiers");
1187   for (set<uint64_t>::const_iterator p = tiers.begin(); p != tiers.end(); ++p)
1188     f->dump_unsigned("pool_id", *p);
1189   f->close_section();
1190   f->dump_int("tier_of", tier_of);
1191   f->dump_int("read_tier", read_tier);
1192   f->dump_int("write_tier", write_tier);
1193   f->dump_string("cache_mode", get_cache_mode_name());
1194   f->dump_unsigned("target_max_bytes", target_max_bytes);
1195   f->dump_unsigned("target_max_objects", target_max_objects);
1196   f->dump_unsigned("cache_target_dirty_ratio_micro",
1197                    cache_target_dirty_ratio_micro);
1198   f->dump_unsigned("cache_target_dirty_high_ratio_micro",
1199                    cache_target_dirty_high_ratio_micro);
1200   f->dump_unsigned("cache_target_full_ratio_micro",
1201                    cache_target_full_ratio_micro);
1202   f->dump_unsigned("cache_min_flush_age", cache_min_flush_age);
1203   f->dump_unsigned("cache_min_evict_age", cache_min_evict_age);
1204   f->dump_string("erasure_code_profile", erasure_code_profile);
1205   f->open_object_section("hit_set_params");
1206   hit_set_params.dump(f);
1207   f->close_section(); // hit_set_params
1208   f->dump_unsigned("hit_set_period", hit_set_period);
1209   f->dump_unsigned("hit_set_count", hit_set_count);
1210   f->dump_bool("use_gmt_hitset", use_gmt_hitset);
1211   f->dump_unsigned("min_read_recency_for_promote", min_read_recency_for_promote);
1212   f->dump_unsigned("min_write_recency_for_promote", min_write_recency_for_promote);
1213   f->dump_unsigned("hit_set_grade_decay_rate", hit_set_grade_decay_rate);
1214   f->dump_unsigned("hit_set_search_last_n", hit_set_search_last_n);
1215   f->open_array_section("grade_table");
1216   for (unsigned i = 0; i < hit_set_count; ++i)
1217     f->dump_unsigned("value", get_grade(i));
1218   f->close_section();
1219   f->dump_unsigned("stripe_width", get_stripe_width());
1220   f->dump_unsigned("expected_num_objects", expected_num_objects);
1221   f->dump_bool("fast_read", fast_read);
1222   f->open_object_section("options");
1223   opts.dump(f);
1224   f->close_section(); // options
1225   f->open_object_section("application_metadata");
1226   for (auto &app_pair : application_metadata) {
1227     f->open_object_section(app_pair.first.c_str());
1228     for (auto &kv_pair : app_pair.second) {
1229       f->dump_string(kv_pair.first.c_str(), kv_pair.second);
1230     }
1231     f->close_section(); // application
1232   }
1233   f->close_section(); // application_metadata
1234 }
1235
1236 void pg_pool_t::convert_to_pg_shards(const vector<int> &from, set<pg_shard_t>* to) const {
1237   for (size_t i = 0; i < from.size(); ++i) {
1238     if (from[i] != CRUSH_ITEM_NONE) {
1239       to->insert(
1240         pg_shard_t(
1241           from[i],
1242           ec_pool() ? shard_id_t(i) : shard_id_t::NO_SHARD));
1243     }
1244   }
1245 }
1246
1247 void pg_pool_t::calc_pg_masks()
1248 {
1249   pg_num_mask = (1 << cbits(pg_num-1)) - 1;
1250   pgp_num_mask = (1 << cbits(pgp_num-1)) - 1;
1251 }
1252
1253 unsigned pg_pool_t::get_pg_num_divisor(pg_t pgid) const
1254 {
1255   if (pg_num == pg_num_mask + 1)
1256     return pg_num;                    // power-of-2 split
1257   unsigned mask = pg_num_mask >> 1;
1258   if ((pgid.ps() & mask) < (pg_num & mask))
1259     return pg_num_mask + 1;           // smaller bin size (already split)
1260   else
1261     return (pg_num_mask + 1) >> 1;    // bigger bin (not yet split)
1262 }
1263
1264 /*
1265  * we have two snap modes:
1266  *  - pool global snaps
1267  *    - snap existence/non-existence defined by snaps[] and snap_seq
1268  *  - user managed snaps
1269  *    - removal governed by removed_snaps
1270  *
1271  * we know which mode we're using based on whether removed_snaps is empty.
1272  */
1273 bool pg_pool_t::is_pool_snaps_mode() const
1274 {
1275   return removed_snaps.empty() && get_snap_seq() > 0;
1276 }
1277
1278 bool pg_pool_t::is_unmanaged_snaps_mode() const
1279 {
1280   return removed_snaps.size() && get_snap_seq() > 0;
1281 }
1282
1283 bool pg_pool_t::is_removed_snap(snapid_t s) const
1284 {
1285   if (is_pool_snaps_mode())
1286     return s <= get_snap_seq() && snaps.count(s) == 0;
1287   else
1288     return removed_snaps.contains(s);
1289 }
1290
1291 /*
1292  * build set of known-removed sets from either pool snaps or
1293  * explicit removed_snaps set.
1294  */
1295 void pg_pool_t::build_removed_snaps(interval_set<snapid_t>& rs) const
1296 {
1297   if (is_pool_snaps_mode()) {
1298     rs.clear();
1299     for (snapid_t s = 1; s <= get_snap_seq(); s = s + 1)
1300       if (snaps.count(s) == 0)
1301         rs.insert(s);
1302   } else {
1303     rs = removed_snaps;
1304   }
1305 }
1306
1307 snapid_t pg_pool_t::snap_exists(const char *s) const
1308 {
1309   for (map<snapid_t,pool_snap_info_t>::const_iterator p = snaps.begin();
1310        p != snaps.end();
1311        ++p)
1312     if (p->second.name == s)
1313       return p->second.snapid;
1314   return 0;
1315 }
1316
1317 void pg_pool_t::add_snap(const char *n, utime_t stamp)
1318 {
1319   assert(!is_unmanaged_snaps_mode());
1320   snapid_t s = get_snap_seq() + 1;
1321   snap_seq = s;
1322   snaps[s].snapid = s;
1323   snaps[s].name = n;
1324   snaps[s].stamp = stamp;
1325 }
1326
1327 void pg_pool_t::add_unmanaged_snap(uint64_t& snapid)
1328 {
1329   if (removed_snaps.empty()) {
1330     assert(!is_pool_snaps_mode());
1331     removed_snaps.insert(snapid_t(1));
1332     snap_seq = 1;
1333   }
1334   snapid = snap_seq = snap_seq + 1;
1335 }
1336
1337 void pg_pool_t::remove_snap(snapid_t s)
1338 {
1339   assert(snaps.count(s));
1340   snaps.erase(s);
1341   snap_seq = snap_seq + 1;
1342 }
1343
1344 void pg_pool_t::remove_unmanaged_snap(snapid_t s)
1345 {
1346   assert(is_unmanaged_snaps_mode());
1347   removed_snaps.insert(s);
1348   snap_seq = snap_seq + 1;
1349   removed_snaps.insert(get_snap_seq());
1350 }
1351
1352 SnapContext pg_pool_t::get_snap_context() const
1353 {
1354   vector<snapid_t> s(snaps.size());
1355   unsigned i = 0;
1356   for (map<snapid_t, pool_snap_info_t>::const_reverse_iterator p = snaps.rbegin();
1357        p != snaps.rend();
1358        ++p)
1359     s[i++] = p->first;
1360   return SnapContext(get_snap_seq(), s);
1361 }
1362
1363 uint32_t pg_pool_t::hash_key(const string& key, const string& ns) const
1364 {
1365  if (ns.empty()) 
1366     return ceph_str_hash(object_hash, key.data(), key.length());
1367   int nsl = ns.length();
1368   int len = key.length() + nsl + 1;
1369   char buf[len];
1370   memcpy(&buf[0], ns.data(), nsl);
1371   buf[nsl] = '\037';
1372   memcpy(&buf[nsl+1], key.data(), key.length());
1373   return ceph_str_hash(object_hash, &buf[0], len);
1374 }
1375
1376 uint32_t pg_pool_t::raw_hash_to_pg(uint32_t v) const
1377 {
1378   return ceph_stable_mod(v, pg_num, pg_num_mask);
1379 }
1380
1381 /*
1382  * map a raw pg (with full precision ps) into an actual pg, for storage
1383  */
1384 pg_t pg_pool_t::raw_pg_to_pg(pg_t pg) const
1385 {
1386   pg.set_ps(ceph_stable_mod(pg.ps(), pg_num, pg_num_mask));
1387   return pg;
1388 }
1389   
1390 /*
1391  * map raw pg (full precision ps) into a placement seed.  include
1392  * pool id in that value so that different pools don't use the same
1393  * seeds.
1394  */
1395 ps_t pg_pool_t::raw_pg_to_pps(pg_t pg) const
1396 {
1397   if (flags & FLAG_HASHPSPOOL) {
1398     // Hash the pool id so that pool PGs do not overlap.
1399     return
1400       crush_hash32_2(CRUSH_HASH_RJENKINS1,
1401                      ceph_stable_mod(pg.ps(), pgp_num, pgp_num_mask),
1402                      pg.pool());
1403   } else {
1404     // Legacy behavior; add ps and pool together.  This is not a great
1405     // idea because the PGs from each pool will essentially overlap on
1406     // top of each other: 0.5 == 1.4 == 2.3 == ...
1407     return
1408       ceph_stable_mod(pg.ps(), pgp_num, pgp_num_mask) +
1409       pg.pool();
1410   }
1411 }
1412
1413 uint32_t pg_pool_t::get_random_pg_position(pg_t pg, uint32_t seed) const
1414 {
1415   uint32_t r = crush_hash32_2(CRUSH_HASH_RJENKINS1, seed, 123);
1416   if (pg_num == pg_num_mask + 1) {
1417     r &= ~pg_num_mask;
1418   } else {
1419     unsigned smaller_mask = pg_num_mask >> 1;
1420     if ((pg.ps() & smaller_mask) < (pg_num & smaller_mask)) {
1421       r &= ~pg_num_mask;
1422     } else {
1423       r &= ~smaller_mask;
1424     }
1425   }
1426   r |= pg.ps();
1427   return r;
1428 }
1429
1430 void pg_pool_t::encode(bufferlist& bl, uint64_t features) const
1431 {
1432   if ((features & CEPH_FEATURE_PGPOOL3) == 0) {
1433     // this encoding matches the old struct ceph_pg_pool
1434     __u8 struct_v = 2;
1435     ::encode(struct_v, bl);
1436     ::encode(type, bl);
1437     ::encode(size, bl);
1438     ::encode(crush_rule, bl);
1439     ::encode(object_hash, bl);
1440     ::encode(pg_num, bl);
1441     ::encode(pgp_num, bl);
1442     __u32 lpg_num = 0, lpgp_num = 0;  // tell old code that there are no localized pgs.
1443     ::encode(lpg_num, bl);
1444     ::encode(lpgp_num, bl);
1445     ::encode(last_change, bl);
1446     ::encode(snap_seq, bl);
1447     ::encode(snap_epoch, bl);
1448
1449     __u32 n = snaps.size();
1450     ::encode(n, bl);
1451     n = removed_snaps.num_intervals();
1452     ::encode(n, bl);
1453
1454     ::encode(auid, bl);
1455
1456     ::encode_nohead(snaps, bl, features);
1457     ::encode_nohead(removed_snaps, bl);
1458     return;
1459   }
1460
1461   if ((features & CEPH_FEATURE_OSDENC) == 0) {
1462     __u8 struct_v = 4;
1463     ::encode(struct_v, bl);
1464     ::encode(type, bl);
1465     ::encode(size, bl);
1466     ::encode(crush_rule, bl);
1467     ::encode(object_hash, bl);
1468     ::encode(pg_num, bl);
1469     ::encode(pgp_num, bl);
1470     __u32 lpg_num = 0, lpgp_num = 0;  // tell old code that there are no localized pgs.
1471     ::encode(lpg_num, bl);
1472     ::encode(lpgp_num, bl);
1473     ::encode(last_change, bl);
1474     ::encode(snap_seq, bl);
1475     ::encode(snap_epoch, bl);
1476     ::encode(snaps, bl, features);
1477     ::encode(removed_snaps, bl);
1478     ::encode(auid, bl);
1479     ::encode(flags, bl);
1480     ::encode(crash_replay_interval, bl);
1481     return;
1482   }
1483
1484   if ((features & CEPH_FEATURE_OSD_POOLRESEND) == 0) {
1485     // we simply added last_force_op_resend here, which is a fully
1486     // backward compatible change.  however, encoding the same map
1487     // differently between monitors triggers scrub noise (even though
1488     // they are decodable without the feature), so let's be pendantic
1489     // about it.
1490     ENCODE_START(14, 5, bl);
1491     ::encode(type, bl);
1492     ::encode(size, bl);
1493     ::encode(crush_rule, bl);
1494     ::encode(object_hash, bl);
1495     ::encode(pg_num, bl);
1496     ::encode(pgp_num, bl);
1497     __u32 lpg_num = 0, lpgp_num = 0;  // tell old code that there are no localized pgs.
1498     ::encode(lpg_num, bl);
1499     ::encode(lpgp_num, bl);
1500     ::encode(last_change, bl);
1501     ::encode(snap_seq, bl);
1502     ::encode(snap_epoch, bl);
1503     ::encode(snaps, bl, features);
1504     ::encode(removed_snaps, bl);
1505     ::encode(auid, bl);
1506     ::encode(flags, bl);
1507     ::encode(crash_replay_interval, bl);
1508     ::encode(min_size, bl);
1509     ::encode(quota_max_bytes, bl);
1510     ::encode(quota_max_objects, bl);
1511     ::encode(tiers, bl);
1512     ::encode(tier_of, bl);
1513     __u8 c = cache_mode;
1514     ::encode(c, bl);
1515     ::encode(read_tier, bl);
1516     ::encode(write_tier, bl);
1517     ::encode(properties, bl);
1518     ::encode(hit_set_params, bl);
1519     ::encode(hit_set_period, bl);
1520     ::encode(hit_set_count, bl);
1521     ::encode(stripe_width, bl);
1522     ::encode(target_max_bytes, bl);
1523     ::encode(target_max_objects, bl);
1524     ::encode(cache_target_dirty_ratio_micro, bl);
1525     ::encode(cache_target_full_ratio_micro, bl);
1526     ::encode(cache_min_flush_age, bl);
1527     ::encode(cache_min_evict_age, bl);
1528     ::encode(erasure_code_profile, bl);
1529     ENCODE_FINISH(bl);
1530     return;
1531   }
1532
1533   uint8_t v = 26;
1534   if (!(features & CEPH_FEATURE_NEW_OSDOP_ENCODING)) {
1535     // this was the first post-hammer thing we added; if it's missing, encode
1536     // like hammer.
1537     v = 21;
1538   }
1539   if (!HAVE_FEATURE(features, SERVER_LUMINOUS)) {
1540     v = 24;
1541   }
1542
1543   ENCODE_START(v, 5, bl);
1544   ::encode(type, bl);
1545   ::encode(size, bl);
1546   ::encode(crush_rule, bl);
1547   ::encode(object_hash, bl);
1548   ::encode(pg_num, bl);
1549   ::encode(pgp_num, bl);
1550   __u32 lpg_num = 0, lpgp_num = 0;  // tell old code that there are no localized pgs.
1551   ::encode(lpg_num, bl);
1552   ::encode(lpgp_num, bl);
1553   ::encode(last_change, bl);
1554   ::encode(snap_seq, bl);
1555   ::encode(snap_epoch, bl);
1556   ::encode(snaps, bl, features);
1557   ::encode(removed_snaps, bl);
1558   ::encode(auid, bl);
1559   ::encode(flags, bl);
1560   ::encode(crash_replay_interval, bl);
1561   ::encode(min_size, bl);
1562   ::encode(quota_max_bytes, bl);
1563   ::encode(quota_max_objects, bl);
1564   ::encode(tiers, bl);
1565   ::encode(tier_of, bl);
1566   __u8 c = cache_mode;
1567   ::encode(c, bl);
1568   ::encode(read_tier, bl);
1569   ::encode(write_tier, bl);
1570   ::encode(properties, bl);
1571   ::encode(hit_set_params, bl);
1572   ::encode(hit_set_period, bl);
1573   ::encode(hit_set_count, bl);
1574   ::encode(stripe_width, bl);
1575   ::encode(target_max_bytes, bl);
1576   ::encode(target_max_objects, bl);
1577   ::encode(cache_target_dirty_ratio_micro, bl);
1578   ::encode(cache_target_full_ratio_micro, bl);
1579   ::encode(cache_min_flush_age, bl);
1580   ::encode(cache_min_evict_age, bl);
1581   ::encode(erasure_code_profile, bl);
1582   ::encode(last_force_op_resend_preluminous, bl);
1583   ::encode(min_read_recency_for_promote, bl);
1584   ::encode(expected_num_objects, bl);
1585   if (v >= 19) {
1586     ::encode(cache_target_dirty_high_ratio_micro, bl);
1587   }
1588   if (v >= 20) {
1589     ::encode(min_write_recency_for_promote, bl);
1590   }
1591   if (v >= 21) {
1592     ::encode(use_gmt_hitset, bl);
1593   }
1594   if (v >= 22) {
1595     ::encode(fast_read, bl);
1596   }
1597   if (v >= 23) {
1598     ::encode(hit_set_grade_decay_rate, bl);
1599     ::encode(hit_set_search_last_n, bl);
1600   }
1601   if (v >= 24) {
1602     ::encode(opts, bl);
1603   }
1604   if (v >= 25) {
1605     ::encode(last_force_op_resend, bl);
1606   }
1607   if (v >= 26) {
1608     ::encode(application_metadata, bl);
1609   }
1610   ENCODE_FINISH(bl);
1611 }
1612
1613 void pg_pool_t::decode(bufferlist::iterator& bl)
1614 {
1615   DECODE_START_LEGACY_COMPAT_LEN(26, 5, 5, bl);
1616   ::decode(type, bl);
1617   ::decode(size, bl);
1618   ::decode(crush_rule, bl);
1619   ::decode(object_hash, bl);
1620   ::decode(pg_num, bl);
1621   ::decode(pgp_num, bl);
1622   {
1623     __u32 lpg_num, lpgp_num;
1624     ::decode(lpg_num, bl);
1625     ::decode(lpgp_num, bl);
1626   }
1627   ::decode(last_change, bl);
1628   ::decode(snap_seq, bl);
1629   ::decode(snap_epoch, bl);
1630
1631   if (struct_v >= 3) {
1632     ::decode(snaps, bl);
1633     ::decode(removed_snaps, bl);
1634     ::decode(auid, bl);
1635   } else {
1636     __u32 n, m;
1637     ::decode(n, bl);
1638     ::decode(m, bl);
1639     ::decode(auid, bl);
1640     ::decode_nohead(n, snaps, bl);
1641     ::decode_nohead(m, removed_snaps, bl);
1642   }
1643
1644   if (struct_v >= 4) {
1645     ::decode(flags, bl);
1646     ::decode(crash_replay_interval, bl);
1647   } else {
1648     flags = 0;
1649
1650     // if this looks like the 'data' pool, set the
1651     // crash_replay_interval appropriately.  unfortunately, we can't
1652     // be precise here.  this should be good enough to preserve replay
1653     // on the data pool for the majority of cluster upgrades, though.
1654     if (crush_rule == 0 && auid == 0)
1655       crash_replay_interval = 60;
1656     else
1657       crash_replay_interval = 0;
1658   }
1659   if (struct_v >= 7) {
1660     ::decode(min_size, bl);
1661   } else {
1662     min_size = size - size/2;
1663   }
1664   if (struct_v >= 8) {
1665     ::decode(quota_max_bytes, bl);
1666     ::decode(quota_max_objects, bl);
1667   }
1668   if (struct_v >= 9) {
1669     ::decode(tiers, bl);
1670     ::decode(tier_of, bl);
1671     __u8 v;
1672     ::decode(v, bl);
1673     cache_mode = (cache_mode_t)v;
1674     ::decode(read_tier, bl);
1675     ::decode(write_tier, bl);
1676   }
1677   if (struct_v >= 10) {
1678     ::decode(properties, bl);
1679   }
1680   if (struct_v >= 11) {
1681     ::decode(hit_set_params, bl);
1682     ::decode(hit_set_period, bl);
1683     ::decode(hit_set_count, bl);
1684   } else {
1685     pg_pool_t def;
1686     hit_set_period = def.hit_set_period;
1687     hit_set_count = def.hit_set_count;
1688   }
1689   if (struct_v >= 12) {
1690     ::decode(stripe_width, bl);
1691   } else {
1692     set_stripe_width(0);
1693   }
1694   if (struct_v >= 13) {
1695     ::decode(target_max_bytes, bl);
1696     ::decode(target_max_objects, bl);
1697     ::decode(cache_target_dirty_ratio_micro, bl);
1698     ::decode(cache_target_full_ratio_micro, bl);
1699     ::decode(cache_min_flush_age, bl);
1700     ::decode(cache_min_evict_age, bl);
1701   } else {
1702     target_max_bytes = 0;
1703     target_max_objects = 0;
1704     cache_target_dirty_ratio_micro = 0;
1705     cache_target_full_ratio_micro = 0;
1706     cache_min_flush_age = 0;
1707     cache_min_evict_age = 0;
1708   }
1709   if (struct_v >= 14) {
1710     ::decode(erasure_code_profile, bl);
1711   }
1712   if (struct_v >= 15) {
1713     ::decode(last_force_op_resend_preluminous, bl);
1714   } else {
1715     last_force_op_resend_preluminous = 0;
1716   }
1717   if (struct_v >= 16) {
1718     ::decode(min_read_recency_for_promote, bl);
1719   } else {
1720     min_read_recency_for_promote = 1;
1721   }
1722   if (struct_v >= 17) {
1723     ::decode(expected_num_objects, bl);
1724   } else {
1725     expected_num_objects = 0;
1726   }
1727   if (struct_v >= 19) {
1728     ::decode(cache_target_dirty_high_ratio_micro, bl);
1729   } else {
1730     cache_target_dirty_high_ratio_micro = cache_target_dirty_ratio_micro;
1731   }
1732   if (struct_v >= 20) {
1733     ::decode(min_write_recency_for_promote, bl);
1734   } else {
1735     min_write_recency_for_promote = 1;
1736   }
1737   if (struct_v >= 21) {
1738     ::decode(use_gmt_hitset, bl);
1739   } else {
1740     use_gmt_hitset = false;
1741   }
1742   if (struct_v >= 22) {
1743     ::decode(fast_read, bl);
1744   } else {
1745     fast_read = false;
1746   }
1747   if (struct_v >= 23) {
1748     ::decode(hit_set_grade_decay_rate, bl);
1749     ::decode(hit_set_search_last_n, bl);
1750   } else {
1751     hit_set_grade_decay_rate = 0;
1752     hit_set_search_last_n = 1;
1753   }
1754   if (struct_v >= 24) {
1755     ::decode(opts, bl);
1756   }
1757   if (struct_v >= 25) {
1758     ::decode(last_force_op_resend, bl);
1759   } else {
1760     last_force_op_resend = last_force_op_resend_preluminous;
1761   }
1762   if (struct_v >= 26) {
1763     ::decode(application_metadata, bl);
1764   }
1765   DECODE_FINISH(bl);
1766   calc_pg_masks();
1767   calc_grade_table();
1768 }
1769
1770 void pg_pool_t::generate_test_instances(list<pg_pool_t*>& o)
1771 {
1772   pg_pool_t a;
1773   o.push_back(new pg_pool_t(a));
1774
1775   a.type = TYPE_REPLICATED;
1776   a.size = 2;
1777   a.crush_rule = 3;
1778   a.object_hash = 4;
1779   a.pg_num = 6;
1780   a.pgp_num = 5;
1781   a.last_change = 9;
1782   a.last_force_op_resend = 123823;
1783   a.last_force_op_resend_preluminous = 123824;
1784   a.snap_seq = 10;
1785   a.snap_epoch = 11;
1786   a.auid = 12;
1787   a.crash_replay_interval = 13;
1788   a.quota_max_bytes = 473;
1789   a.quota_max_objects = 474;
1790   o.push_back(new pg_pool_t(a));
1791
1792   a.snaps[3].name = "asdf";
1793   a.snaps[3].snapid = 3;
1794   a.snaps[3].stamp = utime_t(123, 4);
1795   a.snaps[6].name = "qwer";
1796   a.snaps[6].snapid = 6;
1797   a.snaps[6].stamp = utime_t(23423, 4);
1798   o.push_back(new pg_pool_t(a));
1799
1800   a.removed_snaps.insert(2);   // not quite valid to combine with snaps!
1801   a.quota_max_bytes = 2473;
1802   a.quota_max_objects = 4374;
1803   a.tiers.insert(0);
1804   a.tiers.insert(1);
1805   a.tier_of = 2;
1806   a.cache_mode = CACHEMODE_WRITEBACK;
1807   a.read_tier = 1;
1808   a.write_tier = 1;
1809   a.hit_set_params = HitSet::Params(new BloomHitSet::Params);
1810   a.hit_set_period = 3600;
1811   a.hit_set_count = 8;
1812   a.min_read_recency_for_promote = 1;
1813   a.min_write_recency_for_promote = 1;
1814   a.hit_set_grade_decay_rate = 50;
1815   a.hit_set_search_last_n = 1;
1816   a.calc_grade_table();
1817   a.set_stripe_width(12345);
1818   a.target_max_bytes = 1238132132;
1819   a.target_max_objects = 1232132;
1820   a.cache_target_dirty_ratio_micro = 187232;
1821   a.cache_target_dirty_high_ratio_micro = 309856;
1822   a.cache_target_full_ratio_micro = 987222;
1823   a.cache_min_flush_age = 231;
1824   a.cache_min_evict_age = 2321;
1825   a.erasure_code_profile = "profile in osdmap";
1826   a.expected_num_objects = 123456;
1827   a.fast_read = false;
1828   a.application_metadata = {{"rbd", {{"key", "value"}}}};
1829   o.push_back(new pg_pool_t(a));
1830 }
1831
1832 ostream& operator<<(ostream& out, const pg_pool_t& p)
1833 {
1834   out << p.get_type_name()
1835       << " size " << p.get_size()
1836       << " min_size " << p.get_min_size()
1837       << " crush_rule " << p.get_crush_rule()
1838       << " object_hash " << p.get_object_hash_name()
1839       << " pg_num " << p.get_pg_num()
1840       << " pgp_num " << p.get_pgp_num()
1841       << " last_change " << p.get_last_change();
1842   if (p.get_last_force_op_resend() ||
1843       p.get_last_force_op_resend_preluminous())
1844     out << " lfor " << p.get_last_force_op_resend() << "/"
1845         << p.get_last_force_op_resend_preluminous();
1846   if (p.get_auid())
1847     out << " owner " << p.get_auid();
1848   if (p.flags)
1849     out << " flags " << p.get_flags_string();
1850   if (p.crash_replay_interval)
1851     out << " crash_replay_interval " << p.crash_replay_interval;
1852   if (p.quota_max_bytes)
1853     out << " max_bytes " << p.quota_max_bytes;
1854   if (p.quota_max_objects)
1855     out << " max_objects " << p.quota_max_objects;
1856   if (!p.tiers.empty())
1857     out << " tiers " << p.tiers;
1858   if (p.is_tier())
1859     out << " tier_of " << p.tier_of;
1860   if (p.has_read_tier())
1861     out << " read_tier " << p.read_tier;
1862   if (p.has_write_tier())
1863     out << " write_tier " << p.write_tier;
1864   if (p.cache_mode)
1865     out << " cache_mode " << p.get_cache_mode_name();
1866   if (p.target_max_bytes)
1867     out << " target_bytes " << p.target_max_bytes;
1868   if (p.target_max_objects)
1869     out << " target_objects " << p.target_max_objects;
1870   if (p.hit_set_params.get_type() != HitSet::TYPE_NONE) {
1871     out << " hit_set " << p.hit_set_params
1872         << " " << p.hit_set_period << "s"
1873         << " x" << p.hit_set_count << " decay_rate "
1874         << p.hit_set_grade_decay_rate
1875         << " search_last_n " << p.hit_set_search_last_n;
1876   }
1877   if (p.min_read_recency_for_promote)
1878     out << " min_read_recency_for_promote " << p.min_read_recency_for_promote;
1879   if (p.min_write_recency_for_promote)
1880     out << " min_write_recency_for_promote " << p.min_write_recency_for_promote;
1881   out << " stripe_width " << p.get_stripe_width();
1882   if (p.expected_num_objects)
1883     out << " expected_num_objects " << p.expected_num_objects;
1884   if (p.fast_read)
1885     out << " fast_read " << p.fast_read;
1886   out << p.opts;
1887   if (!p.application_metadata.empty()) {
1888     out << " application ";
1889     for (auto it = p.application_metadata.begin();
1890          it != p.application_metadata.end(); ++it) {
1891       if (it != p.application_metadata.begin())
1892         out << ",";
1893       out << it->first;
1894     }
1895   }
1896   return out;
1897 }
1898
1899
1900 // -- object_stat_sum_t --
1901
1902 void object_stat_sum_t::dump(Formatter *f) const
1903 {
1904   f->dump_int("num_bytes", num_bytes);
1905   f->dump_int("num_objects", num_objects);
1906   f->dump_int("num_object_clones", num_object_clones);
1907   f->dump_int("num_object_copies", num_object_copies);
1908   f->dump_int("num_objects_missing_on_primary", num_objects_missing_on_primary);
1909   f->dump_int("num_objects_missing", num_objects_missing);
1910   f->dump_int("num_objects_degraded", num_objects_degraded);
1911   f->dump_int("num_objects_misplaced", num_objects_misplaced);
1912   f->dump_int("num_objects_unfound", num_objects_unfound);
1913   f->dump_int("num_objects_dirty", num_objects_dirty);
1914   f->dump_int("num_whiteouts", num_whiteouts);
1915   f->dump_int("num_read", num_rd);
1916   f->dump_int("num_read_kb", num_rd_kb);
1917   f->dump_int("num_write", num_wr);
1918   f->dump_int("num_write_kb", num_wr_kb);
1919   f->dump_int("num_scrub_errors", num_scrub_errors);
1920   f->dump_int("num_shallow_scrub_errors", num_shallow_scrub_errors);
1921   f->dump_int("num_deep_scrub_errors", num_deep_scrub_errors);
1922   f->dump_int("num_objects_recovered", num_objects_recovered);
1923   f->dump_int("num_bytes_recovered", num_bytes_recovered);
1924   f->dump_int("num_keys_recovered", num_keys_recovered);
1925   f->dump_int("num_objects_omap", num_objects_omap);
1926   f->dump_int("num_objects_hit_set_archive", num_objects_hit_set_archive);
1927   f->dump_int("num_bytes_hit_set_archive", num_bytes_hit_set_archive);
1928   f->dump_int("num_flush", num_flush);
1929   f->dump_int("num_flush_kb", num_flush_kb);
1930   f->dump_int("num_evict", num_evict);
1931   f->dump_int("num_evict_kb", num_evict_kb);
1932   f->dump_int("num_promote", num_promote);
1933   f->dump_int("num_flush_mode_high", num_flush_mode_high);
1934   f->dump_int("num_flush_mode_low", num_flush_mode_low);
1935   f->dump_int("num_evict_mode_some", num_evict_mode_some);
1936   f->dump_int("num_evict_mode_full", num_evict_mode_full);
1937   f->dump_int("num_objects_pinned", num_objects_pinned);
1938   f->dump_int("num_legacy_snapsets", num_legacy_snapsets);
1939 }
1940
1941 void object_stat_sum_t::encode(bufferlist& bl) const
1942 {
1943   ENCODE_START(16, 14, bl);
1944 #if defined(CEPH_LITTLE_ENDIAN)
1945   bl.append((char *)(&num_bytes), sizeof(object_stat_sum_t));
1946 #else
1947   ::encode(num_bytes, bl);
1948   ::encode(num_objects, bl);
1949   ::encode(num_object_clones, bl);
1950   ::encode(num_object_copies, bl);
1951   ::encode(num_objects_missing_on_primary, bl);
1952   ::encode(num_objects_degraded, bl);
1953   ::encode(num_objects_unfound, bl);
1954   ::encode(num_rd, bl);
1955   ::encode(num_rd_kb, bl);
1956   ::encode(num_wr, bl);
1957   ::encode(num_wr_kb, bl);
1958   ::encode(num_scrub_errors, bl);
1959   ::encode(num_objects_recovered, bl);
1960   ::encode(num_bytes_recovered, bl);
1961   ::encode(num_keys_recovered, bl);
1962   ::encode(num_shallow_scrub_errors, bl);
1963   ::encode(num_deep_scrub_errors, bl);
1964   ::encode(num_objects_dirty, bl);
1965   ::encode(num_whiteouts, bl);
1966   ::encode(num_objects_omap, bl);
1967   ::encode(num_objects_hit_set_archive, bl);
1968   ::encode(num_objects_misplaced, bl);
1969   ::encode(num_bytes_hit_set_archive, bl);
1970   ::encode(num_flush, bl);
1971   ::encode(num_flush_kb, bl);
1972   ::encode(num_evict, bl);
1973   ::encode(num_evict_kb, bl);
1974   ::encode(num_promote, bl);
1975   ::encode(num_flush_mode_high, bl);
1976   ::encode(num_flush_mode_low, bl);
1977   ::encode(num_evict_mode_some, bl);
1978   ::encode(num_evict_mode_full, bl);
1979   ::encode(num_objects_pinned, bl);
1980   ::encode(num_objects_missing, bl);
1981   ::encode(num_legacy_snapsets, bl);
1982 #endif
1983   ENCODE_FINISH(bl);
1984 }
1985
1986 void object_stat_sum_t::decode(bufferlist::iterator& bl)
1987 {
1988   bool decode_finish = false;
1989   DECODE_START(16, bl);
1990 #if defined(CEPH_LITTLE_ENDIAN)
1991   if (struct_v >= 16) {
1992     bl.copy(sizeof(object_stat_sum_t), (char*)(&num_bytes));
1993     decode_finish = true;
1994   }
1995 #endif
1996   if (!decode_finish) {
1997     ::decode(num_bytes, bl);
1998     ::decode(num_objects, bl);
1999     ::decode(num_object_clones, bl);
2000     ::decode(num_object_copies, bl);
2001     ::decode(num_objects_missing_on_primary, bl);
2002     ::decode(num_objects_degraded, bl);
2003     ::decode(num_objects_unfound, bl);
2004     ::decode(num_rd, bl);
2005     ::decode(num_rd_kb, bl);
2006     ::decode(num_wr, bl);
2007     ::decode(num_wr_kb, bl);
2008     ::decode(num_scrub_errors, bl);
2009     ::decode(num_objects_recovered, bl);
2010     ::decode(num_bytes_recovered, bl);
2011     ::decode(num_keys_recovered, bl);
2012     ::decode(num_shallow_scrub_errors, bl);
2013     ::decode(num_deep_scrub_errors, bl);
2014     ::decode(num_objects_dirty, bl);
2015     ::decode(num_whiteouts, bl);
2016     ::decode(num_objects_omap, bl);
2017     ::decode(num_objects_hit_set_archive, bl);
2018     ::decode(num_objects_misplaced, bl);
2019     ::decode(num_bytes_hit_set_archive, bl);
2020     ::decode(num_flush, bl);
2021     ::decode(num_flush_kb, bl);
2022     ::decode(num_evict, bl);
2023     ::decode(num_evict_kb, bl);
2024     ::decode(num_promote, bl);
2025     ::decode(num_flush_mode_high, bl);
2026     ::decode(num_flush_mode_low, bl);
2027     ::decode(num_evict_mode_some, bl);
2028     ::decode(num_evict_mode_full, bl);
2029     ::decode(num_objects_pinned, bl);
2030     ::decode(num_objects_missing, bl);
2031     if (struct_v >= 16) {
2032       ::decode(num_legacy_snapsets, bl);
2033     } else {
2034       num_legacy_snapsets = num_object_clones;  // upper bound
2035     }
2036   }
2037   DECODE_FINISH(bl);
2038 }
2039
2040 void object_stat_sum_t::generate_test_instances(list<object_stat_sum_t*>& o)
2041 {
2042   object_stat_sum_t a;
2043
2044   a.num_bytes = 1;
2045   a.num_objects = 3;
2046   a.num_object_clones = 4;
2047   a.num_object_copies = 5;
2048   a.num_objects_missing_on_primary = 6;
2049   a.num_objects_missing = 123;
2050   a.num_objects_degraded = 7;
2051   a.num_objects_unfound = 8;
2052   a.num_rd = 9; a.num_rd_kb = 10;
2053   a.num_wr = 11; a.num_wr_kb = 12;
2054   a.num_objects_recovered = 14;
2055   a.num_bytes_recovered = 15;
2056   a.num_keys_recovered = 16;
2057   a.num_deep_scrub_errors = 17;
2058   a.num_shallow_scrub_errors = 18;
2059   a.num_scrub_errors = a.num_deep_scrub_errors + a.num_shallow_scrub_errors;
2060   a.num_objects_dirty = 21;
2061   a.num_whiteouts = 22;
2062   a.num_objects_misplaced = 1232;
2063   a.num_objects_hit_set_archive = 2;
2064   a.num_bytes_hit_set_archive = 27;
2065   a.num_flush = 5;
2066   a.num_flush_kb = 6;
2067   a.num_evict = 7;
2068   a.num_evict_kb = 8;
2069   a.num_promote = 9;
2070   a.num_flush_mode_high = 0;
2071   a.num_flush_mode_low = 1;
2072   a.num_evict_mode_some = 1;
2073   a.num_evict_mode_full = 0;
2074   a.num_objects_pinned = 20;
2075   o.push_back(new object_stat_sum_t(a));
2076 }
2077
2078 void object_stat_sum_t::add(const object_stat_sum_t& o)
2079 {
2080   num_bytes += o.num_bytes;
2081   num_objects += o.num_objects;
2082   num_object_clones += o.num_object_clones;
2083   num_object_copies += o.num_object_copies;
2084   num_objects_missing_on_primary += o.num_objects_missing_on_primary;
2085   num_objects_missing += o.num_objects_missing;
2086   num_objects_degraded += o.num_objects_degraded;
2087   num_objects_misplaced += o.num_objects_misplaced;
2088   num_rd += o.num_rd;
2089   num_rd_kb += o.num_rd_kb;
2090   num_wr += o.num_wr;
2091   num_wr_kb += o.num_wr_kb;
2092   num_objects_unfound += o.num_objects_unfound;
2093   num_scrub_errors += o.num_scrub_errors;
2094   num_shallow_scrub_errors += o.num_shallow_scrub_errors;
2095   num_deep_scrub_errors += o.num_deep_scrub_errors;
2096   num_objects_recovered += o.num_objects_recovered;
2097   num_bytes_recovered += o.num_bytes_recovered;
2098   num_keys_recovered += o.num_keys_recovered;
2099   num_objects_dirty += o.num_objects_dirty;
2100   num_whiteouts += o.num_whiteouts;
2101   num_objects_omap += o.num_objects_omap;
2102   num_objects_hit_set_archive += o.num_objects_hit_set_archive;
2103   num_bytes_hit_set_archive += o.num_bytes_hit_set_archive;
2104   num_flush += o.num_flush;
2105   num_flush_kb += o.num_flush_kb;
2106   num_evict += o.num_evict;
2107   num_evict_kb += o.num_evict_kb;
2108   num_promote += o.num_promote;
2109   num_flush_mode_high += o.num_flush_mode_high;
2110   num_flush_mode_low += o.num_flush_mode_low;
2111   num_evict_mode_some += o.num_evict_mode_some;
2112   num_evict_mode_full += o.num_evict_mode_full;
2113   num_objects_pinned += o.num_objects_pinned;
2114   num_legacy_snapsets += o.num_legacy_snapsets;
2115 }
2116
2117 void object_stat_sum_t::sub(const object_stat_sum_t& o)
2118 {
2119   num_bytes -= o.num_bytes;
2120   num_objects -= o.num_objects;
2121   num_object_clones -= o.num_object_clones;
2122   num_object_copies -= o.num_object_copies;
2123   num_objects_missing_on_primary -= o.num_objects_missing_on_primary;
2124   num_objects_missing -= o.num_objects_missing;
2125   num_objects_degraded -= o.num_objects_degraded;
2126   num_objects_misplaced -= o.num_objects_misplaced;
2127   num_rd -= o.num_rd;
2128   num_rd_kb -= o.num_rd_kb;
2129   num_wr -= o.num_wr;
2130   num_wr_kb -= o.num_wr_kb;
2131   num_objects_unfound -= o.num_objects_unfound;
2132   num_scrub_errors -= o.num_scrub_errors;
2133   num_shallow_scrub_errors -= o.num_shallow_scrub_errors;
2134   num_deep_scrub_errors -= o.num_deep_scrub_errors;
2135   num_objects_recovered -= o.num_objects_recovered;
2136   num_bytes_recovered -= o.num_bytes_recovered;
2137   num_keys_recovered -= o.num_keys_recovered;
2138   num_objects_dirty -= o.num_objects_dirty;
2139   num_whiteouts -= o.num_whiteouts;
2140   num_objects_omap -= o.num_objects_omap;
2141   num_objects_hit_set_archive -= o.num_objects_hit_set_archive;
2142   num_bytes_hit_set_archive -= o.num_bytes_hit_set_archive;
2143   num_flush -= o.num_flush;
2144   num_flush_kb -= o.num_flush_kb;
2145   num_evict -= o.num_evict;
2146   num_evict_kb -= o.num_evict_kb;
2147   num_promote -= o.num_promote;
2148   num_flush_mode_high -= o.num_flush_mode_high;
2149   num_flush_mode_low -= o.num_flush_mode_low;
2150   num_evict_mode_some -= o.num_evict_mode_some;
2151   num_evict_mode_full -= o.num_evict_mode_full;
2152   num_objects_pinned -= o.num_objects_pinned;
2153   num_legacy_snapsets -= o.num_legacy_snapsets;
2154 }
2155
2156 bool operator==(const object_stat_sum_t& l, const object_stat_sum_t& r)
2157 {
2158   return
2159     l.num_bytes == r.num_bytes &&
2160     l.num_objects == r.num_objects &&
2161     l.num_object_clones == r.num_object_clones &&
2162     l.num_object_copies == r.num_object_copies &&
2163     l.num_objects_missing_on_primary == r.num_objects_missing_on_primary &&
2164     l.num_objects_missing == r.num_objects_missing &&
2165     l.num_objects_degraded == r.num_objects_degraded &&
2166     l.num_objects_misplaced == r.num_objects_misplaced &&
2167     l.num_objects_unfound == r.num_objects_unfound &&
2168     l.num_rd == r.num_rd &&
2169     l.num_rd_kb == r.num_rd_kb &&
2170     l.num_wr == r.num_wr &&
2171     l.num_wr_kb == r.num_wr_kb &&
2172     l.num_scrub_errors == r.num_scrub_errors &&
2173     l.num_shallow_scrub_errors == r.num_shallow_scrub_errors &&
2174     l.num_deep_scrub_errors == r.num_deep_scrub_errors &&
2175     l.num_objects_recovered == r.num_objects_recovered &&
2176     l.num_bytes_recovered == r.num_bytes_recovered &&
2177     l.num_keys_recovered == r.num_keys_recovered &&
2178     l.num_objects_dirty == r.num_objects_dirty &&
2179     l.num_whiteouts == r.num_whiteouts &&
2180     l.num_objects_omap == r.num_objects_omap &&
2181     l.num_objects_hit_set_archive == r.num_objects_hit_set_archive &&
2182     l.num_bytes_hit_set_archive == r.num_bytes_hit_set_archive &&
2183     l.num_flush == r.num_flush &&
2184     l.num_flush_kb == r.num_flush_kb &&
2185     l.num_evict == r.num_evict &&
2186     l.num_evict_kb == r.num_evict_kb &&
2187     l.num_promote == r.num_promote &&
2188     l.num_flush_mode_high == r.num_flush_mode_high &&
2189     l.num_flush_mode_low == r.num_flush_mode_low &&
2190     l.num_evict_mode_some == r.num_evict_mode_some &&
2191     l.num_evict_mode_full == r.num_evict_mode_full &&
2192     l.num_objects_pinned == r.num_objects_pinned &&
2193     l.num_legacy_snapsets == r.num_legacy_snapsets;
2194 }
2195
2196 // -- object_stat_collection_t --
2197
2198 void object_stat_collection_t::dump(Formatter *f) const
2199 {
2200   f->open_object_section("stat_sum");
2201   sum.dump(f);
2202   f->close_section();
2203 }
2204
2205 void object_stat_collection_t::encode(bufferlist& bl) const
2206 {
2207   ENCODE_START(2, 2, bl);
2208   ::encode(sum, bl);
2209   ::encode((__u32)0, bl);
2210   ENCODE_FINISH(bl);
2211 }
2212
2213 void object_stat_collection_t::decode(bufferlist::iterator& bl)
2214 {
2215   DECODE_START_LEGACY_COMPAT_LEN(2, 2, 2, bl);
2216   ::decode(sum, bl);
2217   {
2218     map<string,object_stat_sum_t> cat_sum;
2219     ::decode(cat_sum, bl);
2220   }
2221   DECODE_FINISH(bl);
2222 }
2223
2224 void object_stat_collection_t::generate_test_instances(list<object_stat_collection_t*>& o)
2225 {
2226   object_stat_collection_t a;
2227   o.push_back(new object_stat_collection_t(a));
2228   list<object_stat_sum_t*> l;
2229   object_stat_sum_t::generate_test_instances(l);
2230   for (list<object_stat_sum_t*>::iterator p = l.begin(); p != l.end(); ++p) {
2231     a.add(**p);
2232     o.push_back(new object_stat_collection_t(a));
2233   }
2234 }
2235
2236
2237 // -- pg_stat_t --
2238
2239 bool pg_stat_t::is_acting_osd(int32_t osd, bool primary) const
2240 {
2241   if (primary && osd == acting_primary) {
2242     return true;
2243   } else if (!primary) {
2244     for(vector<int32_t>::const_iterator it = acting.begin();
2245         it != acting.end(); ++it)
2246     {
2247       if (*it == osd)
2248         return true;
2249     }
2250   }
2251   return false;
2252 }
2253
2254 void pg_stat_t::dump(Formatter *f) const
2255 {
2256   f->dump_stream("version") << version;
2257   f->dump_stream("reported_seq") << reported_seq;
2258   f->dump_stream("reported_epoch") << reported_epoch;
2259   f->dump_string("state", pg_state_string(state));
2260   f->dump_stream("last_fresh") << last_fresh;
2261   f->dump_stream("last_change") << last_change;
2262   f->dump_stream("last_active") << last_active;
2263   f->dump_stream("last_peered") << last_peered;
2264   f->dump_stream("last_clean") << last_clean;
2265   f->dump_stream("last_became_active") << last_became_active;
2266   f->dump_stream("last_became_peered") << last_became_peered;
2267   f->dump_stream("last_unstale") << last_unstale;
2268   f->dump_stream("last_undegraded") << last_undegraded;
2269   f->dump_stream("last_fullsized") << last_fullsized;
2270   f->dump_unsigned("mapping_epoch", mapping_epoch);
2271   f->dump_stream("log_start") << log_start;
2272   f->dump_stream("ondisk_log_start") << ondisk_log_start;
2273   f->dump_unsigned("created", created);
2274   f->dump_unsigned("last_epoch_clean", last_epoch_clean);
2275   f->dump_stream("parent") << parent;
2276   f->dump_unsigned("parent_split_bits", parent_split_bits);
2277   f->dump_stream("last_scrub") << last_scrub;
2278   f->dump_stream("last_scrub_stamp") << last_scrub_stamp;
2279   f->dump_stream("last_deep_scrub") << last_deep_scrub;
2280   f->dump_stream("last_deep_scrub_stamp") << last_deep_scrub_stamp;
2281   f->dump_stream("last_clean_scrub_stamp") << last_clean_scrub_stamp;
2282   f->dump_int("log_size", log_size);
2283   f->dump_int("ondisk_log_size", ondisk_log_size);
2284   f->dump_bool("stats_invalid", stats_invalid);
2285   f->dump_bool("dirty_stats_invalid", dirty_stats_invalid);
2286   f->dump_bool("omap_stats_invalid", omap_stats_invalid);
2287   f->dump_bool("hitset_stats_invalid", hitset_stats_invalid);
2288   f->dump_bool("hitset_bytes_stats_invalid", hitset_bytes_stats_invalid);
2289   f->dump_bool("pin_stats_invalid", pin_stats_invalid);
2290   stats.dump(f);
2291   f->open_array_section("up");
2292   for (vector<int32_t>::const_iterator p = up.begin(); p != up.end(); ++p)
2293     f->dump_int("osd", *p);
2294   f->close_section();
2295   f->open_array_section("acting");
2296   for (vector<int32_t>::const_iterator p = acting.begin(); p != acting.end(); ++p)
2297     f->dump_int("osd", *p);
2298   f->close_section();
2299   f->open_array_section("blocked_by");
2300   for (vector<int32_t>::const_iterator p = blocked_by.begin();
2301        p != blocked_by.end(); ++p)
2302     f->dump_int("osd", *p);
2303   f->close_section();
2304   f->dump_int("up_primary", up_primary);
2305   f->dump_int("acting_primary", acting_primary);
2306 }
2307
2308 void pg_stat_t::dump_brief(Formatter *f) const
2309 {
2310   f->dump_string("state", pg_state_string(state));
2311   f->open_array_section("up");
2312   for (vector<int32_t>::const_iterator p = up.begin(); p != up.end(); ++p)
2313     f->dump_int("osd", *p);
2314   f->close_section();
2315   f->open_array_section("acting");
2316   for (vector<int32_t>::const_iterator p = acting.begin(); p != acting.end(); ++p)
2317     f->dump_int("osd", *p);
2318   f->close_section();
2319   f->dump_int("up_primary", up_primary);
2320   f->dump_int("acting_primary", acting_primary);
2321 }
2322
2323 void pg_stat_t::encode(bufferlist &bl) const
2324 {
2325   ENCODE_START(22, 22, bl);
2326   ::encode(version, bl);
2327   ::encode(reported_seq, bl);
2328   ::encode(reported_epoch, bl);
2329   ::encode(state, bl);
2330   ::encode(log_start, bl);
2331   ::encode(ondisk_log_start, bl);
2332   ::encode(created, bl);
2333   ::encode(last_epoch_clean, bl);
2334   ::encode(parent, bl);
2335   ::encode(parent_split_bits, bl);
2336   ::encode(last_scrub, bl);
2337   ::encode(last_scrub_stamp, bl);
2338   ::encode(stats, bl);
2339   ::encode(log_size, bl);
2340   ::encode(ondisk_log_size, bl);
2341   ::encode(up, bl);
2342   ::encode(acting, bl);
2343   ::encode(last_fresh, bl);
2344   ::encode(last_change, bl);
2345   ::encode(last_active, bl);
2346   ::encode(last_clean, bl);
2347   ::encode(last_unstale, bl);
2348   ::encode(mapping_epoch, bl);
2349   ::encode(last_deep_scrub, bl);
2350   ::encode(last_deep_scrub_stamp, bl);
2351   ::encode(stats_invalid, bl);
2352   ::encode(last_clean_scrub_stamp, bl);
2353   ::encode(last_became_active, bl);
2354   ::encode(dirty_stats_invalid, bl);
2355   ::encode(up_primary, bl);
2356   ::encode(acting_primary, bl);
2357   ::encode(omap_stats_invalid, bl);
2358   ::encode(hitset_stats_invalid, bl);
2359   ::encode(blocked_by, bl);
2360   ::encode(last_undegraded, bl);
2361   ::encode(last_fullsized, bl);
2362   ::encode(hitset_bytes_stats_invalid, bl);
2363   ::encode(last_peered, bl);
2364   ::encode(last_became_peered, bl);
2365   ::encode(pin_stats_invalid, bl);
2366   ENCODE_FINISH(bl);
2367 }
2368
2369 void pg_stat_t::decode(bufferlist::iterator &bl)
2370 {
2371   bool tmp;
2372   DECODE_START(22, bl);
2373   ::decode(version, bl);
2374   ::decode(reported_seq, bl);
2375   ::decode(reported_epoch, bl);
2376   ::decode(state, bl);
2377   ::decode(log_start, bl);
2378   ::decode(ondisk_log_start, bl);
2379   ::decode(created, bl);
2380   ::decode(last_epoch_clean, bl);
2381   ::decode(parent, bl);
2382   ::decode(parent_split_bits, bl);
2383   ::decode(last_scrub, bl);
2384   ::decode(last_scrub_stamp, bl);
2385   ::decode(stats, bl);
2386   ::decode(log_size, bl);
2387   ::decode(ondisk_log_size, bl);
2388   ::decode(up, bl);
2389   ::decode(acting, bl);
2390   ::decode(last_fresh, bl);
2391   ::decode(last_change, bl);
2392   ::decode(last_active, bl);
2393   ::decode(last_clean, bl);
2394   ::decode(last_unstale, bl);
2395   ::decode(mapping_epoch, bl);
2396   ::decode(last_deep_scrub, bl);
2397   ::decode(last_deep_scrub_stamp, bl);
2398   ::decode(tmp, bl);
2399   stats_invalid = tmp;
2400   ::decode(last_clean_scrub_stamp, bl);
2401   ::decode(last_became_active, bl);
2402   ::decode(tmp, bl);
2403   dirty_stats_invalid = tmp;
2404   ::decode(up_primary, bl);
2405   ::decode(acting_primary, bl);
2406   ::decode(tmp, bl);
2407   omap_stats_invalid = tmp;
2408   ::decode(tmp, bl);
2409   hitset_stats_invalid = tmp;
2410   ::decode(blocked_by, bl);
2411   ::decode(last_undegraded, bl);
2412   ::decode(last_fullsized, bl);
2413   ::decode(tmp, bl);
2414   hitset_bytes_stats_invalid = tmp;
2415   ::decode(last_peered, bl);
2416   ::decode(last_became_peered, bl);
2417   ::decode(tmp, bl);
2418   pin_stats_invalid = tmp;
2419   DECODE_FINISH(bl);
2420 }
2421
2422 void pg_stat_t::generate_test_instances(list<pg_stat_t*>& o)
2423 {
2424   pg_stat_t a;
2425   o.push_back(new pg_stat_t(a));
2426
2427   a.version = eversion_t(1, 3);
2428   a.reported_epoch = 1;
2429   a.reported_seq = 2;
2430   a.state = 123;
2431   a.mapping_epoch = 998;
2432   a.last_fresh = utime_t(1002, 1);
2433   a.last_change = utime_t(1002, 2);
2434   a.last_active = utime_t(1002, 3);
2435   a.last_clean = utime_t(1002, 4);
2436   a.last_unstale = utime_t(1002, 5);
2437   a.last_undegraded = utime_t(1002, 7);
2438   a.last_fullsized = utime_t(1002, 8);
2439   a.log_start = eversion_t(1, 4);
2440   a.ondisk_log_start = eversion_t(1, 5);
2441   a.created = 6;
2442   a.last_epoch_clean = 7;
2443   a.parent = pg_t(1, 2, 3);
2444   a.parent_split_bits = 12;
2445   a.last_scrub = eversion_t(9, 10);
2446   a.last_scrub_stamp = utime_t(11, 12);
2447   a.last_deep_scrub = eversion_t(13, 14);
2448   a.last_deep_scrub_stamp = utime_t(15, 16);
2449   a.last_clean_scrub_stamp = utime_t(17, 18);
2450   list<object_stat_collection_t*> l;
2451   object_stat_collection_t::generate_test_instances(l);
2452   a.stats = *l.back();
2453   a.log_size = 99;
2454   a.ondisk_log_size = 88;
2455   a.up.push_back(123);
2456   a.up_primary = 123;
2457   a.acting.push_back(456);
2458   a.acting_primary = 456;
2459   o.push_back(new pg_stat_t(a));
2460
2461   a.up.push_back(124);
2462   a.up_primary = 124;
2463   a.acting.push_back(124);
2464   a.acting_primary = 124;
2465   a.blocked_by.push_back(155);
2466   a.blocked_by.push_back(156);
2467   o.push_back(new pg_stat_t(a));
2468 }
2469
2470 bool operator==(const pg_stat_t& l, const pg_stat_t& r)
2471 {
2472   return
2473     l.version == r.version &&
2474     l.reported_seq == r.reported_seq &&
2475     l.reported_epoch == r.reported_epoch &&
2476     l.state == r.state &&
2477     l.last_fresh == r.last_fresh &&
2478     l.last_change == r.last_change &&
2479     l.last_active == r.last_active &&
2480     l.last_peered == r.last_peered &&
2481     l.last_clean == r.last_clean &&
2482     l.last_unstale == r.last_unstale &&
2483     l.last_undegraded == r.last_undegraded &&
2484     l.last_fullsized == r.last_fullsized &&
2485     l.log_start == r.log_start &&
2486     l.ondisk_log_start == r.ondisk_log_start &&
2487     l.created == r.created &&
2488     l.last_epoch_clean == r.last_epoch_clean &&
2489     l.parent == r.parent &&
2490     l.parent_split_bits == r.parent_split_bits &&
2491     l.last_scrub == r.last_scrub &&
2492     l.last_deep_scrub == r.last_deep_scrub &&
2493     l.last_scrub_stamp == r.last_scrub_stamp &&
2494     l.last_deep_scrub_stamp == r.last_deep_scrub_stamp &&
2495     l.last_clean_scrub_stamp == r.last_clean_scrub_stamp &&
2496     l.stats == r.stats &&
2497     l.stats_invalid == r.stats_invalid &&
2498     l.log_size == r.log_size &&
2499     l.ondisk_log_size == r.ondisk_log_size &&
2500     l.up == r.up &&
2501     l.acting == r.acting &&
2502     l.mapping_epoch == r.mapping_epoch &&
2503     l.blocked_by == r.blocked_by &&
2504     l.last_became_active == r.last_became_active &&
2505     l.last_became_peered == r.last_became_peered &&
2506     l.dirty_stats_invalid == r.dirty_stats_invalid &&
2507     l.omap_stats_invalid == r.omap_stats_invalid &&
2508     l.hitset_stats_invalid == r.hitset_stats_invalid &&
2509     l.hitset_bytes_stats_invalid == r.hitset_bytes_stats_invalid &&
2510     l.up_primary == r.up_primary &&
2511     l.acting_primary == r.acting_primary &&
2512     l.pin_stats_invalid == r.pin_stats_invalid;
2513 }
2514
2515 // -- pool_stat_t --
2516
2517 void pool_stat_t::dump(Formatter *f) const
2518 {
2519   stats.dump(f);
2520   f->dump_int("log_size", log_size);
2521   f->dump_int("ondisk_log_size", ondisk_log_size);
2522   f->dump_int("up", up);
2523   f->dump_int("acting", acting);
2524 }
2525
2526 void pool_stat_t::encode(bufferlist &bl, uint64_t features) const
2527 {
2528   if ((features & CEPH_FEATURE_OSDENC) == 0) {
2529     __u8 v = 4;
2530     ::encode(v, bl);
2531     ::encode(stats, bl);
2532     ::encode(log_size, bl);
2533     ::encode(ondisk_log_size, bl);
2534     return;
2535   }
2536
2537   ENCODE_START(6, 5, bl);
2538   ::encode(stats, bl);
2539   ::encode(log_size, bl);
2540   ::encode(ondisk_log_size, bl);
2541   ::encode(up, bl);
2542   ::encode(acting, bl);
2543   ENCODE_FINISH(bl);
2544 }
2545
2546 void pool_stat_t::decode(bufferlist::iterator &bl)
2547 {
2548   DECODE_START_LEGACY_COMPAT_LEN(6, 5, 5, bl);
2549   if (struct_v >= 4) {
2550     ::decode(stats, bl);
2551     ::decode(log_size, bl);
2552     ::decode(ondisk_log_size, bl);
2553     if (struct_v >= 6) {
2554       ::decode(up, bl);
2555       ::decode(acting, bl);
2556     } else {
2557       up = 0;
2558       acting = 0;
2559     }
2560   } else {
2561     ::decode(stats.sum.num_bytes, bl);
2562     uint64_t num_kb;
2563     ::decode(num_kb, bl);
2564     ::decode(stats.sum.num_objects, bl);
2565     ::decode(stats.sum.num_object_clones, bl);
2566     ::decode(stats.sum.num_object_copies, bl);
2567     ::decode(stats.sum.num_objects_missing_on_primary, bl);
2568     ::decode(stats.sum.num_objects_degraded, bl);
2569     ::decode(log_size, bl);
2570     ::decode(ondisk_log_size, bl);
2571     if (struct_v >= 2) {
2572       ::decode(stats.sum.num_rd, bl);
2573       ::decode(stats.sum.num_rd_kb, bl);
2574       ::decode(stats.sum.num_wr, bl);
2575       ::decode(stats.sum.num_wr_kb, bl);
2576     }
2577     if (struct_v >= 3) {
2578       ::decode(stats.sum.num_objects_unfound, bl);
2579     }
2580   }
2581   DECODE_FINISH(bl);
2582 }
2583
2584 void pool_stat_t::generate_test_instances(list<pool_stat_t*>& o)
2585 {
2586   pool_stat_t a;
2587   o.push_back(new pool_stat_t(a));
2588
2589   list<object_stat_collection_t*> l;
2590   object_stat_collection_t::generate_test_instances(l);
2591   a.stats = *l.back();
2592   a.log_size = 123;
2593   a.ondisk_log_size = 456;
2594   a.acting = 3;
2595   a.up = 4;
2596   o.push_back(new pool_stat_t(a));
2597 }
2598
2599
2600 // -- pg_history_t --
2601
2602 void pg_history_t::encode(bufferlist &bl) const
2603 {
2604   ENCODE_START(9, 4, bl);
2605   ::encode(epoch_created, bl);
2606   ::encode(last_epoch_started, bl);
2607   ::encode(last_epoch_clean, bl);
2608   ::encode(last_epoch_split, bl);
2609   ::encode(same_interval_since, bl);
2610   ::encode(same_up_since, bl);
2611   ::encode(same_primary_since, bl);
2612   ::encode(last_scrub, bl);
2613   ::encode(last_scrub_stamp, bl);
2614   ::encode(last_deep_scrub, bl);
2615   ::encode(last_deep_scrub_stamp, bl);
2616   ::encode(last_clean_scrub_stamp, bl);
2617   ::encode(last_epoch_marked_full, bl);
2618   ::encode(last_interval_started, bl);
2619   ::encode(last_interval_clean, bl);
2620   ::encode(epoch_pool_created, bl);
2621   ENCODE_FINISH(bl);
2622 }
2623
2624 void pg_history_t::decode(bufferlist::iterator &bl)
2625 {
2626   DECODE_START_LEGACY_COMPAT_LEN(9, 4, 4, bl);
2627   ::decode(epoch_created, bl);
2628   ::decode(last_epoch_started, bl);
2629   if (struct_v >= 3)
2630     ::decode(last_epoch_clean, bl);
2631   else
2632     last_epoch_clean = last_epoch_started;  // careful, it's a lie!
2633   ::decode(last_epoch_split, bl);
2634   ::decode(same_interval_since, bl);
2635   ::decode(same_up_since, bl);
2636   ::decode(same_primary_since, bl);
2637   if (struct_v >= 2) {
2638     ::decode(last_scrub, bl);
2639     ::decode(last_scrub_stamp, bl);
2640   }
2641   if (struct_v >= 5) {
2642     ::decode(last_deep_scrub, bl);
2643     ::decode(last_deep_scrub_stamp, bl);
2644   }
2645   if (struct_v >= 6) {
2646     ::decode(last_clean_scrub_stamp, bl);
2647   }
2648   if (struct_v >= 7) {
2649     ::decode(last_epoch_marked_full, bl);
2650   }
2651   if (struct_v >= 8) {
2652     ::decode(last_interval_started, bl);
2653     ::decode(last_interval_clean, bl);
2654   } else {
2655     if (last_epoch_started >= same_interval_since) {
2656       last_interval_started = same_interval_since;
2657     } else {
2658       last_interval_started = last_epoch_started; // best guess
2659     }
2660     if (last_epoch_clean >= same_interval_since) {
2661       last_interval_clean = same_interval_since;
2662     } else {
2663       last_interval_clean = last_epoch_clean; // best guess
2664     }
2665   }
2666   if (struct_v >= 9) {
2667     ::decode(epoch_pool_created, bl);
2668   } else {
2669     epoch_pool_created = epoch_created;
2670   }
2671   DECODE_FINISH(bl);
2672 }
2673
2674 void pg_history_t::dump(Formatter *f) const
2675 {
2676   f->dump_int("epoch_created", epoch_created);
2677   f->dump_int("epoch_pool_created", epoch_pool_created);
2678   f->dump_int("last_epoch_started", last_epoch_started);
2679   f->dump_int("last_interval_started", last_interval_started);
2680   f->dump_int("last_epoch_clean", last_epoch_clean);
2681   f->dump_int("last_interval_clean", last_interval_clean);
2682   f->dump_int("last_epoch_split", last_epoch_split);
2683   f->dump_int("last_epoch_marked_full", last_epoch_marked_full);
2684   f->dump_int("same_up_since", same_up_since);
2685   f->dump_int("same_interval_since", same_interval_since);
2686   f->dump_int("same_primary_since", same_primary_since);
2687   f->dump_stream("last_scrub") << last_scrub;
2688   f->dump_stream("last_scrub_stamp") << last_scrub_stamp;
2689   f->dump_stream("last_deep_scrub") << last_deep_scrub;
2690   f->dump_stream("last_deep_scrub_stamp") << last_deep_scrub_stamp;
2691   f->dump_stream("last_clean_scrub_stamp") << last_clean_scrub_stamp;
2692 }
2693
2694 void pg_history_t::generate_test_instances(list<pg_history_t*>& o)
2695 {
2696   o.push_back(new pg_history_t);
2697   o.push_back(new pg_history_t);
2698   o.back()->epoch_created = 1;
2699   o.back()->epoch_pool_created = 1;
2700   o.back()->last_epoch_started = 2;
2701   o.back()->last_interval_started = 2;
2702   o.back()->last_epoch_clean = 3;
2703   o.back()->last_interval_clean = 2;
2704   o.back()->last_epoch_split = 4;
2705   o.back()->same_up_since = 5;
2706   o.back()->same_interval_since = 6;
2707   o.back()->same_primary_since = 7;
2708   o.back()->last_scrub = eversion_t(8, 9);
2709   o.back()->last_scrub_stamp = utime_t(10, 11);
2710   o.back()->last_deep_scrub = eversion_t(12, 13);
2711   o.back()->last_deep_scrub_stamp = utime_t(14, 15);
2712   o.back()->last_clean_scrub_stamp = utime_t(16, 17);
2713   o.back()->last_epoch_marked_full = 18;
2714 }
2715
2716
2717 // -- pg_info_t --
2718
2719 void pg_info_t::encode(bufferlist &bl) const
2720 {
2721   ENCODE_START(32, 26, bl);
2722   ::encode(pgid.pgid, bl);
2723   ::encode(last_update, bl);
2724   ::encode(last_complete, bl);
2725   ::encode(log_tail, bl);
2726   if (last_backfill_bitwise && !last_backfill.is_max()) {
2727     ::encode(hobject_t(), bl);
2728   } else {
2729     ::encode(last_backfill, bl);
2730   }
2731   ::encode(stats, bl);
2732   history.encode(bl);
2733   ::encode(purged_snaps, bl);
2734   ::encode(last_epoch_started, bl);
2735   ::encode(last_user_version, bl);
2736   ::encode(hit_set, bl);
2737   ::encode(pgid.shard, bl);
2738   ::encode(last_backfill, bl);
2739   ::encode(last_backfill_bitwise, bl);
2740   ::encode(last_interval_started, bl);
2741   ENCODE_FINISH(bl);
2742 }
2743
2744 void pg_info_t::decode(bufferlist::iterator &bl)
2745 {
2746   DECODE_START(32, bl);
2747   ::decode(pgid.pgid, bl);
2748   ::decode(last_update, bl);
2749   ::decode(last_complete, bl);
2750   ::decode(log_tail, bl);
2751   {
2752     hobject_t old_last_backfill;
2753     ::decode(old_last_backfill, bl);
2754   }
2755   ::decode(stats, bl);
2756   history.decode(bl);
2757   ::decode(purged_snaps, bl);
2758   ::decode(last_epoch_started, bl);
2759   ::decode(last_user_version, bl);
2760   ::decode(hit_set, bl);
2761   ::decode(pgid.shard, bl);
2762   ::decode(last_backfill, bl);
2763   ::decode(last_backfill_bitwise, bl);
2764   if (struct_v >= 32) {
2765     ::decode(last_interval_started, bl);
2766   } else {
2767     last_interval_started = last_epoch_started;
2768   }
2769   DECODE_FINISH(bl);
2770 }
2771
2772 // -- pg_info_t --
2773
2774 void pg_info_t::dump(Formatter *f) const
2775 {
2776   f->dump_stream("pgid") << pgid;
2777   f->dump_stream("last_update") << last_update;
2778   f->dump_stream("last_complete") << last_complete;
2779   f->dump_stream("log_tail") << log_tail;
2780   f->dump_int("last_user_version", last_user_version);
2781   f->dump_stream("last_backfill") << last_backfill;
2782   f->dump_int("last_backfill_bitwise", (int)last_backfill_bitwise);
2783   f->open_array_section("purged_snaps");
2784   for (interval_set<snapid_t>::const_iterator i=purged_snaps.begin();
2785        i != purged_snaps.end();
2786        ++i) {
2787     f->open_object_section("purged_snap_interval");
2788     f->dump_stream("start") << i.get_start();
2789     f->dump_stream("length") << i.get_len();
2790     f->close_section();
2791   }
2792   f->close_section();
2793   f->open_object_section("history");
2794   history.dump(f);
2795   f->close_section();
2796   f->open_object_section("stats");
2797   stats.dump(f);
2798   f->close_section();
2799
2800   f->dump_int("empty", is_empty());
2801   f->dump_int("dne", dne());
2802   f->dump_int("incomplete", is_incomplete());
2803   f->dump_int("last_epoch_started", last_epoch_started);
2804
2805   f->open_object_section("hit_set_history");
2806   hit_set.dump(f);
2807   f->close_section();
2808 }
2809
2810 void pg_info_t::generate_test_instances(list<pg_info_t*>& o)
2811 {
2812   o.push_back(new pg_info_t);
2813   o.push_back(new pg_info_t);
2814   list<pg_history_t*> h;
2815   pg_history_t::generate_test_instances(h);
2816   o.back()->history = *h.back();
2817   o.back()->pgid = spg_t(pg_t(1, 2, -1), shard_id_t::NO_SHARD);
2818   o.back()->last_update = eversion_t(3, 4);
2819   o.back()->last_complete = eversion_t(5, 6);
2820   o.back()->last_user_version = 2;
2821   o.back()->log_tail = eversion_t(7, 8);
2822   o.back()->last_backfill = hobject_t(object_t("objname"), "key", 123, 456, -1, "");
2823   o.back()->last_backfill_bitwise = true;
2824   {
2825     list<pg_stat_t*> s;
2826     pg_stat_t::generate_test_instances(s);
2827     o.back()->stats = *s.back();
2828   }
2829   {
2830     list<pg_hit_set_history_t*> s;
2831     pg_hit_set_history_t::generate_test_instances(s);
2832     o.back()->hit_set = *s.back();
2833   }
2834 }
2835
2836 // -- pg_notify_t --
2837 void pg_notify_t::encode(bufferlist &bl) const
2838 {
2839   ENCODE_START(2, 2, bl);
2840   ::encode(query_epoch, bl);
2841   ::encode(epoch_sent, bl);
2842   ::encode(info, bl);
2843   ::encode(to, bl);
2844   ::encode(from, bl);
2845   ENCODE_FINISH(bl);
2846 }
2847
2848 void pg_notify_t::decode(bufferlist::iterator &bl)
2849 {
2850   DECODE_START(2, bl);
2851   ::decode(query_epoch, bl);
2852   ::decode(epoch_sent, bl);
2853   ::decode(info, bl);
2854   ::decode(to, bl);
2855   ::decode(from, bl);
2856   DECODE_FINISH(bl);
2857 }
2858
2859 void pg_notify_t::dump(Formatter *f) const
2860 {
2861   f->dump_int("from", from);
2862   f->dump_int("to", to);
2863   f->dump_unsigned("query_epoch", query_epoch);
2864   f->dump_unsigned("epoch_sent", epoch_sent);
2865   {
2866     f->open_object_section("info");
2867     info.dump(f);
2868     f->close_section();
2869   }
2870 }
2871
2872 void pg_notify_t::generate_test_instances(list<pg_notify_t*>& o)
2873 {
2874   o.push_back(new pg_notify_t(shard_id_t(3), shard_id_t::NO_SHARD, 1, 1, pg_info_t()));
2875   o.push_back(new pg_notify_t(shard_id_t(0), shard_id_t(0), 3, 10, pg_info_t()));
2876 }
2877
2878 ostream &operator<<(ostream &lhs, const pg_notify_t &notify)
2879 {
2880   lhs << "(query:" << notify.query_epoch
2881       << " sent:" << notify.epoch_sent
2882       << " " << notify.info;
2883   if (notify.from != shard_id_t::NO_SHARD ||
2884       notify.to != shard_id_t::NO_SHARD)
2885     lhs << " " << (unsigned)notify.from
2886         << "->" << (unsigned)notify.to;
2887   return lhs << ")";
2888 }
2889
2890 // -- pg_interval_t --
2891
2892 void PastIntervals::pg_interval_t::encode(bufferlist& bl) const
2893 {
2894   ENCODE_START(4, 2, bl);
2895   ::encode(first, bl);
2896   ::encode(last, bl);
2897   ::encode(up, bl);
2898   ::encode(acting, bl);
2899   ::encode(maybe_went_rw, bl);
2900   ::encode(primary, bl);
2901   ::encode(up_primary, bl);
2902   ENCODE_FINISH(bl);
2903 }
2904
2905 void PastIntervals::pg_interval_t::decode(bufferlist::iterator& bl)
2906 {
2907   DECODE_START_LEGACY_COMPAT_LEN(4, 2, 2, bl);
2908   ::decode(first, bl);
2909   ::decode(last, bl);
2910   ::decode(up, bl);
2911   ::decode(acting, bl);
2912   ::decode(maybe_went_rw, bl);
2913   if (struct_v >= 3) {
2914     ::decode(primary, bl);
2915   } else {
2916     if (acting.size())
2917       primary = acting[0];
2918   }
2919   if (struct_v >= 4) {
2920     ::decode(up_primary, bl);
2921   } else {
2922     if (up.size())
2923       up_primary = up[0];
2924   }
2925   DECODE_FINISH(bl);
2926 }
2927
2928 void PastIntervals::pg_interval_t::dump(Formatter *f) const
2929 {
2930   f->dump_unsigned("first", first);
2931   f->dump_unsigned("last", last);
2932   f->dump_int("maybe_went_rw", maybe_went_rw ? 1 : 0);
2933   f->open_array_section("up");
2934   for (vector<int>::const_iterator p = up.begin(); p != up.end(); ++p)
2935     f->dump_int("osd", *p);
2936   f->close_section();
2937   f->open_array_section("acting");
2938   for (vector<int>::const_iterator p = acting.begin(); p != acting.end(); ++p)
2939     f->dump_int("osd", *p);
2940   f->close_section();
2941   f->dump_int("primary", primary);
2942   f->dump_int("up_primary", up_primary);
2943 }
2944
2945 void PastIntervals::pg_interval_t::generate_test_instances(list<pg_interval_t*>& o)
2946 {
2947   o.push_back(new pg_interval_t);
2948   o.push_back(new pg_interval_t);
2949   o.back()->up.push_back(1);
2950   o.back()->acting.push_back(2);
2951   o.back()->acting.push_back(3);
2952   o.back()->first = 4;
2953   o.back()->last = 5;
2954   o.back()->maybe_went_rw = true;
2955 }
2956
2957 WRITE_CLASS_ENCODER(PastIntervals::pg_interval_t)
2958
2959 class pi_simple_rep : public PastIntervals::interval_rep {
2960   map<epoch_t, PastIntervals::pg_interval_t> interval_map;
2961
2962   pi_simple_rep(
2963     bool ec_pool,
2964     std::list<PastIntervals::pg_interval_t> &&intervals) {
2965     for (auto &&i: intervals)
2966       add_interval(ec_pool, i);
2967   }
2968
2969 public:
2970   pi_simple_rep() = default;
2971   pi_simple_rep(const pi_simple_rep &) = default;
2972   pi_simple_rep(pi_simple_rep &&) = default;
2973   pi_simple_rep &operator=(pi_simple_rep &&) = default;
2974   pi_simple_rep &operator=(const pi_simple_rep &) = default;
2975
2976   size_t size() const override { return interval_map.size(); }
2977   bool empty() const override { return interval_map.empty(); }
2978   void clear() override { interval_map.clear(); }
2979   pair<epoch_t, epoch_t> get_bounds() const override {
2980     auto iter = interval_map.begin();
2981     if (iter != interval_map.end()) {
2982       auto riter = interval_map.rbegin();
2983       return make_pair(
2984         iter->second.first,
2985         riter->second.last + 1);
2986     } else {
2987       return make_pair(0, 0);
2988     }
2989   }
2990   set<pg_shard_t> get_all_participants(
2991     bool ec_pool) const override {
2992     set<pg_shard_t> all_participants;
2993
2994     // We need to decide who might have unfound objects that we need
2995     auto p = interval_map.rbegin();
2996     auto end = interval_map.rend();
2997     for (; p != end; ++p) {
2998       const PastIntervals::pg_interval_t &interval(p->second);
2999       // If nothing changed, we don't care about this interval.
3000       if (!interval.maybe_went_rw)
3001         continue;
3002
3003       int i = 0;
3004       std::vector<int>::const_iterator a = interval.acting.begin();
3005       std::vector<int>::const_iterator a_end = interval.acting.end();
3006       for (; a != a_end; ++a, ++i) {
3007         pg_shard_t shard(*a, ec_pool ? shard_id_t(i) : shard_id_t::NO_SHARD);
3008         if (*a != CRUSH_ITEM_NONE)
3009           all_participants.insert(shard);
3010       }
3011     }
3012     return all_participants;
3013   }
3014   void add_interval(
3015     bool ec_pool,
3016     const PastIntervals::pg_interval_t &interval) override {
3017     interval_map[interval.first] = interval;
3018   }
3019   unique_ptr<PastIntervals::interval_rep> clone() const override {
3020     return unique_ptr<PastIntervals::interval_rep>(new pi_simple_rep(*this));
3021   }
3022   ostream &print(ostream &out) const override {
3023     return out << interval_map;
3024   }
3025   void encode(bufferlist &bl) const override {
3026     ::encode(interval_map, bl);
3027   }
3028   void decode(bufferlist::iterator &bl) override {
3029     ::decode(interval_map, bl);
3030   }
3031   void dump(Formatter *f) const override {
3032     f->open_array_section("PastIntervals::compat_rep");
3033     for (auto &&i: interval_map) {
3034       f->open_object_section("pg_interval_t");
3035       f->dump_int("epoch", i.first);
3036       f->open_object_section("interval");
3037       i.second.dump(f);
3038       f->close_section();
3039       f->close_section();
3040     }
3041     f->close_section();
3042   }
3043   bool is_classic() const override {
3044     return true;
3045   }
3046   static void generate_test_instances(list<pi_simple_rep*> &o) {
3047     using ival = PastIntervals::pg_interval_t;
3048     using ivallst = std::list<ival>;
3049     o.push_back(
3050       new pi_simple_rep(
3051         true, ivallst
3052         { ival{{0, 1, 2}, {0, 1, 2}, 10, 20,  true, 0, 0}
3053         , ival{{   1, 2}, {   1, 2}, 21, 30,  true, 1, 1}
3054         , ival{{      2}, {      2}, 31, 35, false, 2, 2}
3055         , ival{{0,    2}, {0,    2}, 36, 50,  true, 0, 0}
3056         }));
3057     o.push_back(
3058       new pi_simple_rep(
3059         false, ivallst
3060         { ival{{0, 1, 2}, {0, 1, 2}, 10, 20,  true, 0, 0}
3061         , ival{{   1, 2}, {   1, 2}, 20, 30,  true, 1, 1}
3062         , ival{{      2}, {      2}, 31, 35, false, 2, 2}
3063         , ival{{0,    2}, {0,    2}, 36, 50,  true, 0, 0}
3064         }));
3065     o.push_back(
3066       new pi_simple_rep(
3067         true, ivallst
3068         { ival{{2, 1, 0}, {2, 1, 0}, 10, 20,  true, 1, 1}
3069         , ival{{   0, 2}, {   0, 2}, 21, 30,  true, 0, 0}
3070         , ival{{   0, 2}, {2,    0}, 31, 35,  true, 2, 2}
3071         , ival{{   0, 2}, {   0, 2}, 36, 50,  true, 0, 0}
3072         }));
3073     return;
3074   }
3075   void iterate_mayberw_back_to(
3076     bool ec_pool,
3077     epoch_t les,
3078     std::function<void(epoch_t, const set<pg_shard_t> &)> &&f) const override {
3079     for (auto i = interval_map.rbegin(); i != interval_map.rend(); ++i) {
3080       if (!i->second.maybe_went_rw)
3081         continue;
3082       if (i->second.last < les)
3083         break;
3084       set<pg_shard_t> actingset;
3085       for (unsigned j = 0; j < i->second.acting.size(); ++j) {
3086         if (i->second.acting[j] == CRUSH_ITEM_NONE)
3087           continue;
3088         actingset.insert(
3089           pg_shard_t(
3090             i->second.acting[j],
3091             ec_pool ? shard_id_t(j) : shard_id_t::NO_SHARD));
3092       }
3093       f(i->second.first, actingset);
3094     }
3095   }
3096
3097   bool has_full_intervals() const override { return true; }
3098   void iterate_all_intervals(
3099     std::function<void(const PastIntervals::pg_interval_t &)> &&f
3100     ) const override {
3101     for (auto &&i: interval_map) {
3102       f(i.second);
3103     }
3104   }
3105   virtual ~pi_simple_rep() override {}
3106 };
3107
3108 /**
3109  * pi_compact_rep
3110  *
3111  * PastIntervals only needs to be able to answer two questions:
3112  * 1) Where should the primary look for unfound objects?
3113  * 2) List a set of subsets of the OSDs such that contacting at least
3114  *    one from each subset guarrantees we speak to at least one witness
3115  *    of any completed write.
3116  *
3117  * Crucially, 2) does not require keeping *all* past intervals.  Certainly,
3118  * we don't need to keep any where maybe_went_rw would be false.  We also
3119  * needn't keep two intervals where the actingset in one is a subset
3120  * of the other (only need to keep the smaller of the two sets).  In order
3121  * to accurately trim the set of intervals as last_epoch_started changes
3122  * without rebuilding the set from scratch, we'll retain the larger set
3123  * if it in an older interval.
3124  */
3125 struct compact_interval_t {
3126   epoch_t first;
3127   epoch_t last;
3128   set<pg_shard_t> acting;
3129   bool supersedes(const compact_interval_t &other) {
3130     for (auto &&i: acting) {
3131       if (!other.acting.count(i))
3132         return false;
3133     }
3134     return true;
3135   }
3136   void dump(Formatter *f) const {
3137     f->open_object_section("compact_interval_t");
3138     f->dump_stream("first") << first;
3139     f->dump_stream("last") << last;
3140     f->dump_stream("acting") << acting;
3141     f->close_section();
3142   }
3143   void encode(bufferlist &bl) const {
3144     ENCODE_START(1, 1, bl);
3145     ::encode(first, bl);
3146     ::encode(last, bl);
3147     ::encode(acting, bl);
3148     ENCODE_FINISH(bl);
3149   }
3150   void decode(bufferlist::iterator &bl) {
3151     DECODE_START(1, bl);
3152     ::decode(first, bl);
3153     ::decode(last, bl);
3154     ::decode(acting, bl);
3155     DECODE_FINISH(bl);
3156   }
3157   static void generate_test_instances(list<compact_interval_t*> & o) {
3158     /* Not going to be used, we'll generate pi_compact_rep directly */
3159   }
3160 };
3161 ostream &operator<<(ostream &o, const compact_interval_t &rhs)
3162 {
3163   return o << "([" << rhs.first << "," << rhs.last
3164            << "] acting " << rhs.acting << ")";
3165 }
3166 WRITE_CLASS_ENCODER(compact_interval_t)
3167
3168 class pi_compact_rep : public PastIntervals::interval_rep {
3169   epoch_t first = 0;
3170   epoch_t last = 0; // inclusive
3171   set<pg_shard_t> all_participants;
3172   list<compact_interval_t> intervals;
3173   pi_compact_rep(
3174     bool ec_pool,
3175     std::list<PastIntervals::pg_interval_t> &&intervals) {
3176     for (auto &&i: intervals)
3177       add_interval(ec_pool, i);
3178   }
3179 public:
3180   pi_compact_rep() = default;
3181   pi_compact_rep(const pi_compact_rep &) = default;
3182   pi_compact_rep(pi_compact_rep &&) = default;
3183   pi_compact_rep &operator=(const pi_compact_rep &) = default;
3184   pi_compact_rep &operator=(pi_compact_rep &&) = default;
3185
3186   size_t size() const override { return intervals.size(); }
3187   bool empty() const override {
3188     return first > last || (first == 0 && last == 0);
3189   }
3190   void clear() override {
3191     *this = pi_compact_rep();
3192   }
3193   pair<epoch_t, epoch_t> get_bounds() const override {
3194     return make_pair(first, last + 1);
3195   }
3196   set<pg_shard_t> get_all_participants(
3197     bool ec_pool) const override {
3198     return all_participants;
3199   }
3200   void add_interval(
3201     bool ec_pool, const PastIntervals::pg_interval_t &interval) override {
3202     if (first == 0)
3203       first = interval.first;
3204     assert(interval.last > last);
3205     last = interval.last;
3206     set<pg_shard_t> acting;
3207     for (unsigned i = 0; i < interval.acting.size(); ++i) {
3208       if (interval.acting[i] == CRUSH_ITEM_NONE)
3209         continue;
3210       acting.insert(
3211         pg_shard_t(
3212           interval.acting[i],
3213           ec_pool ? shard_id_t(i) : shard_id_t::NO_SHARD));
3214     }
3215     all_participants.insert(acting.begin(), acting.end());
3216     if (!interval.maybe_went_rw)
3217       return;
3218     intervals.push_back(
3219       compact_interval_t{interval.first, interval.last, acting});
3220     auto plast = intervals.end();
3221     --plast;
3222     for (auto cur = intervals.begin(); cur != plast; ) {
3223       if (plast->supersedes(*cur)) {
3224         intervals.erase(cur++);
3225       } else {
3226         ++cur;
3227       }
3228     }
3229   }
3230   unique_ptr<PastIntervals::interval_rep> clone() const override {
3231     return unique_ptr<PastIntervals::interval_rep>(new pi_compact_rep(*this));
3232   }
3233   ostream &print(ostream &out) const override {
3234     return out << "([" << first << "," << last
3235                << "] intervals=" << intervals << ")";
3236   }
3237   void encode(bufferlist &bl) const override {
3238     ENCODE_START(1, 1, bl);
3239     ::encode(first, bl);
3240     ::encode(last, bl);
3241     ::encode(all_participants, bl);
3242     ::encode(intervals, bl);
3243     ENCODE_FINISH(bl);
3244   }
3245   void decode(bufferlist::iterator &bl) override {
3246     DECODE_START(1, bl);
3247     ::decode(first, bl);
3248     ::decode(last, bl);
3249     ::decode(all_participants, bl);
3250     ::decode(intervals, bl);
3251     DECODE_FINISH(bl);
3252   }
3253   void dump(Formatter *f) const override {
3254     f->open_object_section("PastIntervals::compact_rep");
3255     f->dump_stream("first") << first;
3256     f->dump_stream("last") << last;
3257     f->open_array_section("all_participants");
3258     for (auto& i : all_participants) {
3259       f->dump_object("pg_shard", i);
3260     }
3261     f->close_section();
3262     f->open_array_section("intervals");
3263     for (auto &&i: intervals) {
3264       i.dump(f);
3265     }
3266     f->close_section();
3267     f->close_section();
3268   }
3269   bool is_classic() const override {
3270     return false;
3271   }
3272   static void generate_test_instances(list<pi_compact_rep*> &o) {
3273     using ival = PastIntervals::pg_interval_t;
3274     using ivallst = std::list<ival>;
3275     o.push_back(
3276       new pi_compact_rep(
3277         true, ivallst
3278         { ival{{0, 1, 2}, {0, 1, 2}, 10, 20,  true, 0, 0}
3279         , ival{{   1, 2}, {   1, 2}, 21, 30,  true, 1, 1}
3280         , ival{{      2}, {      2}, 31, 35, false, 2, 2}
3281         , ival{{0,    2}, {0,    2}, 36, 50,  true, 0, 0}
3282         }));
3283     o.push_back(
3284       new pi_compact_rep(
3285         false, ivallst
3286         { ival{{0, 1, 2}, {0, 1, 2}, 10, 20,  true, 0, 0}
3287         , ival{{   1, 2}, {   1, 2}, 21, 30,  true, 1, 1}
3288         , ival{{      2}, {      2}, 31, 35, false, 2, 2}
3289         , ival{{0,    2}, {0,    2}, 36, 50,  true, 0, 0}
3290         }));
3291     o.push_back(
3292       new pi_compact_rep(
3293         true, ivallst
3294         { ival{{2, 1, 0}, {2, 1, 0}, 10, 20,  true, 1, 1}
3295         , ival{{   0, 2}, {   0, 2}, 21, 30,  true, 0, 0}
3296         , ival{{   0, 2}, {2,    0}, 31, 35,  true, 2, 2}
3297         , ival{{   0, 2}, {   0, 2}, 36, 50,  true, 0, 0}
3298         }));
3299   }
3300   void iterate_mayberw_back_to(
3301     bool ec_pool,
3302     epoch_t les,
3303     std::function<void(epoch_t, const set<pg_shard_t> &)> &&f) const override {
3304     for (auto i = intervals.rbegin(); i != intervals.rend(); ++i) {
3305       if (i->last < les)
3306         break;
3307       f(i->first, i->acting);
3308     }
3309   }
3310   virtual ~pi_compact_rep() override {}
3311 };
3312 WRITE_CLASS_ENCODER(pi_compact_rep)
3313
3314 PastIntervals::PastIntervals(const PastIntervals &rhs)
3315   : past_intervals(rhs.past_intervals ?
3316                    rhs.past_intervals->clone() :
3317                    nullptr) {}
3318
3319 PastIntervals &PastIntervals::operator=(const PastIntervals &rhs)
3320 {
3321   PastIntervals other(rhs);
3322   swap(other);
3323   return *this;
3324 }
3325
3326 ostream& operator<<(ostream& out, const PastIntervals &i)
3327 {
3328   if (i.past_intervals) {
3329     return i.past_intervals->print(out);
3330   } else {
3331     return out << "(empty)";
3332   }
3333 }
3334
3335 ostream& operator<<(ostream& out, const PastIntervals::PriorSet &i)
3336 {
3337   return out << "PriorSet("
3338              << "ec_pool: " << i.ec_pool
3339              << ", probe: " << i.probe
3340              << ", down: " << i.down
3341              << ", blocked_by: " << i.blocked_by
3342              << ", pg_down: " << i.pg_down
3343              << ")";
3344 }
3345
3346 void PastIntervals::decode(bufferlist::iterator &bl)
3347 {
3348   DECODE_START(1, bl);
3349   __u8 type = 0;
3350   ::decode(type, bl);
3351   switch (type) {
3352   case 0:
3353     break;
3354   case 1:
3355     past_intervals.reset(new pi_simple_rep);
3356     past_intervals->decode(bl);
3357     break;
3358   case 2:
3359     past_intervals.reset(new pi_compact_rep);
3360     past_intervals->decode(bl);
3361     break;
3362   }
3363   DECODE_FINISH(bl);
3364 }
3365
3366 void PastIntervals::decode_classic(bufferlist::iterator &bl)
3367 {
3368   past_intervals.reset(new pi_simple_rep);
3369   past_intervals->decode(bl);
3370 }
3371
3372 void PastIntervals::generate_test_instances(list<PastIntervals*> &o)
3373 {
3374   {
3375     list<pi_simple_rep *> simple;
3376     pi_simple_rep::generate_test_instances(simple);
3377     for (auto &&i: simple) {
3378       // takes ownership of contents
3379       o.push_back(new PastIntervals(i));
3380     }
3381   }
3382   {
3383     list<pi_compact_rep *> compact;
3384     pi_compact_rep::generate_test_instances(compact);
3385     for (auto &&i: compact) {
3386       // takes ownership of contents
3387       o.push_back(new PastIntervals(i));
3388     }
3389   }
3390   return;
3391 }
3392
3393 void PastIntervals::update_type(bool ec_pool, bool compact)
3394 {
3395   if (!compact) {
3396     if (!past_intervals) {
3397       past_intervals.reset(new pi_simple_rep);
3398     } else {
3399       // we never convert from compact back to classic
3400       assert(is_classic());
3401     }
3402   } else {
3403     if (!past_intervals) {
3404       past_intervals.reset(new pi_compact_rep);
3405     } else if (is_classic()) {
3406       auto old = std::move(past_intervals);
3407       past_intervals.reset(new pi_compact_rep);
3408       assert(old->has_full_intervals());
3409       old->iterate_all_intervals([&](const pg_interval_t &i) {
3410           past_intervals->add_interval(ec_pool, i);
3411         });
3412     }
3413   }
3414 }
3415
3416 void PastIntervals::update_type_from_map(bool ec_pool, const OSDMap &osdmap)
3417 {
3418   update_type(ec_pool, osdmap.require_osd_release >= CEPH_RELEASE_LUMINOUS);
3419 }
3420
3421 bool PastIntervals::is_new_interval(
3422   int old_acting_primary,
3423   int new_acting_primary,
3424   const vector<int> &old_acting,
3425   const vector<int> &new_acting,
3426   int old_up_primary,
3427   int new_up_primary,
3428   const vector<int> &old_up,
3429   const vector<int> &new_up,
3430   int old_size,
3431   int new_size,
3432   int old_min_size,
3433   int new_min_size,
3434   unsigned old_pg_num,
3435   unsigned new_pg_num,
3436   bool old_sort_bitwise,
3437   bool new_sort_bitwise,
3438   bool old_recovery_deletes,
3439   bool new_recovery_deletes,
3440   pg_t pgid) {
3441   return old_acting_primary != new_acting_primary ||
3442     new_acting != old_acting ||
3443     old_up_primary != new_up_primary ||
3444     new_up != old_up ||
3445     old_min_size != new_min_size ||
3446     old_size != new_size ||
3447     pgid.is_split(old_pg_num, new_pg_num, 0) ||
3448     old_sort_bitwise != new_sort_bitwise ||
3449     old_recovery_deletes != new_recovery_deletes;
3450 }
3451
3452 bool PastIntervals::is_new_interval(
3453   int old_acting_primary,
3454   int new_acting_primary,
3455   const vector<int> &old_acting,
3456   const vector<int> &new_acting,
3457   int old_up_primary,
3458   int new_up_primary,
3459   const vector<int> &old_up,
3460   const vector<int> &new_up,
3461   OSDMapRef osdmap,
3462   OSDMapRef lastmap,
3463   pg_t pgid) {
3464   return !(lastmap->get_pools().count(pgid.pool())) ||
3465     is_new_interval(old_acting_primary,
3466                     new_acting_primary,
3467                     old_acting,
3468                     new_acting,
3469                     old_up_primary,
3470                     new_up_primary,
3471                     old_up,
3472                     new_up,
3473                     lastmap->get_pools().find(pgid.pool())->second.size,
3474                     osdmap->get_pools().find(pgid.pool())->second.size,
3475                     lastmap->get_pools().find(pgid.pool())->second.min_size,
3476                     osdmap->get_pools().find(pgid.pool())->second.min_size,
3477                     lastmap->get_pg_num(pgid.pool()),
3478                     osdmap->get_pg_num(pgid.pool()),
3479                     lastmap->test_flag(CEPH_OSDMAP_SORTBITWISE),
3480                     osdmap->test_flag(CEPH_OSDMAP_SORTBITWISE),
3481                     lastmap->test_flag(CEPH_OSDMAP_RECOVERY_DELETES),
3482                     osdmap->test_flag(CEPH_OSDMAP_RECOVERY_DELETES),
3483                     pgid);
3484 }
3485
3486 bool PastIntervals::check_new_interval(
3487   int old_acting_primary,
3488   int new_acting_primary,
3489   const vector<int> &old_acting,
3490   const vector<int> &new_acting,
3491   int old_up_primary,
3492   int new_up_primary,
3493   const vector<int> &old_up,
3494   const vector<int> &new_up,
3495   epoch_t same_interval_since,
3496   epoch_t last_epoch_clean,
3497   OSDMapRef osdmap,
3498   OSDMapRef lastmap,
3499   pg_t pgid,
3500   IsPGRecoverablePredicate *could_have_gone_active,
3501   PastIntervals *past_intervals,
3502   std::ostream *out)
3503 {
3504   /*
3505    * We have to be careful to gracefully deal with situations like
3506    * so. Say we have a power outage or something that takes out both
3507    * OSDs, but the monitor doesn't mark them down in the same epoch.
3508    * The history may look like
3509    *
3510    *  1: A B
3511    *  2:   B
3512    *  3:       let's say B dies for good, too (say, from the power spike) 
3513    *  4: A
3514    *
3515    * which makes it look like B may have applied updates to the PG
3516    * that we need in order to proceed.  This sucks...
3517    *
3518    * To minimize the risk of this happening, we CANNOT go active if
3519    * _any_ OSDs in the prior set are down until we send an MOSDAlive
3520    * to the monitor such that the OSDMap sets osd_up_thru to an epoch.
3521    * Then, we have something like
3522    *
3523    *  1: A B
3524    *  2:   B   up_thru[B]=0
3525    *  3:
3526    *  4: A
3527    *
3528    * -> we can ignore B, bc it couldn't have gone active (up_thru still 0).
3529    *
3530    * or,
3531    *
3532    *  1: A B
3533    *  2:   B   up_thru[B]=0
3534    *  3:   B   up_thru[B]=2
3535    *  4:
3536    *  5: A    
3537    *
3538    * -> we must wait for B, bc it was alive through 2, and could have
3539    *    written to the pg.
3540    *
3541    * If B is really dead, then an administrator will need to manually
3542    * intervene by marking the OSD as "lost."
3543    */
3544
3545   // remember past interval
3546   //  NOTE: a change in the up set primary triggers an interval
3547   //  change, even though the interval members in the pg_interval_t
3548   //  do not change.
3549   assert(past_intervals);
3550   assert(past_intervals->past_intervals);
3551   if (is_new_interval(
3552         old_acting_primary,
3553         new_acting_primary,
3554         old_acting,
3555         new_acting,
3556         old_up_primary,
3557         new_up_primary,
3558         old_up,
3559         new_up,
3560         osdmap,
3561         lastmap,
3562         pgid)) {
3563     pg_interval_t i;
3564     i.first = same_interval_since;
3565     i.last = osdmap->get_epoch() - 1;
3566     assert(i.first <= i.last);
3567     i.acting = old_acting;
3568     i.up = old_up;
3569     i.primary = old_acting_primary;
3570     i.up_primary = old_up_primary;
3571
3572     unsigned num_acting = 0;
3573     for (vector<int>::const_iterator p = i.acting.begin(); p != i.acting.end();
3574          ++p)
3575       if (*p != CRUSH_ITEM_NONE)
3576         ++num_acting;
3577
3578     assert(lastmap->get_pools().count(pgid.pool()));
3579     const pg_pool_t& old_pg_pool = lastmap->get_pools().find(pgid.pool())->second;
3580     set<pg_shard_t> old_acting_shards;
3581     old_pg_pool.convert_to_pg_shards(old_acting, &old_acting_shards);
3582
3583     if (num_acting &&
3584         i.primary != -1 &&
3585         num_acting >= old_pg_pool.min_size &&
3586         (*could_have_gone_active)(old_acting_shards)) {
3587       if (out)
3588         *out << __func__ << " " << i
3589              << ": not rw,"
3590              << " up_thru " << lastmap->get_up_thru(i.primary)
3591              << " up_from " << lastmap->get_up_from(i.primary)
3592              << " last_epoch_clean " << last_epoch_clean
3593              << std::endl;
3594       if (lastmap->get_up_thru(i.primary) >= i.first &&
3595           lastmap->get_up_from(i.primary) <= i.first) {
3596         i.maybe_went_rw = true;
3597         if (out)
3598           *out << __func__ << " " << i
3599                << " : primary up " << lastmap->get_up_from(i.primary)
3600                << "-" << lastmap->get_up_thru(i.primary)
3601                << " includes interval"
3602                << std::endl;
3603       } else if (last_epoch_clean >= i.first &&
3604                  last_epoch_clean <= i.last) {
3605         // If the last_epoch_clean is included in this interval, then
3606         // the pg must have been rw (for recovery to have completed).
3607         // This is important because we won't know the _real_
3608         // first_epoch because we stop at last_epoch_clean, and we
3609         // don't want the oldest interval to randomly have
3610         // maybe_went_rw false depending on the relative up_thru vs
3611         // last_epoch_clean timing.
3612         i.maybe_went_rw = true;
3613         if (out)
3614           *out << __func__ << " " << i
3615                << " : includes last_epoch_clean " << last_epoch_clean
3616                << " and presumed to have been rw"
3617                << std::endl;
3618       } else {
3619         i.maybe_went_rw = false;
3620         if (out)
3621           *out << __func__ << " " << i
3622                << " : primary up " << lastmap->get_up_from(i.primary)
3623                << "-" << lastmap->get_up_thru(i.primary)
3624                << " does not include interval"
3625                << std::endl;
3626       }
3627     } else {
3628       i.maybe_went_rw = false;
3629       if (out)
3630         *out << __func__ << " " << i << " : acting set is too small" << std::endl;
3631     }
3632     past_intervals->past_intervals->add_interval(old_pg_pool.ec_pool(), i);
3633     return true;
3634   } else {
3635     return false;
3636   }
3637 }
3638
3639
3640 // true if the given map affects the prior set
3641 bool PastIntervals::PriorSet::affected_by_map(
3642   const OSDMap &osdmap,
3643   const DoutPrefixProvider *dpp) const
3644 {
3645   for (set<pg_shard_t>::iterator p = probe.begin();
3646        p != probe.end();
3647        ++p) {
3648     int o = p->osd;
3649
3650     // did someone in the prior set go down?
3651     if (osdmap.is_down(o) && down.count(o) == 0) {
3652       ldpp_dout(dpp, 10) << "affected_by_map osd." << o << " now down" << dendl;
3653       return true;
3654     }
3655
3656     // did a down osd in cur get (re)marked as lost?
3657     map<int, epoch_t>::const_iterator r = blocked_by.find(o);
3658     if (r != blocked_by.end()) {
3659       if (!osdmap.exists(o)) {
3660         ldpp_dout(dpp, 10) << "affected_by_map osd." << o << " no longer exists" << dendl;
3661         return true;
3662       }
3663       if (osdmap.get_info(o).lost_at != r->second) {
3664         ldpp_dout(dpp, 10) << "affected_by_map osd." << o << " (re)marked as lost" << dendl;
3665         return true;
3666       }
3667     }
3668   }
3669
3670   // did someone in the prior down set go up?
3671   for (set<int>::const_iterator p = down.begin();
3672        p != down.end();
3673        ++p) {
3674     int o = *p;
3675
3676     if (osdmap.is_up(o)) {
3677       ldpp_dout(dpp, 10) << "affected_by_map osd." << o << " now up" << dendl;
3678       return true;
3679     }
3680
3681     // did someone in the prior set get lost or destroyed?
3682     if (!osdmap.exists(o)) {
3683       ldpp_dout(dpp, 10) << "affected_by_map osd." << o << " no longer exists" << dendl;
3684       return true;
3685     }
3686     // did a down osd in down get (re)marked as lost?
3687     map<int, epoch_t>::const_iterator r = blocked_by.find(o);
3688     if (r != blocked_by.end()) {
3689       if (osdmap.get_info(o).lost_at != r->second) {
3690         ldpp_dout(dpp, 10) << "affected_by_map osd." << o << " (re)marked as lost" << dendl;
3691         return true;
3692       }
3693     }
3694   }
3695
3696   return false;
3697 }
3698
3699 ostream& operator<<(ostream& out, const PastIntervals::pg_interval_t& i)
3700 {
3701   out << "interval(" << i.first << "-" << i.last
3702       << " up " << i.up << "(" << i.up_primary << ")"
3703       << " acting " << i.acting << "(" << i.primary << ")";
3704   if (i.maybe_went_rw)
3705     out << " maybe_went_rw";
3706   out << ")";
3707   return out;
3708 }
3709
3710
3711
3712 // -- pg_query_t --
3713
3714 void pg_query_t::encode(bufferlist &bl, uint64_t features) const {
3715   ENCODE_START(3, 3, bl);
3716   ::encode(type, bl);
3717   ::encode(since, bl);
3718   history.encode(bl);
3719   ::encode(epoch_sent, bl);
3720   ::encode(to, bl);
3721   ::encode(from, bl);
3722   ENCODE_FINISH(bl);
3723 }
3724
3725 void pg_query_t::decode(bufferlist::iterator &bl) {
3726   DECODE_START(3, bl);
3727   ::decode(type, bl);
3728   ::decode(since, bl);
3729   history.decode(bl);
3730   ::decode(epoch_sent, bl);
3731   ::decode(to, bl);
3732   ::decode(from, bl);
3733   DECODE_FINISH(bl);
3734 }
3735
3736 void pg_query_t::dump(Formatter *f) const
3737 {
3738   f->dump_int("from", from);
3739   f->dump_int("to", to);
3740   f->dump_string("type", get_type_name());
3741   f->dump_stream("since") << since;
3742   f->dump_stream("epoch_sent") << epoch_sent;
3743   f->open_object_section("history");
3744   history.dump(f);
3745   f->close_section();
3746 }
3747 void pg_query_t::generate_test_instances(list<pg_query_t*>& o)
3748 {
3749   o.push_back(new pg_query_t());
3750   list<pg_history_t*> h;
3751   pg_history_t::generate_test_instances(h);
3752   o.push_back(new pg_query_t(pg_query_t::INFO, shard_id_t(1), shard_id_t(2), *h.back(), 4));
3753   o.push_back(new pg_query_t(pg_query_t::MISSING, shard_id_t(2), shard_id_t(3), *h.back(), 4));
3754   o.push_back(new pg_query_t(pg_query_t::LOG, shard_id_t(0), shard_id_t(0),
3755                              eversion_t(4, 5), *h.back(), 4));
3756   o.push_back(new pg_query_t(pg_query_t::FULLLOG,
3757                              shard_id_t::NO_SHARD, shard_id_t::NO_SHARD,
3758                              *h.back(), 5));
3759 }
3760
3761 // -- ObjectModDesc --
3762 void ObjectModDesc::visit(Visitor *visitor) const
3763 {
3764   bufferlist::iterator bp = bl.begin();
3765   try {
3766     while (!bp.end()) {
3767       DECODE_START(max_required_version, bp);
3768       uint8_t code;
3769       ::decode(code, bp);
3770       switch (code) {
3771       case APPEND: {
3772         uint64_t size;
3773         ::decode(size, bp);
3774         visitor->append(size);
3775         break;
3776       }
3777       case SETATTRS: {
3778         map<string, boost::optional<bufferlist> > attrs;
3779         ::decode(attrs, bp);
3780         visitor->setattrs(attrs);
3781         break;
3782       }
3783       case DELETE: {
3784         version_t old_version;
3785         ::decode(old_version, bp);
3786         visitor->rmobject(old_version);
3787         break;
3788       }
3789       case CREATE: {
3790         visitor->create();
3791         break;
3792       }
3793       case UPDATE_SNAPS: {
3794         set<snapid_t> snaps;
3795         ::decode(snaps, bp);
3796         visitor->update_snaps(snaps);
3797         break;
3798       }
3799       case TRY_DELETE: {
3800         version_t old_version;
3801         ::decode(old_version, bp);
3802         visitor->try_rmobject(old_version);
3803         break;
3804       }
3805       case ROLLBACK_EXTENTS: {
3806         vector<pair<uint64_t, uint64_t> > extents;
3807         version_t gen;
3808         ::decode(gen, bp);
3809         ::decode(extents, bp);
3810         visitor->rollback_extents(gen,extents);
3811         break;
3812       }
3813       default:
3814         assert(0 == "Invalid rollback code");
3815       }
3816       DECODE_FINISH(bp);
3817     }
3818   } catch (...) {
3819     assert(0 == "Invalid encoding");
3820   }
3821 }
3822
3823 struct DumpVisitor : public ObjectModDesc::Visitor {
3824   Formatter *f;
3825   explicit DumpVisitor(Formatter *f) : f(f) {}
3826   void append(uint64_t old_size) override {
3827     f->open_object_section("op");
3828     f->dump_string("code", "APPEND");
3829     f->dump_unsigned("old_size", old_size);
3830     f->close_section();
3831   }
3832   void setattrs(map<string, boost::optional<bufferlist> > &attrs) override {
3833     f->open_object_section("op");
3834     f->dump_string("code", "SETATTRS");
3835     f->open_array_section("attrs");
3836     for (map<string, boost::optional<bufferlist> >::iterator i = attrs.begin();
3837          i != attrs.end();
3838          ++i) {
3839       f->dump_string("attr_name", i->first);
3840     }
3841     f->close_section();
3842     f->close_section();
3843   }
3844   void rmobject(version_t old_version) override {
3845     f->open_object_section("op");
3846     f->dump_string("code", "RMOBJECT");
3847     f->dump_unsigned("old_version", old_version);
3848     f->close_section();
3849   }
3850   void try_rmobject(version_t old_version) override {
3851     f->open_object_section("op");
3852     f->dump_string("code", "TRY_RMOBJECT");
3853     f->dump_unsigned("old_version", old_version);
3854     f->close_section();
3855   }
3856   void create() override {
3857     f->open_object_section("op");
3858     f->dump_string("code", "CREATE");
3859     f->close_section();
3860   }
3861   void update_snaps(const set<snapid_t> &snaps) override {
3862     f->open_object_section("op");
3863     f->dump_string("code", "UPDATE_SNAPS");
3864     f->dump_stream("snaps") << snaps;
3865     f->close_section();
3866   }
3867   void rollback_extents(
3868     version_t gen,
3869     const vector<pair<uint64_t, uint64_t> > &extents) override {
3870     f->open_object_section("op");
3871     f->dump_string("code", "ROLLBACK_EXTENTS");
3872     f->dump_unsigned("gen", gen);
3873     f->dump_stream("snaps") << extents;
3874     f->close_section();
3875   }
3876 };
3877
3878 void ObjectModDesc::dump(Formatter *f) const
3879 {
3880   f->open_object_section("object_mod_desc");
3881   f->dump_bool("can_local_rollback", can_local_rollback);
3882   f->dump_bool("rollback_info_completed", rollback_info_completed);
3883   {
3884     f->open_array_section("ops");
3885     DumpVisitor vis(f);
3886     visit(&vis);
3887     f->close_section();
3888   }
3889   f->close_section();
3890 }
3891
3892 void ObjectModDesc::generate_test_instances(list<ObjectModDesc*>& o)
3893 {
3894   map<string, boost::optional<bufferlist> > attrs;
3895   attrs[OI_ATTR];
3896   attrs[SS_ATTR];
3897   attrs["asdf"];
3898   o.push_back(new ObjectModDesc());
3899   o.back()->append(100);
3900   o.back()->setattrs(attrs);
3901   o.push_back(new ObjectModDesc());
3902   o.back()->rmobject(1001);
3903   o.push_back(new ObjectModDesc());
3904   o.back()->create();
3905   o.back()->setattrs(attrs);
3906   o.push_back(new ObjectModDesc());
3907   o.back()->create();
3908   o.back()->setattrs(attrs);
3909   o.back()->mark_unrollbackable();
3910   o.back()->append(1000);
3911 }
3912
3913 void ObjectModDesc::encode(bufferlist &_bl) const
3914 {
3915   ENCODE_START(max_required_version, max_required_version, _bl);
3916   ::encode(can_local_rollback, _bl);
3917   ::encode(rollback_info_completed, _bl);
3918   ::encode(bl, _bl);
3919   ENCODE_FINISH(_bl);
3920 }
3921 void ObjectModDesc::decode(bufferlist::iterator &_bl)
3922 {
3923   DECODE_START(2, _bl);
3924   max_required_version = struct_v;
3925   ::decode(can_local_rollback, _bl);
3926   ::decode(rollback_info_completed, _bl);
3927   ::decode(bl, _bl);
3928   // ensure bl does not pin a larger buffer in memory
3929   bl.rebuild();
3930   bl.reassign_to_mempool(mempool::mempool_osd_pglog);
3931   DECODE_FINISH(_bl);
3932 }
3933
3934 // -- pg_log_entry_t --
3935
3936 string pg_log_entry_t::get_key_name() const
3937 {
3938   return version.get_key_name();
3939 }
3940
3941 void pg_log_entry_t::encode_with_checksum(bufferlist& bl) const
3942 {
3943   bufferlist ebl(sizeof(*this)*2);
3944   encode(ebl);
3945   __u32 crc = ebl.crc32c(0);
3946   ::encode(ebl, bl);
3947   ::encode(crc, bl);
3948 }
3949
3950 void pg_log_entry_t::decode_with_checksum(bufferlist::iterator& p)
3951 {
3952   bufferlist bl;
3953   ::decode(bl, p);
3954   __u32 crc;
3955   ::decode(crc, p);
3956   if (crc != bl.crc32c(0))
3957     throw buffer::malformed_input("bad checksum on pg_log_entry_t");
3958   bufferlist::iterator q = bl.begin();
3959   decode(q);
3960 }
3961
3962 void pg_log_entry_t::encode(bufferlist &bl) const
3963 {
3964   ENCODE_START(11, 4, bl);
3965   ::encode(op, bl);
3966   ::encode(soid, bl);
3967   ::encode(version, bl);
3968
3969   /**
3970    * Added with reverting_to:
3971    * Previous code used prior_version to encode
3972    * what we now call reverting_to.  This will
3973    * allow older code to decode reverting_to
3974    * into prior_version as expected.
3975    */
3976   if (op == LOST_REVERT)
3977     ::encode(reverting_to, bl);
3978   else
3979     ::encode(prior_version, bl);
3980
3981   ::encode(reqid, bl);
3982   ::encode(mtime, bl);
3983   if (op == LOST_REVERT)
3984     ::encode(prior_version, bl);
3985   ::encode(snaps, bl);
3986   ::encode(user_version, bl);
3987   ::encode(mod_desc, bl);
3988   ::encode(extra_reqids, bl);
3989   if (op == ERROR)
3990     ::encode(return_code, bl);
3991   ENCODE_FINISH(bl);
3992 }
3993
3994 void pg_log_entry_t::decode(bufferlist::iterator &bl)
3995 {
3996   DECODE_START_LEGACY_COMPAT_LEN(11, 4, 4, bl);
3997   ::decode(op, bl);
3998   if (struct_v < 2) {
3999     sobject_t old_soid;
4000     ::decode(old_soid, bl);
4001     soid.oid = old_soid.oid;
4002     soid.snap = old_soid.snap;
4003     invalid_hash = true;
4004   } else {
4005     ::decode(soid, bl);
4006   }
4007   if (struct_v < 3)
4008     invalid_hash = true;
4009   ::decode(version, bl);
4010
4011   if (struct_v >= 6 && op == LOST_REVERT)
4012     ::decode(reverting_to, bl);
4013   else
4014     ::decode(prior_version, bl);
4015
4016   ::decode(reqid, bl);
4017
4018   ::decode(mtime, bl);
4019   if (struct_v < 5)
4020     invalid_pool = true;
4021
4022   if (op == LOST_REVERT) {
4023     if (struct_v >= 6) {
4024       ::decode(prior_version, bl);
4025     } else {
4026       reverting_to = prior_version;
4027     }
4028   }
4029   if (struct_v >= 7 ||  // for v >= 7, this is for all ops.
4030       op == CLONE) {    // for v < 7, it's only present for CLONE.
4031     ::decode(snaps, bl);
4032     // ensure snaps does not pin a larger buffer in memory
4033     snaps.rebuild();
4034     snaps.reassign_to_mempool(mempool::mempool_osd_pglog);
4035   }
4036
4037   if (struct_v >= 8)
4038     ::decode(user_version, bl);
4039   else
4040     user_version = version.version;
4041
4042   if (struct_v >= 9)
4043     ::decode(mod_desc, bl);
4044   else
4045     mod_desc.mark_unrollbackable();
4046   if (struct_v >= 10)
4047     ::decode(extra_reqids, bl);
4048   if (struct_v >= 11 && op == ERROR)
4049     ::decode(return_code, bl);
4050   DECODE_FINISH(bl);
4051 }
4052
4053 void pg_log_entry_t::dump(Formatter *f) const
4054 {
4055   f->dump_string("op", get_op_name());
4056   f->dump_stream("object") << soid;
4057   f->dump_stream("version") << version;
4058   f->dump_stream("prior_version") << prior_version;
4059   f->dump_stream("reqid") << reqid;
4060   f->open_array_section("extra_reqids");
4061   for (auto p = extra_reqids.begin();
4062        p != extra_reqids.end();
4063        ++p) {
4064     f->open_object_section("extra_reqid");
4065     f->dump_stream("reqid") << p->first;
4066     f->dump_stream("user_version") << p->second;
4067     f->close_section();
4068   }
4069   f->close_section();
4070   f->dump_stream("mtime") << mtime;
4071   f->dump_int("return_code", return_code);
4072   if (snaps.length() > 0) {
4073     vector<snapid_t> v;
4074     bufferlist c = snaps;
4075     bufferlist::iterator p = c.begin();
4076     try {
4077       ::decode(v, p);
4078     } catch (...) {
4079       v.clear();
4080     }
4081     f->open_object_section("snaps");
4082     for (vector<snapid_t>::iterator p = v.begin(); p != v.end(); ++p)
4083       f->dump_unsigned("snap", *p);
4084     f->close_section();
4085   }
4086   {
4087     f->open_object_section("mod_desc");
4088     mod_desc.dump(f);
4089     f->close_section();
4090   }
4091 }
4092
4093 void pg_log_entry_t::generate_test_instances(list<pg_log_entry_t*>& o)
4094 {
4095   o.push_back(new pg_log_entry_t());
4096   hobject_t oid(object_t("objname"), "key", 123, 456, 0, "");
4097   o.push_back(new pg_log_entry_t(MODIFY, oid, eversion_t(1,2), eversion_t(3,4),
4098                                  1, osd_reqid_t(entity_name_t::CLIENT(777), 8, 999),
4099                                  utime_t(8,9), 0));
4100   o.push_back(new pg_log_entry_t(ERROR, oid, eversion_t(1,2), eversion_t(3,4),
4101                                  1, osd_reqid_t(entity_name_t::CLIENT(777), 8, 999),
4102                                  utime_t(8,9), -ENOENT));
4103 }
4104
4105 ostream& operator<<(ostream& out, const pg_log_entry_t& e)
4106 {
4107   out << e.version << " (" << e.prior_version << ") "
4108       << std::left << std::setw(8) << e.get_op_name() << ' '
4109       << e.soid << " by " << e.reqid << " " << e.mtime
4110       << " " << e.return_code;
4111   if (e.snaps.length()) {
4112     vector<snapid_t> snaps;
4113     bufferlist c = e.snaps;
4114     bufferlist::iterator p = c.begin();
4115     try {
4116       ::decode(snaps, p);
4117     } catch (...) {
4118       snaps.clear();
4119     }
4120     out << " snaps " << snaps;
4121   }
4122   return out;
4123 }
4124
4125 // -- pg_log_dup_t --
4126
4127 string pg_log_dup_t::get_key_name() const
4128 {
4129   return "dup_" + version.get_key_name();
4130 }
4131
4132 void pg_log_dup_t::encode(bufferlist &bl) const
4133 {
4134   ENCODE_START(1, 1, bl);
4135   ::encode(reqid, bl);
4136   ::encode(version, bl);
4137   ::encode(user_version, bl);
4138   ::encode(return_code, bl);
4139   ENCODE_FINISH(bl);
4140 }
4141
4142 void pg_log_dup_t::decode(bufferlist::iterator &bl)
4143 {
4144   DECODE_START(1, bl);
4145   ::decode(reqid, bl);
4146   ::decode(version, bl);
4147   ::decode(user_version, bl);
4148   ::decode(return_code, bl);
4149   DECODE_FINISH(bl);
4150 }
4151
4152 void pg_log_dup_t::dump(Formatter *f) const
4153 {
4154   f->dump_stream("reqid") << reqid;
4155   f->dump_stream("version") << version;
4156   f->dump_stream("user_version") << user_version;
4157   f->dump_stream("return_code") << return_code;
4158 }
4159
4160 void pg_log_dup_t::generate_test_instances(list<pg_log_dup_t*>& o)
4161 {
4162   o.push_back(new pg_log_dup_t());
4163   o.push_back(new pg_log_dup_t(eversion_t(1,2),
4164                                1,
4165                                osd_reqid_t(entity_name_t::CLIENT(777), 8, 999),
4166                                0));
4167   o.push_back(new pg_log_dup_t(eversion_t(1,2),
4168                                2,
4169                                osd_reqid_t(entity_name_t::CLIENT(777), 8, 999),
4170                                -ENOENT));
4171 }
4172
4173
4174 std::ostream& operator<<(std::ostream& out, const pg_log_dup_t& e) {
4175   return out << "log_dup(reqid=" << e.reqid <<
4176     " v=" << e.version << " uv=" << e.user_version <<
4177     " rc=" << e.return_code << ")";
4178 }
4179
4180
4181 // -- pg_log_t --
4182
4183 // out: pg_log_t that only has entries that apply to import_pgid using curmap
4184 // reject: Entries rejected from "in" are in the reject.log.  Other fields not set.
4185 void pg_log_t::filter_log(spg_t import_pgid, const OSDMap &curmap,
4186   const string &hit_set_namespace, const pg_log_t &in,
4187   pg_log_t &out, pg_log_t &reject)
4188 {
4189   out = in;
4190   out.log.clear();
4191   reject.log.clear();
4192
4193   for (list<pg_log_entry_t>::const_iterator i = in.log.begin();
4194        i != in.log.end(); ++i) {
4195
4196     // Reject pg log entries for temporary objects
4197     if (i->soid.is_temp()) {
4198       reject.log.push_back(*i);
4199       continue;
4200     }
4201
4202     if (i->soid.nspace != hit_set_namespace) {
4203       object_t oid = i->soid.oid;
4204       object_locator_t loc(i->soid);
4205       pg_t raw_pgid = curmap.object_locator_to_pg(oid, loc);
4206       pg_t pgid = curmap.raw_pg_to_pg(raw_pgid);
4207
4208       if (import_pgid.pgid == pgid) {
4209         out.log.push_back(*i);
4210       } else {
4211         reject.log.push_back(*i);
4212       }
4213     } else {
4214       out.log.push_back(*i);
4215     }
4216   }
4217 }
4218
4219 void pg_log_t::encode(bufferlist& bl) const
4220 {
4221   ENCODE_START(7, 3, bl);
4222   ::encode(head, bl);
4223   ::encode(tail, bl);
4224   ::encode(log, bl);
4225   ::encode(can_rollback_to, bl);
4226   ::encode(rollback_info_trimmed_to, bl);
4227   ::encode(dups, bl);
4228   ENCODE_FINISH(bl);
4229 }
4230  
4231 void pg_log_t::decode(bufferlist::iterator &bl, int64_t pool)
4232 {
4233   DECODE_START_LEGACY_COMPAT_LEN(7, 3, 3, bl);
4234   ::decode(head, bl);
4235   ::decode(tail, bl);
4236   if (struct_v < 2) {
4237     bool backlog;
4238     ::decode(backlog, bl);
4239   }
4240   ::decode(log, bl);
4241   if (struct_v >= 5)
4242     ::decode(can_rollback_to, bl);
4243
4244   if (struct_v >= 6)
4245     ::decode(rollback_info_trimmed_to, bl);
4246   else
4247     rollback_info_trimmed_to = tail;
4248
4249   if (struct_v >= 7)
4250     ::decode(dups, bl);
4251
4252   DECODE_FINISH(bl);
4253
4254   // handle hobject_t format change
4255   if (struct_v < 4) {
4256     for (list<pg_log_entry_t>::iterator i = log.begin();
4257          i != log.end();
4258          ++i) {
4259       if (!i->soid.is_max() && i->soid.pool == -1)
4260         i->soid.pool = pool;
4261     }
4262   }
4263 }
4264
4265 void pg_log_t::dump(Formatter *f) const
4266 {
4267   f->dump_stream("head") << head;
4268   f->dump_stream("tail") << tail;
4269   f->open_array_section("log");
4270   for (list<pg_log_entry_t>::const_iterator p = log.begin(); p != log.end(); ++p) {
4271     f->open_object_section("entry");
4272     p->dump(f);
4273     f->close_section();
4274   }
4275   f->close_section();
4276   f->open_array_section("dups");
4277   for (const auto& entry : dups) {
4278     f->open_object_section("entry");
4279     entry.dump(f);
4280     f->close_section();
4281   }
4282   f->close_section();
4283 }
4284
4285 void pg_log_t::generate_test_instances(list<pg_log_t*>& o)
4286 {
4287   o.push_back(new pg_log_t);
4288
4289   // this is nonsensical:
4290   o.push_back(new pg_log_t);
4291   o.back()->head = eversion_t(1,2);
4292   o.back()->tail = eversion_t(3,4);
4293   list<pg_log_entry_t*> e;
4294   pg_log_entry_t::generate_test_instances(e);
4295   for (list<pg_log_entry_t*>::iterator p = e.begin(); p != e.end(); ++p)
4296     o.back()->log.push_back(**p);
4297 }
4298
4299 void pg_log_t::copy_after(const pg_log_t &other, eversion_t v) 
4300 {
4301   can_rollback_to = other.can_rollback_to;
4302   head = other.head;
4303   tail = other.tail;
4304   for (list<pg_log_entry_t>::const_reverse_iterator i = other.log.rbegin();
4305        i != other.log.rend();
4306        ++i) {
4307     assert(i->version > other.tail);
4308     if (i->version <= v) {
4309       // make tail accurate.
4310       tail = i->version;
4311       break;
4312     }
4313     log.push_front(*i);
4314   }
4315 }
4316
4317 void pg_log_t::copy_range(const pg_log_t &other, eversion_t from, eversion_t to)
4318 {
4319   can_rollback_to = other.can_rollback_to;
4320   list<pg_log_entry_t>::const_reverse_iterator i = other.log.rbegin();
4321   assert(i != other.log.rend());
4322   while (i->version > to) {
4323     ++i;
4324     assert(i != other.log.rend());
4325   }
4326   assert(i->version == to);
4327   head = to;
4328   for ( ; i != other.log.rend(); ++i) {
4329     if (i->version <= from) {
4330       tail = i->version;
4331       break;
4332     }
4333     log.push_front(*i);
4334   }
4335 }
4336
4337 void pg_log_t::copy_up_to(const pg_log_t &other, int max)
4338 {
4339   can_rollback_to = other.can_rollback_to;
4340   int n = 0;
4341   head = other.head;
4342   tail = other.tail;
4343   for (list<pg_log_entry_t>::const_reverse_iterator i = other.log.rbegin();
4344        i != other.log.rend();
4345        ++i) {
4346     if (n++ >= max) {
4347       tail = i->version;
4348       break;
4349     }
4350     log.push_front(*i);
4351   }
4352 }
4353
4354 ostream& pg_log_t::print(ostream& out) const
4355 {
4356   out << *this << std::endl;
4357   for (list<pg_log_entry_t>::const_iterator p = log.begin();
4358        p != log.end();
4359        ++p)
4360     out << *p << std::endl;
4361   for (const auto& entry : dups) {
4362     out << " dup entry: " << entry << std::endl;
4363   }
4364   return out;
4365 }
4366
4367 // -- pg_missing_t --
4368
4369 ostream& operator<<(ostream& out, const pg_missing_item& i)
4370 {
4371   out << i.need;
4372   if (i.have != eversion_t())
4373     out << "(" << i.have << ")";
4374   out << " flags = " << i.flag_str();
4375   return out;
4376 }
4377
4378 // -- object_copy_cursor_t --
4379
4380 void object_copy_cursor_t::encode(bufferlist& bl) const
4381 {
4382   ENCODE_START(1, 1, bl);
4383   ::encode(attr_complete, bl);
4384   ::encode(data_offset, bl);
4385   ::encode(data_complete, bl);
4386   ::encode(omap_offset, bl);
4387   ::encode(omap_complete, bl);
4388   ENCODE_FINISH(bl);
4389 }
4390
4391 void object_copy_cursor_t::decode(bufferlist::iterator &bl)
4392 {
4393   DECODE_START(1, bl);
4394   ::decode(attr_complete, bl);
4395   ::decode(data_offset, bl);
4396   ::decode(data_complete, bl);
4397   ::decode(omap_offset, bl);
4398   ::decode(omap_complete, bl);
4399   DECODE_FINISH(bl);
4400 }
4401
4402 void object_copy_cursor_t::dump(Formatter *f) const
4403 {
4404   f->dump_unsigned("attr_complete", (int)attr_complete);
4405   f->dump_unsigned("data_offset", data_offset);
4406   f->dump_unsigned("data_complete", (int)data_complete);
4407   f->dump_string("omap_offset", omap_offset);
4408   f->dump_unsigned("omap_complete", (int)omap_complete);
4409 }
4410
4411 void object_copy_cursor_t::generate_test_instances(list<object_copy_cursor_t*>& o)
4412 {
4413   o.push_back(new object_copy_cursor_t);
4414   o.push_back(new object_copy_cursor_t);
4415   o.back()->attr_complete = true;
4416   o.back()->data_offset = 123;
4417   o.push_back(new object_copy_cursor_t);
4418   o.back()->attr_complete = true;
4419   o.back()->data_complete = true;
4420   o.back()->omap_offset = "foo";
4421   o.push_back(new object_copy_cursor_t);
4422   o.back()->attr_complete = true;
4423   o.back()->data_complete = true;
4424   o.back()->omap_complete = true;
4425 }
4426
4427 // -- object_copy_data_t --
4428
4429 void object_copy_data_t::encode(bufferlist& bl, uint64_t features) const
4430 {
4431   ENCODE_START(7, 5, bl);
4432   ::encode(size, bl);
4433   ::encode(mtime, bl);
4434   ::encode(attrs, bl);
4435   ::encode(data, bl);
4436   ::encode(omap_data, bl);
4437   ::encode(cursor, bl);
4438   ::encode(omap_header, bl);
4439   ::encode(snaps, bl);
4440   ::encode(snap_seq, bl);
4441   ::encode(flags, bl);
4442   ::encode(data_digest, bl);
4443   ::encode(omap_digest, bl);
4444   ::encode(reqids, bl);
4445   ::encode(truncate_seq, bl);
4446   ::encode(truncate_size, bl);
4447   ENCODE_FINISH(bl);
4448 }
4449
4450 void object_copy_data_t::decode(bufferlist::iterator& bl)
4451 {
4452   DECODE_START(7, bl);
4453   if (struct_v < 5) {
4454     // old
4455     ::decode(size, bl);
4456     ::decode(mtime, bl);
4457     {
4458       string category;
4459       ::decode(category, bl);  // no longer used
4460     }
4461     ::decode(attrs, bl);
4462     ::decode(data, bl);
4463     {
4464       map<string,bufferlist> omap;
4465       ::decode(omap, bl);
4466       omap_data.clear();
4467       if (!omap.empty())
4468         ::encode(omap, omap_data);
4469     }
4470     ::decode(cursor, bl);
4471     if (struct_v >= 2)
4472       ::decode(omap_header, bl);
4473     if (struct_v >= 3) {
4474       ::decode(snaps, bl);
4475       ::decode(snap_seq, bl);
4476     } else {
4477       snaps.clear();
4478       snap_seq = 0;
4479     }
4480     if (struct_v >= 4) {
4481       ::decode(flags, bl);
4482       ::decode(data_digest, bl);
4483       ::decode(omap_digest, bl);
4484     }
4485   } else {
4486     // current
4487     ::decode(size, bl);
4488     ::decode(mtime, bl);
4489     ::decode(attrs, bl);
4490     ::decode(data, bl);
4491     ::decode(omap_data, bl);
4492     ::decode(cursor, bl);
4493     ::decode(omap_header, bl);
4494     ::decode(snaps, bl);
4495     ::decode(snap_seq, bl);
4496     if (struct_v >= 4) {
4497       ::decode(flags, bl);
4498       ::decode(data_digest, bl);
4499       ::decode(omap_digest, bl);
4500     }
4501     if (struct_v >= 6) {
4502       ::decode(reqids, bl);
4503     }
4504     if (struct_v >= 7) {
4505       ::decode(truncate_seq, bl);
4506       ::decode(truncate_size, bl);
4507     }
4508   }
4509   DECODE_FINISH(bl);
4510 }
4511
4512 void object_copy_data_t::generate_test_instances(list<object_copy_data_t*>& o)
4513 {
4514   o.push_back(new object_copy_data_t());
4515
4516   list<object_copy_cursor_t*> cursors;
4517   object_copy_cursor_t::generate_test_instances(cursors);
4518   list<object_copy_cursor_t*>::iterator ci = cursors.begin();
4519   o.back()->cursor = **(ci++);
4520
4521   o.push_back(new object_copy_data_t());
4522   o.back()->cursor = **(ci++);
4523
4524   o.push_back(new object_copy_data_t());
4525   o.back()->size = 1234;
4526   o.back()->mtime.set_from_double(1234);
4527   bufferptr bp("there", 5);
4528   bufferlist bl;
4529   bl.push_back(bp);
4530   o.back()->attrs["hello"] = bl;
4531   bufferptr bp2("not", 3);
4532   bufferlist bl2;
4533   bl2.push_back(bp2);
4534   map<string,bufferlist> omap;
4535   omap["why"] = bl2;
4536   ::encode(omap, o.back()->omap_data);
4537   bufferptr databp("iamsomedatatocontain", 20);
4538   o.back()->data.push_back(databp);
4539   o.back()->omap_header.append("this is an omap header");
4540   o.back()->snaps.push_back(123);
4541   o.back()->reqids.push_back(make_pair(osd_reqid_t(), version_t()));
4542 }
4543
4544 void object_copy_data_t::dump(Formatter *f) const
4545 {
4546   f->open_object_section("cursor");
4547   cursor.dump(f);
4548   f->close_section(); // cursor
4549   f->dump_int("size", size);
4550   f->dump_stream("mtime") << mtime;
4551   /* we should really print out the attrs here, but bufferlist
4552      const-correctness prevents that */
4553   f->dump_int("attrs_size", attrs.size());
4554   f->dump_int("flags", flags);
4555   f->dump_unsigned("data_digest", data_digest);
4556   f->dump_unsigned("omap_digest", omap_digest);
4557   f->dump_int("omap_data_length", omap_data.length());
4558   f->dump_int("omap_header_length", omap_header.length());
4559   f->dump_int("data_length", data.length());
4560   f->open_array_section("snaps");
4561   for (vector<snapid_t>::const_iterator p = snaps.begin();
4562        p != snaps.end(); ++p)
4563     f->dump_unsigned("snap", *p);
4564   f->close_section();
4565   f->open_array_section("reqids");
4566   for (auto p = reqids.begin();
4567        p != reqids.end();
4568        ++p) {
4569     f->open_object_section("extra_reqid");
4570     f->dump_stream("reqid") << p->first;
4571     f->dump_stream("user_version") << p->second;
4572     f->close_section();
4573   }
4574   f->close_section();
4575 }
4576
4577 // -- pg_create_t --
4578
4579 void pg_create_t::encode(bufferlist &bl) const
4580 {
4581   ENCODE_START(1, 1, bl);
4582   ::encode(created, bl);
4583   ::encode(parent, bl);
4584   ::encode(split_bits, bl);
4585   ENCODE_FINISH(bl);
4586 }
4587
4588 void pg_create_t::decode(bufferlist::iterator &bl)
4589 {
4590   DECODE_START(1, bl);
4591   ::decode(created, bl);
4592   ::decode(parent, bl);
4593   ::decode(split_bits, bl);
4594   DECODE_FINISH(bl);
4595 }
4596
4597 void pg_create_t::dump(Formatter *f) const
4598 {
4599   f->dump_unsigned("created", created);
4600   f->dump_stream("parent") << parent;
4601   f->dump_int("split_bits", split_bits);
4602 }
4603
4604 void pg_create_t::generate_test_instances(list<pg_create_t*>& o)
4605 {
4606   o.push_back(new pg_create_t);
4607   o.push_back(new pg_create_t(1, pg_t(3, 4, -1), 2));
4608 }
4609
4610
4611 // -- pg_hit_set_info_t --
4612
4613 void pg_hit_set_info_t::encode(bufferlist& bl) const
4614 {
4615   ENCODE_START(2, 1, bl);
4616   ::encode(begin, bl);
4617   ::encode(end, bl);
4618   ::encode(version, bl);
4619   ::encode(using_gmt, bl);
4620   ENCODE_FINISH(bl);
4621 }
4622
4623 void pg_hit_set_info_t::decode(bufferlist::iterator& p)
4624 {
4625   DECODE_START(2, p);
4626   ::decode(begin, p);
4627   ::decode(end, p);
4628   ::decode(version, p);
4629   if (struct_v >= 2) {
4630     ::decode(using_gmt, p);
4631   } else {
4632     using_gmt = false;
4633   }
4634   DECODE_FINISH(p);
4635 }
4636
4637 void pg_hit_set_info_t::dump(Formatter *f) const
4638 {
4639   f->dump_stream("begin") << begin;
4640   f->dump_stream("end") << end;
4641   f->dump_stream("version") << version;
4642   f->dump_stream("using_gmt") << using_gmt;
4643 }
4644
4645 void pg_hit_set_info_t::generate_test_instances(list<pg_hit_set_info_t*>& ls)
4646 {
4647   ls.push_back(new pg_hit_set_info_t);
4648   ls.push_back(new pg_hit_set_info_t);
4649   ls.back()->begin = utime_t(1, 2);
4650   ls.back()->end = utime_t(3, 4);
4651 }
4652
4653
4654 // -- pg_hit_set_history_t --
4655
4656 void pg_hit_set_history_t::encode(bufferlist& bl) const
4657 {
4658   ENCODE_START(1, 1, bl);
4659   ::encode(current_last_update, bl);
4660   {
4661     utime_t dummy_stamp;
4662     ::encode(dummy_stamp, bl);
4663   }
4664   {
4665     pg_hit_set_info_t dummy_info;
4666     ::encode(dummy_info, bl);
4667   }
4668   ::encode(history, bl);
4669   ENCODE_FINISH(bl);
4670 }
4671
4672 void pg_hit_set_history_t::decode(bufferlist::iterator& p)
4673 {
4674   DECODE_START(1, p);
4675   ::decode(current_last_update, p);
4676   {
4677     utime_t dummy_stamp;
4678     ::decode(dummy_stamp, p);
4679   }
4680   {
4681     pg_hit_set_info_t dummy_info;
4682     ::decode(dummy_info, p);
4683   }
4684   ::decode(history, p);
4685   DECODE_FINISH(p);
4686 }
4687
4688 void pg_hit_set_history_t::dump(Formatter *f) const
4689 {
4690   f->dump_stream("current_last_update") << current_last_update;
4691   f->open_array_section("history");
4692   for (list<pg_hit_set_info_t>::const_iterator p = history.begin();
4693        p != history.end(); ++p) {
4694     f->open_object_section("info");
4695     p->dump(f);
4696     f->close_section();
4697   }
4698   f->close_section();
4699 }
4700
4701 void pg_hit_set_history_t::generate_test_instances(list<pg_hit_set_history_t*>& ls)
4702 {
4703   ls.push_back(new pg_hit_set_history_t);
4704   ls.push_back(new pg_hit_set_history_t);
4705   ls.back()->current_last_update = eversion_t(1, 2);
4706   ls.back()->history.push_back(pg_hit_set_info_t());
4707 }
4708
4709 // -- osd_peer_stat_t --
4710
4711 void osd_peer_stat_t::encode(bufferlist& bl) const
4712 {
4713   ENCODE_START(1, 1, bl);
4714   ::encode(stamp, bl);
4715   ENCODE_FINISH(bl);
4716 }
4717
4718 void osd_peer_stat_t::decode(bufferlist::iterator& bl)
4719 {
4720   DECODE_START(1, bl);
4721   ::decode(stamp, bl);
4722   DECODE_FINISH(bl);
4723 }
4724
4725 void osd_peer_stat_t::dump(Formatter *f) const
4726 {
4727   f->dump_stream("stamp") << stamp;
4728 }
4729
4730 void osd_peer_stat_t::generate_test_instances(list<osd_peer_stat_t*>& o)
4731 {
4732   o.push_back(new osd_peer_stat_t);
4733   o.push_back(new osd_peer_stat_t);
4734   o.back()->stamp = utime_t(1, 2);
4735 }
4736
4737 ostream& operator<<(ostream& out, const osd_peer_stat_t &stat)
4738 {
4739   return out << "stat(" << stat.stamp << ")";
4740 }
4741
4742
4743 // -- OSDSuperblock --
4744
4745 void OSDSuperblock::encode(bufferlist &bl) const
4746 {
4747   ENCODE_START(8, 5, bl);
4748   ::encode(cluster_fsid, bl);
4749   ::encode(whoami, bl);
4750   ::encode(current_epoch, bl);
4751   ::encode(oldest_map, bl);
4752   ::encode(newest_map, bl);
4753   ::encode(weight, bl);
4754   compat_features.encode(bl);
4755   ::encode(clean_thru, bl);
4756   ::encode(mounted, bl);
4757   ::encode(osd_fsid, bl);
4758   ::encode((epoch_t)0, bl);  // epoch_t last_epoch_marked_full
4759   ::encode((uint32_t)0, bl);  // map<int64_t,epoch_t> pool_last_epoch_marked_full
4760   ENCODE_FINISH(bl);
4761 }
4762
4763 void OSDSuperblock::decode(bufferlist::iterator &bl)
4764 {
4765   DECODE_START_LEGACY_COMPAT_LEN(8, 5, 5, bl);
4766   if (struct_v < 3) {
4767     string magic;
4768     ::decode(magic, bl);
4769   }
4770   ::decode(cluster_fsid, bl);
4771   ::decode(whoami, bl);
4772   ::decode(current_epoch, bl);
4773   ::decode(oldest_map, bl);
4774   ::decode(newest_map, bl);
4775   ::decode(weight, bl);
4776   if (struct_v >= 2) {
4777     compat_features.decode(bl);
4778   } else { //upgrade it!
4779     compat_features.incompat.insert(CEPH_OSD_FEATURE_INCOMPAT_BASE);
4780   }
4781   ::decode(clean_thru, bl);
4782   ::decode(mounted, bl);
4783   if (struct_v >= 4)
4784     ::decode(osd_fsid, bl);
4785   if (struct_v >= 6) {
4786     epoch_t last_map_marked_full;
4787     ::decode(last_map_marked_full, bl);
4788   }
4789   if (struct_v >= 7) {
4790     map<int64_t,epoch_t> pool_last_map_marked_full;
4791     ::decode(pool_last_map_marked_full, bl);
4792   }
4793   DECODE_FINISH(bl);
4794 }
4795
4796 void OSDSuperblock::dump(Formatter *f) const
4797 {
4798   f->dump_stream("cluster_fsid") << cluster_fsid;
4799   f->dump_stream("osd_fsid") << osd_fsid;
4800   f->dump_int("whoami", whoami);
4801   f->dump_int("current_epoch", current_epoch);
4802   f->dump_int("oldest_map", oldest_map);
4803   f->dump_int("newest_map", newest_map);
4804   f->dump_float("weight", weight);
4805   f->open_object_section("compat");
4806   compat_features.dump(f);
4807   f->close_section();
4808   f->dump_int("clean_thru", clean_thru);
4809   f->dump_int("last_epoch_mounted", mounted);
4810 }
4811
4812 void OSDSuperblock::generate_test_instances(list<OSDSuperblock*>& o)
4813 {
4814   OSDSuperblock z;
4815   o.push_back(new OSDSuperblock(z));
4816   memset(&z.cluster_fsid, 1, sizeof(z.cluster_fsid));
4817   memset(&z.osd_fsid, 2, sizeof(z.osd_fsid));
4818   z.whoami = 3;
4819   z.current_epoch = 4;
4820   z.oldest_map = 5;
4821   z.newest_map = 9;
4822   z.mounted = 8;
4823   z.clean_thru = 7;
4824   o.push_back(new OSDSuperblock(z));
4825   o.push_back(new OSDSuperblock(z));
4826 }
4827
4828 // -- SnapSet --
4829
4830 void SnapSet::encode(bufferlist& bl) const
4831 {
4832   ENCODE_START(3, 2, bl);
4833   ::encode(seq, bl);
4834   ::encode(head_exists, bl);
4835   ::encode(snaps, bl);
4836   ::encode(clones, bl);
4837   ::encode(clone_overlap, bl);
4838   ::encode(clone_size, bl);
4839   ::encode(clone_snaps, bl);
4840   ENCODE_FINISH(bl);
4841 }
4842
4843 void SnapSet::decode(bufferlist::iterator& bl)
4844 {
4845   DECODE_START_LEGACY_COMPAT_LEN(3, 2, 2, bl);
4846   ::decode(seq, bl);
4847   ::decode(head_exists, bl);
4848   ::decode(snaps, bl);
4849   ::decode(clones, bl);
4850   ::decode(clone_overlap, bl);
4851   ::decode(clone_size, bl);
4852   if (struct_v >= 3) {
4853     ::decode(clone_snaps, bl);
4854   } else {
4855     clone_snaps.clear();
4856   }
4857   DECODE_FINISH(bl);
4858 }
4859
4860 void SnapSet::dump(Formatter *f) const
4861 {
4862   SnapContext sc(seq, snaps);
4863   f->open_object_section("snap_context");
4864   sc.dump(f);
4865   f->close_section();
4866   f->dump_int("head_exists", head_exists);
4867   f->open_array_section("clones");
4868   for (vector<snapid_t>::const_iterator p = clones.begin(); p != clones.end(); ++p) {
4869     f->open_object_section("clone");
4870     f->dump_unsigned("snap", *p);
4871     f->dump_unsigned("size", clone_size.find(*p)->second);
4872     f->dump_stream("overlap") << clone_overlap.find(*p)->second;
4873     auto q = clone_snaps.find(*p);
4874     if (q != clone_snaps.end()) {
4875       f->open_array_section("snaps");
4876       for (auto s : q->second) {
4877         f->dump_unsigned("snap", s);
4878       }
4879       f->close_section();
4880     }
4881     f->close_section();
4882   }
4883   f->close_section();
4884 }
4885
4886 void SnapSet::generate_test_instances(list<SnapSet*>& o)
4887 {
4888   o.push_back(new SnapSet);
4889   o.push_back(new SnapSet);
4890   o.back()->head_exists = true;
4891   o.back()->seq = 123;
4892   o.back()->snaps.push_back(123);
4893   o.back()->snaps.push_back(12);
4894   o.push_back(new SnapSet);
4895   o.back()->head_exists = true;
4896   o.back()->seq = 123;
4897   o.back()->snaps.push_back(123);
4898   o.back()->snaps.push_back(12);
4899   o.back()->clones.push_back(12);
4900   o.back()->clone_size[12] = 12345;
4901   o.back()->clone_overlap[12];
4902   o.back()->clone_snaps[12] = {12, 10, 8};
4903 }
4904
4905 ostream& operator<<(ostream& out, const SnapSet& cs)
4906 {
4907   if (cs.is_legacy()) {
4908     out << cs.seq << "=" << cs.snaps << ":"
4909         << cs.clones
4910         << (cs.head_exists ? "+head":"");
4911     if (!cs.clone_snaps.empty()) {
4912       out << "+stray_clone_snaps=" << cs.clone_snaps;
4913     }
4914     return out;
4915   } else {
4916     return out << cs.seq << "=" << cs.snaps << ":"
4917                << cs.clone_snaps;
4918   }
4919 }
4920
4921 void SnapSet::from_snap_set(const librados::snap_set_t& ss, bool legacy)
4922 {
4923   // NOTE: our reconstruction of snaps (and the snapc) is not strictly
4924   // correct: it will not include snaps that still logically exist
4925   // but for which there was no clone that is defined.  For all
4926   // practical purposes this doesn't matter, since we only use that
4927   // information to clone on the OSD, and we have already moved
4928   // forward past that part of the object history.
4929
4930   seq = ss.seq;
4931   set<snapid_t> _snaps;
4932   set<snapid_t> _clones;
4933   head_exists = false;
4934   for (vector<librados::clone_info_t>::const_iterator p = ss.clones.begin();
4935        p != ss.clones.end();
4936        ++p) {
4937     if (p->cloneid == librados::SNAP_HEAD) {
4938       head_exists = true;
4939     } else {
4940       _clones.insert(p->cloneid);
4941       _snaps.insert(p->snaps.begin(), p->snaps.end());
4942       clone_size[p->cloneid] = p->size;
4943       clone_overlap[p->cloneid];  // the entry must exist, even if it's empty.
4944       for (vector<pair<uint64_t, uint64_t> >::const_iterator q =
4945              p->overlap.begin(); q != p->overlap.end(); ++q)
4946         clone_overlap[p->cloneid].insert(q->first, q->second);
4947       if (!legacy) {
4948         // p->snaps is ascending; clone_snaps is descending
4949         vector<snapid_t>& v = clone_snaps[p->cloneid];
4950         for (auto q = p->snaps.rbegin(); q != p->snaps.rend(); ++q) {
4951           v.push_back(*q);
4952         }
4953       }
4954     }
4955   }
4956
4957   // ascending
4958   clones.clear();
4959   clones.reserve(_clones.size());
4960   for (set<snapid_t>::iterator p = _clones.begin(); p != _clones.end(); ++p)
4961     clones.push_back(*p);
4962
4963   // descending
4964   snaps.clear();
4965   snaps.reserve(_snaps.size());
4966   for (set<snapid_t>::reverse_iterator p = _snaps.rbegin();
4967        p != _snaps.rend(); ++p)
4968     snaps.push_back(*p);
4969 }
4970
4971 uint64_t SnapSet::get_clone_bytes(snapid_t clone) const
4972 {
4973   assert(clone_size.count(clone));
4974   uint64_t size = clone_size.find(clone)->second;
4975   assert(clone_overlap.count(clone));
4976   const interval_set<uint64_t> &overlap = clone_overlap.find(clone)->second;
4977   for (interval_set<uint64_t>::const_iterator i = overlap.begin();
4978        i != overlap.end();
4979        ++i) {
4980     assert(size >= i.get_len());
4981     size -= i.get_len();
4982   }
4983   return size;
4984 }
4985
4986 void SnapSet::filter(const pg_pool_t &pinfo)
4987 {
4988   vector<snapid_t> oldsnaps;
4989   oldsnaps.swap(snaps);
4990   for (vector<snapid_t>::const_iterator i = oldsnaps.begin();
4991        i != oldsnaps.end();
4992        ++i) {
4993     if (!pinfo.is_removed_snap(*i))
4994       snaps.push_back(*i);
4995   }
4996 }
4997
4998 SnapSet SnapSet::get_filtered(const pg_pool_t &pinfo) const
4999 {
5000   SnapSet ss = *this;
5001   ss.filter(pinfo);
5002   return ss;
5003 }
5004
5005 // -- watch_info_t --
5006
5007 void watch_info_t::encode(bufferlist& bl, uint64_t features) const
5008 {
5009   ENCODE_START(4, 3, bl);
5010   ::encode(cookie, bl);
5011   ::encode(timeout_seconds, bl);
5012   ::encode(addr, bl, features);
5013   ENCODE_FINISH(bl);
5014 }
5015
5016 void watch_info_t::decode(bufferlist::iterator& bl)
5017 {
5018   DECODE_START_LEGACY_COMPAT_LEN(4, 3, 3, bl);
5019   ::decode(cookie, bl);
5020   if (struct_v < 2) {
5021     uint64_t ver;
5022     ::decode(ver, bl);
5023   }
5024   ::decode(timeout_seconds, bl);
5025   if (struct_v >= 4) {
5026     ::decode(addr, bl);
5027   }
5028   DECODE_FINISH(bl);
5029 }
5030
5031 void watch_info_t::dump(Formatter *f) const
5032 {
5033   f->dump_unsigned("cookie", cookie);
5034   f->dump_unsigned("timeout_seconds", timeout_seconds);
5035   f->open_object_section("addr");
5036   addr.dump(f);
5037   f->close_section();
5038 }
5039
5040 void watch_info_t::generate_test_instances(list<watch_info_t*>& o)
5041 {
5042   o.push_back(new watch_info_t);
5043   o.push_back(new watch_info_t);
5044   o.back()->cookie = 123;
5045   o.back()->timeout_seconds = 99;
5046   entity_addr_t ea;
5047   ea.set_type(entity_addr_t::TYPE_LEGACY);
5048   ea.set_nonce(1);
5049   ea.set_family(AF_INET);
5050   ea.set_in4_quad(0, 127);
5051   ea.set_in4_quad(1, 0);
5052   ea.set_in4_quad(2, 1);
5053   ea.set_in4_quad(3, 2);
5054   ea.set_port(2);
5055   o.back()->addr = ea;
5056 }
5057
5058 // -- object_manifest_t --
5059
5060 void object_manifest_t::encode(bufferlist& bl) const
5061 {
5062   ENCODE_START(1, 1, bl);
5063   ::encode(type, bl);
5064   switch (type) {
5065     case TYPE_NONE: break;
5066     case TYPE_REDIRECT: 
5067       ::encode(redirect_target, bl);
5068       break;
5069     default:
5070       ceph_abort();
5071   }
5072   ENCODE_FINISH(bl);
5073 }
5074
5075 void object_manifest_t::decode(bufferlist::iterator& bl)
5076 {
5077   DECODE_START(1, bl);
5078   ::decode(type, bl);
5079   switch (type) {
5080     case TYPE_NONE: break;
5081     case TYPE_REDIRECT: 
5082       ::decode(redirect_target, bl);
5083       break;
5084     default:
5085       ceph_abort();
5086   }
5087   DECODE_FINISH(bl);
5088 }
5089
5090 void object_manifest_t::dump(Formatter *f) const
5091 {
5092   f->dump_unsigned("type", type);
5093   f->open_object_section("redirect_target");
5094   redirect_target.dump(f);
5095   f->close_section();
5096 }
5097
5098 void object_manifest_t::generate_test_instances(list<object_manifest_t*>& o)
5099 {
5100   o.push_back(new object_manifest_t());
5101   o.back()->type = TYPE_REDIRECT;
5102 }
5103
5104 ostream& operator<<(ostream& out, const object_manifest_t& om)
5105 {
5106   return out << "type:" << om.type << " redirect_target:" << om.redirect_target;
5107 }
5108
5109 // -- object_info_t --
5110
5111 void object_info_t::copy_user_bits(const object_info_t& other)
5112 {
5113   // these bits are copied from head->clone.
5114   size = other.size;
5115   mtime = other.mtime;
5116   local_mtime = other.local_mtime;
5117   last_reqid = other.last_reqid;
5118   truncate_seq = other.truncate_seq;
5119   truncate_size = other.truncate_size;
5120   flags = other.flags;
5121   user_version = other.user_version;
5122   data_digest = other.data_digest;
5123   omap_digest = other.omap_digest;
5124 }
5125
5126 ps_t object_info_t::legacy_object_locator_to_ps(const object_t &oid, 
5127                                                 const object_locator_t &loc) {
5128   ps_t ps;
5129   if (loc.key.length())
5130     // Hack, we don't have the osd map, so we don't really know the hash...
5131     ps = ceph_str_hash(CEPH_STR_HASH_RJENKINS, loc.key.c_str(), 
5132                        loc.key.length());
5133   else
5134     ps = ceph_str_hash(CEPH_STR_HASH_RJENKINS, oid.name.c_str(),
5135                        oid.name.length());
5136   return ps;
5137 }
5138
5139 void object_info_t::encode(bufferlist& bl, uint64_t features) const
5140 {
5141   object_locator_t myoloc(soid);
5142   map<entity_name_t, watch_info_t> old_watchers;
5143   for (map<pair<uint64_t, entity_name_t>, watch_info_t>::const_iterator i =
5144          watchers.begin();
5145        i != watchers.end();
5146        ++i) {
5147     old_watchers.insert(make_pair(i->first.second, i->second));
5148   }
5149   ENCODE_START(17, 8, bl);
5150   ::encode(soid, bl);
5151   ::encode(myoloc, bl); //Retained for compatibility
5152   ::encode((__u32)0, bl); // was category, no longer used
5153   ::encode(version, bl);
5154   ::encode(prior_version, bl);
5155   ::encode(last_reqid, bl);
5156   ::encode(size, bl);
5157   ::encode(mtime, bl);
5158   if (soid.snap == CEPH_NOSNAP)
5159     ::encode(osd_reqid_t(), bl);  // used to be wrlock_by
5160   else
5161     ::encode(legacy_snaps, bl);
5162   ::encode(truncate_seq, bl);
5163   ::encode(truncate_size, bl);
5164   ::encode(is_lost(), bl);
5165   ::encode(old_watchers, bl, features);
5166   /* shenanigans to avoid breaking backwards compatibility in the disk format.
5167    * When we can, switch this out for simply putting the version_t on disk. */
5168   eversion_t user_eversion(0, user_version);
5169   ::encode(user_eversion, bl);
5170   ::encode(test_flag(FLAG_USES_TMAP), bl);
5171   ::encode(watchers, bl, features);
5172   __u32 _flags = flags;
5173   ::encode(_flags, bl);
5174   ::encode(local_mtime, bl);
5175   ::encode(data_digest, bl);
5176   ::encode(omap_digest, bl);
5177   ::encode(expected_object_size, bl);
5178   ::encode(expected_write_size, bl);
5179   ::encode(alloc_hint_flags, bl);
5180   if (has_manifest()) {
5181     ::encode(manifest, bl);
5182   }
5183   ENCODE_FINISH(bl);
5184 }
5185
5186 void object_info_t::decode(bufferlist::iterator& bl)
5187 {
5188   object_locator_t myoloc;
5189   DECODE_START_LEGACY_COMPAT_LEN(17, 8, 8, bl);
5190   map<entity_name_t, watch_info_t> old_watchers;
5191   ::decode(soid, bl);
5192   ::decode(myoloc, bl);
5193   {
5194     string category;
5195     ::decode(category, bl);  // no longer used
5196   }
5197   ::decode(version, bl);
5198   ::decode(prior_version, bl);
5199   ::decode(last_reqid, bl);
5200   ::decode(size, bl);
5201   ::decode(mtime, bl);
5202   if (soid.snap == CEPH_NOSNAP) {
5203     osd_reqid_t wrlock_by;
5204     ::decode(wrlock_by, bl);
5205   } else {
5206     ::decode(legacy_snaps, bl);
5207   }
5208   ::decode(truncate_seq, bl);
5209   ::decode(truncate_size, bl);
5210
5211   // if this is struct_v >= 13, we will overwrite this
5212   // below since this field is just here for backwards
5213   // compatibility
5214   __u8 lo;
5215   ::decode(lo, bl);
5216   flags = (flag_t)lo;
5217
5218   ::decode(old_watchers, bl);
5219   eversion_t user_eversion;
5220   ::decode(user_eversion, bl);
5221   user_version = user_eversion.version;
5222
5223   if (struct_v >= 9) {
5224     bool uses_tmap = false;
5225     ::decode(uses_tmap, bl);
5226     if (uses_tmap)
5227       set_flag(FLAG_USES_TMAP);
5228   } else {
5229     set_flag(FLAG_USES_TMAP);
5230   }
5231   if (struct_v < 10)
5232     soid.pool = myoloc.pool;
5233   if (struct_v >= 11) {
5234     ::decode(watchers, bl);
5235   } else {
5236     for (map<entity_name_t, watch_info_t>::iterator i = old_watchers.begin();
5237          i != old_watchers.end();
5238          ++i) {
5239       watchers.insert(
5240         make_pair(
5241           make_pair(i->second.cookie, i->first), i->second));
5242     }
5243   }
5244   if (struct_v >= 13) {
5245     __u32 _flags;
5246     ::decode(_flags, bl);
5247     flags = (flag_t)_flags;
5248   }
5249   if (struct_v >= 14) {
5250     ::decode(local_mtime, bl);
5251   } else {
5252     local_mtime = utime_t();
5253   }
5254   if (struct_v >= 15) {
5255     ::decode(data_digest, bl);
5256     ::decode(omap_digest, bl);
5257   } else {
5258     data_digest = omap_digest = -1;
5259     clear_flag(FLAG_DATA_DIGEST);
5260     clear_flag(FLAG_OMAP_DIGEST);
5261   }
5262   if (struct_v >= 16) {
5263     ::decode(expected_object_size, bl);
5264     ::decode(expected_write_size, bl);
5265     ::decode(alloc_hint_flags, bl);
5266   } else {
5267     expected_object_size = 0;
5268     expected_write_size = 0;
5269     alloc_hint_flags = 0;
5270   }
5271   if (struct_v >= 17) {
5272     if (has_manifest()) {
5273       ::decode(manifest, bl);
5274     }
5275   }
5276   DECODE_FINISH(bl);
5277 }
5278
5279 void object_info_t::dump(Formatter *f) const
5280 {
5281   f->open_object_section("oid");
5282   soid.dump(f);
5283   f->close_section();
5284   f->dump_stream("version") << version;
5285   f->dump_stream("prior_version") << prior_version;
5286   f->dump_stream("last_reqid") << last_reqid;
5287   f->dump_unsigned("user_version", user_version);
5288   f->dump_unsigned("size", size);
5289   f->dump_stream("mtime") << mtime;
5290   f->dump_stream("local_mtime") << local_mtime;
5291   f->dump_unsigned("lost", (int)is_lost());
5292   f->dump_unsigned("flags", (int)flags);
5293   f->open_array_section("legacy_snaps");
5294   for (auto s : legacy_snaps) {
5295     f->dump_unsigned("snap", s);
5296   }
5297   f->close_section();
5298   f->dump_unsigned("truncate_seq", truncate_seq);
5299   f->dump_unsigned("truncate_size", truncate_size);
5300   f->dump_unsigned("data_digest", data_digest);
5301   f->dump_unsigned("omap_digest", omap_digest);
5302   f->dump_unsigned("expected_object_size", expected_object_size);
5303   f->dump_unsigned("expected_write_size", expected_write_size);
5304   f->dump_unsigned("alloc_hint_flags", alloc_hint_flags);
5305   f->dump_object("manifest", manifest);
5306   f->open_object_section("watchers");
5307   for (map<pair<uint64_t, entity_name_t>,watch_info_t>::const_iterator p =
5308          watchers.begin(); p != watchers.end(); ++p) {
5309     stringstream ss;
5310     ss << p->first.second;
5311     f->open_object_section(ss.str().c_str());
5312     p->second.dump(f);
5313     f->close_section();
5314   }
5315   f->close_section();
5316 }
5317
5318 void object_info_t::generate_test_instances(list<object_info_t*>& o)
5319 {
5320   o.push_back(new object_info_t());
5321   
5322   // fixme
5323 }
5324
5325
5326 ostream& operator<<(ostream& out, const object_info_t& oi)
5327 {
5328   out << oi.soid << "(" << oi.version
5329       << " " << oi.last_reqid;
5330   if (oi.soid.snap != CEPH_NOSNAP && !oi.legacy_snaps.empty())
5331     out << " " << oi.legacy_snaps;
5332   if (oi.flags)
5333     out << " " << oi.get_flag_string();
5334   out << " s " << oi.size;
5335   out << " uv " << oi.user_version;
5336   if (oi.is_data_digest())
5337     out << " dd " << std::hex << oi.data_digest << std::dec;
5338   if (oi.is_omap_digest())
5339     out << " od " << std::hex << oi.omap_digest << std::dec;
5340   out << " alloc_hint [" << oi.expected_object_size
5341       << " " << oi.expected_write_size
5342       << " " << oi.alloc_hint_flags << "]";
5343   if (oi.has_manifest())
5344     out << " " << oi.manifest;
5345
5346   out << ")";
5347   return out;
5348 }
5349
5350 // -- ObjectRecovery --
5351 void ObjectRecoveryProgress::encode(bufferlist &bl) const
5352 {
5353   ENCODE_START(1, 1, bl);
5354   ::encode(first, bl);
5355   ::encode(data_complete, bl);
5356   ::encode(data_recovered_to, bl);
5357   ::encode(omap_recovered_to, bl);
5358   ::encode(omap_complete, bl);
5359   ENCODE_FINISH(bl);
5360 }
5361
5362 void ObjectRecoveryProgress::decode(bufferlist::iterator &bl)
5363 {
5364   DECODE_START(1, bl);
5365   ::decode(first, bl);
5366   ::decode(data_complete, bl);
5367   ::decode(data_recovered_to, bl);
5368   ::decode(omap_recovered_to, bl);
5369   ::decode(omap_complete, bl);
5370   DECODE_FINISH(bl);
5371 }
5372
5373 ostream &operator<<(ostream &out, const ObjectRecoveryProgress &prog)
5374 {
5375   return prog.print(out);
5376 }
5377
5378 void ObjectRecoveryProgress::generate_test_instances(
5379   list<ObjectRecoveryProgress*>& o)
5380 {
5381   o.push_back(new ObjectRecoveryProgress);
5382   o.back()->first = false;
5383   o.back()->data_complete = true;
5384   o.back()->omap_complete = true;
5385   o.back()->data_recovered_to = 100;
5386
5387   o.push_back(new ObjectRecoveryProgress);
5388   o.back()->first = true;
5389   o.back()->data_complete = false;
5390   o.back()->omap_complete = false;
5391   o.back()->data_recovered_to = 0;
5392 }
5393
5394 ostream &ObjectRecoveryProgress::print(ostream &out) const
5395 {
5396   return out << "ObjectRecoveryProgress("
5397              << ( first ? "" : "!" ) << "first, "
5398              << "data_recovered_to:" << data_recovered_to
5399              << ", data_complete:" << ( data_complete ? "true" : "false" )
5400              << ", omap_recovered_to:" << omap_recovered_to
5401              << ", omap_complete:" << ( omap_complete ? "true" : "false" )
5402              << ", error:" << ( error ? "true" : "false" )
5403              << ")";
5404 }
5405
5406 void ObjectRecoveryProgress::dump(Formatter *f) const
5407 {
5408   f->dump_int("first?", first);
5409   f->dump_int("data_complete?", data_complete);
5410   f->dump_unsigned("data_recovered_to", data_recovered_to);
5411   f->dump_int("omap_complete?", omap_complete);
5412   f->dump_string("omap_recovered_to", omap_recovered_to);
5413 }
5414
5415 void ObjectRecoveryInfo::encode(bufferlist &bl, uint64_t features) const
5416 {
5417   ENCODE_START(2, 1, bl);
5418   ::encode(soid, bl);
5419   ::encode(version, bl);
5420   ::encode(size, bl);
5421   ::encode(oi, bl, features);
5422   ::encode(ss, bl);
5423   ::encode(copy_subset, bl);
5424   ::encode(clone_subset, bl);
5425   ENCODE_FINISH(bl);
5426 }
5427
5428 void ObjectRecoveryInfo::decode(bufferlist::iterator &bl,
5429                                 int64_t pool)
5430 {
5431   DECODE_START(2, bl);
5432   ::decode(soid, bl);
5433   ::decode(version, bl);
5434   ::decode(size, bl);
5435   ::decode(oi, bl);
5436   ::decode(ss, bl);
5437   ::decode(copy_subset, bl);
5438   ::decode(clone_subset, bl);
5439   DECODE_FINISH(bl);
5440
5441   if (struct_v < 2) {
5442     if (!soid.is_max() && soid.pool == -1)
5443       soid.pool = pool;
5444     map<hobject_t, interval_set<uint64_t>> tmp;
5445     tmp.swap(clone_subset);
5446     for (map<hobject_t, interval_set<uint64_t>>::iterator i = tmp.begin();
5447          i != tmp.end();
5448          ++i) {
5449       hobject_t first(i->first);
5450       if (!first.is_max() && first.pool == -1)
5451         first.pool = pool;
5452       clone_subset[first].swap(i->second);
5453     }
5454   }
5455 }
5456
5457 void ObjectRecoveryInfo::generate_test_instances(
5458   list<ObjectRecoveryInfo*>& o)
5459 {
5460   o.push_back(new ObjectRecoveryInfo);
5461   o.back()->soid = hobject_t(sobject_t("key", CEPH_NOSNAP));
5462   o.back()->version = eversion_t(0,0);
5463   o.back()->size = 100;
5464 }
5465
5466
5467 void ObjectRecoveryInfo::dump(Formatter *f) const
5468 {
5469   f->dump_stream("object") << soid;
5470   f->dump_stream("at_version") << version;
5471   f->dump_stream("size") << size;
5472   {
5473     f->open_object_section("object_info");
5474     oi.dump(f);
5475     f->close_section();
5476   }
5477   {
5478     f->open_object_section("snapset");
5479     ss.dump(f);
5480     f->close_section();
5481   }
5482   f->dump_stream("copy_subset") << copy_subset;
5483   f->dump_stream("clone_subset") << clone_subset;
5484 }
5485
5486 ostream& operator<<(ostream& out, const ObjectRecoveryInfo &inf)
5487 {
5488   return inf.print(out);
5489 }
5490
5491 ostream &ObjectRecoveryInfo::print(ostream &out) const
5492 {
5493   return out << "ObjectRecoveryInfo("
5494              << soid << "@" << version
5495              << ", size: " << size
5496              << ", copy_subset: " << copy_subset
5497              << ", clone_subset: " << clone_subset
5498              << ", snapset: " << ss
5499              << ")";
5500 }
5501
5502 // -- PushReplyOp --
5503 void PushReplyOp::generate_test_instances(list<PushReplyOp*> &o)
5504 {
5505   o.push_back(new PushReplyOp);
5506   o.push_back(new PushReplyOp);
5507   o.back()->soid = hobject_t(sobject_t("asdf", 2));
5508   o.push_back(new PushReplyOp);
5509   o.back()->soid = hobject_t(sobject_t("asdf", CEPH_NOSNAP));
5510 }
5511
5512 void PushReplyOp::encode(bufferlist &bl) const
5513 {
5514   ENCODE_START(1, 1, bl);
5515   ::encode(soid, bl);
5516   ENCODE_FINISH(bl);
5517 }
5518
5519 void PushReplyOp::decode(bufferlist::iterator &bl)
5520 {
5521   DECODE_START(1, bl);
5522   ::decode(soid, bl);
5523   DECODE_FINISH(bl);
5524 }
5525
5526 void PushReplyOp::dump(Formatter *f) const
5527 {
5528   f->dump_stream("soid") << soid;
5529 }
5530
5531 ostream &PushReplyOp::print(ostream &out) const
5532 {
5533   return out
5534     << "PushReplyOp(" << soid
5535     << ")";
5536 }
5537
5538 ostream& operator<<(ostream& out, const PushReplyOp &op)
5539 {
5540   return op.print(out);
5541 }
5542
5543 uint64_t PushReplyOp::cost(CephContext *cct) const
5544 {
5545
5546   return cct->_conf->osd_push_per_object_cost +
5547     cct->_conf->osd_recovery_max_chunk;
5548 }
5549
5550 // -- PullOp --
5551 void PullOp::generate_test_instances(list<PullOp*> &o)
5552 {
5553   o.push_back(new PullOp);
5554   o.push_back(new PullOp);
5555   o.back()->soid = hobject_t(sobject_t("asdf", 2));
5556   o.back()->recovery_info.version = eversion_t(3, 10);
5557   o.push_back(new PullOp);
5558   o.back()->soid = hobject_t(sobject_t("asdf", CEPH_NOSNAP));
5559   o.back()->recovery_info.version = eversion_t(0, 0);
5560 }
5561
5562 void PullOp::encode(bufferlist &bl, uint64_t features) const
5563 {
5564   ENCODE_START(1, 1, bl);
5565   ::encode(soid, bl);
5566   ::encode(recovery_info, bl, features);
5567   ::encode(recovery_progress, bl);
5568   ENCODE_FINISH(bl);
5569 }
5570
5571 void PullOp::decode(bufferlist::iterator &bl)
5572 {
5573   DECODE_START(1, bl);
5574   ::decode(soid, bl);
5575   ::decode(recovery_info, bl);
5576   ::decode(recovery_progress, bl);
5577   DECODE_FINISH(bl);
5578 }
5579
5580 void PullOp::dump(Formatter *f) const
5581 {
5582   f->dump_stream("soid") << soid;
5583   {
5584     f->open_object_section("recovery_info");
5585     recovery_info.dump(f);
5586     f->close_section();
5587   }
5588   {
5589     f->open_object_section("recovery_progress");
5590     recovery_progress.dump(f);
5591     f->close_section();
5592   }
5593 }
5594
5595 ostream &PullOp::print(ostream &out) const
5596 {
5597   return out
5598     << "PullOp(" << soid
5599     << ", recovery_info: " << recovery_info
5600     << ", recovery_progress: " << recovery_progress
5601     << ")";
5602 }
5603
5604 ostream& operator<<(ostream& out, const PullOp &op)
5605 {
5606   return op.print(out);
5607 }
5608
5609 uint64_t PullOp::cost(CephContext *cct) const
5610 {
5611   return cct->_conf->osd_push_per_object_cost +
5612     cct->_conf->osd_recovery_max_chunk;
5613 }
5614
5615 // -- PushOp --
5616 void PushOp::generate_test_instances(list<PushOp*> &o)
5617 {
5618   o.push_back(new PushOp);
5619   o.push_back(new PushOp);
5620   o.back()->soid = hobject_t(sobject_t("asdf", 2));
5621   o.back()->version = eversion_t(3, 10);
5622   o.push_back(new PushOp);
5623   o.back()->soid = hobject_t(sobject_t("asdf", CEPH_NOSNAP));
5624   o.back()->version = eversion_t(0, 0);
5625 }
5626
5627 void PushOp::encode(bufferlist &bl, uint64_t features) const
5628 {
5629   ENCODE_START(1, 1, bl);
5630   ::encode(soid, bl);
5631   ::encode(version, bl);
5632   ::encode(data, bl);
5633   ::encode(data_included, bl);
5634   ::encode(omap_header, bl);
5635   ::encode(omap_entries, bl);
5636   ::encode(attrset, bl);
5637   ::encode(recovery_info, bl, features);
5638   ::encode(after_progress, bl);
5639   ::encode(before_progress, bl);
5640   ENCODE_FINISH(bl);
5641 }
5642
5643 void PushOp::decode(bufferlist::iterator &bl)
5644 {
5645   DECODE_START(1, bl);
5646   ::decode(soid, bl);
5647   ::decode(version, bl);
5648   ::decode(data, bl);
5649   ::decode(data_included, bl);
5650   ::decode(omap_header, bl);
5651   ::decode(omap_entries, bl);
5652   ::decode(attrset, bl);
5653   ::decode(recovery_info, bl);
5654   ::decode(after_progress, bl);
5655   ::decode(before_progress, bl);
5656   DECODE_FINISH(bl);
5657 }
5658
5659 void PushOp::dump(Formatter *f) const
5660 {
5661   f->dump_stream("soid") << soid;
5662   f->dump_stream("version") << version;
5663   f->dump_int("data_len", data.length());
5664   f->dump_stream("data_included") << data_included;
5665   f->dump_int("omap_header_len", omap_header.length());
5666   f->dump_int("omap_entries_len", omap_entries.size());
5667   f->dump_int("attrset_len", attrset.size());
5668   {
5669     f->open_object_section("recovery_info");
5670     recovery_info.dump(f);
5671     f->close_section();
5672   }
5673   {
5674     f->open_object_section("after_progress");
5675     after_progress.dump(f);
5676     f->close_section();
5677   }
5678   {
5679     f->open_object_section("before_progress");
5680     before_progress.dump(f);
5681     f->close_section();
5682   }
5683 }
5684
5685 ostream &PushOp::print(ostream &out) const
5686 {
5687   return out
5688     << "PushOp(" << soid
5689     << ", version: " << version
5690     << ", data_included: " << data_included
5691     << ", data_size: " << data.length()
5692     << ", omap_header_size: " << omap_header.length()
5693     << ", omap_entries_size: " << omap_entries.size()
5694     << ", attrset_size: " << attrset.size()
5695     << ", recovery_info: " << recovery_info
5696     << ", after_progress: " << after_progress
5697     << ", before_progress: " << before_progress
5698     << ")";
5699 }
5700
5701 ostream& operator<<(ostream& out, const PushOp &op)
5702 {
5703   return op.print(out);
5704 }
5705
5706 uint64_t PushOp::cost(CephContext *cct) const
5707 {
5708   uint64_t cost = data_included.size();
5709   for (map<string, bufferlist>::const_iterator i =
5710          omap_entries.begin();
5711        i != omap_entries.end();
5712        ++i) {
5713     cost += i->second.length();
5714   }
5715   cost += cct->_conf->osd_push_per_object_cost;
5716   return cost;
5717 }
5718
5719 // -- ScrubMap --
5720
5721 void ScrubMap::merge_incr(const ScrubMap &l)
5722 {
5723   assert(valid_through == l.incr_since);
5724   valid_through = l.valid_through;
5725
5726   for (map<hobject_t,object>::const_iterator p = l.objects.begin();
5727        p != l.objects.end();
5728        ++p){
5729     if (p->second.negative) {
5730       map<hobject_t,object>::iterator q = objects.find(p->first);
5731       if (q != objects.end()) {
5732         objects.erase(q);
5733       }
5734     } else {
5735       objects[p->first] = p->second;
5736     }
5737   }
5738 }          
5739
5740 void ScrubMap::encode(bufferlist& bl) const
5741 {
5742   ENCODE_START(3, 2, bl);
5743   ::encode(objects, bl);
5744   ::encode((__u32)0, bl); // used to be attrs; now deprecated
5745   bufferlist old_logbl;  // not used
5746   ::encode(old_logbl, bl);
5747   ::encode(valid_through, bl);
5748   ::encode(incr_since, bl);
5749   ENCODE_FINISH(bl);
5750 }
5751
5752 void ScrubMap::decode(bufferlist::iterator& bl, int64_t pool)
5753 {
5754   DECODE_START_LEGACY_COMPAT_LEN(3, 2, 2, bl);
5755   ::decode(objects, bl);
5756   {
5757     map<string,string> attrs;  // deprecated
5758     ::decode(attrs, bl);
5759   }
5760   bufferlist old_logbl;   // not used
5761   ::decode(old_logbl, bl);
5762   ::decode(valid_through, bl);
5763   ::decode(incr_since, bl);
5764   DECODE_FINISH(bl);
5765
5766   // handle hobject_t upgrade
5767   if (struct_v < 3) {
5768     map<hobject_t, object> tmp;
5769     tmp.swap(objects);
5770     for (map<hobject_t, object>::iterator i = tmp.begin();
5771          i != tmp.end();
5772          ++i) {
5773       hobject_t first(i->first);
5774       if (!first.is_max() && first.pool == -1)
5775         first.pool = pool;
5776       objects[first] = i->second;
5777     }
5778   }
5779 }
5780
5781 void ScrubMap::dump(Formatter *f) const
5782 {
5783   f->dump_stream("valid_through") << valid_through;
5784   f->dump_stream("incremental_since") << incr_since;
5785   f->open_array_section("objects");
5786   for (map<hobject_t,object>::const_iterator p = objects.begin(); p != objects.end(); ++p) {
5787     f->open_object_section("object");
5788     f->dump_string("name", p->first.oid.name);
5789     f->dump_unsigned("hash", p->first.get_hash());
5790     f->dump_string("key", p->first.get_key());
5791     f->dump_int("snapid", p->first.snap);
5792     p->second.dump(f);
5793     f->close_section();
5794   }
5795   f->close_section();
5796 }
5797
5798 void ScrubMap::generate_test_instances(list<ScrubMap*>& o)
5799 {
5800   o.push_back(new ScrubMap);
5801   o.push_back(new ScrubMap);
5802   o.back()->valid_through = eversion_t(1, 2);
5803   o.back()->incr_since = eversion_t(3, 4);
5804   list<object*> obj;
5805   object::generate_test_instances(obj);
5806   o.back()->objects[hobject_t(object_t("foo"), "fookey", 123, 456, 0, "")] = *obj.back();
5807   obj.pop_back();
5808   o.back()->objects[hobject_t(object_t("bar"), string(), 123, 456, 0, "")] = *obj.back();
5809 }
5810
5811 // -- ScrubMap::object --
5812
5813 void ScrubMap::object::encode(bufferlist& bl) const
5814 {
5815   bool compat_read_error = read_error || ec_hash_mismatch || ec_size_mismatch;
5816   ENCODE_START(8, 7, bl);
5817   ::encode(size, bl);
5818   ::encode(negative, bl);
5819   ::encode(attrs, bl);
5820   ::encode(digest, bl);
5821   ::encode(digest_present, bl);
5822   ::encode((uint32_t)0, bl);  // obsolete nlinks
5823   ::encode((uint32_t)0, bl);  // snapcolls
5824   ::encode(omap_digest, bl);
5825   ::encode(omap_digest_present, bl);
5826   ::encode(compat_read_error, bl);
5827   ::encode(stat_error, bl);
5828   ::encode(read_error, bl);
5829   ::encode(ec_hash_mismatch, bl);
5830   ::encode(ec_size_mismatch, bl);
5831   ENCODE_FINISH(bl);
5832 }
5833
5834 void ScrubMap::object::decode(bufferlist::iterator& bl)
5835 {
5836   DECODE_START(8, bl);
5837   ::decode(size, bl);
5838   bool tmp, compat_read_error = false;
5839   ::decode(tmp, bl);
5840   negative = tmp;
5841   ::decode(attrs, bl);
5842   ::decode(digest, bl);
5843   ::decode(tmp, bl);
5844   digest_present = tmp;
5845   {
5846     uint32_t nlinks;
5847     ::decode(nlinks, bl);
5848     set<snapid_t> snapcolls;
5849     ::decode(snapcolls, bl);
5850   }
5851   ::decode(omap_digest, bl);
5852   ::decode(tmp, bl);
5853   omap_digest_present = tmp;
5854   ::decode(compat_read_error, bl);
5855   ::decode(tmp, bl);
5856   stat_error = tmp;
5857   if (struct_v >= 8) {
5858     ::decode(tmp, bl);
5859     read_error = tmp;
5860     ::decode(tmp, bl);
5861     ec_hash_mismatch = tmp;
5862     ::decode(tmp, bl);
5863     ec_size_mismatch = tmp;
5864   }
5865   // If older encoder found a read_error, set read_error
5866   if (compat_read_error && !read_error && !ec_hash_mismatch && !ec_size_mismatch)
5867     read_error = true;
5868   DECODE_FINISH(bl);
5869 }
5870
5871 void ScrubMap::object::dump(Formatter *f) const
5872 {
5873   f->dump_int("size", size);
5874   f->dump_int("negative", negative);
5875   f->open_array_section("attrs");
5876   for (map<string,bufferptr>::const_iterator p = attrs.begin(); p != attrs.end(); ++p) {
5877     f->open_object_section("attr");
5878     f->dump_string("name", p->first);
5879     f->dump_int("length", p->second.length());
5880     f->close_section();
5881   }
5882   f->close_section();
5883 }
5884
5885 void ScrubMap::object::generate_test_instances(list<object*>& o)
5886 {
5887   o.push_back(new object);
5888   o.push_back(new object);
5889   o.back()->negative = true;
5890   o.push_back(new object);
5891   o.back()->size = 123;
5892   o.back()->attrs["foo"] = buffer::copy("foo", 3);
5893   o.back()->attrs["bar"] = buffer::copy("barval", 6);
5894 }
5895
5896 // -- OSDOp --
5897
5898 ostream& operator<<(ostream& out, const OSDOp& op)
5899 {
5900   out << ceph_osd_op_name(op.op.op);
5901   if (ceph_osd_op_type_data(op.op.op)) {
5902     // data extent
5903     switch (op.op.op) {
5904     case CEPH_OSD_OP_ASSERT_VER:
5905       out << " v" << op.op.assert_ver.ver;
5906       break;
5907     case CEPH_OSD_OP_TRUNCATE:
5908       out << " " << op.op.extent.offset;
5909       break;
5910     case CEPH_OSD_OP_MASKTRUNC:
5911     case CEPH_OSD_OP_TRIMTRUNC:
5912       out << " " << op.op.extent.truncate_seq << "@"
5913           << (int64_t)op.op.extent.truncate_size;
5914       break;
5915     case CEPH_OSD_OP_ROLLBACK:
5916       out << " " << snapid_t(op.op.snap.snapid);
5917       break;
5918     case CEPH_OSD_OP_WATCH:
5919       out << " " << ceph_osd_watch_op_name(op.op.watch.op)
5920           << " cookie " << op.op.watch.cookie;
5921       if (op.op.watch.gen)
5922         out << " gen " << op.op.watch.gen;
5923       break;
5924     case CEPH_OSD_OP_NOTIFY:
5925     case CEPH_OSD_OP_NOTIFY_ACK:
5926       out << " cookie " << op.op.notify.cookie;
5927       break;
5928     case CEPH_OSD_OP_COPY_GET:
5929       out << " max " << op.op.copy_get.max;
5930       break;
5931     case CEPH_OSD_OP_COPY_FROM:
5932       out << " ver " << op.op.copy_from.src_version;
5933       break;
5934     case CEPH_OSD_OP_SETALLOCHINT:
5935       out << " object_size " << op.op.alloc_hint.expected_object_size
5936           << " write_size " << op.op.alloc_hint.expected_write_size;
5937       break;
5938     case CEPH_OSD_OP_READ:
5939     case CEPH_OSD_OP_SPARSE_READ:
5940     case CEPH_OSD_OP_SYNC_READ:
5941     case CEPH_OSD_OP_WRITE:
5942     case CEPH_OSD_OP_WRITEFULL:
5943     case CEPH_OSD_OP_ZERO:
5944     case CEPH_OSD_OP_APPEND:
5945     case CEPH_OSD_OP_MAPEXT:
5946       out << " " << op.op.extent.offset << "~" << op.op.extent.length;
5947       if (op.op.extent.truncate_seq)
5948         out << " [" << op.op.extent.truncate_seq << "@"
5949             << (int64_t)op.op.extent.truncate_size << "]";
5950       if (op.op.flags)
5951         out << " [" << ceph_osd_op_flag_string(op.op.flags) << "]";
5952     default:
5953       // don't show any arg info
5954       break;
5955     }
5956   } else if (ceph_osd_op_type_attr(op.op.op)) {
5957     // xattr name
5958     if (op.op.xattr.name_len && op.indata.length()) {
5959       out << " ";
5960       op.indata.write(0, op.op.xattr.name_len, out);
5961     }
5962     if (op.op.xattr.value_len)
5963       out << " (" << op.op.xattr.value_len << ")";
5964     if (op.op.op == CEPH_OSD_OP_CMPXATTR)
5965       out << " op " << (int)op.op.xattr.cmp_op
5966           << " mode " << (int)op.op.xattr.cmp_mode;
5967   } else if (ceph_osd_op_type_exec(op.op.op)) {
5968     // class.method
5969     if (op.op.cls.class_len && op.indata.length()) {
5970       out << " ";
5971       op.indata.write(0, op.op.cls.class_len, out);
5972       out << ".";
5973       op.indata.write(op.op.cls.class_len, op.op.cls.method_len, out);
5974     }
5975   } else if (ceph_osd_op_type_pg(op.op.op)) {
5976     switch (op.op.op) {
5977     case CEPH_OSD_OP_PGLS:
5978     case CEPH_OSD_OP_PGLS_FILTER:
5979     case CEPH_OSD_OP_PGNLS:
5980     case CEPH_OSD_OP_PGNLS_FILTER:
5981       out << " start_epoch " << op.op.pgls.start_epoch;
5982       break;
5983     case CEPH_OSD_OP_PG_HITSET_LS:
5984       break;
5985     case CEPH_OSD_OP_PG_HITSET_GET:
5986       out << " " << utime_t(op.op.hit_set_get.stamp);
5987       break;
5988     case CEPH_OSD_OP_SCRUBLS:
5989       break;
5990     }
5991   }
5992   return out;
5993 }
5994
5995
5996 void OSDOp::split_osd_op_vector_in_data(vector<OSDOp>& ops, bufferlist& in)
5997 {
5998   bufferlist::iterator datap = in.begin();
5999   for (unsigned i = 0; i < ops.size(); i++) {
6000     if (ops[i].op.payload_len) {
6001       datap.copy(ops[i].op.payload_len, ops[i].indata);
6002     }
6003   }
6004 }
6005
6006 void OSDOp::merge_osd_op_vector_in_data(vector<OSDOp>& ops, bufferlist& out)
6007 {
6008   for (unsigned i = 0; i < ops.size(); i++) {
6009     if (ops[i].indata.length()) {
6010       ops[i].op.payload_len = ops[i].indata.length();
6011       out.append(ops[i].indata);
6012     }
6013   }
6014 }
6015
6016 void OSDOp::split_osd_op_vector_out_data(vector<OSDOp>& ops, bufferlist& in)
6017 {
6018   bufferlist::iterator datap = in.begin();
6019   for (unsigned i = 0; i < ops.size(); i++) {
6020     if (ops[i].op.payload_len) {
6021       datap.copy(ops[i].op.payload_len, ops[i].outdata);
6022     }
6023   }
6024 }
6025
6026 void OSDOp::merge_osd_op_vector_out_data(vector<OSDOp>& ops, bufferlist& out)
6027 {
6028   for (unsigned i = 0; i < ops.size(); i++) {
6029     if (ops[i].outdata.length()) {
6030       ops[i].op.payload_len = ops[i].outdata.length();
6031       out.append(ops[i].outdata);
6032     }
6033   }
6034 }
6035
6036 bool store_statfs_t::operator==(const store_statfs_t& other) const
6037 {
6038   return total == other.total
6039     && available == other.available
6040     && allocated == other.allocated
6041     && stored == other.stored
6042     && compressed == other.compressed
6043     && compressed_allocated == other.compressed_allocated
6044     && compressed_original == other.compressed_original;
6045 }
6046
6047 void store_statfs_t::dump(Formatter *f) const
6048 {
6049   f->dump_int("total", total);
6050   f->dump_int("available", available);
6051   f->dump_int("allocated", allocated);
6052   f->dump_int("stored", stored);
6053   f->dump_int("compressed", compressed);
6054   f->dump_int("compressed_allocated", compressed_allocated);
6055   f->dump_int("compressed_original", compressed_original);
6056 }
6057
6058 ostream& operator<<(ostream& out, const store_statfs_t &s)
6059 {
6060   out << std::hex
6061       << "store_statfs(0x" << s.available
6062       << "/0x"  << s.total
6063       << ", stored 0x" << s.stored
6064       << "/0x"  << s.allocated
6065       << ", compress 0x" << s.compressed
6066       << "/0x"  << s.compressed_allocated
6067       << "/0x"  << s.compressed_original
6068       << std::dec
6069       << ")";
6070   return out;
6071 }
6072
6073 void OSDOp::clear_data(vector<OSDOp>& ops)
6074 {
6075   for (unsigned i = 0; i < ops.size(); i++) {
6076     OSDOp& op = ops[i];
6077     op.outdata.clear();
6078     if (ceph_osd_op_type_attr(op.op.op) &&
6079         op.op.xattr.name_len &&
6080         op.indata.length() >= op.op.xattr.name_len) {
6081       bufferptr bp(op.op.xattr.name_len);
6082       bufferlist bl;
6083       bl.append(bp);
6084       bl.copy_in(0, op.op.xattr.name_len, op.indata);
6085       op.indata.claim(bl);
6086     } else if (ceph_osd_op_type_exec(op.op.op) &&
6087                op.op.cls.class_len &&
6088                op.indata.length() >
6089                  (op.op.cls.class_len + op.op.cls.method_len)) {
6090       __u8 len = op.op.cls.class_len + op.op.cls.method_len;
6091       bufferptr bp(len);
6092       bufferlist bl;
6093       bl.append(bp);
6094       bl.copy_in(0, len, op.indata);
6095       op.indata.claim(bl);
6096     } else {
6097       op.indata.clear();
6098     }
6099   }
6100 }
6101