Fix some bugs when testing opensds ansible
[stor4nfv.git] / src / ceph / src / rgw / rgw_sync.h
1 #ifndef CEPH_RGW_SYNC_H
2 #define CEPH_RGW_SYNC_H
3
4 #include "rgw_coroutine.h"
5 #include "rgw_http_client.h"
6 #include "rgw_meta_sync_status.h"
7
8 #include "include/stringify.h"
9 #include "common/RWLock.h"
10
11 #include <atomic>
12
13 #define ERROR_LOGGER_SHARDS 32
14 #define RGW_SYNC_ERROR_LOG_SHARD_PREFIX "sync.error-log"
15
16 struct rgw_mdlog_info {
17   uint32_t num_shards;
18   std::string period; //< period id of the master's oldest metadata log
19   epoch_t realm_epoch; //< realm epoch of oldest metadata log
20
21   rgw_mdlog_info() : num_shards(0), realm_epoch(0) {}
22
23   void decode_json(JSONObj *obj);
24 };
25
26
27 struct rgw_mdlog_entry {
28   string id;
29   string section;
30   string name;
31   ceph::real_time timestamp;
32   RGWMetadataLogData log_data;
33
34   void decode_json(JSONObj *obj);
35
36   bool convert_from(cls_log_entry& le) {
37     id = le.id;
38     section = le.section;
39     name = le.name;
40     timestamp = le.timestamp.to_real_time();
41     try {
42       bufferlist::iterator iter = le.data.begin();
43       ::decode(log_data, iter);
44     } catch (buffer::error& err) {
45       return false;
46     }
47     return true;
48   }
49 };
50
51 struct rgw_mdlog_shard_data {
52   string marker;
53   bool truncated;
54   vector<rgw_mdlog_entry> entries;
55
56   void decode_json(JSONObj *obj);
57 };
58
59 class RGWAsyncRadosProcessor;
60 class RGWMetaSyncStatusManager;
61 class RGWMetaSyncCR;
62 class RGWRESTConn;
63
64 class RGWSyncErrorLogger {
65   RGWRados *store;
66
67   vector<string> oids;
68   int num_shards;
69
70   std::atomic<int64_t> counter = { 0 };
71 public:
72   RGWSyncErrorLogger(RGWRados *_store, const string &oid_prefix, int _num_shards);
73   RGWCoroutine *log_error_cr(const string& source_zone, const string& section, const string& name, uint32_t error_code, const string& message);
74
75   static string get_shard_oid(const string& oid_prefix, int shard_id);
76 };
77
78 struct rgw_sync_error_info {
79   string source_zone;
80   uint32_t error_code;
81   string message;
82
83   rgw_sync_error_info() : error_code(0) {}
84   rgw_sync_error_info(const string& _source_zone, uint32_t _error_code, const string& _message) : source_zone(_source_zone), error_code(_error_code), message(_message) {}
85
86   void encode(bufferlist& bl) const {
87     ENCODE_START(1, 1, bl);
88     ::encode(source_zone, bl);
89     ::encode(error_code, bl);
90     ::encode(message, bl);
91     ENCODE_FINISH(bl);
92   }
93
94   void decode(bufferlist::iterator& bl) {
95     DECODE_START(1, bl);
96     ::decode(source_zone, bl);
97     ::decode(error_code, bl);
98     ::decode(message, bl);
99     DECODE_FINISH(bl);
100   }
101
102   void dump(Formatter *f) const;
103 };
104 WRITE_CLASS_ENCODER(rgw_sync_error_info)
105
106 #define DEFAULT_BACKOFF_MAX 30
107
108 class RGWSyncBackoff {
109   int cur_wait;
110   int max_secs;
111
112   void update_wait_time();
113 public:
114   RGWSyncBackoff(int _max_secs = DEFAULT_BACKOFF_MAX) : cur_wait(0), max_secs(_max_secs) {}
115
116   void backoff_sleep();
117   void reset() {
118     cur_wait = 0;
119   }
120
121   void backoff(RGWCoroutine *op);
122 };
123
124 class RGWBackoffControlCR : public RGWCoroutine
125 {
126   RGWCoroutine *cr;
127   Mutex lock;
128
129   RGWSyncBackoff backoff;
130   bool reset_backoff;
131
132   bool exit_on_error;
133
134 protected:
135   bool *backoff_ptr() {
136     return &reset_backoff;
137   }
138
139   Mutex& cr_lock() {
140     return lock;
141   }
142
143   RGWCoroutine *get_cr() {
144     return cr;
145   }
146
147 public:
148   RGWBackoffControlCR(CephContext *_cct, bool _exit_on_error) : RGWCoroutine(_cct), cr(NULL), lock("RGWBackoffControlCR::lock:" + stringify(this)),
149                                                                 reset_backoff(false), exit_on_error(_exit_on_error) {
150   }
151
152   ~RGWBackoffControlCR() override {
153     if (cr) {
154       cr->put();
155     }
156   }
157
158   virtual RGWCoroutine *alloc_cr() = 0;
159   virtual RGWCoroutine *alloc_finisher_cr() { return NULL; }
160
161   int operate() override;
162 };
163
164 struct RGWMetaSyncEnv {
165   CephContext *cct;
166   RGWRados *store;
167   RGWRESTConn *conn;
168   RGWAsyncRadosProcessor *async_rados;
169   RGWHTTPManager *http_manager;
170   RGWSyncErrorLogger *error_logger;
171
172   RGWMetaSyncEnv() : cct(NULL), store(NULL), conn(NULL), async_rados(NULL), http_manager(NULL), error_logger(NULL) {}
173
174   void init(CephContext *_cct, RGWRados *_store, RGWRESTConn *_conn,
175             RGWAsyncRadosProcessor *_async_rados, RGWHTTPManager *_http_manager,
176             RGWSyncErrorLogger *_error_logger);
177
178   string shard_obj_name(int shard_id);
179   string status_oid();
180 };
181
182 class RGWRemoteMetaLog : public RGWCoroutinesManager {
183   RGWRados *store;
184   RGWRESTConn *conn;
185   RGWAsyncRadosProcessor *async_rados;
186
187   RGWHTTPManager http_manager;
188   RGWMetaSyncStatusManager *status_manager;
189   RGWSyncErrorLogger *error_logger;
190
191   RGWMetaSyncCR *meta_sync_cr;
192
193   RGWSyncBackoff backoff;
194
195   RGWMetaSyncEnv sync_env;
196
197   void init_sync_env(RGWMetaSyncEnv *env);
198   int store_sync_info(const rgw_meta_sync_info& sync_info);
199
200   std::atomic<bool> going_down = { false };
201
202 public:
203   RGWRemoteMetaLog(RGWRados *_store, RGWAsyncRadosProcessor *async_rados,
204                    RGWMetaSyncStatusManager *_sm)
205     : RGWCoroutinesManager(_store->ctx(), _store->get_cr_registry()),
206       store(_store), conn(NULL), async_rados(async_rados),
207       http_manager(store->ctx(), completion_mgr),
208       status_manager(_sm), error_logger(NULL), meta_sync_cr(NULL) {}
209
210   ~RGWRemoteMetaLog() override;
211
212   int init();
213   void finish();
214
215   int read_log_info(rgw_mdlog_info *log_info);
216   int read_master_log_shards_info(const string& master_period, map<int, RGWMetadataLogInfo> *shards_info);
217   int read_master_log_shards_next(const string& period, map<int, string> shard_markers, map<int, rgw_mdlog_shard_data> *result);
218   int read_sync_status(rgw_meta_sync_status *sync_status);
219   int init_sync_status();
220   int run_sync();
221
222   void wakeup(int shard_id);
223
224   RGWMetaSyncEnv& get_sync_env() {
225     return sync_env;
226   }
227 };
228
229 class RGWMetaSyncStatusManager {
230   RGWRados *store;
231   librados::IoCtx ioctx;
232
233   RGWRemoteMetaLog master_log;
234
235   map<int, rgw_raw_obj> shard_objs;
236
237   struct utime_shard {
238     real_time ts;
239     int shard_id;
240
241     utime_shard() : shard_id(-1) {}
242
243     bool operator<(const utime_shard& rhs) const {
244       if (ts == rhs.ts) {
245         return shard_id < rhs.shard_id;
246       }
247       return ts < rhs.ts;
248     }
249   };
250
251   RWLock ts_to_shard_lock;
252   map<utime_shard, int> ts_to_shard;
253   vector<string> clone_markers;
254
255 public:
256   RGWMetaSyncStatusManager(RGWRados *_store, RGWAsyncRadosProcessor *async_rados)
257     : store(_store), master_log(store, async_rados, this),
258       ts_to_shard_lock("ts_to_shard_lock") {}
259   int init();
260
261   int read_sync_status(rgw_meta_sync_status *sync_status) {
262     return master_log.read_sync_status(sync_status);
263   }
264   int init_sync_status() { return master_log.init_sync_status(); }
265   int read_log_info(rgw_mdlog_info *log_info) {
266     return master_log.read_log_info(log_info);
267   }
268   int read_master_log_shards_info(const string& master_period, map<int, RGWMetadataLogInfo> *shards_info) {
269     return master_log.read_master_log_shards_info(master_period, shards_info);
270   }
271   int read_master_log_shards_next(const string& period, map<int, string> shard_markers, map<int, rgw_mdlog_shard_data> *result) {
272     return master_log.read_master_log_shards_next(period, shard_markers, result);
273   }
274
275   int run() { return master_log.run_sync(); }
276
277   void wakeup(int shard_id) { return master_log.wakeup(shard_id); }
278   void stop() {
279     master_log.finish();
280   }
281 };
282
283 template <class T, class K>
284 class RGWSyncShardMarkerTrack {
285   struct marker_entry {
286     uint64_t pos;
287     real_time timestamp;
288
289     marker_entry() : pos(0) {}
290     marker_entry(uint64_t _p, const real_time& _ts) : pos(_p), timestamp(_ts) {}
291   };
292   typename std::map<T, marker_entry> pending;
293
294   map<T, marker_entry> finish_markers;
295
296   int window_size;
297   int updates_since_flush;
298
299
300 protected:
301   typename std::set<K> need_retry_set;
302
303   virtual RGWCoroutine *store_marker(const T& new_marker, uint64_t index_pos, const real_time& timestamp) = 0;
304   virtual void handle_finish(const T& marker) { }
305
306 public:
307   RGWSyncShardMarkerTrack(int _window_size) : window_size(_window_size), updates_since_flush(0) {}
308   virtual ~RGWSyncShardMarkerTrack() {}
309
310   bool start(const T& pos, int index_pos, const real_time& timestamp) {
311     if (pending.find(pos) != pending.end()) {
312       return false;
313     }
314     pending[pos] = marker_entry(index_pos, timestamp);
315     return true;
316   }
317
318   void try_update_high_marker(const T& pos, int index_pos, const real_time& timestamp) {
319     finish_markers[pos] = marker_entry(index_pos, timestamp);
320   }
321
322   RGWCoroutine *finish(const T& pos) {
323     if (pending.empty()) {
324       /* can happen, due to a bug that ended up with multiple objects with the same name and version
325        * -- which can happen when versioning is enabled an the version is 'null'.
326        */
327       return NULL;
328     }
329
330     typename std::map<T, marker_entry>::iterator iter = pending.begin();
331
332     bool is_first = (pos == iter->first);
333
334     typename std::map<T, marker_entry>::iterator pos_iter = pending.find(pos);
335     if (pos_iter == pending.end()) {
336       /* see pending.empty() comment */
337       return NULL;
338     }
339
340     finish_markers[pos] = pos_iter->second;
341
342     pending.erase(pos);
343
344     handle_finish(pos);
345
346     updates_since_flush++;
347
348     if (is_first && (updates_since_flush >= window_size || pending.empty())) {
349       return flush();
350     }
351     return NULL;
352   }
353
354   RGWCoroutine *flush() {
355     if (finish_markers.empty()) {
356       return NULL;
357     }
358
359     typename std::map<T, marker_entry>::iterator i;
360
361     if (pending.empty()) {
362       i = finish_markers.end();
363     } else {
364       i = finish_markers.lower_bound(pending.begin()->first);
365     }
366     if (i == finish_markers.begin()) {
367       return NULL;
368     }
369     updates_since_flush = 0;
370
371     auto last = i;
372     --i;
373     const T& high_marker = i->first;
374     marker_entry& high_entry = i->second;
375     RGWCoroutine *cr = store_marker(high_marker, high_entry.pos, high_entry.timestamp);
376     finish_markers.erase(finish_markers.begin(), last);
377     return cr;
378   }
379
380   /*
381    * a key needs retry if it was processing when another marker that points
382    * to the same bucket shards arrives. Instead of processing it, we mark
383    * it as need_retry so that when we finish processing the original, we
384    * retry the processing on the same bucket shard, in case there are more
385    * entries to process. This closes a race that can happen.
386    */
387   bool need_retry(const K& key) {
388     return (need_retry_set.find(key) != need_retry_set.end());
389   }
390
391   void set_need_retry(const K& key) {
392     need_retry_set.insert(key);
393   }
394
395   void reset_need_retry(const K& key) {
396     need_retry_set.erase(key);
397   }
398 };
399
400 class RGWMetaSyncShardMarkerTrack;
401
402 class RGWMetaSyncSingleEntryCR : public RGWCoroutine {
403   RGWMetaSyncEnv *sync_env;
404
405   string raw_key;
406   string entry_marker;
407   RGWMDLogStatus op_status;
408
409   ssize_t pos;
410   string section;
411   string key;
412
413   int sync_status;
414
415   bufferlist md_bl;
416
417   RGWMetaSyncShardMarkerTrack *marker_tracker;
418
419   int tries;
420
421   bool error_injection;
422
423 public:
424   RGWMetaSyncSingleEntryCR(RGWMetaSyncEnv *_sync_env,
425                            const string& _raw_key, const string& _entry_marker,
426                            const RGWMDLogStatus& _op_status,
427                            RGWMetaSyncShardMarkerTrack *_marker_tracker) : RGWCoroutine(_sync_env->cct),
428                                                       sync_env(_sync_env),
429                                                       raw_key(_raw_key), entry_marker(_entry_marker),
430                                                       op_status(_op_status),
431                                                       pos(0), sync_status(0),
432                                                       marker_tracker(_marker_tracker), tries(0) {
433     error_injection = (sync_env->cct->_conf->rgw_sync_meta_inject_err_probability > 0);
434   }
435
436   int operate() override;
437 };
438
439 class RGWShardCollectCR : public RGWCoroutine {
440   int cur_shard;
441   int current_running;
442   int max_concurrent;
443   int status;
444
445 public:
446   RGWShardCollectCR(CephContext *_cct, int _max_concurrent) : RGWCoroutine(_cct),
447                                                              current_running(0),
448                                                              max_concurrent(_max_concurrent),
449                                                              status(0) {}
450
451   virtual bool spawn_next() = 0;
452   int operate() override;
453 };
454
455 // MetaLogTrimCR factory function
456 RGWCoroutine* create_meta_log_trim_cr(RGWRados *store, RGWHTTPManager *http,
457                                       int num_shards, utime_t interval);
458
459 // factory function for mdlog trim via radosgw-admin
460 RGWCoroutine* create_admin_meta_log_trim_cr(RGWRados *store,
461                                             RGWHTTPManager *http,
462                                             int num_shards);
463
464 #endif