X-Git-Url: https://gerrit.opnfv.org/gerrit/gitweb?a=blobdiff_plain;f=src%2Fceph%2Fsrc%2Ftools%2Frbd_mirror%2FImageReplayer.h;fp=src%2Fceph%2Fsrc%2Ftools%2Frbd_mirror%2FImageReplayer.h;h=a66b02a24abc553c82ceaf346548656983367945;hb=812ff6ca9fcd3e629e49d4328905f33eee8ca3f5;hp=0000000000000000000000000000000000000000;hpb=15280273faafb77777eab341909a3f495cf248d9;p=stor4nfv.git diff --git a/src/ceph/src/tools/rbd_mirror/ImageReplayer.h b/src/ceph/src/tools/rbd_mirror/ImageReplayer.h new file mode 100644 index 0000000..a66b02a --- /dev/null +++ b/src/ceph/src/tools/rbd_mirror/ImageReplayer.h @@ -0,0 +1,437 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab + +#ifndef CEPH_RBD_MIRROR_IMAGE_REPLAYER_H +#define CEPH_RBD_MIRROR_IMAGE_REPLAYER_H + +#include "common/AsyncOpTracker.h" +#include "common/Mutex.h" +#include "common/WorkQueue.h" +#include "include/rados/librados.hpp" +#include "cls/journal/cls_journal_types.h" +#include "cls/rbd/cls_rbd_types.h" +#include "journal/JournalMetadataListener.h" +#include "journal/ReplayEntry.h" +#include "librbd/ImageCtx.h" +#include "librbd/journal/Types.h" +#include "librbd/journal/TypeTraits.h" +#include "ProgressContext.h" +#include "types.h" +#include "tools/rbd_mirror/image_replayer/Types.h" + +#include +#include + +#include +#include +#include +#include +#include + +class AdminSocketHook; + +namespace journal { + +class Journaler; +class ReplayHandler; + +} + +namespace librbd { + +class ImageCtx; +namespace journal { template class Replay; } + +} + +namespace rbd { +namespace mirror { + +template struct ImageDeleter; +template struct InstanceWatcher; +template struct Threads; + +namespace image_replayer { template class BootstrapRequest; } +namespace image_replayer { template class EventPreprocessor; } +namespace image_replayer { template class ReplayStatusFormatter; } + +/** + * Replays changes from a remote cluster for a single image. + */ +template +class ImageReplayer { +public: + static ImageReplayer *create( + Threads *threads, ImageDeleter* image_deleter, + InstanceWatcher *instance_watcher, + RadosRef local, const std::string &local_mirror_uuid, int64_t local_pool_id, + const std::string &global_image_id) { + return new ImageReplayer(threads, image_deleter, instance_watcher, + local, local_mirror_uuid, local_pool_id, + global_image_id); + } + void destroy() { + delete this; + } + + ImageReplayer(Threads *threads, + ImageDeleter* image_deleter, + InstanceWatcher *instance_watcher, + RadosRef local, const std::string &local_mirror_uuid, + int64_t local_pool_id, const std::string &global_image_id); + virtual ~ImageReplayer(); + ImageReplayer(const ImageReplayer&) = delete; + ImageReplayer& operator=(const ImageReplayer&) = delete; + + bool is_stopped() { Mutex::Locker l(m_lock); return is_stopped_(); } + bool is_running() { Mutex::Locker l(m_lock); return is_running_(); } + bool is_replaying() { Mutex::Locker l(m_lock); return is_replaying_(); } + + std::string get_name() { Mutex::Locker l(m_lock); return m_name; }; + void set_state_description(int r, const std::string &desc); + + // TODO temporary until policy handles release of image replayers + inline bool is_finished() const { + Mutex::Locker locker(m_lock); + return m_finished; + } + inline void set_finished(bool finished) { + Mutex::Locker locker(m_lock); + m_finished = finished; + } + + inline bool is_blacklisted() const { + Mutex::Locker locker(m_lock); + return (m_last_r == -EBLACKLISTED); + } + + image_replayer::HealthState get_health_state() const; + + void add_peer(const std::string &peer_uuid, librados::IoCtx &remote_io_ctx); + + inline int64_t get_local_pool_id() const { + return m_local_pool_id; + } + inline const std::string& get_global_image_id() const { + return m_global_image_id; + } + + void start(Context *on_finish = nullptr, bool manual = false); + void stop(Context *on_finish = nullptr, bool manual = false, + int r = 0, const std::string& desc = ""); + void restart(Context *on_finish = nullptr); + void flush(Context *on_finish = nullptr); + + void resync_image(Context *on_finish=nullptr); + + void print_status(Formatter *f, stringstream *ss); + + virtual void handle_replay_ready(); + virtual void handle_replay_complete(int r, const std::string &error_desc); + +protected: + /** + * @verbatim + * (error) + * <------------------------------------ FAIL + * | ^ + * v * + * * + * | * + * v * + * WAIT_FOR_DELETION * + * | * + * v (error) * + * PREPARE_LOCAL_IMAGE * * * * * * * * * * * * * * * * * * + * | * + * v (error) * + * PREPARE_REMOTE_IMAGE * * * * * * * * * * * * * * * * * * + * | * + * v (error) * + * BOOTSTRAP_IMAGE * * * * * * * * * * * * * * * * * * * * + * | * + * v (error) * + * INIT_REMOTE_JOURNALER * * * * * * * * * * * * * * * * * + * | * + * v (error) * + * START_REPLAY * * * * * * * * * * * * * * * * * * * * * * + * | + * | /--------------------------------------------\ + * | | | + * v v (asok flush) | + * REPLAYING -------------> LOCAL_REPLAY_FLUSH | + * | \ | | + * | | v | + * | | FLUSH_COMMIT_POSITION | + * | | | | + * | | \--------------------/| + * | | | + * | | (entries available) | + * | \-----------> REPLAY_READY | + * | | | + * | | (skip if not | + * | v needed) (error) + * | REPLAY_FLUSH * * * * * * * * * + * | | | * + * | | (skip if not | * + * | v needed) (error) * + * | GET_REMOTE_TAG * * * * * * * * + * | | | * + * | | (skip if not | * + * | v needed) (error) * + * | ALLOCATE_LOCAL_TAG * * * * * * + * | | | * + * | v (error) * + * | PREPROCESS_ENTRY * * * * * * * + * | | | * + * | v (error) * + * | PROCESS_ENTRY * * * * * * * * * + * | | | * + * | \---------------------/ * + * v * + * REPLAY_COMPLETE < * * * * * * * * * * * * * * * * * * * + * | + * v + * JOURNAL_REPLAY_SHUT_DOWN + * | + * v + * LOCAL_IMAGE_CLOSE + * | + * v + * + * + * @endverbatim + */ + + virtual void on_start_fail(int r, const std::string &desc = ""); + virtual bool on_start_interrupted(); + + virtual void on_stop_journal_replay(int r = 0, const std::string &desc = ""); + + virtual void on_flush_local_replay_flush_start(Context *on_flush); + virtual void on_flush_local_replay_flush_finish(Context *on_flush, int r); + virtual void on_flush_flush_commit_position_start(Context *on_flush); + virtual void on_flush_flush_commit_position_finish(Context *on_flush, int r); + + bool on_replay_interrupted(); + +private: + typedef typename librbd::journal::TypeTraits::ReplayEntry ReplayEntry; + + enum State { + STATE_UNKNOWN, + STATE_STARTING, + STATE_REPLAYING, + STATE_REPLAY_FLUSHING, + STATE_STOPPING, + STATE_STOPPED, + }; + + struct RemoteImage { + std::string mirror_uuid; + std::string image_id; + librados::IoCtx io_ctx; + + RemoteImage() { + } + RemoteImage(const Peer& peer) : io_ctx(peer.io_ctx) { + } + }; + + typedef typename librbd::journal::TypeTraits::Journaler Journaler; + typedef boost::optional OptionalState; + typedef boost::optional + OptionalMirrorImageStatusState; + + struct JournalListener : public librbd::journal::Listener { + ImageReplayer *img_replayer; + + JournalListener(ImageReplayer *img_replayer) + : img_replayer(img_replayer) { + } + + void handle_close() override { + img_replayer->on_stop_journal_replay(); + } + + void handle_promoted() override { + img_replayer->on_stop_journal_replay(0, "force promoted"); + } + + void handle_resync() override { + img_replayer->resync_image(); + } + }; + + class BootstrapProgressContext : public ProgressContext { + public: + BootstrapProgressContext(ImageReplayer *replayer) : + replayer(replayer) { + } + + void update_progress(const std::string &description, + bool flush = true) override; + private: + ImageReplayer *replayer; + }; + + Threads *m_threads; + ImageDeleter* m_image_deleter; + InstanceWatcher *m_instance_watcher; + + Peers m_peers; + RemoteImage m_remote_image; + + RadosRef m_local; + std::string m_local_mirror_uuid; + int64_t m_local_pool_id; + std::string m_local_image_id; + std::string m_global_image_id; + std::string m_name; + + mutable Mutex m_lock; + State m_state = STATE_STOPPED; + std::string m_state_desc; + + OptionalMirrorImageStatusState m_mirror_image_status_state = boost::none; + int m_last_r = 0; + + BootstrapProgressContext m_progress_cxt; + + bool m_finished = false; + bool m_delete_requested = false; + bool m_resync_requested = false; + + image_replayer::EventPreprocessor *m_event_preprocessor = nullptr; + image_replayer::ReplayStatusFormatter *m_replay_status_formatter = + nullptr; + librados::IoCtx m_local_ioctx; + ImageCtxT *m_local_image_ctx = nullptr; + std::string m_local_image_tag_owner; + + decltype(ImageCtxT::journal) m_local_journal = nullptr; + librbd::journal::Replay *m_local_replay = nullptr; + Journaler* m_remote_journaler = nullptr; + ::journal::ReplayHandler *m_replay_handler = nullptr; + librbd::journal::Listener *m_journal_listener; + + Context *m_on_start_finish = nullptr; + Context *m_on_stop_finish = nullptr; + Context *m_update_status_task = nullptr; + int m_update_status_interval = 0; + librados::AioCompletion *m_update_status_comp = nullptr; + bool m_stop_requested = false; + bool m_manual_stop = false; + + AdminSocketHook *m_asok_hook = nullptr; + + image_replayer::BootstrapRequest *m_bootstrap_request = nullptr; + + uint32_t m_in_flight_status_updates = 0; + bool m_update_status_requested = false; + Context *m_on_update_status_finish = nullptr; + + librbd::journal::MirrorPeerClientMeta m_client_meta; + + ReplayEntry m_replay_entry; + bool m_replay_tag_valid = false; + uint64_t m_replay_tag_tid = 0; + cls::journal::Tag m_replay_tag; + librbd::journal::TagData m_replay_tag_data; + librbd::journal::EventEntry m_event_entry; + AsyncOpTracker m_event_replay_tracker; + Context *m_delayed_preprocess_task = nullptr; + + struct RemoteJournalerListener : public ::journal::JournalMetadataListener { + ImageReplayer *replayer; + + RemoteJournalerListener(ImageReplayer *replayer) : replayer(replayer) { } + + void handle_update(::journal::JournalMetadata *) override; + } m_remote_listener; + + struct C_ReplayCommitted : public Context { + ImageReplayer *replayer; + ReplayEntry replay_entry; + + C_ReplayCommitted(ImageReplayer *replayer, + ReplayEntry &&replay_entry) + : replayer(replayer), replay_entry(std::move(replay_entry)) { + } + void finish(int r) override { + replayer->handle_process_entry_safe(replay_entry, r); + } + }; + + static std::string to_string(const State state); + + bool is_stopped_() const { + return m_state == STATE_STOPPED; + } + bool is_running_() const { + return !is_stopped_() && m_state != STATE_STOPPING && !m_stop_requested; + } + bool is_replaying_() const { + return (m_state == STATE_REPLAYING || + m_state == STATE_REPLAY_FLUSHING); + } + + bool update_mirror_image_status(bool force, const OptionalState &state); + bool start_mirror_image_status_update(bool force, bool restarting); + void finish_mirror_image_status_update(); + void queue_mirror_image_status_update(const OptionalState &state); + void send_mirror_status_update(const OptionalState &state); + void handle_mirror_status_update(int r); + void reschedule_update_status_task(int new_interval = 0); + + void shut_down(int r); + void handle_shut_down(int r); + void handle_remote_journal_metadata_updated(); + + void wait_for_deletion(); + void handle_wait_for_deletion(int r); + + void prepare_local_image(); + void handle_prepare_local_image(int r); + + void prepare_remote_image(); + void handle_prepare_remote_image(int r); + + void bootstrap(); + void handle_bootstrap(int r); + + void init_remote_journaler(); + void handle_init_remote_journaler(int r); + + void start_replay(); + void handle_start_replay(int r); + + void replay_flush(); + void handle_replay_flush(int r); + + void get_remote_tag(); + void handle_get_remote_tag(int r); + + void allocate_local_tag(); + void handle_allocate_local_tag(int r); + + void preprocess_entry(); + void handle_preprocess_entry_ready(int r); + void handle_preprocess_entry_safe(int r); + + void process_entry(); + void handle_process_entry_ready(int r); + void handle_process_entry_safe(const ReplayEntry& replay_entry, int r); + + void register_admin_socket_hook(); + void unregister_admin_socket_hook(); + + void on_name_changed(); +}; + +} // namespace mirror +} // namespace rbd + +extern template class rbd::mirror::ImageReplayer; + +#endif // CEPH_RBD_MIRROR_IMAGE_REPLAYER_H