Fix some bugs when testing opensds ansible
[stor4nfv.git] / src / ceph / src / rgw / rgw_rados.h
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
3
4 #ifndef CEPH_RGWRADOS_H
5 #define CEPH_RGWRADOS_H
6
7 #include <functional>
8
9 #include "include/rados/librados.hpp"
10 #include "include/Context.h"
11 #include "common/RefCountedObj.h"
12 #include "common/RWLock.h"
13 #include "common/ceph_time.h"
14 #include "common/lru_map.h"
15 #include "rgw_common.h"
16 #include "cls/rgw/cls_rgw_types.h"
17 #include "cls/version/cls_version_types.h"
18 #include "cls/log/cls_log_types.h"
19 #include "cls/statelog/cls_statelog_types.h"
20 #include "cls/timeindex/cls_timeindex_types.h"
21 #include "rgw_log.h"
22 #include "rgw_metadata.h"
23 #include "rgw_meta_sync_status.h"
24 #include "rgw_period_puller.h"
25 #include "rgw_sync_module.h"
26
27 class RGWWatcher;
28 class SafeTimer;
29 class ACLOwner;
30 class RGWGC;
31 class RGWMetaNotifier;
32 class RGWDataNotifier;
33 class RGWLC;
34 class RGWObjectExpirer;
35 class RGWMetaSyncProcessorThread;
36 class RGWDataSyncProcessorThread;
37 class RGWSyncLogTrimThread;
38 class RGWRESTConn;
39 struct RGWZoneGroup;
40 struct RGWZoneParams;
41 class RGWReshard;
42 class RGWReshardWait;
43
44 /* flags for put_obj_meta() */
45 #define PUT_OBJ_CREATE      0x01
46 #define PUT_OBJ_EXCL        0x02
47 #define PUT_OBJ_CREATE_EXCL (PUT_OBJ_CREATE | PUT_OBJ_EXCL)
48
49 #define RGW_OBJ_NS_MULTIPART "multipart"
50 #define RGW_OBJ_NS_SHADOW    "shadow"
51
52 #define RGW_BUCKET_INSTANCE_MD_PREFIX ".bucket.meta."
53
54 #define RGW_NO_SHARD -1
55
56 #define RGW_SHARDS_PRIME_0 7877
57 #define RGW_SHARDS_PRIME_1 65521
58
59 static inline int rgw_shards_mod(unsigned hval, int max_shards)
60 {
61   if (max_shards <= RGW_SHARDS_PRIME_0) {
62     return hval % RGW_SHARDS_PRIME_0 % max_shards;
63   }
64   return hval % RGW_SHARDS_PRIME_1 % max_shards;
65 }
66
67 static inline int rgw_shards_hash(const string& key, int max_shards)
68 {
69   return rgw_shards_mod(ceph_str_hash_linux(key.c_str(), key.size()), max_shards);
70 }
71
72 static inline int rgw_shards_max()
73 {
74   return RGW_SHARDS_PRIME_1;
75 }
76
77 static inline void prepend_bucket_marker(const rgw_bucket& bucket, const string& orig_oid, string& oid)
78 {
79   if (bucket.marker.empty() || orig_oid.empty()) {
80     oid = orig_oid;
81   } else {
82     oid = bucket.marker;
83     oid.append("_");
84     oid.append(orig_oid);
85   }
86 }
87
88 static inline void get_obj_bucket_and_oid_loc(const rgw_obj& obj, string& oid, string& locator)
89 {
90   const rgw_bucket& bucket = obj.bucket;
91   prepend_bucket_marker(bucket, obj.get_oid(), oid);
92   const string& loc = obj.key.get_loc();
93   if (!loc.empty()) {
94     prepend_bucket_marker(bucket, loc, locator);
95   } else {
96     locator.clear();
97   }
98 }
99
100 int rgw_init_ioctx(librados::Rados *rados, const rgw_pool& pool, librados::IoCtx& ioctx, bool create = false);
101
102 int rgw_policy_from_attrset(CephContext *cct, map<string, bufferlist>& attrset, RGWAccessControlPolicy *policy);
103
104 static inline bool rgw_raw_obj_to_obj(const rgw_bucket& bucket, const rgw_raw_obj& raw_obj, rgw_obj *obj)
105 {
106   ssize_t pos = raw_obj.oid.find('_');
107   if (pos < 0) {
108     return false;
109   }
110
111   if (!rgw_obj_key::parse_raw_oid(raw_obj.oid.substr(pos + 1), &obj->key)) {
112     return false;
113   }
114   obj->bucket = bucket;
115
116   return true;
117 }
118
119 struct rgw_bucket_placement {
120   string placement_rule;
121   rgw_bucket bucket;
122
123   void dump(Formatter *f) const;
124 };
125
126 class rgw_obj_select {
127   string placement_rule;
128   rgw_obj obj;
129   rgw_raw_obj raw_obj;
130   bool is_raw;
131
132 public:
133   rgw_obj_select() : is_raw(false) {}
134   rgw_obj_select(const rgw_obj& _obj) : obj(_obj), is_raw(false) {}
135   rgw_obj_select(const rgw_raw_obj& _raw_obj) : raw_obj(_raw_obj), is_raw(true) {}
136   rgw_obj_select(const rgw_obj_select& rhs) {
137     placement_rule = rhs.placement_rule;
138     is_raw = rhs.is_raw;
139     if (is_raw) {
140       raw_obj = rhs.raw_obj;
141     } else {
142       obj = rhs.obj;
143     }
144   }
145
146   rgw_raw_obj get_raw_obj(const RGWZoneGroup& zonegroup, const RGWZoneParams& zone_params) const;
147   rgw_raw_obj get_raw_obj(RGWRados *store) const;
148
149   rgw_obj_select& operator=(const rgw_obj& rhs) {
150     obj = rhs;
151     is_raw = false;
152     return *this;
153   }
154
155   rgw_obj_select& operator=(const rgw_raw_obj& rhs) {
156     raw_obj = rhs;
157     is_raw = true;
158     return *this;
159   }
160
161   void set_placement_rule(const string& rule) {
162     placement_rule = rule;
163   }
164 };
165
166 struct compression_block {
167   uint64_t old_ofs;
168   uint64_t new_ofs;
169   uint64_t len;
170
171   void encode(bufferlist& bl) const {
172     ENCODE_START(1, 1, bl);
173     ::encode(old_ofs, bl);
174     ::encode(new_ofs, bl);
175     ::encode(len, bl);
176     ENCODE_FINISH(bl);
177   }
178
179   void decode(bufferlist::iterator& bl) {
180      DECODE_START(1, bl);
181      ::decode(old_ofs, bl);
182      ::decode(new_ofs, bl);
183      ::decode(len, bl);
184      DECODE_FINISH(bl);
185   }
186 };
187 WRITE_CLASS_ENCODER(compression_block)
188
189 struct RGWCompressionInfo {
190   string compression_type;
191   uint64_t orig_size;
192   vector<compression_block> blocks;
193
194   RGWCompressionInfo() : compression_type("none"), orig_size(0) {}
195   RGWCompressionInfo(const RGWCompressionInfo& cs_info) : compression_type(cs_info.compression_type),
196                                                           orig_size(cs_info.orig_size),
197                                                           blocks(cs_info.blocks) {}
198
199   void encode(bufferlist& bl) const {
200     ENCODE_START(1, 1, bl);
201     ::encode(compression_type, bl);
202     ::encode(orig_size, bl);
203     ::encode(blocks, bl);
204     ENCODE_FINISH(bl);
205   }
206
207   void decode(bufferlist::iterator& bl) {
208      DECODE_START(1, bl);
209      ::decode(compression_type, bl);
210      ::decode(orig_size, bl);
211      ::decode(blocks, bl);
212      DECODE_FINISH(bl);
213   }
214 };
215 WRITE_CLASS_ENCODER(RGWCompressionInfo)
216
217 int rgw_compression_info_from_attrset(map<string, bufferlist>& attrs, bool& need_decompress, RGWCompressionInfo& cs_info);
218
219 struct RGWOLHInfo {
220   rgw_obj target;
221   bool removed;
222
223   RGWOLHInfo() : removed(false) {}
224
225   void encode(bufferlist& bl) const {
226     ENCODE_START(1, 1, bl);
227     ::encode(target, bl);
228     ::encode(removed, bl);
229     ENCODE_FINISH(bl);
230   }
231
232   void decode(bufferlist::iterator& bl) {
233      DECODE_START(1, bl);
234      ::decode(target, bl);
235      ::decode(removed, bl);
236      DECODE_FINISH(bl);
237   }
238   static void generate_test_instances(list<RGWOLHInfo*>& o);
239   void dump(Formatter *f) const;
240 };
241 WRITE_CLASS_ENCODER(RGWOLHInfo)
242
243 struct RGWOLHPendingInfo {
244   ceph::real_time time;
245
246   RGWOLHPendingInfo() {}
247
248   void encode(bufferlist& bl) const {
249     ENCODE_START(1, 1, bl);
250     ::encode(time, bl);
251     ENCODE_FINISH(bl);
252   }
253
254   void decode(bufferlist::iterator& bl) {
255      DECODE_START(1, bl);
256      ::decode(time, bl);
257      DECODE_FINISH(bl);
258   }
259
260   void dump(Formatter *f) const;
261 };
262 WRITE_CLASS_ENCODER(RGWOLHPendingInfo)
263
264 struct RGWUsageBatch {
265   map<ceph::real_time, rgw_usage_log_entry> m;
266
267   void insert(ceph::real_time& t, rgw_usage_log_entry& entry, bool *account) {
268     bool exists = m.find(t) != m.end();
269     *account = !exists;
270     m[t].aggregate(entry);
271   }
272 };
273
274 struct RGWUsageIter {
275   string read_iter;
276   uint32_t index;
277
278   RGWUsageIter() : index(0) {}
279 };
280
281 class RGWGetDataCB {
282 protected:
283   uint64_t extra_data_len;
284 public:
285   virtual int handle_data(bufferlist& bl, off_t bl_ofs, off_t bl_len) = 0;
286   RGWGetDataCB() : extra_data_len(0) {}
287   virtual ~RGWGetDataCB() {}
288   virtual void set_extra_data_len(uint64_t len) {
289     extra_data_len = len;
290   }
291   /**
292    * Flushes any cached data. Used by RGWGetObjFilter.
293    * Return logic same as handle_data.
294    */
295   virtual int flush() {
296     return 0;
297   }
298   /**
299    * Allows to extend fetch range of RGW object. Used by RGWGetObjFilter.
300    */
301   virtual int fixup_range(off_t& bl_ofs, off_t& bl_end) {
302     return 0;
303   }
304 };
305
306 class RGWAccessListFilter {
307 public:
308   virtual ~RGWAccessListFilter() {}
309   virtual bool filter(string& name, string& key) = 0;
310 };
311
312 struct RGWCloneRangeInfo {
313   rgw_obj src;
314   off_t src_ofs;
315   off_t dst_ofs;
316   uint64_t len;
317 };
318
319 struct RGWObjManifestPart {
320   rgw_obj loc;   /* the object where the data is located */
321   uint64_t loc_ofs;  /* the offset at that object where the data is located */
322   uint64_t size;     /* the part size */
323
324   RGWObjManifestPart() : loc_ofs(0), size(0) {}
325
326   void encode(bufferlist& bl) const {
327     ENCODE_START(2, 2, bl);
328     ::encode(loc, bl);
329     ::encode(loc_ofs, bl);
330     ::encode(size, bl);
331     ENCODE_FINISH(bl);
332   }
333
334   void decode(bufferlist::iterator& bl) {
335      DECODE_START_LEGACY_COMPAT_LEN_32(2, 2, 2, bl);
336      ::decode(loc, bl);
337      ::decode(loc_ofs, bl);
338      ::decode(size, bl);
339      DECODE_FINISH(bl);
340   }
341
342   void dump(Formatter *f) const;
343   static void generate_test_instances(list<RGWObjManifestPart*>& o);
344 };
345 WRITE_CLASS_ENCODER(RGWObjManifestPart)
346
347 /*
348  The manifest defines a set of rules for structuring the object parts.
349  There are a few terms to note:
350      - head: the head part of the object, which is the part that contains
351        the first chunk of data. An object might not have a head (as in the
352        case of multipart-part objects).
353      - stripe: data portion of a single rgw object that resides on a single
354        rados object.
355      - part: a collection of stripes that make a contiguous part of an
356        object. A regular object will only have one part (although might have
357        many stripes), a multipart object might have many parts. Each part
358        has a fixed stripe size, although the last stripe of a part might
359        be smaller than that. Consecutive parts may be merged if their stripe
360        value is the same.
361 */
362
363 struct RGWObjManifestRule {
364   uint32_t start_part_num;
365   uint64_t start_ofs;
366   uint64_t part_size; /* each part size, 0 if there's no part size, meaning it's unlimited */
367   uint64_t stripe_max_size; /* underlying obj max size */
368   string override_prefix;
369
370   RGWObjManifestRule() : start_part_num(0), start_ofs(0), part_size(0), stripe_max_size(0) {}
371   RGWObjManifestRule(uint32_t _start_part_num, uint64_t _start_ofs, uint64_t _part_size, uint64_t _stripe_max_size) :
372                        start_part_num(_start_part_num), start_ofs(_start_ofs), part_size(_part_size), stripe_max_size(_stripe_max_size) {}
373
374   void encode(bufferlist& bl) const {
375     ENCODE_START(2, 1, bl);
376     ::encode(start_part_num, bl);
377     ::encode(start_ofs, bl);
378     ::encode(part_size, bl);
379     ::encode(stripe_max_size, bl);
380     ::encode(override_prefix, bl);
381     ENCODE_FINISH(bl);
382   }
383
384   void decode(bufferlist::iterator& bl) {
385     DECODE_START(2, bl);
386     ::decode(start_part_num, bl);
387     ::decode(start_ofs, bl);
388     ::decode(part_size, bl);
389     ::decode(stripe_max_size, bl);
390     if (struct_v >= 2)
391       ::decode(override_prefix, bl);
392     DECODE_FINISH(bl);
393   }
394   void dump(Formatter *f) const;
395 };
396 WRITE_CLASS_ENCODER(RGWObjManifestRule)
397
398 class RGWObjManifest {
399 protected:
400   bool explicit_objs; /* old manifest? */
401   map<uint64_t, RGWObjManifestPart> objs;
402
403   uint64_t obj_size;
404
405   rgw_obj obj;
406   uint64_t head_size;
407   string head_placement_rule;
408
409   uint64_t max_head_size;
410   string prefix;
411   rgw_bucket_placement tail_placement; /* might be different than the original bucket,
412                                        as object might have been copied across pools */
413   map<uint64_t, RGWObjManifestRule> rules;
414
415   string tail_instance; /* tail object's instance */
416
417   void convert_to_explicit(const RGWZoneGroup& zonegroup, const RGWZoneParams& zone_params);
418   int append_explicit(RGWObjManifest& m, const RGWZoneGroup& zonegroup, const RGWZoneParams& zone_params);
419   void append_rules(RGWObjManifest& m, map<uint64_t, RGWObjManifestRule>::iterator& iter, string *override_prefix);
420
421   void update_iterators() {
422     begin_iter.seek(0);
423     end_iter.seek(obj_size);
424   }
425 public:
426
427   RGWObjManifest() : explicit_objs(false), obj_size(0), head_size(0), max_head_size(0),
428                      begin_iter(this), end_iter(this) {}
429   RGWObjManifest(const RGWObjManifest& rhs) {
430     *this = rhs;
431   }
432   RGWObjManifest& operator=(const RGWObjManifest& rhs) {
433     explicit_objs = rhs.explicit_objs;
434     objs = rhs.objs;
435     obj_size = rhs.obj_size;
436     obj = rhs.obj;
437     head_size = rhs.head_size;
438     max_head_size = rhs.max_head_size;
439     prefix = rhs.prefix;
440     tail_placement = rhs.tail_placement;
441     rules = rhs.rules;
442     tail_instance = rhs.tail_instance;
443
444     begin_iter.set_manifest(this);
445     end_iter.set_manifest(this);
446
447     begin_iter.seek(rhs.begin_iter.get_ofs());
448     end_iter.seek(rhs.end_iter.get_ofs());
449
450     return *this;
451   }
452
453   map<uint64_t, RGWObjManifestPart>& get_explicit_objs() {
454     return objs;
455   }
456
457
458   void set_explicit(uint64_t _size, map<uint64_t, RGWObjManifestPart>& _objs) {
459     explicit_objs = true;
460     obj_size = _size;
461     objs.swap(_objs);
462   }
463
464   void get_implicit_location(uint64_t cur_part_id, uint64_t cur_stripe, uint64_t ofs, string *override_prefix, rgw_obj_select *location);
465
466   void set_trivial_rule(uint64_t tail_ofs, uint64_t stripe_max_size) {
467     RGWObjManifestRule rule(0, tail_ofs, 0, stripe_max_size);
468     rules[0] = rule;
469     max_head_size = tail_ofs;
470   }
471
472   void set_multipart_part_rule(uint64_t stripe_max_size, uint64_t part_num) {
473     RGWObjManifestRule rule(0, 0, 0, stripe_max_size);
474     rule.start_part_num = part_num;
475     rules[0] = rule;
476     max_head_size = 0;
477   }
478
479   void encode(bufferlist& bl) const {
480     ENCODE_START(7, 6, bl);
481     ::encode(obj_size, bl);
482     ::encode(objs, bl);
483     ::encode(explicit_objs, bl);
484     ::encode(obj, bl);
485     ::encode(head_size, bl);
486     ::encode(max_head_size, bl);
487     ::encode(prefix, bl);
488     ::encode(rules, bl);
489     bool encode_tail_bucket = !(tail_placement.bucket == obj.bucket);
490     ::encode(encode_tail_bucket, bl);
491     if (encode_tail_bucket) {
492       ::encode(tail_placement.bucket, bl);
493     }
494     bool encode_tail_instance = (tail_instance != obj.key.instance);
495     ::encode(encode_tail_instance, bl);
496     if (encode_tail_instance) {
497       ::encode(tail_instance, bl);
498     }
499     ::encode(head_placement_rule, bl);
500     ::encode(tail_placement.placement_rule, bl);
501     ENCODE_FINISH(bl);
502   }
503
504   void decode(bufferlist::iterator& bl) {
505     DECODE_START_LEGACY_COMPAT_LEN_32(7, 2, 2, bl);
506     ::decode(obj_size, bl);
507     ::decode(objs, bl);
508     if (struct_v >= 3) {
509       ::decode(explicit_objs, bl);
510       ::decode(obj, bl);
511       ::decode(head_size, bl);
512       ::decode(max_head_size, bl);
513       ::decode(prefix, bl);
514       ::decode(rules, bl);
515     } else {
516       explicit_objs = true;
517       if (!objs.empty()) {
518         map<uint64_t, RGWObjManifestPart>::iterator iter = objs.begin();
519         obj = iter->second.loc;
520         head_size = iter->second.size;
521         max_head_size = head_size;
522       }
523     }
524
525     if (explicit_objs && head_size > 0 && !objs.empty()) {
526       /* patch up manifest due to issue 16435:
527        * the first object in the explicit objs list might not be the one we need to access, use the
528        * head object instead if set. This would happen if we had an old object that was created
529        * when the explicit objs manifest was around, and it got copied.
530        */
531       rgw_obj& obj_0 = objs[0].loc;
532       if (!obj_0.get_oid().empty() && obj_0.key.ns.empty()) {
533         objs[0].loc = obj;
534         objs[0].size = head_size;
535       }
536     }
537
538     if (struct_v >= 4) {
539       if (struct_v < 6) {
540         ::decode(tail_placement.bucket, bl);
541       } else {
542         bool need_to_decode;
543         ::decode(need_to_decode, bl);
544         if (need_to_decode) {
545           ::decode(tail_placement.bucket, bl);
546         } else {
547           tail_placement.bucket = obj.bucket;
548         }
549       }
550     }
551
552     if (struct_v >= 5) {
553       if (struct_v < 6) {
554         ::decode(tail_instance, bl);
555       } else {
556         bool need_to_decode;
557         ::decode(need_to_decode, bl);
558         if (need_to_decode) {
559           ::decode(tail_instance, bl);
560         } else {
561           tail_instance = obj.key.instance;
562         }
563       }
564     } else { // old object created before 'tail_instance' field added to manifest
565       tail_instance = obj.key.instance;
566     }
567
568     if (struct_v >= 7) {
569       ::decode(head_placement_rule, bl);
570       ::decode(tail_placement.placement_rule, bl);
571     }
572
573     update_iterators();
574     DECODE_FINISH(bl);
575   }
576
577   void dump(Formatter *f) const;
578   static void generate_test_instances(list<RGWObjManifest*>& o);
579
580   int append(RGWObjManifest& m, RGWZoneGroup& zonegroup, RGWZoneParams& zone_params);
581   int append(RGWObjManifest& m, RGWRados *store);
582
583   bool get_rule(uint64_t ofs, RGWObjManifestRule *rule);
584
585   bool empty() {
586     if (explicit_objs)
587       return objs.empty();
588     return rules.empty();
589   }
590
591   bool has_explicit_objs() {
592     return explicit_objs;
593   }
594
595   bool has_tail() {
596     if (explicit_objs) {
597       if (objs.size() == 1) {
598         map<uint64_t, RGWObjManifestPart>::iterator iter = objs.begin();
599         rgw_obj& o = iter->second.loc;
600         return !(obj == o);
601       }
602       return (objs.size() >= 2);
603     }
604     return (obj_size > head_size);
605   }
606
607   void set_head(const string& placement_rule, const rgw_obj& _o, uint64_t _s) {
608     head_placement_rule = placement_rule;
609     obj = _o;
610     head_size = _s;
611
612     if (explicit_objs && head_size > 0) {
613       objs[0].loc = obj;
614       objs[0].size = head_size;
615     }
616   }
617
618   const rgw_obj& get_obj() {
619     return obj;
620   }
621
622   void set_tail_placement(const string& placement_rule, const rgw_bucket& _b) {
623     tail_placement.placement_rule = placement_rule;
624     tail_placement.bucket = _b;
625   }
626
627   const rgw_bucket_placement& get_tail_placement() {
628     return tail_placement;
629   }
630
631   const string& get_head_placement_rule() {
632     return head_placement_rule;
633   }
634
635   void set_prefix(const string& _p) {
636     prefix = _p;
637   }
638
639   const string& get_prefix() {
640     return prefix;
641   }
642
643   void set_tail_instance(const string& _ti) {
644     tail_instance = _ti;
645   }
646
647   const string& get_tail_instance() {
648     return tail_instance;
649   }
650
651   void set_head_size(uint64_t _s) {
652     head_size = _s;
653   }
654
655   void set_obj_size(uint64_t s) {
656     obj_size = s;
657
658     update_iterators();
659   }
660
661   uint64_t get_obj_size() {
662     return obj_size;
663   }
664
665   uint64_t get_head_size() {
666     return head_size;
667   }
668
669   void set_max_head_size(uint64_t s) {
670     max_head_size = s;
671   }
672
673   uint64_t get_max_head_size() {
674     return max_head_size;
675   }
676
677   class obj_iterator {
678     RGWObjManifest *manifest;
679     uint64_t part_ofs; /* where current part starts */
680     uint64_t stripe_ofs; /* where current stripe starts */
681     uint64_t ofs;       /* current position within the object */
682     uint64_t stripe_size;      /* current part size */
683
684     int cur_part_id;
685     int cur_stripe;
686     string cur_override_prefix;
687
688     rgw_obj_select location;
689
690     map<uint64_t, RGWObjManifestRule>::iterator rule_iter;
691     map<uint64_t, RGWObjManifestRule>::iterator next_rule_iter;
692
693     map<uint64_t, RGWObjManifestPart>::iterator explicit_iter;
694
695     void init() {
696       part_ofs = 0;
697       stripe_ofs = 0;
698       ofs = 0;
699       stripe_size = 0;
700       cur_part_id = 0;
701       cur_stripe = 0;
702     }
703
704     void update_explicit_pos();
705
706
707   protected:
708
709     void set_manifest(RGWObjManifest *m) {
710       manifest = m;
711     }
712
713   public:
714     obj_iterator() : manifest(NULL) {
715       init();
716     }
717     explicit obj_iterator(RGWObjManifest *_m) : manifest(_m) {
718       init();
719       if (!manifest->empty()) {
720         seek(0);
721       }
722     }
723     obj_iterator(RGWObjManifest *_m, uint64_t _ofs) : manifest(_m) {
724       init();
725       if (!manifest->empty()) {
726         seek(_ofs);
727       }
728     }
729     void seek(uint64_t ofs);
730
731     void operator++();
732     bool operator==(const obj_iterator& rhs) {
733       return (ofs == rhs.ofs);
734     }
735     bool operator!=(const obj_iterator& rhs) {
736       return (ofs != rhs.ofs);
737     }
738     const rgw_obj_select& get_location() {
739       return location;
740     }
741
742     /* start of current stripe */
743     uint64_t get_stripe_ofs() {
744       if (manifest->explicit_objs) {
745         return explicit_iter->first;
746       }
747       return stripe_ofs;
748     }
749
750     /* current ofs relative to start of rgw object */
751     uint64_t get_ofs() const {
752       return ofs;
753     }
754
755     /* stripe number */
756     int get_cur_stripe() const {
757       return cur_stripe;
758     }
759
760     /* current stripe size */
761     uint64_t get_stripe_size() {
762       if (manifest->explicit_objs) {
763         return explicit_iter->second.size;
764       }
765       return stripe_size;
766     }
767
768     /* offset where data starts within current stripe */
769     uint64_t location_ofs() {
770       if (manifest->explicit_objs) {
771         return explicit_iter->second.loc_ofs;
772       }
773       return 0; /* all stripes start at zero offset */
774     }
775
776     void update_location();
777
778     friend class RGWObjManifest;
779   };
780
781   const obj_iterator& obj_begin();
782   const obj_iterator& obj_end();
783   obj_iterator obj_find(uint64_t ofs);
784
785   obj_iterator begin_iter;
786   obj_iterator end_iter;
787
788   /*
789    * simple object generator. Using a simple single rule manifest.
790    */
791   class generator {
792     RGWObjManifest *manifest;
793     uint64_t last_ofs;
794     uint64_t cur_part_ofs;
795     int cur_part_id;
796     int cur_stripe;
797     uint64_t cur_stripe_size;
798     string cur_oid;
799     
800     string oid_prefix;
801
802     rgw_obj_select cur_obj;
803
804     RGWObjManifestRule rule;
805
806   public:
807     generator() : manifest(NULL), last_ofs(0), cur_part_ofs(0), cur_part_id(0), 
808                   cur_stripe(0), cur_stripe_size(0) {}
809     int create_begin(CephContext *cct, RGWObjManifest *manifest, const string& placement_rule, rgw_bucket& bucket, rgw_obj& obj);
810
811     int create_next(uint64_t ofs);
812
813     rgw_raw_obj get_cur_obj(RGWZoneGroup& zonegroup, RGWZoneParams& zone_params) { return cur_obj.get_raw_obj(zonegroup, zone_params); }
814     rgw_raw_obj get_cur_obj(RGWRados *store) { return cur_obj.get_raw_obj(store); }
815
816     /* total max size of current stripe (including head obj) */
817     uint64_t cur_stripe_max_size() {
818       return cur_stripe_size;
819     }
820   };
821 };
822 WRITE_CLASS_ENCODER(RGWObjManifest)
823
824 struct RGWUploadPartInfo {
825   uint32_t num;
826   uint64_t size;
827   uint64_t accounted_size{0};
828   string etag;
829   ceph::real_time modified;
830   RGWObjManifest manifest;
831   RGWCompressionInfo cs_info;
832
833   RGWUploadPartInfo() : num(0), size(0) {}
834
835   void encode(bufferlist& bl) const {
836     ENCODE_START(4, 2, bl);
837     ::encode(num, bl);
838     ::encode(size, bl);
839     ::encode(etag, bl);
840     ::encode(modified, bl);
841     ::encode(manifest, bl);
842     ::encode(cs_info, bl);
843     ::encode(accounted_size, bl);
844     ENCODE_FINISH(bl);
845   }
846   void decode(bufferlist::iterator& bl) {
847     DECODE_START_LEGACY_COMPAT_LEN(4, 2, 2, bl);
848     ::decode(num, bl);
849     ::decode(size, bl);
850     ::decode(etag, bl);
851     ::decode(modified, bl);
852     if (struct_v >= 3)
853       ::decode(manifest, bl);
854     if (struct_v >= 4) {
855       ::decode(cs_info, bl);
856       ::decode(accounted_size, bl);
857     } else {
858       accounted_size = size;
859     }
860     DECODE_FINISH(bl);
861   }
862   void dump(Formatter *f) const;
863   static void generate_test_instances(list<RGWUploadPartInfo*>& o);
864 };
865 WRITE_CLASS_ENCODER(RGWUploadPartInfo)
866
867 struct RGWObjState {
868   rgw_obj obj;
869   bool is_atomic;
870   bool has_attrs;
871   bool exists;
872   uint64_t size; //< size of raw object
873   uint64_t accounted_size{0}; //< size before compression, encryption
874   ceph::real_time mtime;
875   uint64_t epoch;
876   bufferlist obj_tag;
877   bufferlist tail_tag;
878   string write_tag;
879   bool fake_tag;
880   RGWObjManifest manifest;
881   bool has_manifest;
882   string shadow_obj;
883   bool has_data;
884   bufferlist data;
885   bool prefetch_data;
886   bool keep_tail;
887   bool is_olh;
888   bufferlist olh_tag;
889   uint64_t pg_ver;
890   uint32_t zone_short_id;
891
892   /* important! don't forget to update copy constructor */
893
894   RGWObjVersionTracker objv_tracker;
895
896   map<string, bufferlist> attrset;
897   RGWObjState() : is_atomic(false), has_attrs(0), exists(false),
898                   size(0), epoch(0), fake_tag(false), has_manifest(false),
899                   has_data(false), prefetch_data(false), keep_tail(false), is_olh(false),
900                   pg_ver(0), zone_short_id(0) {}
901   RGWObjState(const RGWObjState& rhs) : obj (rhs.obj) {
902     is_atomic = rhs.is_atomic;
903     has_attrs = rhs.has_attrs;
904     exists = rhs.exists;
905     size = rhs.size;
906     accounted_size = rhs.accounted_size;
907     mtime = rhs.mtime;
908     epoch = rhs.epoch;
909     if (rhs.obj_tag.length()) {
910       obj_tag = rhs.obj_tag;
911     }
912     if (rhs.tail_tag.length()) {
913       tail_tag = rhs.tail_tag;
914     }
915     write_tag = rhs.write_tag;
916     fake_tag = rhs.fake_tag;
917     if (rhs.has_manifest) {
918       manifest = rhs.manifest;
919     }
920     has_manifest = rhs.has_manifest;
921     shadow_obj = rhs.shadow_obj;
922     has_data = rhs.has_data;
923     if (rhs.data.length()) {
924       data = rhs.data;
925     }
926     prefetch_data = rhs.prefetch_data;
927     keep_tail = rhs.keep_tail;
928     is_olh = rhs.is_olh;
929     objv_tracker = rhs.objv_tracker;
930     pg_ver = rhs.pg_ver;
931   }
932
933   bool get_attr(string name, bufferlist& dest) {
934     map<string, bufferlist>::iterator iter = attrset.find(name);
935     if (iter != attrset.end()) {
936       dest = iter->second;
937       return true;
938     }
939     return false;
940   }
941 };
942
943 struct RGWRawObjState {
944   rgw_raw_obj obj;
945   bool has_attrs{false};
946   bool exists{false};
947   uint64_t size{0};
948   ceph::real_time mtime;
949   uint64_t epoch;
950   bufferlist obj_tag;
951   bool has_data{false};
952   bufferlist data;
953   bool prefetch_data{false};
954   uint64_t pg_ver{0};
955
956   /* important! don't forget to update copy constructor */
957
958   RGWObjVersionTracker objv_tracker;
959
960   map<string, bufferlist> attrset;
961   RGWRawObjState() {}
962   RGWRawObjState(const RGWRawObjState& rhs) : obj (rhs.obj) {
963     has_attrs = rhs.has_attrs;
964     exists = rhs.exists;
965     size = rhs.size;
966     mtime = rhs.mtime;
967     epoch = rhs.epoch;
968     if (rhs.obj_tag.length()) {
969       obj_tag = rhs.obj_tag;
970     }
971     has_data = rhs.has_data;
972     if (rhs.data.length()) {
973       data = rhs.data;
974     }
975     prefetch_data = rhs.prefetch_data;
976     pg_ver = rhs.pg_ver;
977     objv_tracker = rhs.objv_tracker;
978   }
979 };
980
981 struct RGWPoolIterCtx {
982   librados::IoCtx io_ctx;
983   librados::NObjectIterator iter;
984 };
985
986 struct RGWListRawObjsCtx {
987   bool initialized;
988   RGWPoolIterCtx iter_ctx;
989
990   RGWListRawObjsCtx() : initialized(false) {}
991 };
992
993 struct RGWDefaultSystemMetaObjInfo {
994   string default_id;
995
996   void encode(bufferlist& bl) const {
997     ENCODE_START(1, 1, bl);
998     ::encode(default_id, bl);
999     ENCODE_FINISH(bl);
1000   }
1001
1002   void decode(bufferlist::iterator& bl) {
1003     DECODE_START(1, bl);
1004     ::decode(default_id, bl);
1005     DECODE_FINISH(bl);
1006   }
1007
1008   void dump(Formatter *f) const;
1009   void decode_json(JSONObj *obj);
1010 };
1011 WRITE_CLASS_ENCODER(RGWDefaultSystemMetaObjInfo)
1012
1013 struct RGWNameToId {
1014   string obj_id;
1015
1016   void encode(bufferlist& bl) const {
1017     ENCODE_START(1, 1, bl);
1018     ::encode(obj_id, bl);
1019     ENCODE_FINISH(bl);
1020   }
1021
1022   void decode(bufferlist::iterator& bl) {
1023     DECODE_START(1, bl);
1024     ::decode(obj_id, bl);
1025     DECODE_FINISH(bl);
1026   }
1027
1028   void dump(Formatter *f) const;
1029   void decode_json(JSONObj *obj);
1030 };
1031 WRITE_CLASS_ENCODER(RGWNameToId)
1032
1033 class RGWSystemMetaObj {
1034 protected:
1035   string id;
1036   string name;
1037
1038   CephContext *cct;
1039   RGWRados *store;
1040
1041   int store_name(bool exclusive);
1042   int store_info(bool exclusive);
1043   int read_info(const string& obj_id, bool old_format = false);
1044   int read_id(const string& obj_name, string& obj_id);
1045   int read_default(RGWDefaultSystemMetaObjInfo& default_info,
1046                    const string& oid);
1047   /* read and use default id */
1048   int use_default(bool old_format = false);
1049
1050 public:
1051   RGWSystemMetaObj() : cct(NULL), store(NULL) {}
1052   RGWSystemMetaObj(const string& _name): name(_name), cct(NULL), store(NULL)  {}
1053   RGWSystemMetaObj(const string& _id, const string& _name) : id(_id), name(_name), cct(NULL), store(NULL) {}
1054   RGWSystemMetaObj(CephContext *_cct, RGWRados *_store): cct(_cct), store(_store){}
1055   RGWSystemMetaObj(const string& _name, CephContext *_cct, RGWRados *_store): name(_name), cct(_cct), store(_store){}
1056   const string& get_name() const { return name; }
1057   const string& get_id() const { return id; }
1058
1059   void set_name(const string& _name) { name = _name;}
1060   void set_id(const string& _id) { id = _id;}
1061   void clear_id() { id.clear(); }
1062
1063   virtual ~RGWSystemMetaObj() {}
1064
1065   virtual void encode(bufferlist& bl) const {
1066     ENCODE_START(1, 1, bl);
1067     ::encode(id, bl);
1068     ::encode(name, bl);
1069     ENCODE_FINISH(bl);
1070   }
1071
1072   virtual void decode(bufferlist::iterator& bl) {
1073     DECODE_START(1, bl);
1074     ::decode(id, bl);
1075     ::decode(name, bl);
1076     DECODE_FINISH(bl);
1077   }
1078
1079   void reinit_instance(CephContext *_cct, RGWRados *_store) {
1080     cct = _cct;
1081     store = _store;
1082   }
1083   int init(CephContext *_cct, RGWRados *_store, bool setup_obj = true, bool old_format = false);
1084   virtual int read_default_id(string& default_id, bool old_format = false);
1085   virtual int set_as_default(bool exclusive = false);
1086   int delete_default();
1087   virtual int create(bool exclusive = true);
1088   int delete_obj(bool old_format = false);
1089   int rename(const string& new_name);
1090   int update() { return store_info(false);}
1091   int update_name() { return store_name(false);}
1092   int read();
1093   int write(bool exclusive);
1094
1095   virtual rgw_pool get_pool(CephContext *cct) = 0;
1096   virtual const string get_default_oid(bool old_format = false) = 0;
1097   virtual const string& get_names_oid_prefix() = 0;
1098   virtual const string& get_info_oid_prefix(bool old_format = false) = 0;
1099   virtual const string& get_predefined_name(CephContext *cct) = 0;
1100
1101   void dump(Formatter *f) const;
1102   void decode_json(JSONObj *obj);
1103 };
1104 WRITE_CLASS_ENCODER(RGWSystemMetaObj)
1105
1106 struct RGWZonePlacementInfo {
1107   rgw_pool index_pool;
1108   rgw_pool data_pool;
1109   rgw_pool data_extra_pool; /* if not set we should use data_pool */
1110   RGWBucketIndexType index_type;
1111   std::string compression_type;
1112
1113   RGWZonePlacementInfo() : index_type(RGWBIType_Normal) {}
1114
1115   void encode(bufferlist& bl) const {
1116     ENCODE_START(6, 1, bl);
1117     ::encode(index_pool.to_str(), bl);
1118     ::encode(data_pool.to_str(), bl);
1119     ::encode(data_extra_pool.to_str(), bl);
1120     ::encode((uint32_t)index_type, bl);
1121     ::encode(compression_type, bl);
1122     ENCODE_FINISH(bl);
1123   }
1124
1125   void decode(bufferlist::iterator& bl) {
1126     DECODE_START(6, bl);
1127     string index_pool_str;
1128     string data_pool_str;
1129     ::decode(index_pool_str, bl);
1130     index_pool = rgw_pool(index_pool_str);
1131     ::decode(data_pool_str, bl);
1132     data_pool = rgw_pool(data_pool_str);
1133     if (struct_v >= 4) {
1134       string data_extra_pool_str;
1135       ::decode(data_extra_pool_str, bl);
1136       data_extra_pool = rgw_pool(data_extra_pool_str);
1137     }
1138     if (struct_v >= 5) {
1139       uint32_t it;
1140       ::decode(it, bl);
1141       index_type = (RGWBucketIndexType)it;
1142     }
1143     if (struct_v >= 6) {
1144       ::decode(compression_type, bl);
1145     }
1146     DECODE_FINISH(bl);
1147   }
1148   const rgw_pool& get_data_extra_pool() const {
1149     if (data_extra_pool.empty()) {
1150       return data_pool;
1151     }
1152     return data_extra_pool;
1153   }
1154   void dump(Formatter *f) const;
1155   void decode_json(JSONObj *obj);
1156 };
1157 WRITE_CLASS_ENCODER(RGWZonePlacementInfo)
1158
1159 struct RGWZoneParams : RGWSystemMetaObj {
1160   rgw_pool domain_root;
1161   rgw_pool metadata_heap;
1162   rgw_pool control_pool;
1163   rgw_pool gc_pool;
1164   rgw_pool lc_pool;
1165   rgw_pool log_pool;
1166   rgw_pool intent_log_pool;
1167   rgw_pool usage_log_pool;
1168
1169   rgw_pool user_keys_pool;
1170   rgw_pool user_email_pool;
1171   rgw_pool user_swift_pool;
1172   rgw_pool user_uid_pool;
1173   rgw_pool roles_pool;
1174   rgw_pool reshard_pool;
1175
1176   RGWAccessKey system_key;
1177
1178   map<string, RGWZonePlacementInfo> placement_pools;
1179
1180   string realm_id;
1181
1182   map<string, string, ltstr_nocase> tier_config;
1183
1184   RGWZoneParams() : RGWSystemMetaObj() {}
1185   RGWZoneParams(const string& name) : RGWSystemMetaObj(name){}
1186   RGWZoneParams(const string& id, const string& name) : RGWSystemMetaObj(id, name) {}
1187   RGWZoneParams(const string& id, const string& name, const string& _realm_id)
1188     : RGWSystemMetaObj(id, name), realm_id(_realm_id) {}
1189
1190   rgw_pool get_pool(CephContext *cct);
1191   const string get_default_oid(bool old_format = false) override;
1192   const string& get_names_oid_prefix() override;
1193   const string& get_info_oid_prefix(bool old_format = false) override;
1194   const string& get_predefined_name(CephContext *cct) override;
1195
1196   int init(CephContext *_cct, RGWRados *_store, bool setup_obj = true,
1197            bool old_format = false);
1198   using RGWSystemMetaObj::init;
1199   int read_default_id(string& default_id, bool old_format = false) override;
1200   int set_as_default(bool exclusive = false) override;
1201   int create_default(bool old_format = false);
1202   int create(bool exclusive = true) override;
1203   int fix_pool_names();
1204
1205   const string& get_compression_type(const string& placement_rule) const;
1206   
1207   void encode(bufferlist& bl) const override {
1208     ENCODE_START(10, 1, bl);
1209     ::encode(domain_root, bl);
1210     ::encode(control_pool, bl);
1211     ::encode(gc_pool, bl);
1212     ::encode(log_pool, bl);
1213     ::encode(intent_log_pool, bl);
1214     ::encode(usage_log_pool, bl);
1215     ::encode(user_keys_pool, bl);
1216     ::encode(user_email_pool, bl);
1217     ::encode(user_swift_pool, bl);
1218     ::encode(user_uid_pool, bl);
1219     RGWSystemMetaObj::encode(bl);
1220     ::encode(system_key, bl);
1221     ::encode(placement_pools, bl);
1222     ::encode(metadata_heap, bl);
1223     ::encode(realm_id, bl);
1224     ::encode(lc_pool, bl);
1225     ::encode(tier_config, bl);
1226     ::encode(roles_pool, bl);
1227     ::encode(reshard_pool, bl);
1228     ENCODE_FINISH(bl);
1229   }
1230
1231   void decode(bufferlist::iterator& bl) override {
1232     DECODE_START(10, bl);
1233     ::decode(domain_root, bl);
1234     ::decode(control_pool, bl);
1235     ::decode(gc_pool, bl);
1236     ::decode(log_pool, bl);
1237     ::decode(intent_log_pool, bl);
1238     ::decode(usage_log_pool, bl);
1239     ::decode(user_keys_pool, bl);
1240     ::decode(user_email_pool, bl);
1241     ::decode(user_swift_pool, bl);
1242     ::decode(user_uid_pool, bl);
1243     if (struct_v >= 6) {
1244       RGWSystemMetaObj::decode(bl);
1245     } else if (struct_v >= 2) {
1246       ::decode(name, bl);
1247       id = name;
1248     }
1249     if (struct_v >= 3)
1250       ::decode(system_key, bl);
1251     if (struct_v >= 4)
1252       ::decode(placement_pools, bl);
1253     if (struct_v >= 5)
1254       ::decode(metadata_heap, bl);
1255     if (struct_v >= 6) {
1256       ::decode(realm_id, bl);
1257     }
1258     if (struct_v >= 7) {
1259       ::decode(lc_pool, bl);
1260     } else {
1261       lc_pool = log_pool.name + ":lc";
1262     }
1263     if (struct_v >= 8) {
1264       ::decode(tier_config, bl);
1265     }
1266     if (struct_v >= 9) {
1267       ::decode(roles_pool, bl);
1268     } else {
1269       roles_pool = name + ".rgw.meta:roles";
1270     }
1271     if (struct_v >= 10) {
1272       ::decode(reshard_pool, bl);
1273     } else {
1274       reshard_pool = log_pool.name + ":reshard";
1275     }
1276     DECODE_FINISH(bl);
1277   }
1278   void dump(Formatter *f) const;
1279   void decode_json(JSONObj *obj);
1280   static void generate_test_instances(list<RGWZoneParams*>& o);
1281
1282   bool get_placement(const string& placement_id, RGWZonePlacementInfo *placement) const {
1283     auto iter = placement_pools.find(placement_id);
1284     if (iter == placement_pools.end()) {
1285       return false;
1286     }
1287     *placement = iter->second;
1288     return true;
1289   }
1290
1291   /*
1292    * return data pool of the head object
1293    */
1294   bool get_head_data_pool(const string& placement_id, const rgw_obj& obj, rgw_pool *pool) const {
1295     const rgw_data_placement_target& explicit_placement = obj.bucket.explicit_placement;
1296     if (!explicit_placement.data_pool.empty()) {
1297       if (!obj.in_extra_data) {
1298         *pool = explicit_placement.data_pool;
1299       } else {
1300         *pool = explicit_placement.get_data_extra_pool();
1301       }
1302       return true;
1303     }
1304     if (placement_id.empty()) {
1305       return false;
1306     }
1307     auto iter = placement_pools.find(placement_id);
1308     if (iter == placement_pools.end()) {
1309       return false;
1310     }
1311     if (!obj.in_extra_data) {
1312       *pool = iter->second.data_pool;
1313     } else {
1314       *pool = iter->second.get_data_extra_pool();
1315     }
1316     return true;
1317   }
1318 };
1319 WRITE_CLASS_ENCODER(RGWZoneParams)
1320
1321 struct RGWZone {
1322   string id;
1323   string name;
1324   list<string> endpoints;
1325   bool log_meta;
1326   bool log_data;
1327   bool read_only;
1328   string tier_type;
1329
1330 /**
1331  * Represents the number of shards for the bucket index object, a value of zero
1332  * indicates there is no sharding. By default (no sharding, the name of the object
1333  * is '.dir.{marker}', with sharding, the name is '.dir.{marker}.{sharding_id}',
1334  * sharding_id is zero-based value. It is not recommended to set a too large value
1335  * (e.g. thousand) as it increases the cost for bucket listing.
1336  */
1337   uint32_t bucket_index_max_shards;
1338
1339   bool sync_from_all;
1340   set<string> sync_from; /* list of zones to sync from */
1341
1342   RGWZone() : log_meta(false), log_data(false), read_only(false), bucket_index_max_shards(0),
1343               sync_from_all(true) {}
1344
1345   void encode(bufferlist& bl) const {
1346     ENCODE_START(6, 1, bl);
1347     ::encode(name, bl);
1348     ::encode(endpoints, bl);
1349     ::encode(log_meta, bl);
1350     ::encode(log_data, bl);
1351     ::encode(bucket_index_max_shards, bl);
1352     ::encode(id, bl);
1353     ::encode(read_only, bl);
1354     ::encode(tier_type, bl);
1355     ::encode(sync_from_all, bl);
1356     ::encode(sync_from, bl);
1357     ENCODE_FINISH(bl);
1358   }
1359
1360   void decode(bufferlist::iterator& bl) {
1361     DECODE_START(6, bl);
1362     ::decode(name, bl);
1363     if (struct_v < 4) {
1364       id = name;
1365     }
1366     ::decode(endpoints, bl);
1367     if (struct_v >= 2) {
1368       ::decode(log_meta, bl);
1369       ::decode(log_data, bl);
1370     }
1371     if (struct_v >= 3) {
1372       ::decode(bucket_index_max_shards, bl);
1373     }
1374     if (struct_v >= 4) {
1375       ::decode(id, bl);
1376       ::decode(read_only, bl);
1377     }
1378     if (struct_v >= 5) {
1379       ::decode(tier_type, bl);
1380     }
1381     if (struct_v >= 6) {
1382       ::decode(sync_from_all, bl);
1383       ::decode(sync_from, bl);
1384     }
1385     DECODE_FINISH(bl);
1386   }
1387   void dump(Formatter *f) const;
1388   void decode_json(JSONObj *obj);
1389   static void generate_test_instances(list<RGWZone*>& o);
1390
1391   bool is_read_only() { return read_only; }
1392
1393   bool syncs_from(const string& zone_id) {
1394     return (sync_from_all || sync_from.find(zone_id) != sync_from.end());
1395   }
1396 };
1397 WRITE_CLASS_ENCODER(RGWZone)
1398
1399 struct RGWDefaultZoneGroupInfo {
1400   string default_zonegroup;
1401
1402   void encode(bufferlist& bl) const {
1403     ENCODE_START(1, 1, bl);
1404     ::encode(default_zonegroup, bl);
1405     ENCODE_FINISH(bl);
1406   }
1407
1408   void decode(bufferlist::iterator& bl) {
1409     DECODE_START(1, bl);
1410     ::decode(default_zonegroup, bl);
1411     DECODE_FINISH(bl);
1412   }
1413   void dump(Formatter *f) const;
1414   void decode_json(JSONObj *obj);
1415   //todo: implement ceph-dencoder
1416 };
1417 WRITE_CLASS_ENCODER(RGWDefaultZoneGroupInfo)
1418
1419 struct RGWZoneGroupPlacementTarget {
1420   string name;
1421   set<string> tags;
1422
1423   bool user_permitted(list<string>& user_tags) const {
1424     if (tags.empty()) {
1425       return true;
1426     }
1427     for (auto& rule : user_tags) {
1428       if (tags.find(rule) != tags.end()) {
1429         return true;
1430       }
1431     }
1432     return false;
1433   }
1434
1435   void encode(bufferlist& bl) const {
1436     ENCODE_START(1, 1, bl);
1437     ::encode(name, bl);
1438     ::encode(tags, bl);
1439     ENCODE_FINISH(bl);
1440   }
1441
1442   void decode(bufferlist::iterator& bl) {
1443     DECODE_START(1, bl);
1444     ::decode(name, bl);
1445     ::decode(tags, bl);
1446     DECODE_FINISH(bl);
1447   }
1448   void dump(Formatter *f) const;
1449   void decode_json(JSONObj *obj);
1450 };
1451 WRITE_CLASS_ENCODER(RGWZoneGroupPlacementTarget)
1452
1453
1454 struct RGWZoneGroup : public RGWSystemMetaObj {
1455   string api_name;
1456   list<string> endpoints;
1457   bool is_master;
1458
1459   string master_zone;
1460   map<string, RGWZone> zones;
1461
1462   map<string, RGWZoneGroupPlacementTarget> placement_targets;
1463   string default_placement;
1464
1465   list<string> hostnames;
1466   list<string> hostnames_s3website;
1467   // TODO: Maybe convert hostnames to a map<string,list<string>> for
1468   // endpoint_type->hostnames
1469 /*
1470 20:05 < _robbat21irssi> maybe I do someting like: if (hostname_map.empty()) { populate all map keys from hostnames; };
1471 20:05 < _robbat21irssi> but that's a later compatability migration planning bit
1472 20:06 < yehudasa> more like if (!hostnames.empty()) {
1473 20:06 < yehudasa> for (list<string>::iterator iter = hostnames.begin(); iter != hostnames.end(); ++iter) {
1474 20:06 < yehudasa> hostname_map["s3"].append(iter->second);
1475 20:07 < yehudasa> hostname_map["s3website"].append(iter->second);
1476 20:07 < yehudasa> s/append/push_back/g
1477 20:08 < _robbat21irssi> inner loop over APIs
1478 20:08 < yehudasa> yeah, probably
1479 20:08 < _robbat21irssi> s3, s3website, swift, swith_auth, swift_website
1480 */
1481   map<string, list<string> > api_hostname_map;
1482   map<string, list<string> > api_endpoints_map;
1483
1484   string realm_id;
1485
1486   RGWZoneGroup(): is_master(false){}
1487   RGWZoneGroup(const std::string &id, const std::string &name):RGWSystemMetaObj(id, name) {}
1488   RGWZoneGroup(const std::string &_name):RGWSystemMetaObj(_name) {}
1489   RGWZoneGroup(const std::string &_name, bool _is_master, CephContext *cct, RGWRados* store,
1490                const string& _realm_id, const list<string>& _endpoints)
1491     : RGWSystemMetaObj(_name, cct , store), endpoints(_endpoints), is_master(_is_master),
1492       realm_id(_realm_id) {}
1493
1494   bool is_master_zonegroup() const { return is_master;}
1495   void update_master(bool _is_master) {
1496     is_master = _is_master;
1497     post_process_params();
1498   }
1499   void post_process_params();
1500
1501   void encode(bufferlist& bl) const override {
1502     ENCODE_START(4, 1, bl);
1503     ::encode(name, bl);
1504     ::encode(api_name, bl);
1505     ::encode(is_master, bl);
1506     ::encode(endpoints, bl);
1507     ::encode(master_zone, bl);
1508     ::encode(zones, bl);
1509     ::encode(placement_targets, bl);
1510     ::encode(default_placement, bl);
1511     ::encode(hostnames, bl);
1512     ::encode(hostnames_s3website, bl);
1513     RGWSystemMetaObj::encode(bl);
1514     ::encode(realm_id, bl);
1515     ENCODE_FINISH(bl);
1516   }
1517
1518   void decode(bufferlist::iterator& bl) override {
1519     DECODE_START(4, bl);
1520     ::decode(name, bl);
1521     ::decode(api_name, bl);
1522     ::decode(is_master, bl);
1523     ::decode(endpoints, bl);
1524     ::decode(master_zone, bl);
1525     ::decode(zones, bl);
1526     ::decode(placement_targets, bl);
1527     ::decode(default_placement, bl);
1528     if (struct_v >= 2) {
1529       ::decode(hostnames, bl);
1530     }
1531     if (struct_v >= 3) {
1532       ::decode(hostnames_s3website, bl);
1533     }
1534     if (struct_v >= 4) {
1535       RGWSystemMetaObj::decode(bl);
1536       ::decode(realm_id, bl);
1537     } else {
1538       id = name;
1539     }
1540     DECODE_FINISH(bl);
1541   }
1542
1543   int read_default_id(string& default_id, bool old_format = false) override;
1544   int set_as_default(bool exclusive = false) override;
1545   int create_default(bool old_format = false);
1546   int equals(const string& other_zonegroup) const;
1547   int add_zone(const RGWZoneParams& zone_params, bool *is_master, bool *read_only,
1548                const list<string>& endpoints, const string *ptier_type,
1549                bool *psync_from_all, list<string>& sync_from, list<string>& sync_from_rm);
1550   int remove_zone(const std::string& zone_id);
1551   int rename_zone(const RGWZoneParams& zone_params);
1552   rgw_pool get_pool(CephContext *cct);
1553   const string get_default_oid(bool old_region_format = false) override;
1554   const string& get_info_oid_prefix(bool old_region_format = false) override;
1555   const string& get_names_oid_prefix() override;
1556   const string& get_predefined_name(CephContext *cct) override;
1557
1558   void dump(Formatter *f) const;
1559   void decode_json(JSONObj *obj);
1560   static void generate_test_instances(list<RGWZoneGroup*>& o);
1561 };
1562 WRITE_CLASS_ENCODER(RGWZoneGroup)
1563
1564 struct RGWPeriodMap
1565 {
1566   string id;
1567   map<string, RGWZoneGroup> zonegroups;
1568   map<string, RGWZoneGroup> zonegroups_by_api;
1569   map<string, uint32_t> short_zone_ids;
1570
1571   string master_zonegroup;
1572
1573   void encode(bufferlist& bl) const;
1574   void decode(bufferlist::iterator& bl);
1575
1576   int update(const RGWZoneGroup& zonegroup, CephContext *cct);
1577
1578   void dump(Formatter *f) const;
1579   void decode_json(JSONObj *obj);
1580
1581   void reset() {
1582     zonegroups.clear();
1583     zonegroups_by_api.clear();
1584     master_zonegroup.clear();
1585   }
1586
1587   uint32_t get_zone_short_id(const string& zone_id) const;
1588 };
1589 WRITE_CLASS_ENCODER(RGWPeriodMap)
1590
1591 struct RGWPeriodConfig
1592 {
1593   RGWQuotaInfo bucket_quota;
1594   RGWQuotaInfo user_quota;
1595
1596   void encode(bufferlist& bl) const {
1597     ENCODE_START(1, 1, bl);
1598     ::encode(bucket_quota, bl);
1599     ::encode(user_quota, bl);
1600     ENCODE_FINISH(bl);
1601   }
1602
1603   void decode(bufferlist::iterator& bl) {
1604     DECODE_START(1, bl);
1605     ::decode(bucket_quota, bl);
1606     ::decode(user_quota, bl);
1607     DECODE_FINISH(bl);
1608   }
1609
1610   void dump(Formatter *f) const;
1611   void decode_json(JSONObj *obj);
1612
1613   // the period config must be stored in a local object outside of the period,
1614   // so that it can be used in a default configuration where no realm/period
1615   // exists
1616   int read(RGWRados *store, const std::string& realm_id);
1617   int write(RGWRados *store, const std::string& realm_id);
1618
1619   static std::string get_oid(const std::string& realm_id);
1620   static rgw_pool get_pool(CephContext *cct);
1621 };
1622 WRITE_CLASS_ENCODER(RGWPeriodConfig)
1623
1624 /* for backward comaptability */
1625 struct RGWRegionMap {
1626
1627   map<string, RGWZoneGroup> regions;
1628
1629   string master_region;
1630
1631   RGWQuotaInfo bucket_quota;
1632   RGWQuotaInfo user_quota;
1633
1634   void encode(bufferlist& bl) const;
1635   void decode(bufferlist::iterator& bl);
1636
1637   void dump(Formatter *f) const;
1638   void decode_json(JSONObj *obj);
1639 };
1640 WRITE_CLASS_ENCODER(RGWRegionMap)
1641
1642 struct RGWZoneGroupMap {
1643
1644   map<string, RGWZoneGroup> zonegroups;
1645   map<string, RGWZoneGroup> zonegroups_by_api;
1646
1647   string master_zonegroup;
1648
1649   RGWQuotaInfo bucket_quota;
1650   RGWQuotaInfo user_quota;
1651
1652   /* constract the map */
1653   int read(CephContext *cct, RGWRados *store);
1654
1655   void encode(bufferlist& bl) const;
1656   void decode(bufferlist::iterator& bl);
1657
1658   void dump(Formatter *f) const;
1659   void decode_json(JSONObj *obj);
1660 };
1661 WRITE_CLASS_ENCODER(RGWZoneGroupMap)
1662
1663 class RGWRealm;
1664
1665 struct objexp_hint_entry {
1666   string tenant;
1667   string bucket_name;
1668   string bucket_id;
1669   rgw_obj_key obj_key;
1670   ceph::real_time exp_time;
1671
1672   void encode(bufferlist& bl) const {
1673     ENCODE_START(2, 1, bl);
1674     ::encode(bucket_name, bl);
1675     ::encode(bucket_id, bl);
1676     ::encode(obj_key, bl);
1677     ::encode(exp_time, bl);
1678     ::encode(tenant, bl);
1679     ENCODE_FINISH(bl);
1680   }
1681
1682   void decode(bufferlist::iterator& bl) {
1683     // XXX Do we want DECODE_START_LEGACY_COMPAT_LEN(2, 1, 1, bl); ?
1684     DECODE_START(2, bl);
1685     ::decode(bucket_name, bl);
1686     ::decode(bucket_id, bl);
1687     ::decode(obj_key, bl);
1688     ::decode(exp_time, bl);
1689     if (struct_v >= 2) {
1690       ::decode(tenant, bl);
1691     } else {
1692       tenant.clear();
1693     }
1694     DECODE_FINISH(bl);
1695   }
1696 };
1697 WRITE_CLASS_ENCODER(objexp_hint_entry)
1698
1699 class RGWPeriod;
1700
1701 class RGWRealm : public RGWSystemMetaObj
1702 {
1703   string current_period;
1704   epoch_t epoch{0}; //< realm epoch, incremented for each new period
1705
1706   int create_control(bool exclusive);
1707   int delete_control();
1708 public:
1709   RGWRealm() {}
1710   RGWRealm(const string& _id, const string& _name = "") : RGWSystemMetaObj(_id, _name) {}
1711   RGWRealm(CephContext *_cct, RGWRados *_store): RGWSystemMetaObj(_cct, _store) {}
1712   RGWRealm(const string& _name, CephContext *_cct, RGWRados *_store): RGWSystemMetaObj(_name, _cct, _store){}
1713
1714   void encode(bufferlist& bl) const override {
1715     ENCODE_START(1, 1, bl);
1716     RGWSystemMetaObj::encode(bl);
1717     ::encode(current_period, bl);
1718     ::encode(epoch, bl);
1719     ENCODE_FINISH(bl);
1720   }
1721
1722   void decode(bufferlist::iterator& bl) override {
1723     DECODE_START(1, bl);
1724     RGWSystemMetaObj::decode(bl);
1725     ::decode(current_period, bl);
1726     ::decode(epoch, bl);
1727     DECODE_FINISH(bl);
1728   }
1729
1730   int create(bool exclusive = true) override;
1731   int delete_obj();
1732   rgw_pool get_pool(CephContext *cct);
1733   const string get_default_oid(bool old_format = false) override;
1734   const string& get_names_oid_prefix() override;
1735   const string& get_info_oid_prefix(bool old_format = false) override;
1736   const string& get_predefined_name(CephContext *cct) override;
1737
1738   using RGWSystemMetaObj::read_id; // expose as public for radosgw-admin
1739
1740   void dump(Formatter *f) const;
1741   void decode_json(JSONObj *obj);
1742
1743   const string& get_current_period() const {
1744     return current_period;
1745   }
1746   int set_current_period(RGWPeriod& period);
1747   void clear_current_period_and_epoch() {
1748     current_period.clear();
1749     epoch = 0;
1750   }
1751   epoch_t get_epoch() const { return epoch; }
1752
1753   string get_control_oid();
1754   /// send a notify on the realm control object
1755   int notify_zone(bufferlist& bl);
1756   /// notify the zone of a new period
1757   int notify_new_period(const RGWPeriod& period);
1758 };
1759 WRITE_CLASS_ENCODER(RGWRealm)
1760
1761 struct RGWPeriodLatestEpochInfo {
1762   epoch_t epoch;
1763
1764   void encode(bufferlist& bl) const {
1765     ENCODE_START(1, 1, bl);
1766     ::encode(epoch, bl);
1767     ENCODE_FINISH(bl);
1768   }
1769
1770   void decode(bufferlist::iterator& bl) {
1771     DECODE_START(1, bl);
1772     ::decode(epoch, bl);
1773     DECODE_FINISH(bl);
1774   }
1775
1776   void dump(Formatter *f) const;
1777   void decode_json(JSONObj *obj);
1778 };
1779 WRITE_CLASS_ENCODER(RGWPeriodLatestEpochInfo)
1780
1781 class RGWPeriod
1782 {
1783   string id;
1784   epoch_t epoch;
1785   string predecessor_uuid;
1786   std::vector<std::string> sync_status;
1787   RGWPeriodMap period_map;
1788   RGWPeriodConfig period_config;
1789   string master_zonegroup;
1790   string master_zone;
1791
1792   string realm_id;
1793   string realm_name;
1794   epoch_t realm_epoch{1}; //< realm epoch when period was made current
1795
1796   CephContext *cct;
1797   RGWRados *store;
1798
1799   int read_info();
1800   int read_latest_epoch(RGWPeriodLatestEpochInfo& epoch_info,
1801                         RGWObjVersionTracker *objv = nullptr);
1802   int use_latest_epoch();
1803   int use_current_period();
1804
1805   const string get_period_oid();
1806   const string get_period_oid_prefix();
1807
1808   // gather the metadata sync status for each shard; only for use on master zone
1809   int update_sync_status(const RGWPeriod &current_period,
1810                          std::ostream& error_stream, bool force_if_stale);
1811
1812 public:
1813   RGWPeriod() : epoch(0), cct(NULL), store(NULL) {}
1814
1815   RGWPeriod(const string& period_id, epoch_t _epoch = 0)
1816     : id(period_id), epoch(_epoch),
1817       cct(NULL), store(NULL) {}
1818
1819   const string& get_id() const { return id; }
1820   epoch_t get_epoch() const { return epoch; }
1821   epoch_t get_realm_epoch() const { return realm_epoch; }
1822   const string& get_predecessor() const { return predecessor_uuid; }
1823   const string& get_master_zone() const { return master_zone; }
1824   const string& get_master_zonegroup() const { return master_zonegroup; }
1825   const string& get_realm() const { return realm_id; }
1826   const RGWPeriodMap& get_map() const { return period_map; }
1827   RGWPeriodConfig& get_config() { return period_config; }
1828   const RGWPeriodConfig& get_config() const { return period_config; }
1829   const std::vector<std::string>& get_sync_status() const { return sync_status; }
1830   rgw_pool get_pool(CephContext *cct);
1831   const string& get_latest_epoch_oid();
1832   const string& get_info_oid_prefix();
1833
1834   void set_user_quota(RGWQuotaInfo& user_quota) {
1835     period_config.user_quota = user_quota;
1836   }
1837
1838   void set_bucket_quota(RGWQuotaInfo& bucket_quota) {
1839     period_config.bucket_quota = bucket_quota;
1840   }
1841
1842   void set_id(const string& id) {
1843     this->id = id;
1844     period_map.id = id;
1845   }
1846   void set_epoch(epoch_t epoch) { this->epoch = epoch; }
1847   void set_realm_epoch(epoch_t epoch) { realm_epoch = epoch; }
1848
1849   void set_predecessor(const string& predecessor)
1850   {
1851     predecessor_uuid = predecessor;
1852   }
1853
1854   void set_realm_id(const string& _realm_id) {
1855     realm_id = _realm_id;
1856   }
1857
1858   int reflect();
1859
1860   int get_zonegroup(RGWZoneGroup& zonegroup,
1861                     const string& zonegroup_id);
1862
1863   bool is_single_zonegroup() const
1864   {
1865       return (period_map.zonegroups.size() == 1);
1866   }
1867
1868   /*
1869     returns true if there are several zone groups with a least one zone
1870    */
1871   bool is_multi_zonegroups_with_zones()
1872   {
1873     int count = 0;
1874     for (const auto& zg:  period_map.zonegroups) {
1875       if (zg.second.zones.size() > 0) {
1876         if (count++ > 0) {
1877           return true;
1878         }
1879       }
1880     }
1881     return false;
1882   }
1883
1884   int get_latest_epoch(epoch_t& epoch);
1885   int set_latest_epoch(epoch_t epoch, bool exclusive = false,
1886                        RGWObjVersionTracker *objv = nullptr);
1887   // update latest_epoch if the given epoch is higher, else return -EEXIST
1888   int update_latest_epoch(epoch_t epoch);
1889
1890   int init(CephContext *_cct, RGWRados *_store, const string &period_realm_id, const string &period_realm_name = "",
1891            bool setup_obj = true);
1892   int init(CephContext *_cct, RGWRados *_store, bool setup_obj = true);  
1893
1894   int create(bool exclusive = true);
1895   int delete_obj();
1896   int store_info(bool exclusive);
1897   int add_zonegroup(const RGWZoneGroup& zonegroup);
1898
1899   void fork();
1900   int update();
1901
1902   // commit a staging period; only for use on master zone
1903   int commit(RGWRealm& realm, const RGWPeriod &current_period,
1904              std::ostream& error_stream, bool force_if_stale = false);
1905
1906   void encode(bufferlist& bl) const {
1907     ENCODE_START(1, 1, bl);
1908     ::encode(id, bl);
1909     ::encode(epoch, bl);
1910     ::encode(realm_epoch, bl);
1911     ::encode(predecessor_uuid, bl);
1912     ::encode(sync_status, bl);
1913     ::encode(period_map, bl);
1914     ::encode(master_zone, bl);
1915     ::encode(master_zonegroup, bl);
1916     ::encode(period_config, bl);
1917     ::encode(realm_id, bl);
1918     ::encode(realm_name, bl);
1919     ENCODE_FINISH(bl);
1920   }
1921
1922   void decode(bufferlist::iterator& bl) {
1923     DECODE_START(1, bl);
1924     ::decode(id, bl);
1925     ::decode(epoch, bl);
1926     ::decode(realm_epoch, bl);
1927     ::decode(predecessor_uuid, bl);
1928     ::decode(sync_status, bl);
1929     ::decode(period_map, bl);
1930     ::decode(master_zone, bl);
1931     ::decode(master_zonegroup, bl);
1932     ::decode(period_config, bl);
1933     ::decode(realm_id, bl);
1934     ::decode(realm_name, bl);
1935     DECODE_FINISH(bl);
1936   }
1937   void dump(Formatter *f) const;
1938   void decode_json(JSONObj *obj);
1939
1940   static string get_staging_id(const string& realm_id) {
1941     return realm_id + ":staging";
1942   }
1943 };
1944 WRITE_CLASS_ENCODER(RGWPeriod)
1945
1946 class RGWDataChangesLog;
1947 class RGWMetaSyncStatusManager;
1948 class RGWDataSyncStatusManager;
1949 class RGWReplicaLogger;
1950 class RGWCoroutinesManagerRegistry;
1951   
1952 class RGWStateLog {
1953   RGWRados *store;
1954   int num_shards;
1955   string module_name;
1956
1957   void oid_str(int shard, string& oid);
1958   int get_shard_num(const string& object);
1959   string get_oid(const string& object);
1960   int open_ioctx(librados::IoCtx& ioctx);
1961
1962   struct list_state {
1963     int cur_shard;
1964     int max_shard;
1965     string marker;
1966     string client_id;
1967     string op_id;
1968     string object;
1969
1970     list_state() : cur_shard(0), max_shard(0) {}
1971   };
1972
1973 protected:
1974   virtual bool dump_entry_internal(const cls_statelog_entry& entry, Formatter *f) {
1975     return false;
1976   }
1977
1978 public:
1979   RGWStateLog(RGWRados *_store, int _num_shards, const string& _module_name) :
1980               store(_store), num_shards(_num_shards), module_name(_module_name) {}
1981   virtual ~RGWStateLog() {}
1982
1983   int store_entry(const string& client_id, const string& op_id, const string& object,
1984                   uint32_t state, bufferlist *bl, uint32_t *check_state);
1985
1986   int remove_entry(const string& client_id, const string& op_id, const string& object);
1987
1988   void init_list_entries(const string& client_id, const string& op_id, const string& object,
1989                          void **handle);
1990
1991   int list_entries(void *handle, int max_entries, list<cls_statelog_entry>& entries, bool *done);
1992
1993   void finish_list_entries(void *handle);
1994
1995   virtual void dump_entry(const cls_statelog_entry& entry, Formatter *f);
1996 };
1997
1998 /*
1999  * state transitions:
2000  *
2001  * unknown -> in-progress -> complete
2002  *                        -> error
2003  *
2004  * user can try setting the 'abort' state, and it can only succeed if state is
2005  * in-progress.
2006  *
2007  * state renewal cannot switch state (stays in the same state)
2008  *
2009  * rgw can switch from in-progress to complete
2010  * rgw can switch from in-progress to error
2011  *
2012  * rgw can switch from abort to cancelled
2013  *
2014  */
2015
2016 class RGWOpState : public RGWStateLog {
2017 protected:
2018   bool dump_entry_internal(const cls_statelog_entry& entry, Formatter *f) override;
2019 public:
2020
2021   enum OpState {
2022     OPSTATE_UNKNOWN     = 0,
2023     OPSTATE_IN_PROGRESS = 1,
2024     OPSTATE_COMPLETE    = 2,
2025     OPSTATE_ERROR       = 3,
2026     OPSTATE_ABORT       = 4,
2027     OPSTATE_CANCELLED   = 5,
2028   };
2029
2030   explicit RGWOpState(RGWRados *_store);
2031
2032   int state_from_str(const string& s, OpState *state);
2033   int set_state(const string& client_id, const string& op_id, const string& object, OpState state);
2034   int renew_state(const string& client_id, const string& op_id, const string& object, OpState state);
2035 };
2036
2037 class RGWOpStateSingleOp
2038 {
2039   RGWOpState os;
2040   string client_id;
2041   string op_id;
2042   string object;
2043
2044   CephContext *cct;
2045
2046   RGWOpState::OpState cur_state;
2047   ceph::real_time last_update;
2048
2049 public:
2050   RGWOpStateSingleOp(RGWRados *store, const string& cid, const string& oid, const string& obj);
2051
2052   int set_state(RGWOpState::OpState state);
2053   int renew_state();
2054 };
2055
2056 class RGWGetBucketStats_CB : public RefCountedObject {
2057 protected:
2058   rgw_bucket bucket;
2059   map<RGWObjCategory, RGWStorageStats> *stats;
2060 public:
2061   explicit RGWGetBucketStats_CB(const rgw_bucket& _bucket) : bucket(_bucket), stats(NULL) {}
2062   ~RGWGetBucketStats_CB() override {}
2063   virtual void handle_response(int r) = 0;
2064   virtual void set_response(map<RGWObjCategory, RGWStorageStats> *_stats) {
2065     stats = _stats;
2066   }
2067 };
2068
2069 class RGWGetUserStats_CB : public RefCountedObject {
2070 protected:
2071   rgw_user user;
2072   RGWStorageStats stats;
2073 public:
2074   explicit RGWGetUserStats_CB(const rgw_user& _user) : user(_user) {}
2075   ~RGWGetUserStats_CB() override {}
2076   virtual void handle_response(int r) = 0;
2077   virtual void set_response(RGWStorageStats& _stats) {
2078     stats = _stats;
2079   }
2080 };
2081
2082 class RGWGetDirHeader_CB;
2083 class RGWGetUserHeader_CB;
2084
2085 struct rgw_rados_ref {
2086   rgw_pool pool;
2087   string oid;
2088   string key;
2089   librados::IoCtx ioctx;
2090 };
2091
2092 class RGWChainedCache {
2093 public:
2094   virtual ~RGWChainedCache() {}
2095   virtual void chain_cb(const string& key, void *data) = 0;
2096   virtual void invalidate(const string& key) = 0;
2097   virtual void invalidate_all() = 0;
2098
2099   struct Entry {
2100     RGWChainedCache *cache;
2101     const string& key;
2102     void *data;
2103
2104     Entry(RGWChainedCache *_c, const string& _k, void *_d) : cache(_c), key(_k), data(_d) {}
2105   };
2106 };
2107
2108 template <class T, class S>
2109 class RGWObjectCtxImpl {
2110   RGWRados *store;
2111   std::map<T, S> objs_state;
2112   RWLock lock;
2113
2114 public:
2115   RGWObjectCtxImpl(RGWRados *_store) : store(_store), lock("RGWObjectCtxImpl") {}
2116
2117   S *get_state(const T& obj) {
2118     S *result;
2119     typename std::map<T, S>::iterator iter;
2120     lock.get_read();
2121     assert (!obj.empty());
2122     iter = objs_state.find(obj);
2123     if (iter != objs_state.end()) {
2124       result = &iter->second;
2125       lock.unlock();
2126     } else {
2127       lock.unlock();
2128       lock.get_write();
2129       result = &objs_state[obj];
2130       lock.unlock();
2131     }
2132     return result;
2133   }
2134
2135   void set_atomic(T& obj) {
2136     RWLock::WLocker wl(lock);
2137     assert (!obj.empty());
2138     objs_state[obj].is_atomic = true;
2139   }
2140   void set_prefetch_data(T& obj) {
2141     RWLock::WLocker wl(lock);
2142     assert (!obj.empty());
2143     objs_state[obj].prefetch_data = true;
2144   }
2145   void invalidate(T& obj) {
2146     RWLock::WLocker wl(lock);
2147     auto iter = objs_state.find(obj);
2148     if (iter == objs_state.end()) {
2149       return;
2150     }
2151     bool is_atomic = iter->second.is_atomic;
2152     bool prefetch_data = iter->second.prefetch_data;
2153   
2154     objs_state.erase(iter);
2155
2156     if (is_atomic || prefetch_data) {
2157       auto& s = objs_state[obj];
2158       s.is_atomic = is_atomic;
2159       s.prefetch_data = prefetch_data;
2160     }
2161   }
2162 };
2163
2164 template<>
2165 void RGWObjectCtxImpl<rgw_obj, RGWObjState>::invalidate(rgw_obj& obj);
2166
2167 template<>
2168 void RGWObjectCtxImpl<rgw_raw_obj, RGWRawObjState>::invalidate(rgw_raw_obj& obj);
2169
2170 struct RGWObjectCtx {
2171   RGWRados *store;
2172   void *user_ctx;
2173
2174   RGWObjectCtxImpl<rgw_obj, RGWObjState> obj;
2175   RGWObjectCtxImpl<rgw_raw_obj, RGWRawObjState> raw;
2176
2177   explicit RGWObjectCtx(RGWRados *_store) : store(_store), user_ctx(NULL), obj(store), raw(store) { }
2178   RGWObjectCtx(RGWRados *_store, void *_user_ctx) : store(_store), user_ctx(_user_ctx), obj(store), raw(store) { }
2179 };
2180
2181 class Finisher;
2182 class RGWAsyncRadosProcessor;
2183
2184 template <class T>
2185 class RGWChainedCacheImpl;
2186
2187 struct bucket_info_entry {
2188   RGWBucketInfo info;
2189   real_time mtime;
2190   map<string, bufferlist> attrs;
2191 };
2192
2193 struct tombstone_entry {
2194   ceph::real_time mtime;
2195   uint32_t zone_short_id;
2196   uint64_t pg_ver;
2197
2198   tombstone_entry() = default;
2199   tombstone_entry(const RGWObjState& state)
2200     : mtime(state.mtime), zone_short_id(state.zone_short_id),
2201       pg_ver(state.pg_ver) {}
2202 };
2203
2204 class RGWIndexCompletionManager;
2205
2206 class RGWRados
2207 {
2208   friend class RGWGC;
2209   friend class RGWMetaNotifier;
2210   friend class RGWDataNotifier;
2211   friend class RGWLC;
2212   friend class RGWObjectExpirer;
2213   friend class RGWMetaSyncProcessorThread;
2214   friend class RGWDataSyncProcessorThread;
2215   friend class RGWStateLog;
2216   friend class RGWReplicaLogger;
2217   friend class RGWReshard;
2218   friend class RGWBucketReshard;
2219   friend class BucketIndexLockGuard;
2220   friend class RGWCompleteMultipart;
2221
2222   /** Open the pool used as root for this gateway */
2223   int open_root_pool_ctx();
2224   int open_gc_pool_ctx();
2225   int open_lc_pool_ctx();
2226   int open_objexp_pool_ctx();
2227   int open_reshard_pool_ctx();
2228
2229   int open_pool_ctx(const rgw_pool& pool, librados::IoCtx&  io_ctx);
2230   int open_bucket_index_ctx(const RGWBucketInfo& bucket_info, librados::IoCtx& index_ctx);
2231   int open_bucket_index(const RGWBucketInfo& bucket_info, librados::IoCtx&  index_ctx, string& bucket_oid);
2232   int open_bucket_index_base(const RGWBucketInfo& bucket_info, librados::IoCtx&  index_ctx,
2233       string& bucket_oid_base);
2234   int open_bucket_index_shard(const RGWBucketInfo& bucket_info, librados::IoCtx& index_ctx,
2235       const string& obj_key, string *bucket_obj, int *shard_id);
2236   int open_bucket_index_shard(const RGWBucketInfo& bucket_info, librados::IoCtx& index_ctx,
2237                               int shard_id, string *bucket_obj);
2238   int open_bucket_index(const RGWBucketInfo& bucket_info, librados::IoCtx& index_ctx,
2239       map<int, string>& bucket_objs, int shard_id = -1, map<int, string> *bucket_instance_ids = NULL);
2240   template<typename T>
2241   int open_bucket_index(const RGWBucketInfo& bucket_info, librados::IoCtx& index_ctx,
2242                         map<int, string>& oids, map<int, T>& bucket_objs,
2243                         int shard_id = -1, map<int, string> *bucket_instance_ids = NULL);
2244   void build_bucket_index_marker(const string& shard_id_str, const string& shard_marker,
2245       string *marker);
2246
2247   void get_bucket_instance_ids(const RGWBucketInfo& bucket_info, int shard_id, map<int, string> *result);
2248
2249   std::atomic<int64_t> max_req_id = { 0 };
2250   Mutex lock;
2251   Mutex watchers_lock;
2252   SafeTimer *timer;
2253
2254   RGWGC *gc;
2255   RGWLC *lc;
2256   RGWObjectExpirer *obj_expirer;
2257   bool use_gc_thread;
2258   bool use_lc_thread;
2259   bool quota_threads;
2260   bool run_sync_thread;
2261   bool run_reshard_thread;
2262
2263   RGWAsyncRadosProcessor* async_rados;
2264
2265   RGWMetaNotifier *meta_notifier;
2266   RGWDataNotifier *data_notifier;
2267   RGWMetaSyncProcessorThread *meta_sync_processor_thread;
2268   map<string, RGWDataSyncProcessorThread *> data_sync_processor_threads;
2269
2270   RGWSyncLogTrimThread *sync_log_trimmer{nullptr};
2271
2272   Mutex meta_sync_thread_lock;
2273   Mutex data_sync_thread_lock;
2274
2275   int num_watchers;
2276   RGWWatcher **watchers;
2277   std::set<int> watchers_set;
2278   librados::IoCtx root_pool_ctx;      // .rgw
2279   librados::IoCtx control_pool_ctx;   // .rgw.control
2280   bool watch_initialized;
2281
2282   friend class RGWWatcher;
2283
2284   Mutex bucket_id_lock;
2285
2286   // This field represents the number of bucket index object shards
2287   uint32_t bucket_index_max_shards;
2288
2289   int get_obj_head_ioctx(const RGWBucketInfo& bucket_info, const rgw_obj& obj, librados::IoCtx *ioctx);
2290   int get_obj_head_ref(const RGWBucketInfo& bucket_info, const rgw_obj& obj, rgw_rados_ref *ref);
2291   int get_system_obj_ref(const rgw_raw_obj& obj, rgw_rados_ref *ref);
2292   uint64_t max_bucket_id;
2293
2294   int get_olh_target_state(RGWObjectCtx& rctx, const RGWBucketInfo& bucket_info, const rgw_obj& obj,
2295                            RGWObjState *olh_state, RGWObjState **target_state);
2296   int get_system_obj_state_impl(RGWObjectCtx *rctx, rgw_raw_obj& obj, RGWRawObjState **state, RGWObjVersionTracker *objv_tracker);
2297   int get_obj_state_impl(RGWObjectCtx *rctx, const RGWBucketInfo& bucket_info, const rgw_obj& obj, RGWObjState **state,
2298                          bool follow_olh, bool assume_noent = false);
2299   int append_atomic_test(RGWObjectCtx *rctx, const RGWBucketInfo& bucket_info, const rgw_obj& obj,
2300                          librados::ObjectOperation& op, RGWObjState **state);
2301
2302   int update_placement_map();
2303   int store_bucket_info(RGWBucketInfo& info, map<string, bufferlist> *pattrs, RGWObjVersionTracker *objv_tracker, bool exclusive);
2304
2305   void remove_rgw_head_obj(librados::ObjectWriteOperation& op);
2306   void cls_obj_check_prefix_exist(librados::ObjectOperation& op, const string& prefix, bool fail_if_exist);
2307   void cls_obj_check_mtime(librados::ObjectOperation& op, const real_time& mtime, bool high_precision_time, RGWCheckMTimeType type);
2308 protected:
2309   CephContext *cct;
2310
2311   std::vector<librados::Rados> rados;
2312   uint32_t next_rados_handle;
2313   RWLock handle_lock;
2314   std::map<pthread_t, int> rados_map;
2315
2316   using RGWChainedCacheImpl_bucket_info_entry = RGWChainedCacheImpl<bucket_info_entry>;
2317   RGWChainedCacheImpl_bucket_info_entry *binfo_cache;
2318
2319   using tombstone_cache_t = lru_map<rgw_obj, tombstone_entry>;
2320   tombstone_cache_t *obj_tombstone_cache;
2321
2322   librados::IoCtx gc_pool_ctx;        // .rgw.gc
2323   librados::IoCtx lc_pool_ctx;        // .rgw.lc
2324   librados::IoCtx objexp_pool_ctx;
2325   librados::IoCtx reshard_pool_ctx;
2326
2327   bool pools_initialized;
2328
2329   string trans_id_suffix;
2330
2331   RGWQuotaHandler *quota_handler;
2332
2333   Finisher *finisher;
2334
2335   RGWCoroutinesManagerRegistry *cr_registry;
2336
2337   RGWSyncModulesManager *sync_modules_manager{nullptr};
2338   RGWSyncModuleInstanceRef sync_module;
2339   bool writeable_zone{false};
2340
2341   RGWZoneGroup zonegroup;
2342   RGWZone zone_public_config; /* external zone params, e.g., entrypoints, log flags, etc. */  
2343   RGWZoneParams zone_params; /* internal zone params, e.g., rados pools */
2344   uint32_t zone_short_id;
2345
2346   RGWPeriod current_period;
2347
2348   RGWIndexCompletionManager *index_completion_manager{nullptr};
2349 public:
2350   RGWRados() : lock("rados_timer_lock"), watchers_lock("watchers_lock"), timer(NULL),
2351                gc(NULL), lc(NULL), obj_expirer(NULL), use_gc_thread(false), use_lc_thread(false), quota_threads(false),
2352                run_sync_thread(false), run_reshard_thread(false), async_rados(nullptr), meta_notifier(NULL),
2353                data_notifier(NULL), meta_sync_processor_thread(NULL),
2354                meta_sync_thread_lock("meta_sync_thread_lock"), data_sync_thread_lock("data_sync_thread_lock"),
2355                num_watchers(0), watchers(NULL),
2356                watch_initialized(false),
2357                bucket_id_lock("rados_bucket_id"),
2358                bucket_index_max_shards(0),
2359                max_bucket_id(0), cct(NULL),
2360                next_rados_handle(0),
2361                handle_lock("rados_handle_lock"),
2362                binfo_cache(NULL), obj_tombstone_cache(nullptr),
2363                pools_initialized(false),
2364                quota_handler(NULL),
2365                finisher(NULL),
2366                cr_registry(NULL),
2367                zone_short_id(0),
2368                rest_master_conn(NULL),
2369                meta_mgr(NULL), data_log(NULL), reshard(NULL) {}
2370
2371   uint64_t get_new_req_id() {
2372     return ++max_req_id;
2373   }
2374
2375   librados::IoCtx* get_lc_pool_ctx() {
2376     return &lc_pool_ctx;
2377   }
2378   void set_context(CephContext *_cct) {
2379     cct = _cct;
2380   }
2381
2382   /**
2383    * AmazonS3 errors contain a HostId string, but is an opaque base64 blob; we
2384    * try to be more transparent. This has a wrapper so we can update it when zonegroup/zone are changed.
2385    */
2386   void init_host_id() {
2387     /* uint64_t needs 16, two '-' separators and a trailing null */
2388     const string& zone_name = get_zone().name;
2389     const string& zonegroup_name = zonegroup.get_name();
2390     char charbuf[16 + zone_name.size() + zonegroup_name.size() + 2 + 1];
2391     snprintf(charbuf, sizeof(charbuf), "%llx-%s-%s", (unsigned long long)instance_id(), zone_name.c_str(), zonegroup_name.c_str());
2392     string s(charbuf);
2393     host_id = s;
2394   }
2395
2396   string host_id;
2397
2398   RGWRealm realm;
2399
2400   RGWRESTConn *rest_master_conn;
2401   map<string, RGWRESTConn *> zone_conn_map;
2402   map<string, RGWRESTConn *> zone_data_sync_from_map;
2403   map<string, RGWRESTConn *> zone_data_notify_to_map;
2404   map<string, RGWRESTConn *> zonegroup_conn_map;
2405
2406   map<string, string> zone_id_by_name;
2407   map<string, RGWZone> zone_by_id;
2408
2409   RGWRESTConn *get_zone_conn_by_id(const string& id) {
2410     auto citer = zone_conn_map.find(id);
2411     if (citer == zone_conn_map.end()) {
2412       return NULL;
2413     }
2414
2415     return citer->second;
2416   }
2417
2418   RGWRESTConn *get_zone_conn_by_name(const string& name) {
2419     auto i = zone_id_by_name.find(name);
2420     if (i == zone_id_by_name.end()) {
2421       return NULL;
2422     }
2423
2424     return get_zone_conn_by_id(i->second);
2425   }
2426
2427   bool find_zone_id_by_name(const string& name, string *id) {
2428     auto i = zone_id_by_name.find(name);
2429     if (i == zone_id_by_name.end()) {
2430       return false;
2431     }
2432     *id = i->second; 
2433     return true;
2434   }
2435
2436   int get_zonegroup(const string& id, RGWZoneGroup& zonegroup) {
2437     int ret = 0;
2438     if (id == get_zonegroup().get_id()) {
2439       zonegroup = get_zonegroup();
2440     } else if (!current_period.get_id().empty()) {
2441       ret = current_period.get_zonegroup(zonegroup, id);
2442     }
2443     return ret;
2444   }
2445
2446   RGWRealm& get_realm() {
2447     return realm;
2448   }
2449
2450   RGWZoneParams& get_zone_params() { return zone_params; }
2451   RGWZoneGroup& get_zonegroup() {
2452     return zonegroup;
2453   }
2454   RGWZone& get_zone() {
2455     return zone_public_config;
2456   }
2457
2458   bool zone_is_writeable() {
2459     return writeable_zone && !get_zone().is_read_only();
2460   }
2461
2462   uint32_t get_zone_short_id() const {
2463     return zone_short_id;
2464   }
2465
2466   bool zone_syncs_from(RGWZone& target_zone, RGWZone& source_zone);
2467
2468   const RGWQuotaInfo& get_bucket_quota() {
2469     return current_period.get_config().bucket_quota;
2470   }
2471
2472   const RGWQuotaInfo& get_user_quota() {
2473     return current_period.get_config().user_quota;
2474   }
2475
2476   const string& get_current_period_id() {
2477     return current_period.get_id();
2478   }
2479
2480   bool has_zonegroup_api(const std::string& api) const {
2481     if (!current_period.get_id().empty()) {
2482       const auto& zonegroups_by_api = current_period.get_map().zonegroups_by_api;
2483       if (zonegroups_by_api.find(api) != zonegroups_by_api.end())
2484         return true;
2485     }
2486     return false;
2487   }
2488
2489   // pulls missing periods for period_history
2490   std::unique_ptr<RGWPeriodPuller> period_puller;
2491   // maintains a connected history of periods
2492   std::unique_ptr<RGWPeriodHistory> period_history;
2493
2494   RGWAsyncRadosProcessor* get_async_rados() const { return async_rados; };
2495
2496   RGWMetadataManager *meta_mgr;
2497
2498   RGWDataChangesLog *data_log;
2499
2500   RGWReshard *reshard;
2501   std::shared_ptr<RGWReshardWait> reshard_wait;
2502
2503   virtual ~RGWRados() = default;
2504
2505   tombstone_cache_t *get_tombstone_cache() {
2506     return obj_tombstone_cache;
2507   }
2508
2509   RGWSyncModulesManager *get_sync_modules_manager() {
2510     return sync_modules_manager;
2511   }
2512   const RGWSyncModuleInstanceRef& get_sync_module() {
2513     return sync_module;
2514   }
2515
2516   int get_required_alignment(const rgw_pool& pool, uint64_t *alignment);
2517   int get_max_chunk_size(const rgw_pool& pool, uint64_t *max_chunk_size);
2518   int get_max_chunk_size(const string& placement_rule, const rgw_obj& obj, uint64_t *max_chunk_size);
2519
2520   uint32_t get_max_bucket_shards() {
2521     return rgw_shards_max();
2522   }
2523
2524
2525   int get_raw_obj_ref(const rgw_raw_obj& obj, rgw_rados_ref *ref);
2526
2527   int list_raw_objects_init(const rgw_pool& pool, const string& marker, RGWListRawObjsCtx *ctx);
2528   int list_raw_objects_next(const string& prefix_filter, int max,
2529                             RGWListRawObjsCtx& ctx, list<string>& oids,
2530                             bool *is_truncated);
2531   int list_raw_objects(const rgw_pool& pool, const string& prefix_filter, int max,
2532                        RGWListRawObjsCtx& ctx, list<string>& oids,
2533                        bool *is_truncated);
2534   string list_raw_objs_get_cursor(RGWListRawObjsCtx& ctx);
2535
2536   int list_raw_prefixed_objs(const rgw_pool& pool, const string& prefix, list<string>& result);
2537   int list_zonegroups(list<string>& zonegroups);
2538   int list_regions(list<string>& regions);
2539   int list_zones(list<string>& zones);
2540   int list_realms(list<string>& realms);
2541   int list_periods(list<string>& periods);
2542   int list_periods(const string& current_period, list<string>& periods);
2543   void tick();
2544
2545   CephContext *ctx() { return cct; }
2546   /** do all necessary setup of the storage device */
2547   int initialize(CephContext *_cct, bool _use_gc_thread, bool _use_lc_thread, bool _quota_threads, bool _run_sync_thread, bool _run_reshard_thread) {
2548     set_context(_cct);
2549     use_gc_thread = _use_gc_thread;
2550     use_lc_thread = _use_lc_thread;
2551     quota_threads = _quota_threads;
2552     run_sync_thread = _run_sync_thread;
2553     run_reshard_thread = _run_reshard_thread;
2554     return initialize();
2555   }
2556   /** Initialize the RADOS instance and prepare to do other ops */
2557   virtual int init_rados();
2558   int init_zg_from_period(bool *initialized);
2559   int init_zg_from_local(bool *creating_defaults);
2560   int init_complete();
2561   int replace_region_with_zonegroup();
2562   int convert_regionmap();
2563   int initialize();
2564   void finalize();
2565
2566   int register_to_service_map(const string& daemon_type, const map<string, string>& meta);
2567
2568   void schedule_context(Context *c);
2569
2570   /** set up a bucket listing. handle is filled in. */
2571   int list_buckets_init(RGWAccessHandle *handle);
2572   /** 
2573    * get the next bucket in the listing. obj is filled in,
2574    * handle is updated.
2575    */
2576   int list_buckets_next(rgw_bucket_dir_entry& obj, RGWAccessHandle *handle);
2577
2578   /// list logs
2579   int log_list_init(const string& prefix, RGWAccessHandle *handle);
2580   int log_list_next(RGWAccessHandle handle, string *name);
2581
2582   /// remove log
2583   int log_remove(const string& name);
2584
2585   /// show log
2586   int log_show_init(const string& name, RGWAccessHandle *handle);
2587   int log_show_next(RGWAccessHandle handle, rgw_log_entry *entry);
2588
2589   // log bandwidth info
2590   int log_usage(map<rgw_user_bucket, RGWUsageBatch>& usage_info);
2591   int read_usage(const rgw_user& user, uint64_t start_epoch, uint64_t end_epoch, uint32_t max_entries,
2592                  bool *is_truncated, RGWUsageIter& read_iter, map<rgw_user_bucket, rgw_usage_log_entry>& usage);
2593   int trim_usage(rgw_user& user, uint64_t start_epoch, uint64_t end_epoch);
2594
2595   int create_pool(const rgw_pool& pool);
2596
2597   int init_bucket_index(RGWBucketInfo& bucket_info, int num_shards);
2598   int select_bucket_placement(RGWUserInfo& user_info, const string& zonegroup_id, const string& rule,
2599                               string *pselected_rule_name, RGWZonePlacementInfo *rule_info);
2600   int select_legacy_bucket_placement(RGWZonePlacementInfo *rule_info);
2601   int select_new_bucket_location(RGWUserInfo& user_info, const string& zonegroup_id, const string& rule,
2602                                  string *pselected_rule_name, RGWZonePlacementInfo *rule_info);
2603   int select_bucket_location_by_rule(const string& location_rule, RGWZonePlacementInfo *rule_info);
2604   void create_bucket_id(string *bucket_id);
2605
2606   bool get_obj_data_pool(const string& placement_rule, const rgw_obj& obj, rgw_pool *pool);
2607   bool obj_to_raw(const string& placement_rule, const rgw_obj& obj, rgw_raw_obj *raw_obj);
2608
2609   int create_bucket(RGWUserInfo& owner, rgw_bucket& bucket,
2610                             const string& zonegroup_id,
2611                             const string& placement_rule,
2612                             const string& swift_ver_location,
2613                             const RGWQuotaInfo * pquota_info,
2614                             map<std::string,bufferlist>& attrs,
2615                             RGWBucketInfo& bucket_info,
2616                             obj_version *pobjv,
2617                             obj_version *pep_objv,
2618                             ceph::real_time creation_time,
2619                             rgw_bucket *master_bucket,
2620                             uint32_t *master_num_shards,
2621                             bool exclusive = true);
2622   int add_bucket_placement(const rgw_pool& new_pool);
2623   int remove_bucket_placement(const rgw_pool& new_pool);
2624   int list_placement_set(set<rgw_pool>& names);
2625   int create_pools(vector<rgw_pool>& pools, vector<int>& retcodes);
2626
2627   RGWCoroutinesManagerRegistry *get_cr_registry() { return cr_registry; }
2628
2629   class SystemObject {
2630     RGWRados *store;
2631     RGWObjectCtx& ctx;
2632     rgw_raw_obj obj;
2633
2634     RGWObjState *state;
2635
2636   protected:
2637     int get_state(RGWRawObjState **pstate, RGWObjVersionTracker *objv_tracker);
2638
2639   public:
2640     SystemObject(RGWRados *_store, RGWObjectCtx& _ctx, rgw_raw_obj& _obj) : store(_store), ctx(_ctx), obj(_obj), state(NULL) {}
2641
2642     void invalidate_state();
2643
2644     RGWRados *get_store() { return store; }
2645     rgw_raw_obj& get_obj() { return obj; }
2646     RGWObjectCtx& get_ctx() { return ctx; }
2647
2648     struct Read {
2649       RGWRados::SystemObject *source;
2650
2651       struct GetObjState {
2652         rgw_rados_ref ref;
2653         bool has_ref{false};
2654         uint64_t last_ver{0};
2655
2656         GetObjState() {}
2657
2658         int get_ref(RGWRados *store, rgw_raw_obj& obj, rgw_rados_ref **pref);
2659       } state;
2660       
2661       struct StatParams {
2662         ceph::real_time *lastmod;
2663         uint64_t *obj_size;
2664         map<string, bufferlist> *attrs;
2665
2666         StatParams() : lastmod(NULL), obj_size(NULL), attrs(NULL) {}
2667       } stat_params;
2668
2669       struct ReadParams {
2670         rgw_cache_entry_info *cache_info{nullptr};
2671         map<string, bufferlist> *attrs;
2672
2673         ReadParams() : attrs(NULL) {}
2674       } read_params;
2675
2676       explicit Read(RGWRados::SystemObject *_source) : source(_source) {}
2677
2678       int stat(RGWObjVersionTracker *objv_tracker);
2679       int read(int64_t ofs, int64_t end, bufferlist& bl, RGWObjVersionTracker *objv_tracker);
2680       int get_attr(const char *name, bufferlist& dest);
2681     };
2682   };
2683
2684   struct BucketShard {
2685     RGWRados *store;
2686     rgw_bucket bucket;
2687     int shard_id;
2688     librados::IoCtx index_ctx;
2689     string bucket_obj;
2690
2691     explicit BucketShard(RGWRados *_store) : store(_store), shard_id(-1) {}
2692     int init(const rgw_bucket& _bucket, const rgw_obj& obj);
2693     int init(const rgw_bucket& _bucket, int sid);
2694   };
2695
2696   class Object {
2697     RGWRados *store;
2698     RGWBucketInfo bucket_info;
2699     RGWObjectCtx& ctx;
2700     rgw_obj obj;
2701
2702     BucketShard bs;
2703
2704     RGWObjState *state;
2705
2706     bool versioning_disabled;
2707
2708     bool bs_initialized;
2709
2710   protected:
2711     int get_state(RGWObjState **pstate, bool follow_olh, bool assume_noent = false);
2712     void invalidate_state();
2713
2714     int prepare_atomic_modification(librados::ObjectWriteOperation& op, bool reset_obj, const string *ptag,
2715                                     const char *ifmatch, const char *ifnomatch, bool removal_op, bool modify_tail);
2716     int complete_atomic_modification();
2717
2718   public:
2719     Object(RGWRados *_store, const RGWBucketInfo& _bucket_info, RGWObjectCtx& _ctx, const rgw_obj& _obj) : store(_store), bucket_info(_bucket_info),
2720                                                                                                ctx(_ctx), obj(_obj), bs(store),
2721                                                                                                state(NULL), versioning_disabled(false),
2722                                                                                                bs_initialized(false) {}
2723
2724     RGWRados *get_store() { return store; }
2725     rgw_obj& get_obj() { return obj; }
2726     RGWObjectCtx& get_ctx() { return ctx; }
2727     RGWBucketInfo& get_bucket_info() { return bucket_info; }
2728     int get_manifest(RGWObjManifest **pmanifest);
2729
2730     int get_bucket_shard(BucketShard **pbs) {
2731       if (!bs_initialized) {
2732         int r = bs.init(bucket_info.bucket, obj);
2733         if (r < 0) {
2734           return r;
2735         }
2736         bs_initialized = true;
2737       }
2738       *pbs = &bs;
2739       return 0;
2740     }
2741
2742     void set_versioning_disabled(bool status) {
2743       versioning_disabled = status;
2744     }
2745
2746     bool versioning_enabled() {
2747       return (!versioning_disabled && bucket_info.versioning_enabled());
2748     }
2749
2750     struct Read {
2751       RGWRados::Object *source;
2752
2753       struct GetObjState {
2754         librados::IoCtx io_ctx;
2755         rgw_obj obj;
2756         rgw_raw_obj head_obj;
2757       } state;
2758       
2759       struct ConditionParams {
2760         const ceph::real_time *mod_ptr;
2761         const ceph::real_time *unmod_ptr;
2762         bool high_precision_time;
2763         uint32_t mod_zone_id;
2764         uint64_t mod_pg_ver;
2765         const char *if_match;
2766         const char *if_nomatch;
2767         
2768         ConditionParams() : 
2769                  mod_ptr(NULL), unmod_ptr(NULL), high_precision_time(false), mod_zone_id(0), mod_pg_ver(0),
2770                  if_match(NULL), if_nomatch(NULL) {}
2771       } conds;
2772
2773       struct Params {
2774         ceph::real_time *lastmod;
2775         uint64_t *obj_size;
2776         map<string, bufferlist> *attrs;
2777
2778         Params() : lastmod(NULL), obj_size(NULL), attrs(NULL) {}
2779       } params;
2780
2781       explicit Read(RGWRados::Object *_source) : source(_source) {}
2782
2783       int prepare();
2784       static int range_to_ofs(uint64_t obj_size, int64_t &ofs, int64_t &end);
2785       int read(int64_t ofs, int64_t end, bufferlist& bl);
2786       int iterate(int64_t ofs, int64_t end, RGWGetDataCB *cb);
2787       int get_attr(const char *name, bufferlist& dest);
2788     };
2789
2790     struct Write {
2791       RGWRados::Object *target;
2792       
2793       struct MetaParams {
2794         ceph::real_time *mtime;
2795         map<std::string, bufferlist>* rmattrs;
2796         const bufferlist *data;
2797         RGWObjManifest *manifest;
2798         const string *ptag;
2799         list<rgw_obj_index_key> *remove_objs;
2800         ceph::real_time set_mtime;
2801         rgw_user owner;
2802         RGWObjCategory category;
2803         int flags;
2804         const char *if_match;
2805         const char *if_nomatch;
2806         uint64_t olh_epoch;
2807         ceph::real_time delete_at;
2808         bool canceled;
2809         const string *user_data;
2810         rgw_zone_set *zones_trace;
2811         bool modify_tail;
2812         bool completeMultipart;
2813
2814         MetaParams() : mtime(NULL), rmattrs(NULL), data(NULL), manifest(NULL), ptag(NULL),
2815                  remove_objs(NULL), category(RGW_OBJ_CATEGORY_MAIN), flags(0),
2816                  if_match(NULL), if_nomatch(NULL), olh_epoch(0), canceled(false), user_data(nullptr), zones_trace(nullptr),
2817                  modify_tail(false),  completeMultipart(false) {}
2818       } meta;
2819
2820       explicit Write(RGWRados::Object *_target) : target(_target) {}
2821
2822       int _do_write_meta(uint64_t size, uint64_t accounted_size,
2823                      map<std::string, bufferlist>& attrs,
2824                      bool modify_tail, bool assume_noent,
2825                      void *index_op);
2826       int write_meta(uint64_t size, uint64_t accounted_size,
2827                      map<std::string, bufferlist>& attrs);
2828       int write_data(const char *data, uint64_t ofs, uint64_t len, bool exclusive);
2829     };
2830
2831     struct Delete {
2832       RGWRados::Object *target;
2833
2834       struct DeleteParams {
2835         rgw_user bucket_owner;
2836         int versioning_status;
2837         ACLOwner obj_owner; /* needed for creation of deletion marker */
2838         uint64_t olh_epoch;
2839         string marker_version_id;
2840         uint32_t bilog_flags;
2841         list<rgw_obj_index_key> *remove_objs;
2842         ceph::real_time expiration_time;
2843         ceph::real_time unmod_since;
2844         ceph::real_time mtime; /* for setting delete marker mtime */
2845         bool high_precision_time;
2846         rgw_zone_set *zones_trace;
2847
2848         DeleteParams() : versioning_status(0), olh_epoch(0), bilog_flags(0), remove_objs(NULL), high_precision_time(false), zones_trace(nullptr) {}
2849       } params;
2850
2851       struct DeleteResult {
2852         bool delete_marker;
2853         string version_id;
2854
2855         DeleteResult() : delete_marker(false) {}
2856       } result;
2857       
2858       explicit Delete(RGWRados::Object *_target) : target(_target) {}
2859
2860       int delete_obj();
2861     };
2862
2863     struct Stat {
2864       RGWRados::Object *source;
2865
2866       struct Result {
2867         rgw_obj obj;
2868         RGWObjManifest manifest;
2869         bool has_manifest;
2870         uint64_t size;
2871         struct timespec mtime;
2872         map<string, bufferlist> attrs;
2873
2874         Result() : has_manifest(false), size(0) {}
2875       } result;
2876
2877       struct State {
2878         librados::IoCtx io_ctx;
2879         librados::AioCompletion *completion;
2880         int ret;
2881
2882         State() : completion(NULL), ret(0) {}
2883       } state;
2884
2885
2886       explicit Stat(RGWRados::Object *_source) : source(_source) {}
2887
2888       int stat_async();
2889       int wait();
2890       int stat();
2891     private:
2892       int finish();
2893     };
2894   };
2895
2896   class Bucket {
2897     RGWRados *store;
2898     RGWBucketInfo bucket_info;
2899     rgw_bucket& bucket;
2900     int shard_id;
2901
2902   public:
2903     Bucket(RGWRados *_store, const RGWBucketInfo& _bucket_info) : store(_store), bucket_info(_bucket_info), bucket(bucket_info.bucket),
2904                                                             shard_id(RGW_NO_SHARD) {}
2905     RGWRados *get_store() { return store; }
2906     rgw_bucket& get_bucket() { return bucket; }
2907     RGWBucketInfo& get_bucket_info() { return bucket_info; }
2908
2909     int update_bucket_id(const string& new_bucket_id);
2910
2911     int get_shard_id() { return shard_id; }
2912     void set_shard_id(int id) {
2913       shard_id = id;
2914     }
2915
2916     class UpdateIndex {
2917       RGWRados::Bucket *target;
2918       string optag;
2919       rgw_obj obj;
2920       uint16_t bilog_flags{0};
2921       BucketShard bs;
2922       bool bs_initialized{false};
2923       bool blind;
2924       bool prepared{false};
2925       rgw_zone_set *zones_trace{nullptr};
2926
2927       int init_bs() {
2928         int r = bs.init(target->get_bucket(), obj);
2929         if (r < 0) {
2930           return r;
2931         }
2932         bs_initialized = true;
2933         return 0;
2934       }
2935
2936       void invalidate_bs() {
2937         bs_initialized = false;
2938       }
2939
2940       int guard_reshard(BucketShard **pbs, std::function<int(BucketShard *)> call);
2941     public:
2942
2943       UpdateIndex(RGWRados::Bucket *_target, const rgw_obj& _obj) : target(_target), obj(_obj),
2944                                                               bs(target->get_store()) {
2945                                                                 blind = (target->get_bucket_info().index_type == RGWBIType_Indexless);
2946                                                               }
2947
2948       int get_bucket_shard(BucketShard **pbs) {
2949         if (!bs_initialized) {
2950           int r = init_bs();
2951           if (r < 0) {
2952             return r;
2953           }
2954         }
2955         *pbs = &bs;
2956         return 0;
2957       }
2958
2959       void set_bilog_flags(uint16_t flags) {
2960         bilog_flags = flags;
2961       }
2962       
2963       void set_zones_trace(rgw_zone_set *_zones_trace) {
2964         zones_trace = _zones_trace;
2965       }
2966
2967       int prepare(RGWModifyOp, const string *write_tag);
2968       int complete(int64_t poolid, uint64_t epoch, uint64_t size,
2969                    uint64_t accounted_size, ceph::real_time& ut,
2970                    const string& etag, const string& content_type,
2971                    bufferlist *acl_bl, RGWObjCategory category,
2972                    list<rgw_obj_index_key> *remove_objs, const string *user_data = nullptr);
2973       int complete_del(int64_t poolid, uint64_t epoch,
2974                        ceph::real_time& removed_mtime, /* mtime of removed object */
2975                        list<rgw_obj_index_key> *remove_objs);
2976       int cancel();
2977
2978       const string *get_optag() { return &optag; }
2979
2980       bool is_prepared() { return prepared; }
2981     };
2982
2983     struct List {
2984       RGWRados::Bucket *target;
2985       rgw_obj_key next_marker;
2986
2987       struct Params {
2988         string prefix;
2989         string delim;
2990         rgw_obj_key marker;
2991         rgw_obj_key end_marker;
2992         string ns;
2993         bool enforce_ns;
2994         RGWAccessListFilter *filter;
2995         bool list_versions;
2996
2997         Params() : enforce_ns(true), filter(NULL), list_versions(false) {}
2998       } params;
2999
3000     public:
3001       explicit List(RGWRados::Bucket *_target) : target(_target) {}
3002
3003       int list_objects(int64_t max, vector<rgw_bucket_dir_entry> *result, map<string, bool> *common_prefixes, bool *is_truncated);
3004       rgw_obj_key& get_next_marker() {
3005         return next_marker;
3006       }
3007     };
3008   };
3009
3010   /** Write/overwrite an object to the bucket storage. */
3011   virtual int put_system_obj_impl(rgw_raw_obj& obj, uint64_t size, ceph::real_time *mtime,
3012               map<std::string, bufferlist>& attrs, int flags,
3013               bufferlist& data,
3014               RGWObjVersionTracker *objv_tracker,
3015               ceph::real_time set_mtime /* 0 for don't set */);
3016
3017   virtual int put_system_obj_data(void *ctx, rgw_raw_obj& obj, bufferlist& bl,
3018               off_t ofs, bool exclusive,
3019               RGWObjVersionTracker *objv_tracker = nullptr);
3020   int aio_put_obj_data(void *ctx, rgw_raw_obj& obj, bufferlist& bl,
3021                         off_t ofs, bool exclusive, void **handle);
3022
3023   int put_system_obj(void *ctx, rgw_raw_obj& obj, const char *data, size_t len, bool exclusive,
3024               ceph::real_time *mtime, map<std::string, bufferlist>& attrs, RGWObjVersionTracker *objv_tracker,
3025               ceph::real_time set_mtime) {
3026     bufferlist bl;
3027     bl.append(data, len);
3028     int flags = PUT_OBJ_CREATE;
3029     if (exclusive)
3030       flags |= PUT_OBJ_EXCL;
3031
3032     return put_system_obj_impl(obj, len, mtime, attrs, flags, bl, objv_tracker, set_mtime);
3033   }
3034   int aio_wait(void *handle);
3035   bool aio_completed(void *handle);
3036
3037   int on_last_entry_in_listing(RGWBucketInfo& bucket_info,
3038                                const std::string& obj_prefix,
3039                                const std::string& obj_delim,
3040                                std::function<int(const rgw_bucket_dir_entry&)> handler);
3041
3042   bool swift_versioning_enabled(const RGWBucketInfo& bucket_info) const {
3043     return bucket_info.has_swift_versioning() &&
3044         bucket_info.swift_ver_location.size();
3045   }
3046
3047   int swift_versioning_copy(RGWObjectCtx& obj_ctx,              /* in/out */
3048                             const rgw_user& user,               /* in */
3049                             RGWBucketInfo& bucket_info,         /* in */
3050                             rgw_obj& obj);                      /* in */
3051   int swift_versioning_restore(RGWObjectCtx& obj_ctx,           /* in/out */
3052                                const rgw_user& user,            /* in */
3053                                RGWBucketInfo& bucket_info,      /* in */
3054                                rgw_obj& obj,                    /* in */
3055                                bool& restored);                 /* out */
3056   int copy_obj_to_remote_dest(RGWObjState *astate,
3057                               map<string, bufferlist>& src_attrs,
3058                               RGWRados::Object::Read& read_op,
3059                               const rgw_user& user_id,
3060                               rgw_obj& dest_obj,
3061                               ceph::real_time *mtime);
3062
3063   enum AttrsMod {
3064     ATTRSMOD_NONE    = 0,
3065     ATTRSMOD_REPLACE = 1,
3066     ATTRSMOD_MERGE   = 2
3067   };
3068
3069   int rewrite_obj(RGWBucketInfo& dest_bucket_info, rgw_obj& obj);
3070
3071   int stat_remote_obj(RGWObjectCtx& obj_ctx,
3072                const rgw_user& user_id,
3073                const string& client_id,
3074                req_info *info,
3075                const string& source_zone,
3076                rgw_obj& src_obj,
3077                RGWBucketInfo& src_bucket_info,
3078                real_time *src_mtime,
3079                uint64_t *psize,
3080                const real_time *mod_ptr,
3081                const real_time *unmod_ptr,
3082                bool high_precision_time,
3083                const char *if_match,
3084                const char *if_nomatch,
3085                map<string, bufferlist> *pattrs,
3086                string *version_id,
3087                string *ptag,
3088                string *petag);
3089
3090   int fetch_remote_obj(RGWObjectCtx& obj_ctx,
3091                        const rgw_user& user_id,
3092                        const string& client_id,
3093                        const string& op_id,
3094                        bool record_op_state,
3095                        req_info *info,
3096                        const string& source_zone,
3097                        rgw_obj& dest_obj,
3098                        rgw_obj& src_obj,
3099                        RGWBucketInfo& dest_bucket_info,
3100                        RGWBucketInfo& src_bucket_info,
3101                        ceph::real_time *src_mtime,
3102                        ceph::real_time *mtime,
3103                        const ceph::real_time *mod_ptr,
3104                        const ceph::real_time *unmod_ptr,
3105                        bool high_precision_time,
3106                        const char *if_match,
3107                        const char *if_nomatch,
3108                        AttrsMod attrs_mod,
3109                        bool copy_if_newer,
3110                        map<string, bufferlist>& attrs,
3111                        RGWObjCategory category,
3112                        uint64_t olh_epoch,
3113                        ceph::real_time delete_at,
3114                        string *version_id,
3115                        string *ptag,
3116                        ceph::buffer::list *petag,
3117                        void (*progress_cb)(off_t, void *),
3118                        void *progress_data,
3119                        rgw_zone_set *zones_trace= nullptr);
3120   /**
3121    * Copy an object.
3122    * dest_obj: the object to copy into
3123    * src_obj: the object to copy from
3124    * attrs: usage depends on attrs_mod parameter
3125    * attrs_mod: the modification mode of the attrs, may have the following values:
3126    *            ATTRSMOD_NONE - the attributes of the source object will be
3127    *                            copied without modifications, attrs parameter is ignored;
3128    *            ATTRSMOD_REPLACE - new object will have the attributes provided by attrs
3129    *                               parameter, source object attributes are not copied;
3130    *            ATTRSMOD_MERGE - any conflicting meta keys on the source object's attributes
3131    *                             are overwritten by values contained in attrs parameter.
3132    * Returns: 0 on success, -ERR# otherwise.
3133    */
3134   int copy_obj(RGWObjectCtx& obj_ctx,
3135                const rgw_user& user_id,
3136                const string& client_id,
3137                const string& op_id,
3138                req_info *info,
3139                const string& source_zone,
3140                rgw_obj& dest_obj,
3141                rgw_obj& src_obj,
3142                RGWBucketInfo& dest_bucket_info,
3143                RGWBucketInfo& src_bucket_info,
3144                ceph::real_time *src_mtime,
3145                ceph::real_time *mtime,
3146                const ceph::real_time *mod_ptr,
3147                const ceph::real_time *unmod_ptr,
3148                bool high_precision_time,
3149                const char *if_match,
3150                const char *if_nomatch,
3151                AttrsMod attrs_mod,
3152                bool copy_if_newer,
3153                map<std::string, bufferlist>& attrs,
3154                RGWObjCategory category,
3155                uint64_t olh_epoch,
3156                ceph::real_time delete_at,
3157                string *version_id,
3158                string *ptag,
3159                ceph::buffer::list *petag,
3160                void (*progress_cb)(off_t, void *),
3161                void *progress_data);
3162
3163   int copy_obj_data(RGWObjectCtx& obj_ctx,
3164                RGWBucketInfo& dest_bucket_info,
3165                RGWRados::Object::Read& read_op, off_t end,
3166                rgw_obj& dest_obj,
3167                rgw_obj& src_obj,
3168                uint64_t max_chunk_size,
3169                ceph::real_time *mtime,
3170                ceph::real_time set_mtime,
3171                map<string, bufferlist>& attrs,
3172                RGWObjCategory category,
3173                uint64_t olh_epoch,
3174                ceph::real_time delete_at,
3175                string *version_id,
3176                string *ptag,
3177                ceph::buffer::list *petag);
3178   
3179   int check_bucket_empty(RGWBucketInfo& bucket_info);
3180
3181   /**
3182    * Delete a bucket.
3183    * bucket: the name of the bucket to delete
3184    * Returns 0 on success, -ERR# otherwise.
3185    */
3186   int delete_bucket(RGWBucketInfo& bucket_info, RGWObjVersionTracker& objv_tracker, bool check_empty = true);
3187
3188   bool is_meta_master();
3189
3190   /**
3191    * Check to see if the bucket metadata is synced
3192    */
3193   bool is_syncing_bucket_meta(const rgw_bucket& bucket);
3194   void wakeup_meta_sync_shards(set<int>& shard_ids);
3195   void wakeup_data_sync_shards(const string& source_zone, map<int, set<string> >& shard_ids);
3196
3197   RGWMetaSyncStatusManager* get_meta_sync_manager();
3198   RGWDataSyncStatusManager* get_data_sync_manager(const std::string& source_zone);
3199
3200   int set_bucket_owner(rgw_bucket& bucket, ACLOwner& owner);
3201   int set_buckets_enabled(std::vector<rgw_bucket>& buckets, bool enabled);
3202   int bucket_suspended(rgw_bucket& bucket, bool *suspended);
3203
3204   /** Delete an object.*/
3205   int delete_obj(RGWObjectCtx& obj_ctx,
3206                          const RGWBucketInfo& bucket_owner,
3207                          const rgw_obj& src_obj,
3208                          int versioning_status,
3209                          uint16_t bilog_flags = 0,
3210                          const ceph::real_time& expiration_time = ceph::real_time(),
3211                          rgw_zone_set *zones_trace = nullptr);
3212
3213   /** Delete a raw object.*/
3214   int delete_raw_obj(const rgw_raw_obj& obj);
3215
3216   /* Delete a system object */
3217   virtual int delete_system_obj(rgw_raw_obj& src_obj, RGWObjVersionTracker *objv_tracker = NULL);
3218
3219   /** Remove an object from the bucket index */
3220   int delete_obj_index(const rgw_obj& obj);
3221
3222   /**
3223    * Get an attribute for a system object.
3224    * obj: the object to get attr
3225    * name: name of the attr to retrieve
3226    * dest: bufferlist to store the result in
3227    * Returns: 0 on success, -ERR# otherwise.
3228    */
3229   virtual int system_obj_get_attr(rgw_raw_obj& obj, const char *name, bufferlist& dest);
3230
3231   int system_obj_set_attr(void *ctx, rgw_raw_obj& obj, const char *name, bufferlist& bl,
3232                           RGWObjVersionTracker *objv_tracker);
3233   virtual int system_obj_set_attrs(void *ctx, rgw_raw_obj& obj,
3234                                    map<string, bufferlist>& attrs,
3235                                    map<string, bufferlist>* rmattrs,
3236                                    RGWObjVersionTracker *objv_tracker);
3237
3238   /**
3239    * Set an attr on an object.
3240    * bucket: name of the bucket holding the object
3241    * obj: name of the object to set the attr on
3242    * name: the attr to set
3243    * bl: the contents of the attr
3244    * Returns: 0 on success, -ERR# otherwise.
3245    */
3246   int set_attr(void *ctx, const RGWBucketInfo& bucket_info, rgw_obj& obj, const char *name, bufferlist& bl);
3247
3248   int set_attrs(void *ctx, const RGWBucketInfo& bucket_info, rgw_obj& obj,
3249                         map<string, bufferlist>& attrs,
3250                         map<string, bufferlist>* rmattrs);
3251
3252   int get_system_obj_state(RGWObjectCtx *rctx, rgw_raw_obj& obj, RGWRawObjState **state, RGWObjVersionTracker *objv_tracker);
3253   int get_obj_state(RGWObjectCtx *rctx, const RGWBucketInfo& bucket_info, const rgw_obj& obj, RGWObjState **state,
3254                     bool follow_olh, bool assume_noent = false);
3255   int get_obj_state(RGWObjectCtx *rctx, const RGWBucketInfo& bucket_info, const rgw_obj& obj, RGWObjState **state) {
3256     return get_obj_state(rctx, bucket_info, obj, state, true);
3257   }
3258
3259   virtual int stat_system_obj(RGWObjectCtx& obj_ctx,
3260                               RGWRados::SystemObject::Read::GetObjState& state,
3261                               rgw_raw_obj& obj,
3262                               map<string, bufferlist> *attrs,
3263                               ceph::real_time *lastmod,
3264                               uint64_t *obj_size,
3265                               RGWObjVersionTracker *objv_tracker);
3266
3267   virtual int get_system_obj(RGWObjectCtx& obj_ctx, RGWRados::SystemObject::Read::GetObjState& read_state,
3268                              RGWObjVersionTracker *objv_tracker, rgw_raw_obj& obj,
3269                              bufferlist& bl, off_t ofs, off_t end,
3270                              map<string, bufferlist> *attrs,
3271                              rgw_cache_entry_info *cache_info);
3272
3273   virtual void register_chained_cache(RGWChainedCache *cache) {}
3274   virtual bool chain_cache_entry(list<rgw_cache_entry_info *>& cache_info_entries, RGWChainedCache::Entry *chained_entry) { return false; }
3275
3276   int iterate_obj(RGWObjectCtx& ctx,
3277                   const RGWBucketInfo& bucket_info, const rgw_obj& obj,
3278                   off_t ofs, off_t end,
3279                   uint64_t max_chunk_size,
3280                   int (*iterate_obj_cb)(const RGWBucketInfo& bucket_info, const rgw_obj& obj, const rgw_raw_obj&, off_t, off_t, off_t, bool, RGWObjState *, void *),
3281                   void *arg);
3282
3283   int flush_read_list(struct get_obj_data *d);
3284
3285   int get_obj_iterate_cb(RGWObjectCtx *ctx, RGWObjState *astate,
3286                          const RGWBucketInfo& bucket_info, const rgw_obj& obj,
3287                          const rgw_raw_obj& read_obj,
3288                          off_t obj_ofs, off_t read_ofs, off_t len,
3289                          bool is_head_obj, void *arg);
3290
3291   void get_obj_aio_completion_cb(librados::completion_t cb, void *arg);
3292
3293   /**
3294    * a simple object read without keeping state
3295    */
3296
3297   virtual int raw_obj_stat(rgw_raw_obj& obj, uint64_t *psize, ceph::real_time *pmtime, uint64_t *epoch,
3298                        map<string, bufferlist> *attrs, bufferlist *first_chunk,
3299                        RGWObjVersionTracker *objv_tracker);
3300
3301   int obj_operate(const RGWBucketInfo& bucket_info, const rgw_obj& obj, librados::ObjectWriteOperation *op);
3302   int obj_operate(const RGWBucketInfo& bucket_info, const rgw_obj& obj, librados::ObjectReadOperation *op);
3303
3304   int guard_reshard(BucketShard *bs, const rgw_obj& obj_instance, std::function<int(BucketShard *)> call);
3305   int block_while_resharding(RGWRados::BucketShard *bs, string *new_bucket_id);
3306
3307   void bucket_index_guard_olh_op(RGWObjState& olh_state, librados::ObjectOperation& op);
3308   int olh_init_modification(const RGWBucketInfo& bucket_info, RGWObjState& state, const rgw_obj& olh_obj, string *op_tag);
3309   int olh_init_modification_impl(const RGWBucketInfo& bucket_info, RGWObjState& state, const rgw_obj& olh_obj, string *op_tag);
3310   int bucket_index_link_olh(const RGWBucketInfo& bucket_info, RGWObjState& olh_state,
3311                             const rgw_obj& obj_instance, bool delete_marker,
3312                             const string& op_tag, struct rgw_bucket_dir_entry_meta *meta,
3313                             uint64_t olh_epoch,
3314                             ceph::real_time unmod_since, bool high_precision_time, rgw_zone_set *zones_trace = nullptr);
3315   int bucket_index_unlink_instance(const RGWBucketInfo& bucket_info, const rgw_obj& obj_instance, const string& op_tag, const string& olh_tag, uint64_t olh_epoch, rgw_zone_set *zones_trace = nullptr);
3316   int bucket_index_read_olh_log(const RGWBucketInfo& bucket_info, RGWObjState& state, const rgw_obj& obj_instance, uint64_t ver_marker,
3317                                 map<uint64_t, vector<rgw_bucket_olh_log_entry> > *log, bool *is_truncated);
3318   int bucket_index_trim_olh_log(const RGWBucketInfo& bucket_info, RGWObjState& obj_state, const rgw_obj& obj_instance, uint64_t ver);
3319   int bucket_index_clear_olh(const RGWBucketInfo& bucket_info, RGWObjState& state, const rgw_obj& obj_instance);
3320   int apply_olh_log(RGWObjectCtx& ctx, RGWObjState& obj_state, const RGWBucketInfo& bucket_info, const rgw_obj& obj,
3321                     bufferlist& obj_tag, map<uint64_t, vector<rgw_bucket_olh_log_entry> >& log,
3322                     uint64_t *plast_ver, rgw_zone_set *zones_trace = nullptr);
3323   int update_olh(RGWObjectCtx& obj_ctx, RGWObjState *state, const RGWBucketInfo& bucket_info, const rgw_obj& obj, rgw_zone_set *zones_trace = nullptr);
3324   int set_olh(RGWObjectCtx& obj_ctx, RGWBucketInfo& bucket_info, const rgw_obj& target_obj, bool delete_marker, rgw_bucket_dir_entry_meta *meta,
3325               uint64_t olh_epoch, ceph::real_time unmod_since, bool high_precision_time, rgw_zone_set *zones_trace = nullptr);
3326   int unlink_obj_instance(RGWObjectCtx& obj_ctx, RGWBucketInfo& bucket_info, const rgw_obj& target_obj,
3327                           uint64_t olh_epoch, rgw_zone_set *zones_trace = nullptr);
3328
3329   void check_pending_olh_entries(map<string, bufferlist>& pending_entries, map<string, bufferlist> *rm_pending_entries);
3330   int remove_olh_pending_entries(const RGWBucketInfo& bucket_info, RGWObjState& state, const rgw_obj& olh_obj, map<string, bufferlist>& pending_attrs);
3331   int follow_olh(const RGWBucketInfo& bucket_info, RGWObjectCtx& ctx, RGWObjState *state, const rgw_obj& olh_obj, rgw_obj *target);
3332   int get_olh(const RGWBucketInfo& bucket_info, const rgw_obj& obj, RGWOLHInfo *olh);
3333
3334   void gen_rand_obj_instance_name(rgw_obj *target);
3335
3336   int omap_get_vals(rgw_raw_obj& obj, bufferlist& header, const std::string& marker, uint64_t count, std::map<string, bufferlist>& m);
3337   int omap_get_all(rgw_raw_obj& obj, bufferlist& header, std::map<string, bufferlist>& m);
3338   int omap_set(rgw_raw_obj& obj, const std::string& key, bufferlist& bl);
3339   int omap_set(rgw_raw_obj& obj, map<std::string, bufferlist>& m);
3340   int omap_del(rgw_raw_obj& obj, const std::string& key);
3341   int update_containers_stats(map<string, RGWBucketEnt>& m);
3342   int append_async(rgw_raw_obj& obj, size_t size, bufferlist& bl);
3343
3344   int watch(const string& oid, uint64_t *watch_handle, librados::WatchCtx2 *ctx);
3345   int unwatch(uint64_t watch_handle);
3346   void add_watcher(int i);
3347   void remove_watcher(int i);
3348   virtual bool need_watch_notify() { return false; }
3349   int init_watch();
3350   void finalize_watch();
3351   int distribute(const string& key, bufferlist& bl);
3352   virtual int watch_cb(uint64_t notify_id,
3353                        uint64_t cookie,
3354                        uint64_t notifier_id,
3355                        bufferlist& bl) { return 0; }
3356   void pick_control_oid(const string& key, string& notify_oid);
3357
3358   virtual void set_cache_enabled(bool state) {}
3359
3360   void set_atomic(void *ctx, rgw_obj& obj) {
3361     RGWObjectCtx *rctx = static_cast<RGWObjectCtx *>(ctx);
3362     rctx->obj.set_atomic(obj);
3363   }
3364   void set_prefetch_data(void *ctx, rgw_obj& obj) {
3365     RGWObjectCtx *rctx = static_cast<RGWObjectCtx *>(ctx);
3366     rctx->obj.set_prefetch_data(obj);
3367   }
3368   void set_prefetch_data(void *ctx, rgw_raw_obj& obj) {
3369     RGWObjectCtx *rctx = static_cast<RGWObjectCtx *>(ctx);
3370     rctx->raw.set_prefetch_data(obj);
3371   }
3372
3373   int decode_policy(bufferlist& bl, ACLOwner *owner);
3374   int get_bucket_stats(RGWBucketInfo& bucket_info, int shard_id, string *bucket_ver, string *master_ver,
3375       map<RGWObjCategory, RGWStorageStats>& stats, string *max_marker, bool* syncstopped = NULL);
3376   int get_bucket_stats_async(RGWBucketInfo& bucket_info, int shard_id, RGWGetBucketStats_CB *cb);
3377   int get_user_stats(const rgw_user& user, RGWStorageStats& stats);
3378   int get_user_stats_async(const rgw_user& user, RGWGetUserStats_CB *cb);
3379   void get_bucket_instance_obj(const rgw_bucket& bucket, rgw_raw_obj& obj);
3380   void get_bucket_meta_oid(const rgw_bucket& bucket, string& oid);
3381
3382   int put_bucket_entrypoint_info(const string& tenant_name, const string& bucket_name, RGWBucketEntryPoint& entry_point,
3383                                  bool exclusive, RGWObjVersionTracker& objv_tracker, ceph::real_time mtime,
3384                                  map<string, bufferlist> *pattrs);
3385   int put_bucket_instance_info(RGWBucketInfo& info, bool exclusive, ceph::real_time mtime, map<string, bufferlist> *pattrs);
3386   int get_bucket_entrypoint_info(RGWObjectCtx& obj_ctx, const string& tenant_name, const string& bucket_name,
3387                                  RGWBucketEntryPoint& entry_point, RGWObjVersionTracker *objv_tracker,
3388                                  ceph::real_time *pmtime, map<string, bufferlist> *pattrs, rgw_cache_entry_info *cache_info = NULL);
3389   int get_bucket_instance_info(RGWObjectCtx& obj_ctx, const string& meta_key, RGWBucketInfo& info, ceph::real_time *pmtime, map<string, bufferlist> *pattrs);
3390   int get_bucket_instance_info(RGWObjectCtx& obj_ctx, const rgw_bucket& bucket, RGWBucketInfo& info, ceph::real_time *pmtime, map<string, bufferlist> *pattrs);
3391   int get_bucket_instance_from_oid(RGWObjectCtx& obj_ctx, const string& oid, RGWBucketInfo& info, ceph::real_time *pmtime, map<string, bufferlist> *pattrs,
3392                                    rgw_cache_entry_info *cache_info = NULL);
3393
3394   int convert_old_bucket_info(RGWObjectCtx& obj_ctx, const string& tenant_name, const string& bucket_name);
3395   static void make_bucket_entry_name(const string& tenant_name, const string& bucket_name, string& bucket_entry);
3396   int get_bucket_info(RGWObjectCtx& obj_ctx,
3397                               const string& tenant_name, const string& bucket_name,
3398                               RGWBucketInfo& info,
3399                               ceph::real_time *pmtime, map<string, bufferlist> *pattrs = NULL);
3400   int put_linked_bucket_info(RGWBucketInfo& info, bool exclusive, ceph::real_time mtime, obj_version *pep_objv,
3401                                      map<string, bufferlist> *pattrs, bool create_entry_point);
3402
3403   int cls_rgw_init_index(librados::IoCtx& io_ctx, librados::ObjectWriteOperation& op, string& oid);
3404   int cls_obj_prepare_op(BucketShard& bs, RGWModifyOp op, string& tag, rgw_obj& obj, uint16_t bilog_flags, rgw_zone_set *zones_trace = nullptr);
3405   int cls_obj_complete_op(BucketShard& bs, const rgw_obj& obj, RGWModifyOp op, string& tag, int64_t pool, uint64_t epoch,
3406                           rgw_bucket_dir_entry& ent, RGWObjCategory category, list<rgw_obj_index_key> *remove_objs, uint16_t bilog_flags, rgw_zone_set *zones_trace = nullptr);
3407   int cls_obj_complete_add(BucketShard& bs, const rgw_obj& obj, string& tag, int64_t pool, uint64_t epoch, rgw_bucket_dir_entry& ent,
3408                            RGWObjCategory category, list<rgw_obj_index_key> *remove_objs, uint16_t bilog_flags, rgw_zone_set *zones_trace = nullptr);
3409   int cls_obj_complete_del(BucketShard& bs, string& tag, int64_t pool, uint64_t epoch, rgw_obj& obj,
3410                            ceph::real_time& removed_mtime, list<rgw_obj_index_key> *remove_objs, uint16_t bilog_flags, rgw_zone_set *zones_trace = nullptr);
3411   int cls_obj_complete_cancel(BucketShard& bs, string& tag, rgw_obj& obj, uint16_t bilog_flags, rgw_zone_set *zones_trace = nullptr);
3412   int cls_obj_set_bucket_tag_timeout(RGWBucketInfo& bucket_info, uint64_t timeout);
3413   int cls_bucket_list(RGWBucketInfo& bucket_info, int shard_id, rgw_obj_index_key& start, const string& prefix,
3414                       uint32_t num_entries, bool list_versions, map<string, rgw_bucket_dir_entry>& m,
3415                       bool *is_truncated, rgw_obj_index_key *last_entry,
3416                       bool (*force_check_filter)(const string&  name) = NULL);
3417   int cls_bucket_head(const RGWBucketInfo& bucket_info, int shard_id, map<string, struct rgw_bucket_dir_header>& headers, map<int, string> *bucket_instance_ids = NULL);
3418   int cls_bucket_head_async(const RGWBucketInfo& bucket_info, int shard_id, RGWGetDirHeader_CB *ctx, int *num_aio);
3419   int list_bi_log_entries(RGWBucketInfo& bucket_info, int shard_id, string& marker, uint32_t max, std::list<rgw_bi_log_entry>& result, bool *truncated);
3420   int trim_bi_log_entries(RGWBucketInfo& bucket_info, int shard_id, string& marker, string& end_marker);
3421   int resync_bi_log_entries(RGWBucketInfo& bucket_info, int shard_id);
3422   int stop_bi_log_entries(RGWBucketInfo& bucket_info, int shard_id);
3423   int get_bi_log_status(RGWBucketInfo& bucket_info, int shard_id, map<int, string>& max_marker);
3424
3425   int bi_get_instance(const RGWBucketInfo& bucket_info, rgw_obj& obj, rgw_bucket_dir_entry *dirent);
3426   int bi_get(rgw_bucket& bucket, rgw_obj& obj, BIIndexType index_type, rgw_cls_bi_entry *entry);
3427   void bi_put(librados::ObjectWriteOperation& op, BucketShard& bs, rgw_cls_bi_entry& entry);
3428   int bi_put(BucketShard& bs, rgw_cls_bi_entry& entry);
3429   int bi_put(rgw_bucket& bucket, rgw_obj& obj, rgw_cls_bi_entry& entry);
3430   int bi_list(rgw_bucket& bucket, int shard_id, const string& filter_obj, const string& marker, uint32_t max, list<rgw_cls_bi_entry> *entries, bool *is_truncated);
3431   int bi_list(BucketShard& bs, const string& filter_obj, const string& marker, uint32_t max, list<rgw_cls_bi_entry> *entries, bool *is_truncated);
3432   int bi_list(rgw_bucket& bucket, const string& obj_name, const string& marker, uint32_t max,
3433               list<rgw_cls_bi_entry> *entries, bool *is_truncated);
3434   int bi_remove(BucketShard& bs);
3435
3436   int cls_obj_usage_log_add(const string& oid, rgw_usage_log_info& info);
3437   int cls_obj_usage_log_read(string& oid, string& user, uint64_t start_epoch, uint64_t end_epoch, uint32_t max_entries,
3438                              string& read_iter, map<rgw_user_bucket, rgw_usage_log_entry>& usage, bool *is_truncated);
3439   int cls_obj_usage_log_trim(string& oid, string& user, uint64_t start_epoch, uint64_t end_epoch);
3440
3441   int key_to_shard_id(const string& key, int max_shards);
3442   void shard_name(const string& prefix, unsigned max_shards, const string& key, string& name, int *shard_id);
3443   void shard_name(const string& prefix, unsigned max_shards, const string& section, const string& key, string& name);
3444   void shard_name(const string& prefix, unsigned shard_id, string& name);
3445   int get_target_shard_id(const RGWBucketInfo& bucket_info, const string& obj_key, int *shard_id);
3446   void time_log_prepare_entry(cls_log_entry& entry, const ceph::real_time& ut, const string& section, const string& key, bufferlist& bl);
3447   int time_log_add_init(librados::IoCtx& io_ctx);
3448   int time_log_add(const string& oid, list<cls_log_entry>& entries,
3449                    librados::AioCompletion *completion, bool monotonic_inc = true);
3450   int time_log_add(const string& oid, const ceph::real_time& ut, const string& section, const string& key, bufferlist& bl);
3451   int time_log_list(const string& oid, const ceph::real_time& start_time, const ceph::real_time& end_time,
3452                     int max_entries, list<cls_log_entry>& entries,
3453                     const string& marker, string *out_marker, bool *truncated);
3454   int time_log_info(const string& oid, cls_log_header *header);
3455   int time_log_info_async(librados::IoCtx& io_ctx, const string& oid, cls_log_header *header, librados::AioCompletion *completion);
3456   int time_log_trim(const string& oid, const ceph::real_time& start_time, const ceph::real_time& end_time,
3457                     const string& from_marker, const string& to_marker,
3458                     librados::AioCompletion *completion = nullptr);
3459
3460   string objexp_hint_get_shardname(int shard_num);
3461   int objexp_key_shard(const rgw_obj_index_key& key);
3462   void objexp_get_shard(int shard_num,
3463                         string& shard);                       /* out */
3464   int objexp_hint_add(const ceph::real_time& delete_at,
3465                       const string& tenant_name,
3466                       const string& bucket_name,
3467                       const string& bucket_id,
3468                       const rgw_obj_index_key& obj_key);
3469   int objexp_hint_list(const string& oid,
3470                        const ceph::real_time& start_time,
3471                        const ceph::real_time& end_time,
3472                        const int max_entries,
3473                        const string& marker,
3474                        list<cls_timeindex_entry>& entries, /* out */
3475                        string *out_marker,                 /* out */
3476                        bool *truncated);                   /* out */
3477   int objexp_hint_parse(cls_timeindex_entry &ti_entry,
3478                         objexp_hint_entry& hint_entry);    /* out */
3479   int objexp_hint_trim(const string& oid,
3480                        const ceph::real_time& start_time,
3481                        const ceph::real_time& end_time,
3482                        const string& from_marker = std::string(),
3483                        const string& to_marker   = std::string());
3484
3485   int lock_exclusive(rgw_pool& pool, const string& oid, ceph::timespan& duration, string& zone_id, string& owner_id);
3486   int unlock(rgw_pool& pool, const string& oid, string& zone_id, string& owner_id);
3487
3488   void update_gc_chain(rgw_obj& head_obj, RGWObjManifest& manifest, cls_rgw_obj_chain *chain);
3489   int send_chain_to_gc(cls_rgw_obj_chain& chain, const string& tag, bool sync);
3490   int gc_operate(string& oid, librados::ObjectWriteOperation *op);
3491   int gc_aio_operate(string& oid, librados::ObjectWriteOperation *op);
3492   int gc_operate(string& oid, librados::ObjectReadOperation *op, bufferlist *pbl);
3493
3494   int list_gc_objs(int *index, string& marker, uint32_t max, bool expired_only, std::list<cls_rgw_gc_obj_info>& result, bool *truncated);
3495   int process_gc();
3496   int process_expire_objects();
3497   int defer_gc(void *ctx, const RGWBucketInfo& bucket_info, const rgw_obj& obj);
3498
3499   int process_lc();
3500   int list_lc_progress(const string& marker, uint32_t max_entries, map<string, int> *progress_map);
3501   
3502   int bucket_check_index(RGWBucketInfo& bucket_info,
3503                          map<RGWObjCategory, RGWStorageStats> *existing_stats,
3504                          map<RGWObjCategory, RGWStorageStats> *calculated_stats);
3505   int bucket_rebuild_index(RGWBucketInfo& bucket_info);
3506   int bucket_set_reshard(RGWBucketInfo& bucket_info, const cls_rgw_bucket_instance_entry& entry);
3507   int remove_objs_from_index(RGWBucketInfo& bucket_info, list<rgw_obj_index_key>& oid_list);
3508   int move_rados_obj(librados::IoCtx& src_ioctx,
3509                      const string& src_oid, const string& src_locator,
3510                      librados::IoCtx& dst_ioctx,
3511                      const string& dst_oid, const string& dst_locator);
3512   int fix_head_obj_locator(const RGWBucketInfo& bucket_info, bool copy_obj, bool remove_bad, rgw_obj_key& key);
3513   int fix_tail_obj_locator(const RGWBucketInfo& bucket_info, rgw_obj_key& key, bool fix, bool *need_fix);
3514
3515   int cls_user_get_header(const string& user_id, cls_user_header *header);
3516   int cls_user_get_header_async(const string& user_id, RGWGetUserHeader_CB *ctx);
3517   int cls_user_sync_bucket_stats(rgw_raw_obj& user_obj, const RGWBucketInfo& bucket_info);
3518   int cls_user_list_buckets(rgw_raw_obj& obj,
3519                             const string& in_marker,
3520                             const string& end_marker,
3521                             int max_entries,
3522                             list<cls_user_bucket_entry>& entries,
3523                             string *out_marker,
3524                             bool *truncated);
3525   int cls_user_add_bucket(rgw_raw_obj& obj, const cls_user_bucket_entry& entry);
3526   int cls_user_update_buckets(rgw_raw_obj& obj, list<cls_user_bucket_entry>& entries, bool add);
3527   int cls_user_complete_stats_sync(rgw_raw_obj& obj);
3528   int complete_sync_user_stats(const rgw_user& user_id);
3529   int cls_user_add_bucket(rgw_raw_obj& obj, list<cls_user_bucket_entry>& entries);
3530   int cls_user_remove_bucket(rgw_raw_obj& obj, const cls_user_bucket& bucket);
3531   int cls_user_get_bucket_stats(const rgw_bucket& bucket, cls_user_bucket_entry& entry);
3532
3533   int check_quota(const rgw_user& bucket_owner, rgw_bucket& bucket,
3534                   RGWQuotaInfo& user_quota, RGWQuotaInfo& bucket_quota, uint64_t obj_size);
3535
3536   int check_bucket_shards(const RGWBucketInfo& bucket_info, const rgw_bucket& bucket,
3537                           RGWQuotaInfo& bucket_quota);
3538
3539   int add_bucket_to_reshard(const RGWBucketInfo& bucket_info, uint32_t new_num_shards);
3540
3541   uint64_t instance_id();
3542   const string& zone_name() {
3543     return get_zone_params().get_name();
3544   }
3545   const string& zone_id() {
3546     return get_zone_params().get_id();
3547   }
3548   string unique_id(uint64_t unique_num) {
3549     char buf[32];
3550     snprintf(buf, sizeof(buf), ".%llu.%llu", (unsigned long long)instance_id(), (unsigned long long)unique_num);
3551     string s = get_zone_params().get_id() + buf;
3552     return s;
3553   }
3554
3555   void init_unique_trans_id_deps() {
3556     char buf[16 + 2 + 1]; /* uint64_t needs 16, 2 hyphens add further 2 */
3557
3558     snprintf(buf, sizeof(buf), "-%llx-", (unsigned long long)instance_id());
3559     url_encode(string(buf) + get_zone_params().get_name(), trans_id_suffix);
3560   }
3561
3562   /* In order to preserve compability with Swift API, transaction ID
3563    * should contain at least 32 characters satisfying following spec:
3564    *  - first 21 chars must be in range [0-9a-f]. Swift uses this
3565    *    space for storing fragment of UUID obtained through a call to
3566    *    uuid4() function of Python's uuid module;
3567    *  - char no. 22 must be a hyphen;
3568    *  - at least 10 next characters constitute hex-formatted timestamp
3569    *    padded with zeroes if necessary. All bytes must be in [0-9a-f]
3570    *    range;
3571    *  - last, optional part of transaction ID is any url-encoded string
3572    *    without restriction on length. */
3573   string unique_trans_id(const uint64_t unique_num) {
3574     char buf[41]; /* 2 + 21 + 1 + 16 (timestamp can consume up to 16) + 1 */
3575     time_t timestamp = time(NULL);
3576
3577     snprintf(buf, sizeof(buf), "tx%021llx-%010llx",
3578              (unsigned long long)unique_num,
3579              (unsigned long long)timestamp);
3580
3581     return string(buf) + trans_id_suffix;
3582   }
3583
3584   void get_log_pool(rgw_pool& pool) {
3585     pool = get_zone_params().log_pool;
3586   }
3587
3588   bool need_to_log_data() {
3589     return get_zone().log_data;
3590   }
3591
3592   bool need_to_log_metadata() {
3593     return is_meta_master() &&
3594       (get_zonegroup().zones.size() > 1 || current_period.is_multi_zonegroups_with_zones());
3595   }
3596
3597   bool can_reshard() const {
3598     return current_period.get_id().empty() ||
3599       (zonegroup.zones.size() == 1 && current_period.is_single_zonegroup());
3600   }
3601
3602   librados::Rados* get_rados_handle();
3603
3604   int delete_raw_obj_aio(const rgw_raw_obj& obj, list<librados::AioCompletion *>& handles);
3605   int delete_obj_aio(const rgw_obj& obj, RGWBucketInfo& info, RGWObjState *astate,
3606                      list<librados::AioCompletion *>& handles, bool keep_index_consistent);
3607  private:
3608   /**
3609    * This is a helper method, it generates a list of bucket index objects with the given
3610    * bucket base oid and number of shards.
3611    *
3612    * bucket_oid_base [in] - base name of the bucket index object;
3613    * num_shards [in] - number of bucket index object shards.
3614    * bucket_objs [out] - filled by this method, a list of bucket index objects.
3615    */
3616   void get_bucket_index_objects(const string& bucket_oid_base, uint32_t num_shards,
3617       map<int, string>& bucket_objs, int shard_id = -1);
3618
3619   /**
3620    * Get the bucket index object with the given base bucket index object and object key,
3621    * and the number of bucket index shards.
3622    *
3623    * bucket_oid_base [in] - bucket object base name.
3624    * obj_key [in] - object key.
3625    * num_shards [in] - number of bucket index shards.
3626    * hash_type [in] - type of hash to find the shard ID.
3627    * bucket_obj [out] - the bucket index object for the given object.
3628    *
3629    * Return 0 on success, a failure code otherwise.
3630    */
3631   int get_bucket_index_object(const string& bucket_oid_base, const string& obj_key,
3632       uint32_t num_shards, RGWBucketInfo::BIShardsHashType hash_type, string *bucket_obj, int *shard);
3633
3634   void get_bucket_index_object(const string& bucket_oid_base, uint32_t num_shards,
3635                                int shard_id, string *bucket_obj);
3636
3637   /**
3638    * Check the actual on-disk state of the object specified
3639    * by list_state, and fill in the time and size of object.
3640    * Then append any changes to suggested_updates for
3641    * the rgw class' dir_suggest_changes function.
3642    *
3643    * Note that this can maul list_state; don't use it afterwards. Also
3644    * it expects object to already be filled in from list_state; it only
3645    * sets the size and mtime.
3646    *
3647    * Returns 0 on success, -ENOENT if the object doesn't exist on disk,
3648    * and -errno on other failures. (-ENOENT is not a failure, and it
3649    * will encode that info as a suggested update.)
3650    */
3651   int check_disk_state(librados::IoCtx io_ctx,
3652                        const RGWBucketInfo& bucket_info,
3653                        rgw_bucket_dir_entry& list_state,
3654                        rgw_bucket_dir_entry& object,
3655                        bufferlist& suggested_updates);
3656
3657   /**
3658    * Init pool iteration
3659    * pool: pool to use for the ctx initialization
3660    * ctx: context object to use for the iteration
3661    * Returns: 0 on success, -ERR# otherwise.
3662    */
3663   int pool_iterate_begin(const rgw_pool& pool, RGWPoolIterCtx& ctx);
3664
3665   /**
3666    * Init pool iteration
3667    * pool: pool to use
3668    * cursor: position to start iteration
3669    * ctx: context object to use for the iteration
3670    * Returns: 0 on success, -ERR# otherwise.
3671    */
3672   int pool_iterate_begin(const rgw_pool& pool, const string& cursor, RGWPoolIterCtx& ctx);
3673
3674   /**
3675    * Get pool iteration position
3676    * ctx: context object to use for the iteration
3677    * Returns: string representation of position
3678    */
3679   string pool_iterate_get_cursor(RGWPoolIterCtx& ctx);
3680
3681   /**
3682    * Iterate over pool return object names, use optional filter
3683    * ctx: iteration context, initialized with pool_iterate_begin()
3684    * num: max number of objects to return
3685    * objs: a vector that the results will append into
3686    * is_truncated: if not NULL, will hold true iff iteration is complete
3687    * filter: if not NULL, will be used to filter returned objects
3688    * Returns: 0 on success, -ERR# otherwise.
3689    */
3690   int pool_iterate(RGWPoolIterCtx& ctx, uint32_t num, vector<rgw_bucket_dir_entry>& objs,
3691                    bool *is_truncated, RGWAccessListFilter *filter);
3692
3693   uint64_t next_bucket_id();
3694 };
3695
3696 class RGWStoreManager {
3697 public:
3698   RGWStoreManager() {}
3699   static RGWRados *get_storage(CephContext *cct, bool use_gc_thread, bool use_lc_thread, bool quota_threads, bool run_sync_thread, bool run_reshard_thread) {
3700     RGWRados *store = init_storage_provider(cct, use_gc_thread, use_lc_thread, quota_threads, run_sync_thread,
3701                                             run_reshard_thread);
3702     return store;
3703   }
3704   static RGWRados *get_raw_storage(CephContext *cct) {
3705     RGWRados *store = init_raw_storage_provider(cct);
3706     return store;
3707   }
3708   static RGWRados *init_storage_provider(CephContext *cct, bool use_gc_thread, bool use_lc_thread, bool quota_threads, bool run_sync_thread, bool run_reshard_thread);
3709   static RGWRados *init_raw_storage_provider(CephContext *cct);
3710   static void close_storage(RGWRados *store);
3711
3712 };
3713
3714 template <class T>
3715 class RGWChainedCacheImpl : public RGWChainedCache {
3716   RWLock lock;
3717
3718   map<string, T> entries;
3719
3720 public:
3721   RGWChainedCacheImpl() : lock("RGWChainedCacheImpl::lock") {}
3722
3723   void init(RGWRados *store) {
3724     store->register_chained_cache(this);
3725   }
3726
3727   bool find(const string& key, T *entry) {
3728     RWLock::RLocker rl(lock);
3729     typename map<string, T>::iterator iter = entries.find(key);
3730     if (iter == entries.end()) {
3731       return false;
3732     }
3733
3734     *entry = iter->second;
3735     return true;
3736   }
3737
3738   bool put(RGWRados *store, const string& key, T *entry, list<rgw_cache_entry_info *>& cache_info_entries) {
3739     Entry chain_entry(this, key, entry);
3740
3741     /* we need the store cache to call us under its lock to maintain lock ordering */
3742     return store->chain_cache_entry(cache_info_entries, &chain_entry);
3743   }
3744
3745   void chain_cb(const string& key, void *data) override {
3746     T *entry = static_cast<T *>(data);
3747     RWLock::WLocker wl(lock);
3748     entries[key] = *entry;
3749   }
3750
3751   void invalidate(const string& key) override {
3752     RWLock::WLocker wl(lock);
3753     entries.erase(key);
3754   }
3755
3756   void invalidate_all() override {
3757     RWLock::WLocker wl(lock);
3758     entries.clear();
3759   }
3760 }; /* RGWChainedCacheImpl */
3761
3762 /**
3763  * Base of PUT operation.
3764  * Allow to create chained data transformers like compresors and encryptors.
3765  */
3766 class RGWPutObjDataProcessor
3767 {
3768 public:
3769   RGWPutObjDataProcessor(){}
3770   virtual ~RGWPutObjDataProcessor(){}
3771   virtual int handle_data(bufferlist& bl, off_t ofs, void **phandle, rgw_raw_obj *pobj, bool *again) = 0;
3772   virtual int throttle_data(void *handle, const rgw_raw_obj& obj, uint64_t size, bool need_to_wait) = 0;
3773 }; /* RGWPutObjDataProcessor */
3774
3775
3776 class RGWPutObjProcessor : public RGWPutObjDataProcessor
3777 {
3778 protected:
3779   RGWRados *store;
3780   RGWObjectCtx& obj_ctx;
3781   bool is_complete;
3782   RGWBucketInfo bucket_info;
3783   bool canceled;
3784
3785   virtual int do_complete(size_t accounted_size, const string& etag,
3786                           ceph::real_time *mtime, ceph::real_time set_mtime,
3787                           map<string, bufferlist>& attrs, ceph::real_time delete_at,
3788                           const char *if_match, const char *if_nomatch, const string *user_data,
3789                           rgw_zone_set* zones_trace = nullptr) = 0;
3790
3791 public:
3792   RGWPutObjProcessor(RGWObjectCtx& _obj_ctx, RGWBucketInfo& _bi) : store(NULL), 
3793                                                                    obj_ctx(_obj_ctx), 
3794                                                                    is_complete(false), 
3795                                                                    bucket_info(_bi), 
3796                                                                    canceled(false) {}
3797   ~RGWPutObjProcessor() override {}
3798   virtual int prepare(RGWRados *_store, string *oid_rand) {
3799     store = _store;
3800     return 0;
3801   }
3802
3803   int complete(size_t accounted_size, const string& etag, 
3804                ceph::real_time *mtime, ceph::real_time set_mtime,
3805                map<string, bufferlist>& attrs, ceph::real_time delete_at,
3806                const char *if_match = NULL, const char *if_nomatch = NULL, const string *user_data = nullptr,
3807                rgw_zone_set *zones_trace = nullptr);
3808
3809   CephContext *ctx();
3810
3811   bool is_canceled() { return canceled; }
3812 }; /* RGWPutObjProcessor */
3813
3814 struct put_obj_aio_info {
3815   void *handle;
3816   rgw_raw_obj obj;
3817   uint64_t size;
3818 };
3819
3820 #define RGW_PUT_OBJ_MIN_WINDOW_SIZE_DEFAULT (16 * 1024 * 1024)
3821
3822 class RGWPutObjProcessor_Aio : public RGWPutObjProcessor
3823 {
3824   list<struct put_obj_aio_info> pending;
3825   uint64_t window_size{RGW_PUT_OBJ_MIN_WINDOW_SIZE_DEFAULT};
3826   uint64_t pending_size{0};
3827
3828   struct put_obj_aio_info pop_pending();
3829   int wait_pending_front();
3830   bool pending_has_completed();
3831
3832   rgw_raw_obj last_written_obj;
3833
3834 protected:
3835   uint64_t obj_len{0};
3836
3837   set<rgw_raw_obj> written_objs;
3838   rgw_obj head_obj;
3839
3840   void add_written_obj(const rgw_raw_obj& obj) {
3841     written_objs.insert(obj);
3842   }
3843
3844   int drain_pending();
3845   int handle_obj_data(rgw_raw_obj& obj, bufferlist& bl, off_t ofs, off_t abs_ofs, void **phandle, bool exclusive);
3846
3847 public:
3848   int prepare(RGWRados *store, string *oid_rand) override;
3849   int throttle_data(void *handle, const rgw_raw_obj& obj, uint64_t size, bool need_to_wait) override;
3850
3851   RGWPutObjProcessor_Aio(RGWObjectCtx& obj_ctx, RGWBucketInfo& bucket_info) : RGWPutObjProcessor(obj_ctx, bucket_info) {}
3852   ~RGWPutObjProcessor_Aio() override;
3853 }; /* RGWPutObjProcessor_Aio */
3854
3855 class RGWPutObjProcessor_Atomic : public RGWPutObjProcessor_Aio
3856 {
3857   bufferlist first_chunk;
3858   uint64_t part_size;
3859   off_t cur_part_ofs;
3860   off_t next_part_ofs;
3861   int cur_part_id;
3862   off_t data_ofs;
3863
3864   bufferlist pending_data_bl;
3865   uint64_t max_chunk_size;
3866
3867   bool versioned_object;
3868   uint64_t olh_epoch;
3869   string version_id;
3870
3871 protected:
3872   rgw_bucket bucket;
3873   string obj_str;
3874
3875   string unique_tag;
3876
3877   rgw_raw_obj cur_obj;
3878   RGWObjManifest manifest;
3879   RGWObjManifest::generator manifest_gen;
3880
3881   int write_data(bufferlist& bl, off_t ofs, void **phandle, rgw_raw_obj *pobj, bool exclusive);
3882   int do_complete(size_t accounted_size, const string& etag,
3883                   ceph::real_time *mtime, ceph::real_time set_mtime,
3884                   map<string, bufferlist>& attrs, ceph::real_time delete_at,
3885                   const char *if_match, const char *if_nomatch, const string *user_data, rgw_zone_set *zones_trace) override;
3886
3887   int prepare_next_part(off_t ofs);
3888   int complete_parts();
3889   int complete_writing_data();
3890
3891   int prepare_init(RGWRados *store, string *oid_rand);
3892
3893 public:
3894   ~RGWPutObjProcessor_Atomic() override {}
3895   RGWPutObjProcessor_Atomic(RGWObjectCtx& obj_ctx, RGWBucketInfo& bucket_info,
3896                             rgw_bucket& _b, const string& _o, uint64_t _p, const string& _t, bool versioned) :
3897                                 RGWPutObjProcessor_Aio(obj_ctx, bucket_info),
3898                                 part_size(_p),
3899                                 cur_part_ofs(0),
3900                                 next_part_ofs(_p),
3901                                 cur_part_id(0),
3902                                 data_ofs(0),
3903                                 max_chunk_size(0),
3904                                 versioned_object(versioned),
3905                                 olh_epoch(0),
3906                                 bucket(_b),
3907                                 obj_str(_o),
3908                                 unique_tag(_t) {}
3909   int prepare(RGWRados *store, string *oid_rand) override;
3910   virtual bool immutable_head() { return false; }
3911   int handle_data(bufferlist& bl, off_t ofs, void **phandle, rgw_raw_obj *pobj, bool *again) override;
3912
3913   void set_olh_epoch(uint64_t epoch) {
3914     olh_epoch = epoch;
3915   }
3916
3917   void set_version_id(const string& vid) {
3918     version_id = vid;
3919   }
3920 }; /* RGWPutObjProcessor_Atomic */
3921
3922 #define MP_META_SUFFIX ".meta"
3923
3924 class RGWMPObj {
3925   string oid;
3926   string prefix;
3927   string meta;
3928   string upload_id;
3929 public:
3930   RGWMPObj() {}
3931   RGWMPObj(const string& _oid, const string& _upload_id) {
3932     init(_oid, _upload_id, _upload_id);
3933   }
3934   void init(const string& _oid, const string& _upload_id) {
3935     init(_oid, _upload_id, _upload_id);
3936   }
3937   void init(const string& _oid, const string& _upload_id, const string& part_unique_str) {
3938     if (_oid.empty()) {
3939       clear();
3940       return;
3941     }
3942     oid = _oid;
3943     upload_id = _upload_id;
3944     prefix = oid + ".";
3945     meta = prefix + upload_id + MP_META_SUFFIX;
3946     prefix.append(part_unique_str);
3947   }
3948   string& get_meta() { return meta; }
3949   string get_part(int num) {
3950     char buf[16];
3951     snprintf(buf, 16, ".%d", num);
3952     string s = prefix;
3953     s.append(buf);
3954     return s;
3955   }
3956   string get_part(string& part) {
3957     string s = prefix;
3958     s.append(".");
3959     s.append(part);
3960     return s;
3961   }
3962   string& get_upload_id() {
3963     return upload_id;
3964   }
3965   string& get_key() {
3966     return oid;
3967   }
3968   bool from_meta(string& meta) {
3969     int end_pos = meta.rfind('.'); // search for ".meta"
3970     if (end_pos < 0)
3971       return false;
3972     int mid_pos = meta.rfind('.', end_pos - 1); // <key>.<upload_id>
3973     if (mid_pos < 0)
3974       return false;
3975     oid = meta.substr(0, mid_pos);
3976     upload_id = meta.substr(mid_pos + 1, end_pos - mid_pos - 1);
3977     init(oid, upload_id, upload_id);
3978     return true;
3979   }
3980   void clear() {
3981     oid = "";
3982     prefix = "";
3983     meta = "";
3984     upload_id = "";
3985   }
3986 };
3987
3988 class RGWPutObjProcessor_Multipart : public RGWPutObjProcessor_Atomic
3989 {
3990   string part_num;
3991   RGWMPObj mp;
3992   req_state *s;
3993   string upload_id;
3994
3995 protected:
3996   int prepare(RGWRados *store, string *oid_rand);
3997   int do_complete(size_t accounted_size, const string& etag,
3998                   ceph::real_time *mtime, ceph::real_time set_mtime,
3999                   map<string, bufferlist>& attrs, ceph::real_time delete_at,
4000                   const char *if_match, const char *if_nomatch, const string *user_data,
4001                   rgw_zone_set *zones_trace) override;
4002 public:
4003   bool immutable_head() { return true; }
4004   RGWPutObjProcessor_Multipart(RGWObjectCtx& obj_ctx, RGWBucketInfo& bucket_info, uint64_t _p, req_state *_s) :
4005                    RGWPutObjProcessor_Atomic(obj_ctx, bucket_info, _s->bucket, _s->object.name, _p, _s->req_id, false), s(_s) {}
4006   void get_mp(RGWMPObj** _mp);
4007 }; /* RGWPutObjProcessor_Multipart */
4008 #endif