X-Git-Url: https://gerrit.opnfv.org/gerrit/gitweb?a=blobdiff_plain;f=src%2Fceph%2Fsrc%2Frgw%2Frgw_sync.h;fp=src%2Fceph%2Fsrc%2Frgw%2Frgw_sync.h;h=0000000000000000000000000000000000000000;hb=7da45d65be36d36b880cc55c5036e96c24b53f00;hp=c651f7a9ad1b98191a44207db7beadf543079417;hpb=691462d09d0987b47e112d6ee8740375df3c51b2;p=stor4nfv.git diff --git a/src/ceph/src/rgw/rgw_sync.h b/src/ceph/src/rgw/rgw_sync.h deleted file mode 100644 index c651f7a..0000000 --- a/src/ceph/src/rgw/rgw_sync.h +++ /dev/null @@ -1,464 +0,0 @@ -#ifndef CEPH_RGW_SYNC_H -#define CEPH_RGW_SYNC_H - -#include "rgw_coroutine.h" -#include "rgw_http_client.h" -#include "rgw_meta_sync_status.h" - -#include "include/stringify.h" -#include "common/RWLock.h" - -#include - -#define ERROR_LOGGER_SHARDS 32 -#define RGW_SYNC_ERROR_LOG_SHARD_PREFIX "sync.error-log" - -struct rgw_mdlog_info { - uint32_t num_shards; - std::string period; //< period id of the master's oldest metadata log - epoch_t realm_epoch; //< realm epoch of oldest metadata log - - rgw_mdlog_info() : num_shards(0), realm_epoch(0) {} - - void decode_json(JSONObj *obj); -}; - - -struct rgw_mdlog_entry { - string id; - string section; - string name; - ceph::real_time timestamp; - RGWMetadataLogData log_data; - - void decode_json(JSONObj *obj); - - bool convert_from(cls_log_entry& le) { - id = le.id; - section = le.section; - name = le.name; - timestamp = le.timestamp.to_real_time(); - try { - bufferlist::iterator iter = le.data.begin(); - ::decode(log_data, iter); - } catch (buffer::error& err) { - return false; - } - return true; - } -}; - -struct rgw_mdlog_shard_data { - string marker; - bool truncated; - vector entries; - - void decode_json(JSONObj *obj); -}; - -class RGWAsyncRadosProcessor; -class RGWMetaSyncStatusManager; -class RGWMetaSyncCR; -class RGWRESTConn; - -class RGWSyncErrorLogger { - RGWRados *store; - - vector oids; - int num_shards; - - std::atomic counter = { 0 }; -public: - RGWSyncErrorLogger(RGWRados *_store, const string &oid_prefix, int _num_shards); - RGWCoroutine *log_error_cr(const string& source_zone, const string& section, const string& name, uint32_t error_code, const string& message); - - static string get_shard_oid(const string& oid_prefix, int shard_id); -}; - -struct rgw_sync_error_info { - string source_zone; - uint32_t error_code; - string message; - - rgw_sync_error_info() : error_code(0) {} - rgw_sync_error_info(const string& _source_zone, uint32_t _error_code, const string& _message) : source_zone(_source_zone), error_code(_error_code), message(_message) {} - - void encode(bufferlist& bl) const { - ENCODE_START(1, 1, bl); - ::encode(source_zone, bl); - ::encode(error_code, bl); - ::encode(message, bl); - ENCODE_FINISH(bl); - } - - void decode(bufferlist::iterator& bl) { - DECODE_START(1, bl); - ::decode(source_zone, bl); - ::decode(error_code, bl); - ::decode(message, bl); - DECODE_FINISH(bl); - } - - void dump(Formatter *f) const; -}; -WRITE_CLASS_ENCODER(rgw_sync_error_info) - -#define DEFAULT_BACKOFF_MAX 30 - -class RGWSyncBackoff { - int cur_wait; - int max_secs; - - void update_wait_time(); -public: - RGWSyncBackoff(int _max_secs = DEFAULT_BACKOFF_MAX) : cur_wait(0), max_secs(_max_secs) {} - - void backoff_sleep(); - void reset() { - cur_wait = 0; - } - - void backoff(RGWCoroutine *op); -}; - -class RGWBackoffControlCR : public RGWCoroutine -{ - RGWCoroutine *cr; - Mutex lock; - - RGWSyncBackoff backoff; - bool reset_backoff; - - bool exit_on_error; - -protected: - bool *backoff_ptr() { - return &reset_backoff; - } - - Mutex& cr_lock() { - return lock; - } - - RGWCoroutine *get_cr() { - return cr; - } - -public: - RGWBackoffControlCR(CephContext *_cct, bool _exit_on_error) : RGWCoroutine(_cct), cr(NULL), lock("RGWBackoffControlCR::lock:" + stringify(this)), - reset_backoff(false), exit_on_error(_exit_on_error) { - } - - ~RGWBackoffControlCR() override { - if (cr) { - cr->put(); - } - } - - virtual RGWCoroutine *alloc_cr() = 0; - virtual RGWCoroutine *alloc_finisher_cr() { return NULL; } - - int operate() override; -}; - -struct RGWMetaSyncEnv { - CephContext *cct; - RGWRados *store; - RGWRESTConn *conn; - RGWAsyncRadosProcessor *async_rados; - RGWHTTPManager *http_manager; - RGWSyncErrorLogger *error_logger; - - RGWMetaSyncEnv() : cct(NULL), store(NULL), conn(NULL), async_rados(NULL), http_manager(NULL), error_logger(NULL) {} - - void init(CephContext *_cct, RGWRados *_store, RGWRESTConn *_conn, - RGWAsyncRadosProcessor *_async_rados, RGWHTTPManager *_http_manager, - RGWSyncErrorLogger *_error_logger); - - string shard_obj_name(int shard_id); - string status_oid(); -}; - -class RGWRemoteMetaLog : public RGWCoroutinesManager { - RGWRados *store; - RGWRESTConn *conn; - RGWAsyncRadosProcessor *async_rados; - - RGWHTTPManager http_manager; - RGWMetaSyncStatusManager *status_manager; - RGWSyncErrorLogger *error_logger; - - RGWMetaSyncCR *meta_sync_cr; - - RGWSyncBackoff backoff; - - RGWMetaSyncEnv sync_env; - - void init_sync_env(RGWMetaSyncEnv *env); - int store_sync_info(const rgw_meta_sync_info& sync_info); - - std::atomic going_down = { false }; - -public: - RGWRemoteMetaLog(RGWRados *_store, RGWAsyncRadosProcessor *async_rados, - RGWMetaSyncStatusManager *_sm) - : RGWCoroutinesManager(_store->ctx(), _store->get_cr_registry()), - store(_store), conn(NULL), async_rados(async_rados), - http_manager(store->ctx(), completion_mgr), - status_manager(_sm), error_logger(NULL), meta_sync_cr(NULL) {} - - ~RGWRemoteMetaLog() override; - - int init(); - void finish(); - - int read_log_info(rgw_mdlog_info *log_info); - int read_master_log_shards_info(const string& master_period, map *shards_info); - int read_master_log_shards_next(const string& period, map shard_markers, map *result); - int read_sync_status(rgw_meta_sync_status *sync_status); - int init_sync_status(); - int run_sync(); - - void wakeup(int shard_id); - - RGWMetaSyncEnv& get_sync_env() { - return sync_env; - } -}; - -class RGWMetaSyncStatusManager { - RGWRados *store; - librados::IoCtx ioctx; - - RGWRemoteMetaLog master_log; - - map shard_objs; - - struct utime_shard { - real_time ts; - int shard_id; - - utime_shard() : shard_id(-1) {} - - bool operator<(const utime_shard& rhs) const { - if (ts == rhs.ts) { - return shard_id < rhs.shard_id; - } - return ts < rhs.ts; - } - }; - - RWLock ts_to_shard_lock; - map ts_to_shard; - vector clone_markers; - -public: - RGWMetaSyncStatusManager(RGWRados *_store, RGWAsyncRadosProcessor *async_rados) - : store(_store), master_log(store, async_rados, this), - ts_to_shard_lock("ts_to_shard_lock") {} - int init(); - - int read_sync_status(rgw_meta_sync_status *sync_status) { - return master_log.read_sync_status(sync_status); - } - int init_sync_status() { return master_log.init_sync_status(); } - int read_log_info(rgw_mdlog_info *log_info) { - return master_log.read_log_info(log_info); - } - int read_master_log_shards_info(const string& master_period, map *shards_info) { - return master_log.read_master_log_shards_info(master_period, shards_info); - } - int read_master_log_shards_next(const string& period, map shard_markers, map *result) { - return master_log.read_master_log_shards_next(period, shard_markers, result); - } - - int run() { return master_log.run_sync(); } - - void wakeup(int shard_id) { return master_log.wakeup(shard_id); } - void stop() { - master_log.finish(); - } -}; - -template -class RGWSyncShardMarkerTrack { - struct marker_entry { - uint64_t pos; - real_time timestamp; - - marker_entry() : pos(0) {} - marker_entry(uint64_t _p, const real_time& _ts) : pos(_p), timestamp(_ts) {} - }; - typename std::map pending; - - map finish_markers; - - int window_size; - int updates_since_flush; - - -protected: - typename std::set need_retry_set; - - virtual RGWCoroutine *store_marker(const T& new_marker, uint64_t index_pos, const real_time& timestamp) = 0; - virtual void handle_finish(const T& marker) { } - -public: - RGWSyncShardMarkerTrack(int _window_size) : window_size(_window_size), updates_since_flush(0) {} - virtual ~RGWSyncShardMarkerTrack() {} - - bool start(const T& pos, int index_pos, const real_time& timestamp) { - if (pending.find(pos) != pending.end()) { - return false; - } - pending[pos] = marker_entry(index_pos, timestamp); - return true; - } - - void try_update_high_marker(const T& pos, int index_pos, const real_time& timestamp) { - finish_markers[pos] = marker_entry(index_pos, timestamp); - } - - RGWCoroutine *finish(const T& pos) { - if (pending.empty()) { - /* can happen, due to a bug that ended up with multiple objects with the same name and version - * -- which can happen when versioning is enabled an the version is 'null'. - */ - return NULL; - } - - typename std::map::iterator iter = pending.begin(); - - bool is_first = (pos == iter->first); - - typename std::map::iterator pos_iter = pending.find(pos); - if (pos_iter == pending.end()) { - /* see pending.empty() comment */ - return NULL; - } - - finish_markers[pos] = pos_iter->second; - - pending.erase(pos); - - handle_finish(pos); - - updates_since_flush++; - - if (is_first && (updates_since_flush >= window_size || pending.empty())) { - return flush(); - } - return NULL; - } - - RGWCoroutine *flush() { - if (finish_markers.empty()) { - return NULL; - } - - typename std::map::iterator i; - - if (pending.empty()) { - i = finish_markers.end(); - } else { - i = finish_markers.lower_bound(pending.begin()->first); - } - if (i == finish_markers.begin()) { - return NULL; - } - updates_since_flush = 0; - - auto last = i; - --i; - const T& high_marker = i->first; - marker_entry& high_entry = i->second; - RGWCoroutine *cr = store_marker(high_marker, high_entry.pos, high_entry.timestamp); - finish_markers.erase(finish_markers.begin(), last); - return cr; - } - - /* - * a key needs retry if it was processing when another marker that points - * to the same bucket shards arrives. Instead of processing it, we mark - * it as need_retry so that when we finish processing the original, we - * retry the processing on the same bucket shard, in case there are more - * entries to process. This closes a race that can happen. - */ - bool need_retry(const K& key) { - return (need_retry_set.find(key) != need_retry_set.end()); - } - - void set_need_retry(const K& key) { - need_retry_set.insert(key); - } - - void reset_need_retry(const K& key) { - need_retry_set.erase(key); - } -}; - -class RGWMetaSyncShardMarkerTrack; - -class RGWMetaSyncSingleEntryCR : public RGWCoroutine { - RGWMetaSyncEnv *sync_env; - - string raw_key; - string entry_marker; - RGWMDLogStatus op_status; - - ssize_t pos; - string section; - string key; - - int sync_status; - - bufferlist md_bl; - - RGWMetaSyncShardMarkerTrack *marker_tracker; - - int tries; - - bool error_injection; - -public: - RGWMetaSyncSingleEntryCR(RGWMetaSyncEnv *_sync_env, - const string& _raw_key, const string& _entry_marker, - const RGWMDLogStatus& _op_status, - RGWMetaSyncShardMarkerTrack *_marker_tracker) : RGWCoroutine(_sync_env->cct), - sync_env(_sync_env), - raw_key(_raw_key), entry_marker(_entry_marker), - op_status(_op_status), - pos(0), sync_status(0), - marker_tracker(_marker_tracker), tries(0) { - error_injection = (sync_env->cct->_conf->rgw_sync_meta_inject_err_probability > 0); - } - - int operate() override; -}; - -class RGWShardCollectCR : public RGWCoroutine { - int cur_shard; - int current_running; - int max_concurrent; - int status; - -public: - RGWShardCollectCR(CephContext *_cct, int _max_concurrent) : RGWCoroutine(_cct), - current_running(0), - max_concurrent(_max_concurrent), - status(0) {} - - virtual bool spawn_next() = 0; - int operate() override; -}; - -// MetaLogTrimCR factory function -RGWCoroutine* create_meta_log_trim_cr(RGWRados *store, RGWHTTPManager *http, - int num_shards, utime_t interval); - -// factory function for mdlog trim via radosgw-admin -RGWCoroutine* create_admin_meta_log_trim_cr(RGWRados *store, - RGWHTTPManager *http, - int num_shards); - -#endif