Fix some bugs when testing opensds ansible
[stor4nfv.git] / src / ceph / src / rgw / rgw_cr_rados.h
1 #ifndef CEPH_RGW_CR_RADOS_H
2 #define CEPH_RGW_CR_RADOS_H
3
4 #include <boost/intrusive_ptr.hpp>
5 #include "include/assert.h"
6 #include "rgw_coroutine.h"
7 #include "rgw_rados.h"
8 #include "common/WorkQueue.h"
9 #include "common/Throttle.h"
10
11 #include <atomic>
12
13 class RGWAsyncRadosRequest : public RefCountedObject {
14   RGWCoroutine *caller;
15   RGWAioCompletionNotifier *notifier;
16
17   int retcode;
18
19   Mutex lock;
20
21 protected:
22   virtual int _send_request() = 0;
23 public:
24   RGWAsyncRadosRequest(RGWCoroutine *_caller, RGWAioCompletionNotifier *_cn) : caller(_caller), notifier(_cn), retcode(0),
25                                                                                lock("RGWAsyncRadosRequest::lock") {
26   }
27   ~RGWAsyncRadosRequest() override {
28     if (notifier) {
29       notifier->put();
30     }
31   }
32
33   void send_request() {
34     get();
35     retcode = _send_request();
36     {
37       Mutex::Locker l(lock);
38       if (notifier) {
39         notifier->cb(); // drops its own ref
40         notifier = nullptr;
41       }
42     }
43     put();
44   }
45
46   int get_ret_status() { return retcode; }
47
48   void finish() {
49     {
50       Mutex::Locker l(lock);
51       if (notifier) {
52         // we won't call notifier->cb() to drop its ref, so drop it here
53         notifier->put();
54         notifier = nullptr;
55       }
56     }
57     put();
58   }
59 };
60
61
62 class RGWAsyncRadosProcessor {
63   deque<RGWAsyncRadosRequest *> m_req_queue;
64   std::atomic<bool> going_down = { false };
65 protected:
66   RGWRados *store;
67   ThreadPool m_tp;
68   Throttle req_throttle;
69
70   struct RGWWQ : public ThreadPool::WorkQueue<RGWAsyncRadosRequest> {
71     RGWAsyncRadosProcessor *processor;
72     RGWWQ(RGWAsyncRadosProcessor *p, time_t timeout, time_t suicide_timeout, ThreadPool *tp)
73       : ThreadPool::WorkQueue<RGWAsyncRadosRequest>("RGWWQ", timeout, suicide_timeout, tp), processor(p) {}
74
75     bool _enqueue(RGWAsyncRadosRequest *req) override;
76     void _dequeue(RGWAsyncRadosRequest *req) override {
77       ceph_abort();
78     }
79     bool _empty() override;
80     RGWAsyncRadosRequest *_dequeue() override;
81     using ThreadPool::WorkQueue<RGWAsyncRadosRequest>::_process;
82     void _process(RGWAsyncRadosRequest *req, ThreadPool::TPHandle& handle) override;
83     void _dump_queue();
84     void _clear() override {
85       assert(processor->m_req_queue.empty());
86     }
87   } req_wq;
88
89 public:
90   RGWAsyncRadosProcessor(RGWRados *_store, int num_threads);
91   ~RGWAsyncRadosProcessor() {}
92   void start();
93   void stop();
94   void handle_request(RGWAsyncRadosRequest *req);
95   void queue(RGWAsyncRadosRequest *req);
96
97   bool is_going_down() {
98     return going_down;
99   }
100 };
101
102
103 class RGWAsyncGetSystemObj : public RGWAsyncRadosRequest {
104   RGWRados *store;
105   RGWObjectCtx *obj_ctx;
106   RGWRados::SystemObject::Read::GetObjState read_state;
107   RGWObjVersionTracker *objv_tracker;
108   rgw_raw_obj obj;
109   bufferlist *pbl;
110   map<string, bufferlist> *pattrs;
111   off_t ofs;
112   off_t end;
113 protected:
114   int _send_request() override;
115 public:
116   RGWAsyncGetSystemObj(RGWCoroutine *caller, RGWAioCompletionNotifier *cn, RGWRados *_store, RGWObjectCtx *_obj_ctx,
117                        RGWObjVersionTracker *_objv_tracker, const rgw_raw_obj& _obj,
118                        bufferlist *_pbl, off_t _ofs, off_t _end);
119   void set_read_attrs(map<string, bufferlist> *_pattrs) { pattrs = _pattrs; }
120 };
121
122 class RGWAsyncPutSystemObj : public RGWAsyncRadosRequest {
123   RGWRados *store;
124   RGWObjVersionTracker *objv_tracker;
125   rgw_raw_obj obj;
126   bool exclusive;
127   bufferlist bl;
128
129 protected:
130   int _send_request() override;
131 public:
132   RGWAsyncPutSystemObj(RGWCoroutine *caller, RGWAioCompletionNotifier *cn, RGWRados *_store,
133                        RGWObjVersionTracker *_objv_tracker, rgw_raw_obj& _obj,
134                        bool _exclusive, bufferlist& _bl);
135 };
136
137 class RGWAsyncPutSystemObjAttrs : public RGWAsyncRadosRequest {
138   RGWRados *store;
139   RGWObjVersionTracker *objv_tracker;
140   rgw_raw_obj obj;
141   map<string, bufferlist> *attrs;
142
143 protected:
144   int _send_request() override;
145 public:
146   RGWAsyncPutSystemObjAttrs(RGWCoroutine *caller, RGWAioCompletionNotifier *cn, RGWRados *_store,
147                        RGWObjVersionTracker *_objv_tracker, const rgw_raw_obj& _obj,
148                        map<string, bufferlist> *_attrs);
149 };
150
151 class RGWAsyncLockSystemObj : public RGWAsyncRadosRequest {
152   RGWRados *store;
153   rgw_raw_obj obj;
154   string lock_name;
155   string cookie;
156   uint32_t duration_secs;
157
158 protected:
159   int _send_request() override;
160 public:
161   RGWAsyncLockSystemObj(RGWCoroutine *caller, RGWAioCompletionNotifier *cn, RGWRados *_store,
162                         RGWObjVersionTracker *_objv_tracker, const rgw_raw_obj& _obj,
163                         const string& _name, const string& _cookie, uint32_t _duration_secs);
164 };
165
166 class RGWAsyncUnlockSystemObj : public RGWAsyncRadosRequest {
167   RGWRados *store;
168   rgw_raw_obj obj;
169   string lock_name;
170   string cookie;
171
172 protected:
173   int _send_request() override;
174 public:
175   RGWAsyncUnlockSystemObj(RGWCoroutine *caller, RGWAioCompletionNotifier *cn, RGWRados *_store,
176                         RGWObjVersionTracker *_objv_tracker, const rgw_raw_obj& _obj,
177                         const string& _name, const string& _cookie);
178 };
179
180 template <class T>
181 class RGWSimpleRadosReadCR : public RGWSimpleCoroutine {
182   RGWAsyncRadosProcessor *async_rados;
183   RGWRados *store;
184   RGWObjectCtx obj_ctx;
185   bufferlist bl;
186
187   rgw_raw_obj obj;
188
189   map<string, bufferlist> *pattrs{nullptr};
190
191   T *result;
192   /// on ENOENT, call handle_data() with an empty object instead of failing
193   const bool empty_on_enoent;
194   RGWObjVersionTracker *objv_tracker;
195
196   RGWAsyncGetSystemObj *req{nullptr};
197
198 public:
199   RGWSimpleRadosReadCR(RGWAsyncRadosProcessor *_async_rados, RGWRados *_store,
200                       const rgw_raw_obj& _obj,
201                       T *_result, bool empty_on_enoent = true,
202                       RGWObjVersionTracker *objv_tracker = nullptr)
203     : RGWSimpleCoroutine(_store->ctx()), async_rados(_async_rados), store(_store),
204       obj_ctx(store), obj(_obj), result(_result),
205       empty_on_enoent(empty_on_enoent), objv_tracker(objv_tracker) {}
206   ~RGWSimpleRadosReadCR() override {
207     request_cleanup();
208   }
209
210   void request_cleanup() override {
211     if (req) {
212       req->finish();
213       req = NULL;
214     }
215   }
216
217   int send_request() override;
218   int request_complete() override;
219
220   virtual int handle_data(T& data) {
221     return 0;
222   }
223 };
224
225 template <class T>
226 int RGWSimpleRadosReadCR<T>::send_request()
227 {
228   req = new RGWAsyncGetSystemObj(this, stack->create_completion_notifier(),
229                                  store, &obj_ctx, objv_tracker,
230                                  obj,
231                                  &bl, 0, -1);
232   if (pattrs) {
233     req->set_read_attrs(pattrs);
234   }
235   async_rados->queue(req);
236   return 0;
237 }
238
239 template <class T>
240 int RGWSimpleRadosReadCR<T>::request_complete()
241 {
242   int ret = req->get_ret_status();
243   retcode = ret;
244   if (ret == -ENOENT && empty_on_enoent) {
245     *result = T();
246   } else {
247     if (ret < 0) {
248       return ret;
249     }
250     try {
251       bufferlist::iterator iter = bl.begin();
252       if (iter.end()) {
253         // allow successful reads with empty buffers. ReadSyncStatus coroutines
254         // depend on this to be able to read without locking, because the
255         // cls lock from InitSyncStatus will create an empty object if it didnt
256         // exist
257         *result = T();
258       } else {
259         ::decode(*result, iter);
260       }
261     } catch (buffer::error& err) {
262       return -EIO;
263     }
264   }
265
266   return handle_data(*result);
267 }
268
269 class RGWSimpleRadosReadAttrsCR : public RGWSimpleCoroutine {
270   RGWAsyncRadosProcessor *async_rados;
271   RGWRados *store;
272   RGWObjectCtx obj_ctx;
273   bufferlist bl;
274
275   rgw_raw_obj obj;
276
277   map<string, bufferlist> *pattrs;
278
279   RGWAsyncGetSystemObj *req;
280
281 public:
282   RGWSimpleRadosReadAttrsCR(RGWAsyncRadosProcessor *_async_rados, RGWRados *_store,
283                       const rgw_raw_obj& _obj,
284                       map<string, bufferlist> *_pattrs) : RGWSimpleCoroutine(_store->ctx()),
285                                                 async_rados(_async_rados), store(_store),
286                                                 obj_ctx(store),
287                                                 obj(_obj),
288                                                 pattrs(_pattrs),
289                                                 req(NULL) { }
290   ~RGWSimpleRadosReadAttrsCR() override {
291     request_cleanup();
292   }
293                                                          
294   void request_cleanup() override {
295     if (req) {
296       req->finish();
297       req = NULL;
298     }
299   }
300
301   int send_request() override;
302   int request_complete() override;
303 };
304
305 template <class T>
306 class RGWSimpleRadosWriteCR : public RGWSimpleCoroutine {
307   RGWAsyncRadosProcessor *async_rados;
308   RGWRados *store;
309   bufferlist bl;
310
311   rgw_raw_obj obj;
312   RGWObjVersionTracker *objv_tracker;
313
314   RGWAsyncPutSystemObj *req{nullptr};
315
316 public:
317   RGWSimpleRadosWriteCR(RGWAsyncRadosProcessor *_async_rados, RGWRados *_store,
318                       const rgw_raw_obj& _obj,
319                       const T& _data, RGWObjVersionTracker *objv_tracker = nullptr)
320     : RGWSimpleCoroutine(_store->ctx()), async_rados(_async_rados),
321       store(_store), obj(_obj), objv_tracker(objv_tracker) {
322     ::encode(_data, bl);
323   }
324
325   ~RGWSimpleRadosWriteCR() override {
326     request_cleanup();
327   }
328
329   void request_cleanup() override {
330     if (req) {
331       req->finish();
332       req = NULL;
333     }
334   }
335
336   int send_request() override {
337     req = new RGWAsyncPutSystemObj(this, stack->create_completion_notifier(),
338                                    store, objv_tracker, obj, false, bl);
339     async_rados->queue(req);
340     return 0;
341   }
342
343   int request_complete() override {
344     return req->get_ret_status();
345   }
346 };
347
348 class RGWSimpleRadosWriteAttrsCR : public RGWSimpleCoroutine {
349   RGWAsyncRadosProcessor *async_rados;
350   RGWRados *store;
351
352   rgw_raw_obj obj;
353
354   map<string, bufferlist> attrs;
355
356   RGWAsyncPutSystemObjAttrs *req;
357
358 public:
359   RGWSimpleRadosWriteAttrsCR(RGWAsyncRadosProcessor *_async_rados, RGWRados *_store,
360                       const rgw_raw_obj& _obj,
361                       map<string, bufferlist>& _attrs) : RGWSimpleCoroutine(_store->ctx()),
362                                                 async_rados(_async_rados),
363                                                 store(_store),
364                                                 obj(_obj),
365                                                 attrs(_attrs), req(NULL) {
366   }
367   ~RGWSimpleRadosWriteAttrsCR() override {
368     request_cleanup();
369   }
370
371   void request_cleanup() override {
372     if (req) {
373       req->finish();
374       req = NULL;
375     }
376   }
377
378   int send_request() override {
379     req = new RGWAsyncPutSystemObjAttrs(this, stack->create_completion_notifier(),
380                                    store, NULL, obj, &attrs);
381     async_rados->queue(req);
382     return 0;
383   }
384
385   int request_complete() override {
386     return req->get_ret_status();
387   }
388 };
389
390 class RGWRadosSetOmapKeysCR : public RGWSimpleCoroutine {
391   RGWRados *store;
392   map<string, bufferlist> entries;
393
394   rgw_rados_ref ref;
395
396   rgw_raw_obj obj;
397
398   boost::intrusive_ptr<RGWAioCompletionNotifier> cn;
399
400 public:
401   RGWRadosSetOmapKeysCR(RGWRados *_store,
402                       const rgw_raw_obj& _obj,
403                       map<string, bufferlist>& _entries);
404
405   int send_request() override;
406   int request_complete() override;
407 };
408
409 class RGWRadosGetOmapKeysCR : public RGWSimpleCoroutine {
410   RGWRados *store;
411
412   string marker;
413   map<string, bufferlist> *entries;
414   int max_entries;
415
416   int rval;
417   rgw_rados_ref ref;
418
419   rgw_raw_obj obj;
420
421   boost::intrusive_ptr<RGWAioCompletionNotifier> cn;
422
423 public:
424   RGWRadosGetOmapKeysCR(RGWRados *_store,
425                       const rgw_raw_obj& _obj,
426                       const string& _marker,
427                       map<string, bufferlist> *_entries, int _max_entries);
428
429   int send_request() override;
430
431   int request_complete() override {
432     return rval;
433   }
434 };
435
436 class RGWRadosRemoveOmapKeysCR : public RGWSimpleCoroutine {
437   RGWRados *store;
438
439   rgw_rados_ref ref;
440
441   set<string> keys;
442
443   rgw_raw_obj obj;
444
445   boost::intrusive_ptr<RGWAioCompletionNotifier> cn;
446
447 public:
448   RGWRadosRemoveOmapKeysCR(RGWRados *_store,
449                       const rgw_raw_obj& _obj,
450                       const set<string>& _keys);
451
452   int send_request() override;
453
454   int request_complete() override;
455 };
456
457 class RGWRadosRemoveCR : public RGWSimpleCoroutine {
458   RGWRados *store;
459   librados::IoCtx ioctx;
460   const rgw_raw_obj obj;
461   boost::intrusive_ptr<RGWAioCompletionNotifier> cn;
462
463 public:
464   RGWRadosRemoveCR(RGWRados *store, const rgw_raw_obj& obj);
465
466   int send_request();
467   int request_complete();
468 };
469
470 class RGWSimpleRadosLockCR : public RGWSimpleCoroutine {
471   RGWAsyncRadosProcessor *async_rados;
472   RGWRados *store;
473   string lock_name;
474   string cookie;
475   uint32_t duration;
476
477   rgw_raw_obj obj;
478
479   RGWAsyncLockSystemObj *req;
480
481 public:
482   RGWSimpleRadosLockCR(RGWAsyncRadosProcessor *_async_rados, RGWRados *_store,
483                       const rgw_raw_obj& _obj,
484                       const string& _lock_name,
485                       const string& _cookie,
486                       uint32_t _duration);
487   ~RGWSimpleRadosLockCR() override {
488     request_cleanup();
489   }
490   void request_cleanup() override;
491
492   int send_request() override;
493   int request_complete() override;
494
495   static std::string gen_random_cookie(CephContext* cct) {
496 #define COOKIE_LEN 16
497     char buf[COOKIE_LEN + 1];
498     gen_rand_alphanumeric(cct, buf, sizeof(buf) - 1);
499     return buf;
500   }
501 };
502
503 class RGWSimpleRadosUnlockCR : public RGWSimpleCoroutine {
504   RGWAsyncRadosProcessor *async_rados;
505   RGWRados *store;
506   string lock_name;
507   string cookie;
508
509   rgw_raw_obj obj;
510
511   RGWAsyncUnlockSystemObj *req;
512
513 public:
514   RGWSimpleRadosUnlockCR(RGWAsyncRadosProcessor *_async_rados, RGWRados *_store,
515                       const rgw_raw_obj& _obj, 
516                       const string& _lock_name,
517                       const string& _cookie);
518   ~RGWSimpleRadosUnlockCR() override {
519     request_cleanup();
520   }
521   void request_cleanup() override;
522
523   int send_request() override;
524   int request_complete() override;
525 };
526
527 #define OMAP_APPEND_MAX_ENTRIES_DEFAULT 100
528
529 class RGWOmapAppend : public RGWConsumerCR<string> {
530   RGWAsyncRadosProcessor *async_rados;
531   RGWRados *store;
532
533   rgw_raw_obj obj;
534
535   bool going_down;
536
537   int num_pending_entries;
538   list<string> pending_entries;
539
540   map<string, bufferlist> entries;
541
542   uint64_t window_size;
543   uint64_t total_entries;
544 public:
545   RGWOmapAppend(RGWAsyncRadosProcessor *_async_rados, RGWRados *_store,
546                 const rgw_raw_obj& _obj,
547                 uint64_t _window_size = OMAP_APPEND_MAX_ENTRIES_DEFAULT);
548   int operate() override;
549   void flush_pending();
550   bool append(const string& s);
551   bool finish();
552
553   uint64_t get_total_entries() {
554     return total_entries;
555   }
556
557   const rgw_raw_obj& get_obj() {
558     return obj;
559   }
560 };
561
562 class RGWAsyncWait : public RGWAsyncRadosRequest {
563   CephContext *cct;
564   Mutex *lock;
565   Cond *cond;
566   utime_t interval;
567 protected:
568   int _send_request() override {
569     Mutex::Locker l(*lock);
570     return cond->WaitInterval(*lock, interval);
571   }
572 public:
573   RGWAsyncWait(RGWCoroutine *caller, RGWAioCompletionNotifier *cn, CephContext *_cct,
574                Mutex *_lock, Cond *_cond, int _secs) : RGWAsyncRadosRequest(caller, cn),
575                                        cct(_cct),
576                                        lock(_lock), cond(_cond), interval(_secs, 0) {}
577
578   void wakeup() {
579     Mutex::Locker l(*lock);
580     cond->Signal();
581   }
582 };
583
584 class RGWWaitCR : public RGWSimpleCoroutine {
585   CephContext *cct;
586   RGWAsyncRadosProcessor *async_rados;
587   Mutex *lock;
588   Cond *cond;
589   int secs;
590
591   RGWAsyncWait *req;
592
593 public:
594   RGWWaitCR(RGWAsyncRadosProcessor *_async_rados, CephContext *_cct,
595             Mutex *_lock, Cond *_cond,
596             int _secs) : RGWSimpleCoroutine(_cct), cct(_cct),
597                          async_rados(_async_rados), lock(_lock), cond(_cond), secs(_secs), req(NULL) {
598   }
599   ~RGWWaitCR() override {
600     request_cleanup();
601   }
602
603   void request_cleanup() override {
604     if (req) {
605       wakeup();
606       req->finish();
607       req = NULL;
608     }
609   }
610
611   int send_request() override {
612     req = new RGWAsyncWait(this, stack->create_completion_notifier(), cct,  lock, cond, secs);
613     async_rados->queue(req);
614     return 0;
615   }
616
617   int request_complete() override {
618     return req->get_ret_status();
619   }
620
621   void wakeup() {
622     req->wakeup();
623   }
624 };
625
626 class RGWShardedOmapCRManager {
627   RGWAsyncRadosProcessor *async_rados;
628   RGWRados *store;
629   RGWCoroutine *op;
630
631   int num_shards;
632
633   vector<RGWOmapAppend *> shards;
634 public:
635   RGWShardedOmapCRManager(RGWAsyncRadosProcessor *_async_rados, RGWRados *_store, RGWCoroutine *_op, int _num_shards, const rgw_pool& pool, const string& oid_prefix)
636                       : async_rados(_async_rados),
637                         store(_store), op(_op), num_shards(_num_shards) {
638     shards.reserve(num_shards);
639     for (int i = 0; i < num_shards; ++i) {
640       char buf[oid_prefix.size() + 16];
641       snprintf(buf, sizeof(buf), "%s.%d", oid_prefix.c_str(), i);
642       RGWOmapAppend *shard = new RGWOmapAppend(async_rados, store, rgw_raw_obj(pool, buf));
643       shard->get();
644       shards.push_back(shard);
645       op->spawn(shard, false);
646     }
647   }
648
649   ~RGWShardedOmapCRManager() {
650     for (auto shard : shards) {
651       shard->put();
652     }
653   }
654
655   bool append(const string& entry, int shard_id) {
656     return shards[shard_id]->append(entry);
657   }
658   bool finish() {
659     bool success = true;
660     for (vector<RGWOmapAppend *>::iterator iter = shards.begin(); iter != shards.end(); ++iter) {
661       success &= ((*iter)->finish() && (!(*iter)->is_error()));
662     }
663     return success;
664   }
665
666   uint64_t get_total_entries(int shard_id) {
667     return shards[shard_id]->get_total_entries();
668   }
669 };
670
671 class RGWAsyncGetBucketInstanceInfo : public RGWAsyncRadosRequest {
672   RGWRados *store;
673   rgw_bucket bucket;
674   RGWBucketInfo *bucket_info;
675
676 protected:
677   int _send_request() override;
678 public:
679   RGWAsyncGetBucketInstanceInfo(RGWCoroutine *caller, RGWAioCompletionNotifier *cn,
680                                 RGWRados *_store, const rgw_bucket& bucket,
681                                 RGWBucketInfo *_bucket_info)
682     : RGWAsyncRadosRequest(caller, cn), store(_store),
683       bucket(bucket), bucket_info(_bucket_info) {}
684 };
685
686 class RGWGetBucketInstanceInfoCR : public RGWSimpleCoroutine {
687   RGWAsyncRadosProcessor *async_rados;
688   RGWRados *store;
689   rgw_bucket bucket;
690   RGWBucketInfo *bucket_info;
691
692   RGWAsyncGetBucketInstanceInfo *req;
693   
694 public:
695   RGWGetBucketInstanceInfoCR(RGWAsyncRadosProcessor *_async_rados, RGWRados *_store,
696                              const rgw_bucket& bucket, RGWBucketInfo *_bucket_info)
697     : RGWSimpleCoroutine(_store->ctx()), async_rados(_async_rados), store(_store),
698       bucket(bucket), bucket_info(_bucket_info), req(NULL) {}
699   ~RGWGetBucketInstanceInfoCR() override {
700     request_cleanup();
701   }
702   void request_cleanup() override {
703     if (req) {
704       req->finish();
705       req = NULL;
706     }
707   }
708
709   int send_request() override {
710     req = new RGWAsyncGetBucketInstanceInfo(this, stack->create_completion_notifier(), store, bucket, bucket_info);
711     async_rados->queue(req);
712     return 0;
713   }
714   int request_complete() override {
715     return req->get_ret_status();
716   }
717 };
718
719 class RGWAsyncFetchRemoteObj : public RGWAsyncRadosRequest {
720   RGWRados *store;
721   string source_zone;
722
723   RGWBucketInfo bucket_info;
724
725   rgw_obj_key key;
726   uint64_t versioned_epoch;
727
728   real_time src_mtime;
729
730   bool copy_if_newer;
731   rgw_zone_set *zones_trace;
732
733 protected:
734   int _send_request() override;
735 public:
736   RGWAsyncFetchRemoteObj(RGWCoroutine *caller, RGWAioCompletionNotifier *cn, RGWRados *_store,
737                          const string& _source_zone,
738                          RGWBucketInfo& _bucket_info,
739                          const rgw_obj_key& _key,
740                          uint64_t _versioned_epoch,
741                          bool _if_newer, rgw_zone_set *_zones_trace) : RGWAsyncRadosRequest(caller, cn), store(_store),
742                                                       source_zone(_source_zone),
743                                                       bucket_info(_bucket_info),
744                                                       key(_key),
745                                                       versioned_epoch(_versioned_epoch),
746                                                       copy_if_newer(_if_newer), zones_trace(_zones_trace) {}
747 };
748
749 class RGWFetchRemoteObjCR : public RGWSimpleCoroutine {
750   CephContext *cct;
751   RGWAsyncRadosProcessor *async_rados;
752   RGWRados *store;
753   string source_zone;
754
755   RGWBucketInfo bucket_info;
756
757   rgw_obj_key key;
758   uint64_t versioned_epoch;
759
760   real_time src_mtime;
761
762   bool copy_if_newer;
763
764   RGWAsyncFetchRemoteObj *req;
765   rgw_zone_set *zones_trace;
766
767 public:
768   RGWFetchRemoteObjCR(RGWAsyncRadosProcessor *_async_rados, RGWRados *_store,
769                       const string& _source_zone,
770                       RGWBucketInfo& _bucket_info,
771                       const rgw_obj_key& _key,
772                       uint64_t _versioned_epoch,
773                       bool _if_newer, rgw_zone_set *_zones_trace) : RGWSimpleCoroutine(_store->ctx()), cct(_store->ctx()),
774                                        async_rados(_async_rados), store(_store),
775                                        source_zone(_source_zone),
776                                        bucket_info(_bucket_info),
777                                        key(_key),
778                                        versioned_epoch(_versioned_epoch),
779                                        copy_if_newer(_if_newer), req(NULL), zones_trace(_zones_trace) {}
780
781
782   ~RGWFetchRemoteObjCR() override {
783     request_cleanup();
784   }
785
786   void request_cleanup() override {
787     if (req) {
788       req->finish();
789       req = NULL;
790     }
791   }
792
793   int send_request() override {
794     req = new RGWAsyncFetchRemoteObj(this, stack->create_completion_notifier(), store, source_zone, bucket_info,
795                                      key, versioned_epoch, copy_if_newer, zones_trace);
796     async_rados->queue(req);
797     return 0;
798   }
799
800   int request_complete() override {
801     return req->get_ret_status();
802   }
803 };
804
805 class RGWAsyncStatRemoteObj : public RGWAsyncRadosRequest {
806   RGWRados *store;
807   string source_zone;
808
809   RGWBucketInfo bucket_info;
810
811   rgw_obj_key key;
812
813   ceph::real_time *pmtime;
814   uint64_t *psize;
815   map<string, bufferlist> *pattrs;
816
817 protected:
818   int _send_request() override;
819 public:
820   RGWAsyncStatRemoteObj(RGWCoroutine *caller, RGWAioCompletionNotifier *cn, RGWRados *_store,
821                          const string& _source_zone,
822                          RGWBucketInfo& _bucket_info,
823                          const rgw_obj_key& _key,
824                          ceph::real_time *_pmtime,
825                          uint64_t *_psize,
826                          map<string, bufferlist> *_pattrs) : RGWAsyncRadosRequest(caller, cn), store(_store),
827                                                       source_zone(_source_zone),
828                                                       bucket_info(_bucket_info),
829                                                       key(_key),
830                                                       pmtime(_pmtime),
831                                                       psize(_psize),
832                                                       pattrs(_pattrs) {}
833 };
834
835 class RGWStatRemoteObjCR : public RGWSimpleCoroutine {
836   CephContext *cct;
837   RGWAsyncRadosProcessor *async_rados;
838   RGWRados *store;
839   string source_zone;
840
841   RGWBucketInfo bucket_info;
842
843   rgw_obj_key key;
844
845   ceph::real_time *pmtime;
846   uint64_t *psize;
847   map<string, bufferlist> *pattrs;
848
849   RGWAsyncStatRemoteObj *req;
850
851 public:
852   RGWStatRemoteObjCR(RGWAsyncRadosProcessor *_async_rados, RGWRados *_store,
853                       const string& _source_zone,
854                       RGWBucketInfo& _bucket_info,
855                       const rgw_obj_key& _key,
856                       ceph::real_time *_pmtime,
857                       uint64_t *_psize,
858                       map<string, bufferlist> *_pattrs) : RGWSimpleCoroutine(_store->ctx()), cct(_store->ctx()),
859                                        async_rados(_async_rados), store(_store),
860                                        source_zone(_source_zone),
861                                        bucket_info(_bucket_info),
862                                        key(_key),
863                                        pmtime(_pmtime),
864                                        psize(_psize),
865                                        pattrs(_pattrs),
866                                        req(NULL) {}
867
868
869   ~RGWStatRemoteObjCR() override {
870     request_cleanup();
871   }
872
873   void request_cleanup() override {
874     if (req) {
875       req->finish();
876       req = NULL;
877     }
878   }
879
880   int send_request() override {
881     req = new RGWAsyncStatRemoteObj(this, stack->create_completion_notifier(), store, source_zone,
882                                     bucket_info, key, pmtime, psize, pattrs);
883     async_rados->queue(req);
884     return 0;
885   }
886
887   int request_complete() override {
888     return req->get_ret_status();
889   }
890 };
891
892 class RGWAsyncRemoveObj : public RGWAsyncRadosRequest {
893   RGWRados *store;
894   string source_zone;
895
896   RGWBucketInfo bucket_info;
897
898   rgw_obj_key key;
899   string owner;
900   string owner_display_name;
901   bool versioned;
902   uint64_t versioned_epoch;
903   string marker_version_id;
904
905   bool del_if_older;
906   ceph::real_time timestamp;
907   rgw_zone_set *zones_trace;
908
909 protected:
910   int _send_request() override;
911 public:
912   RGWAsyncRemoveObj(RGWCoroutine *caller, RGWAioCompletionNotifier *cn, RGWRados *_store,
913                          const string& _source_zone,
914                          RGWBucketInfo& _bucket_info,
915                          const rgw_obj_key& _key,
916                          const string& _owner,
917                          const string& _owner_display_name,
918                          bool _versioned,
919                          uint64_t _versioned_epoch,
920                          bool _delete_marker,
921                          bool _if_older,
922                          real_time& _timestamp,
923                          rgw_zone_set* _zones_trace) : RGWAsyncRadosRequest(caller, cn), store(_store),
924                                                       source_zone(_source_zone),
925                                                       bucket_info(_bucket_info),
926                                                       key(_key),
927                                                       owner(_owner),
928                                                       owner_display_name(_owner_display_name),
929                                                       versioned(_versioned),
930                                                       versioned_epoch(_versioned_epoch),
931                                                       del_if_older(_if_older),
932                                                       timestamp(_timestamp), zones_trace(_zones_trace) {
933     if (_delete_marker) {
934       marker_version_id = key.instance;
935     }
936   }
937 };
938
939 class RGWRemoveObjCR : public RGWSimpleCoroutine {
940   CephContext *cct;
941   RGWAsyncRadosProcessor *async_rados;
942   RGWRados *store;
943   string source_zone;
944
945   RGWBucketInfo bucket_info;
946
947   rgw_obj_key key;
948   bool versioned;
949   uint64_t versioned_epoch;
950   bool delete_marker;
951   string owner;
952   string owner_display_name;
953
954   bool del_if_older;
955   real_time timestamp;
956
957   RGWAsyncRemoveObj *req;
958   
959   rgw_zone_set *zones_trace;
960
961 public:
962   RGWRemoveObjCR(RGWAsyncRadosProcessor *_async_rados, RGWRados *_store,
963                       const string& _source_zone,
964                       RGWBucketInfo& _bucket_info,
965                       const rgw_obj_key& _key,
966                       bool _versioned,
967                       uint64_t _versioned_epoch,
968                       string *_owner,
969                       string *_owner_display_name,
970                       bool _delete_marker,
971                       real_time *_timestamp,
972                       rgw_zone_set *_zones_trace) : RGWSimpleCoroutine(_store->ctx()), cct(_store->ctx()),
973                                        async_rados(_async_rados), store(_store),
974                                        source_zone(_source_zone),
975                                        bucket_info(_bucket_info),
976                                        key(_key),
977                                        versioned(_versioned),
978                                        versioned_epoch(_versioned_epoch),
979                                        delete_marker(_delete_marker), req(NULL), zones_trace(_zones_trace) {
980     del_if_older = (_timestamp != NULL);
981     if (_timestamp) {
982       timestamp = *_timestamp;
983     }
984
985     if (_owner) {
986       owner = *_owner;
987     }
988
989     if (_owner_display_name) {
990       owner_display_name = *_owner_display_name;
991     }
992   }
993   ~RGWRemoveObjCR() override {
994     request_cleanup();
995   }
996
997   void request_cleanup() override {
998     if (req) {
999       req->finish();
1000       req = NULL;
1001     }
1002   }
1003
1004   int send_request() override {
1005     req = new RGWAsyncRemoveObj(this, stack->create_completion_notifier(), store, source_zone, bucket_info,
1006                                 key, owner, owner_display_name, versioned, versioned_epoch,
1007                                 delete_marker, del_if_older, timestamp, zones_trace);
1008     async_rados->queue(req);
1009     return 0;
1010   }
1011
1012   int request_complete() override {
1013     return req->get_ret_status();
1014   }
1015 };
1016
1017 class RGWContinuousLeaseCR : public RGWCoroutine {
1018   RGWAsyncRadosProcessor *async_rados;
1019   RGWRados *store;
1020
1021   const rgw_raw_obj obj;
1022
1023   const string lock_name;
1024   const string cookie;
1025
1026   int interval;
1027
1028   Mutex lock;
1029   std::atomic<bool> going_down = { false };
1030   bool locked{false};
1031
1032   RGWCoroutine *caller;
1033
1034   bool aborted{false};
1035
1036 public:
1037   RGWContinuousLeaseCR(RGWAsyncRadosProcessor *_async_rados, RGWRados *_store,
1038                        const rgw_raw_obj& _obj,
1039                        const string& _lock_name, int _interval, RGWCoroutine *_caller)
1040     : RGWCoroutine(_store->ctx()), async_rados(_async_rados), store(_store),
1041     obj(_obj), lock_name(_lock_name),
1042     cookie(RGWSimpleRadosLockCR::gen_random_cookie(cct)),
1043     interval(_interval), lock("RGWContinuousLeaseCR"), caller(_caller)
1044   {}
1045
1046   int operate() override;
1047
1048   bool is_locked() {
1049     Mutex::Locker l(lock);
1050     return locked;
1051   }
1052
1053   void set_locked(bool status) {
1054     Mutex::Locker l(lock);
1055     locked = status;
1056   }
1057
1058   void go_down() {
1059     going_down = true;
1060     wakeup();
1061   }
1062
1063   void abort() {
1064     aborted = true;
1065   }
1066 };
1067
1068 class RGWRadosTimelogAddCR : public RGWSimpleCoroutine {
1069   RGWRados *store;
1070   list<cls_log_entry> entries;
1071
1072   string oid;
1073
1074   boost::intrusive_ptr<RGWAioCompletionNotifier> cn;
1075
1076 public:
1077   RGWRadosTimelogAddCR(RGWRados *_store, const string& _oid,
1078                         const cls_log_entry& entry);
1079
1080   int send_request() override;
1081   int request_complete() override;
1082 };
1083
1084 class RGWRadosTimelogTrimCR : public RGWSimpleCoroutine {
1085   RGWRados *store;
1086   boost::intrusive_ptr<RGWAioCompletionNotifier> cn;
1087  protected:
1088   std::string oid;
1089   real_time start_time;
1090   real_time end_time;
1091   std::string from_marker;
1092   std::string to_marker;
1093
1094  public:
1095   RGWRadosTimelogTrimCR(RGWRados *store, const std::string& oid,
1096                         const real_time& start_time, const real_time& end_time,
1097                         const std::string& from_marker,
1098                         const std::string& to_marker);
1099
1100   int send_request() override;
1101   int request_complete() override;
1102 };
1103
1104 // wrapper to update last_trim_marker on success
1105 class RGWSyncLogTrimCR : public RGWRadosTimelogTrimCR {
1106   CephContext *cct;
1107   std::string *last_trim_marker;
1108  public:
1109   RGWSyncLogTrimCR(RGWRados *store, const std::string& oid,
1110                    const std::string& to_marker, std::string *last_trim_marker);
1111   int request_complete() override;
1112 };
1113
1114 class RGWAsyncStatObj : public RGWAsyncRadosRequest {
1115   RGWRados *store;
1116   RGWBucketInfo bucket_info;
1117   rgw_obj obj;
1118   uint64_t *psize;
1119   real_time *pmtime;
1120   uint64_t *pepoch;
1121   RGWObjVersionTracker *objv_tracker;
1122 protected:
1123   int _send_request() override;
1124 public:
1125   RGWAsyncStatObj(RGWCoroutine *caller, RGWAioCompletionNotifier *cn, RGWRados *store,
1126                   const RGWBucketInfo& _bucket_info, const rgw_obj& obj, uint64_t *psize = nullptr,
1127                   real_time *pmtime = nullptr, uint64_t *pepoch = nullptr,
1128                   RGWObjVersionTracker *objv_tracker = nullptr)
1129           : RGWAsyncRadosRequest(caller, cn), store(store), obj(obj), psize(psize),
1130           pmtime(pmtime), pepoch(pepoch), objv_tracker(objv_tracker) {}
1131 };
1132
1133 class RGWStatObjCR : public RGWSimpleCoroutine {
1134   RGWRados *store;
1135   RGWAsyncRadosProcessor *async_rados;
1136   RGWBucketInfo bucket_info;
1137   rgw_obj obj;
1138   uint64_t *psize;
1139   real_time *pmtime;
1140   uint64_t *pepoch;
1141   RGWObjVersionTracker *objv_tracker;
1142   RGWAsyncStatObj *req = nullptr;
1143  public:
1144   RGWStatObjCR(RGWAsyncRadosProcessor *async_rados, RGWRados *store,
1145           const RGWBucketInfo& _bucket_info, const rgw_obj& obj, uint64_t *psize = nullptr,
1146           real_time* pmtime = nullptr, uint64_t *pepoch = nullptr,
1147           RGWObjVersionTracker *objv_tracker = nullptr);
1148   ~RGWStatObjCR() override {
1149     request_cleanup();
1150   }
1151   void request_cleanup() override;
1152
1153   int send_request() override;
1154   int request_complete() override;
1155 };
1156
1157 #endif