X-Git-Url: https://gerrit.opnfv.org/gerrit/gitweb?a=blobdiff_plain;f=src%2Fceph%2Fsrc%2Frgw%2Frgw_sync.h;fp=src%2Fceph%2Fsrc%2Frgw%2Frgw_sync.h;h=c651f7a9ad1b98191a44207db7beadf543079417;hb=812ff6ca9fcd3e629e49d4328905f33eee8ca3f5;hp=0000000000000000000000000000000000000000;hpb=15280273faafb77777eab341909a3f495cf248d9;p=stor4nfv.git diff --git a/src/ceph/src/rgw/rgw_sync.h b/src/ceph/src/rgw/rgw_sync.h new file mode 100644 index 0000000..c651f7a --- /dev/null +++ b/src/ceph/src/rgw/rgw_sync.h @@ -0,0 +1,464 @@ +#ifndef CEPH_RGW_SYNC_H +#define CEPH_RGW_SYNC_H + +#include "rgw_coroutine.h" +#include "rgw_http_client.h" +#include "rgw_meta_sync_status.h" + +#include "include/stringify.h" +#include "common/RWLock.h" + +#include + +#define ERROR_LOGGER_SHARDS 32 +#define RGW_SYNC_ERROR_LOG_SHARD_PREFIX "sync.error-log" + +struct rgw_mdlog_info { + uint32_t num_shards; + std::string period; //< period id of the master's oldest metadata log + epoch_t realm_epoch; //< realm epoch of oldest metadata log + + rgw_mdlog_info() : num_shards(0), realm_epoch(0) {} + + void decode_json(JSONObj *obj); +}; + + +struct rgw_mdlog_entry { + string id; + string section; + string name; + ceph::real_time timestamp; + RGWMetadataLogData log_data; + + void decode_json(JSONObj *obj); + + bool convert_from(cls_log_entry& le) { + id = le.id; + section = le.section; + name = le.name; + timestamp = le.timestamp.to_real_time(); + try { + bufferlist::iterator iter = le.data.begin(); + ::decode(log_data, iter); + } catch (buffer::error& err) { + return false; + } + return true; + } +}; + +struct rgw_mdlog_shard_data { + string marker; + bool truncated; + vector entries; + + void decode_json(JSONObj *obj); +}; + +class RGWAsyncRadosProcessor; +class RGWMetaSyncStatusManager; +class RGWMetaSyncCR; +class RGWRESTConn; + +class RGWSyncErrorLogger { + RGWRados *store; + + vector oids; + int num_shards; + + std::atomic counter = { 0 }; +public: + RGWSyncErrorLogger(RGWRados *_store, const string &oid_prefix, int _num_shards); + RGWCoroutine *log_error_cr(const string& source_zone, const string& section, const string& name, uint32_t error_code, const string& message); + + static string get_shard_oid(const string& oid_prefix, int shard_id); +}; + +struct rgw_sync_error_info { + string source_zone; + uint32_t error_code; + string message; + + rgw_sync_error_info() : error_code(0) {} + rgw_sync_error_info(const string& _source_zone, uint32_t _error_code, const string& _message) : source_zone(_source_zone), error_code(_error_code), message(_message) {} + + void encode(bufferlist& bl) const { + ENCODE_START(1, 1, bl); + ::encode(source_zone, bl); + ::encode(error_code, bl); + ::encode(message, bl); + ENCODE_FINISH(bl); + } + + void decode(bufferlist::iterator& bl) { + DECODE_START(1, bl); + ::decode(source_zone, bl); + ::decode(error_code, bl); + ::decode(message, bl); + DECODE_FINISH(bl); + } + + void dump(Formatter *f) const; +}; +WRITE_CLASS_ENCODER(rgw_sync_error_info) + +#define DEFAULT_BACKOFF_MAX 30 + +class RGWSyncBackoff { + int cur_wait; + int max_secs; + + void update_wait_time(); +public: + RGWSyncBackoff(int _max_secs = DEFAULT_BACKOFF_MAX) : cur_wait(0), max_secs(_max_secs) {} + + void backoff_sleep(); + void reset() { + cur_wait = 0; + } + + void backoff(RGWCoroutine *op); +}; + +class RGWBackoffControlCR : public RGWCoroutine +{ + RGWCoroutine *cr; + Mutex lock; + + RGWSyncBackoff backoff; + bool reset_backoff; + + bool exit_on_error; + +protected: + bool *backoff_ptr() { + return &reset_backoff; + } + + Mutex& cr_lock() { + return lock; + } + + RGWCoroutine *get_cr() { + return cr; + } + +public: + RGWBackoffControlCR(CephContext *_cct, bool _exit_on_error) : RGWCoroutine(_cct), cr(NULL), lock("RGWBackoffControlCR::lock:" + stringify(this)), + reset_backoff(false), exit_on_error(_exit_on_error) { + } + + ~RGWBackoffControlCR() override { + if (cr) { + cr->put(); + } + } + + virtual RGWCoroutine *alloc_cr() = 0; + virtual RGWCoroutine *alloc_finisher_cr() { return NULL; } + + int operate() override; +}; + +struct RGWMetaSyncEnv { + CephContext *cct; + RGWRados *store; + RGWRESTConn *conn; + RGWAsyncRadosProcessor *async_rados; + RGWHTTPManager *http_manager; + RGWSyncErrorLogger *error_logger; + + RGWMetaSyncEnv() : cct(NULL), store(NULL), conn(NULL), async_rados(NULL), http_manager(NULL), error_logger(NULL) {} + + void init(CephContext *_cct, RGWRados *_store, RGWRESTConn *_conn, + RGWAsyncRadosProcessor *_async_rados, RGWHTTPManager *_http_manager, + RGWSyncErrorLogger *_error_logger); + + string shard_obj_name(int shard_id); + string status_oid(); +}; + +class RGWRemoteMetaLog : public RGWCoroutinesManager { + RGWRados *store; + RGWRESTConn *conn; + RGWAsyncRadosProcessor *async_rados; + + RGWHTTPManager http_manager; + RGWMetaSyncStatusManager *status_manager; + RGWSyncErrorLogger *error_logger; + + RGWMetaSyncCR *meta_sync_cr; + + RGWSyncBackoff backoff; + + RGWMetaSyncEnv sync_env; + + void init_sync_env(RGWMetaSyncEnv *env); + int store_sync_info(const rgw_meta_sync_info& sync_info); + + std::atomic going_down = { false }; + +public: + RGWRemoteMetaLog(RGWRados *_store, RGWAsyncRadosProcessor *async_rados, + RGWMetaSyncStatusManager *_sm) + : RGWCoroutinesManager(_store->ctx(), _store->get_cr_registry()), + store(_store), conn(NULL), async_rados(async_rados), + http_manager(store->ctx(), completion_mgr), + status_manager(_sm), error_logger(NULL), meta_sync_cr(NULL) {} + + ~RGWRemoteMetaLog() override; + + int init(); + void finish(); + + int read_log_info(rgw_mdlog_info *log_info); + int read_master_log_shards_info(const string& master_period, map *shards_info); + int read_master_log_shards_next(const string& period, map shard_markers, map *result); + int read_sync_status(rgw_meta_sync_status *sync_status); + int init_sync_status(); + int run_sync(); + + void wakeup(int shard_id); + + RGWMetaSyncEnv& get_sync_env() { + return sync_env; + } +}; + +class RGWMetaSyncStatusManager { + RGWRados *store; + librados::IoCtx ioctx; + + RGWRemoteMetaLog master_log; + + map shard_objs; + + struct utime_shard { + real_time ts; + int shard_id; + + utime_shard() : shard_id(-1) {} + + bool operator<(const utime_shard& rhs) const { + if (ts == rhs.ts) { + return shard_id < rhs.shard_id; + } + return ts < rhs.ts; + } + }; + + RWLock ts_to_shard_lock; + map ts_to_shard; + vector clone_markers; + +public: + RGWMetaSyncStatusManager(RGWRados *_store, RGWAsyncRadosProcessor *async_rados) + : store(_store), master_log(store, async_rados, this), + ts_to_shard_lock("ts_to_shard_lock") {} + int init(); + + int read_sync_status(rgw_meta_sync_status *sync_status) { + return master_log.read_sync_status(sync_status); + } + int init_sync_status() { return master_log.init_sync_status(); } + int read_log_info(rgw_mdlog_info *log_info) { + return master_log.read_log_info(log_info); + } + int read_master_log_shards_info(const string& master_period, map *shards_info) { + return master_log.read_master_log_shards_info(master_period, shards_info); + } + int read_master_log_shards_next(const string& period, map shard_markers, map *result) { + return master_log.read_master_log_shards_next(period, shard_markers, result); + } + + int run() { return master_log.run_sync(); } + + void wakeup(int shard_id) { return master_log.wakeup(shard_id); } + void stop() { + master_log.finish(); + } +}; + +template +class RGWSyncShardMarkerTrack { + struct marker_entry { + uint64_t pos; + real_time timestamp; + + marker_entry() : pos(0) {} + marker_entry(uint64_t _p, const real_time& _ts) : pos(_p), timestamp(_ts) {} + }; + typename std::map pending; + + map finish_markers; + + int window_size; + int updates_since_flush; + + +protected: + typename std::set need_retry_set; + + virtual RGWCoroutine *store_marker(const T& new_marker, uint64_t index_pos, const real_time& timestamp) = 0; + virtual void handle_finish(const T& marker) { } + +public: + RGWSyncShardMarkerTrack(int _window_size) : window_size(_window_size), updates_since_flush(0) {} + virtual ~RGWSyncShardMarkerTrack() {} + + bool start(const T& pos, int index_pos, const real_time& timestamp) { + if (pending.find(pos) != pending.end()) { + return false; + } + pending[pos] = marker_entry(index_pos, timestamp); + return true; + } + + void try_update_high_marker(const T& pos, int index_pos, const real_time& timestamp) { + finish_markers[pos] = marker_entry(index_pos, timestamp); + } + + RGWCoroutine *finish(const T& pos) { + if (pending.empty()) { + /* can happen, due to a bug that ended up with multiple objects with the same name and version + * -- which can happen when versioning is enabled an the version is 'null'. + */ + return NULL; + } + + typename std::map::iterator iter = pending.begin(); + + bool is_first = (pos == iter->first); + + typename std::map::iterator pos_iter = pending.find(pos); + if (pos_iter == pending.end()) { + /* see pending.empty() comment */ + return NULL; + } + + finish_markers[pos] = pos_iter->second; + + pending.erase(pos); + + handle_finish(pos); + + updates_since_flush++; + + if (is_first && (updates_since_flush >= window_size || pending.empty())) { + return flush(); + } + return NULL; + } + + RGWCoroutine *flush() { + if (finish_markers.empty()) { + return NULL; + } + + typename std::map::iterator i; + + if (pending.empty()) { + i = finish_markers.end(); + } else { + i = finish_markers.lower_bound(pending.begin()->first); + } + if (i == finish_markers.begin()) { + return NULL; + } + updates_since_flush = 0; + + auto last = i; + --i; + const T& high_marker = i->first; + marker_entry& high_entry = i->second; + RGWCoroutine *cr = store_marker(high_marker, high_entry.pos, high_entry.timestamp); + finish_markers.erase(finish_markers.begin(), last); + return cr; + } + + /* + * a key needs retry if it was processing when another marker that points + * to the same bucket shards arrives. Instead of processing it, we mark + * it as need_retry so that when we finish processing the original, we + * retry the processing on the same bucket shard, in case there are more + * entries to process. This closes a race that can happen. + */ + bool need_retry(const K& key) { + return (need_retry_set.find(key) != need_retry_set.end()); + } + + void set_need_retry(const K& key) { + need_retry_set.insert(key); + } + + void reset_need_retry(const K& key) { + need_retry_set.erase(key); + } +}; + +class RGWMetaSyncShardMarkerTrack; + +class RGWMetaSyncSingleEntryCR : public RGWCoroutine { + RGWMetaSyncEnv *sync_env; + + string raw_key; + string entry_marker; + RGWMDLogStatus op_status; + + ssize_t pos; + string section; + string key; + + int sync_status; + + bufferlist md_bl; + + RGWMetaSyncShardMarkerTrack *marker_tracker; + + int tries; + + bool error_injection; + +public: + RGWMetaSyncSingleEntryCR(RGWMetaSyncEnv *_sync_env, + const string& _raw_key, const string& _entry_marker, + const RGWMDLogStatus& _op_status, + RGWMetaSyncShardMarkerTrack *_marker_tracker) : RGWCoroutine(_sync_env->cct), + sync_env(_sync_env), + raw_key(_raw_key), entry_marker(_entry_marker), + op_status(_op_status), + pos(0), sync_status(0), + marker_tracker(_marker_tracker), tries(0) { + error_injection = (sync_env->cct->_conf->rgw_sync_meta_inject_err_probability > 0); + } + + int operate() override; +}; + +class RGWShardCollectCR : public RGWCoroutine { + int cur_shard; + int current_running; + int max_concurrent; + int status; + +public: + RGWShardCollectCR(CephContext *_cct, int _max_concurrent) : RGWCoroutine(_cct), + current_running(0), + max_concurrent(_max_concurrent), + status(0) {} + + virtual bool spawn_next() = 0; + int operate() override; +}; + +// MetaLogTrimCR factory function +RGWCoroutine* create_meta_log_trim_cr(RGWRados *store, RGWHTTPManager *http, + int num_shards, utime_t interval); + +// factory function for mdlog trim via radosgw-admin +RGWCoroutine* create_admin_meta_log_trim_cr(RGWRados *store, + RGWHTTPManager *http, + int num_shards); + +#endif