Fix some bugs when testing opensds ansible
[stor4nfv.git] / src / ceph / src / rgw / rgw_reshard.h
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
3
4 #ifndef RGW_RESHARD_H
5 #define RGW_RESHARD_H
6
7 #include <vector>
8 #include "include/rados/librados.hpp"
9 #include "cls/rgw/cls_rgw_types.h"
10 #include "cls/lock/cls_lock_client.h"
11 #include "rgw_bucket.h"
12
13 class CephContext;
14 class RGWRados;
15
16
17 class RGWBucketReshard {
18   friend class RGWReshard;
19
20   RGWRados *store;
21   RGWBucketInfo bucket_info;
22   std::map<string, bufferlist> bucket_attrs;
23
24   string reshard_oid;
25   rados::cls::lock::Lock reshard_lock;
26
27   int lock_bucket();
28   void unlock_bucket();
29   int set_resharding_status(const string& new_instance_id, int32_t num_shards, cls_rgw_reshard_status status);
30   int clear_resharding();
31
32   int create_new_bucket_instance(int new_num_shards, RGWBucketInfo& new_bucket_info);
33   int do_reshard(int num_shards,
34                  const RGWBucketInfo& new_bucket_info,
35                  int max_entries,
36                  bool verbose,
37                  ostream *os,
38                  Formatter *formatter);
39 public:
40   RGWBucketReshard(RGWRados *_store, const RGWBucketInfo& _bucket_info,
41                    const std::map<string, bufferlist>& _bucket_attrs);
42
43   int execute(int num_shards, int max_op_entries,
44               bool verbose = false, ostream *out = nullptr,
45               Formatter *formatter = nullptr,
46               RGWReshard *reshard_log = nullptr);
47   int abort();
48   int get_status(std::list<cls_rgw_bucket_instance_entry> *status);
49 };
50
51 class RGWReshard {
52     RGWRados *store;
53     string lock_name;
54     rados::cls::lock::Lock instance_lock;
55     int num_logshards;
56
57     bool verbose;
58     ostream *out;
59     Formatter *formatter;
60
61     void get_logshard_oid(int shard_num, string *shard);
62 protected:
63   class ReshardWorker : public Thread {
64     CephContext *cct;
65     RGWReshard *reshard;
66     Mutex lock;
67     Cond cond;
68
69   public:
70     ReshardWorker(CephContext * const _cct,
71                               RGWReshard * const _reshard)
72       : cct(_cct),
73         reshard(_reshard),
74         lock("ReshardWorker") {
75     }
76
77     void *entry() override;
78     void stop();
79   };
80
81   ReshardWorker *worker = nullptr;
82   std::atomic<bool> down_flag = { false };
83
84   string get_logshard_key(const string& tenant, const string& bucket_name);
85   void get_bucket_logshard_oid(const string& tenant, const string& bucket_name, string *oid);
86
87 public:
88   RGWReshard(RGWRados* _store, bool _verbose = false, ostream *_out = nullptr, Formatter *_formatter = nullptr);
89   int add(cls_rgw_reshard_entry& entry);
90   int update(const RGWBucketInfo& bucket_info, const RGWBucketInfo& new_bucket_info);
91   int get(cls_rgw_reshard_entry& entry);
92   int remove(cls_rgw_reshard_entry& entry);
93   int list(int logshard_num, string& marker, uint32_t max, std::list<cls_rgw_reshard_entry>& entries, bool *is_truncated);
94   int clear_bucket_resharding(const string& bucket_instance_oid, cls_rgw_reshard_entry& entry);
95
96   /* reshard thread */
97   int process_single_logshard(int logshard_num);
98   int process_all_logshards();
99   bool going_down();
100   void start_processor();
101   void stop_processor();
102 };
103
104
105 class RGWReshardWait {
106   RGWRados *store;
107   Mutex lock{"RGWReshardWait::lock"};
108   Cond cond;
109
110   bool going_down{false};
111
112   int do_wait();
113 public:
114   RGWReshardWait(RGWRados *_store) : store(_store) {}
115   ~RGWReshardWait() {
116     assert(going_down);
117   }
118   int block_while_resharding(RGWRados::BucketShard *bs, string *new_bucket_id);
119
120   void stop() {
121     Mutex::Locker l(lock);
122     going_down = true;
123     cond.SignalAll();
124   }
125 };
126
127 #endif