X-Git-Url: https://gerrit.opnfv.org/gerrit/gitweb?a=blobdiff_plain;f=src%2Fceph%2Fsrc%2Ftools%2Frbd_mirror%2FInstanceReplayer.cc;fp=src%2Fceph%2Fsrc%2Ftools%2Frbd_mirror%2FInstanceReplayer.cc;h=0000000000000000000000000000000000000000;hb=7da45d65be36d36b880cc55c5036e96c24b53f00;hp=8b7fca17ac16e923387fc07a3efda64a2ffedba9;hpb=691462d09d0987b47e112d6ee8740375df3c51b2;p=stor4nfv.git diff --git a/src/ceph/src/tools/rbd_mirror/InstanceReplayer.cc b/src/ceph/src/tools/rbd_mirror/InstanceReplayer.cc deleted file mode 100644 index 8b7fca1..0000000 --- a/src/ceph/src/tools/rbd_mirror/InstanceReplayer.cc +++ /dev/null @@ -1,497 +0,0 @@ -// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- -// vim: ts=8 sw=2 smarttab - -#include "include/stringify.h" -#include "common/Timer.h" -#include "common/debug.h" -#include "common/errno.h" -#include "librbd/Utils.h" -#include "ImageReplayer.h" -#include "InstanceReplayer.h" -#include "ServiceDaemon.h" -#include "Threads.h" - -#define dout_context g_ceph_context -#define dout_subsys ceph_subsys_rbd_mirror -#undef dout_prefix -#define dout_prefix *_dout << "rbd::mirror::InstanceReplayer: " \ - << this << " " << __func__ << ": " - -namespace rbd { -namespace mirror { - -namespace { - -const std::string SERVICE_DAEMON_ASSIGNED_COUNT_KEY("image_assigned_count"); -const std::string SERVICE_DAEMON_WARNING_COUNT_KEY("image_warning_count"); -const std::string SERVICE_DAEMON_ERROR_COUNT_KEY("image_error_count"); - -} // anonymous namespace - -using librbd::util::create_async_context_callback; -using librbd::util::create_context_callback; - -template -InstanceReplayer::InstanceReplayer( - Threads *threads, ServiceDaemon* service_daemon, - ImageDeleter* image_deleter, RadosRef local_rados, - const std::string &local_mirror_uuid, int64_t local_pool_id) - : m_threads(threads), m_service_daemon(service_daemon), - m_image_deleter(image_deleter), m_local_rados(local_rados), - m_local_mirror_uuid(local_mirror_uuid), m_local_pool_id(local_pool_id), - m_lock("rbd::mirror::InstanceReplayer " + stringify(local_pool_id)) { -} - -template -InstanceReplayer::~InstanceReplayer() { - assert(m_image_state_check_task == nullptr); - assert(m_async_op_tracker.empty()); - assert(m_image_replayers.empty()); -} - -template -int InstanceReplayer::init() { - C_SaferCond init_ctx; - init(&init_ctx); - return init_ctx.wait(); -} - -template -void InstanceReplayer::init(Context *on_finish) { - dout(20) << dendl; - - Context *ctx = new FunctionContext( - [this, on_finish] (int r) { - { - Mutex::Locker timer_locker(m_threads->timer_lock); - schedule_image_state_check_task(); - } - on_finish->complete(0); - }); - - m_threads->work_queue->queue(ctx, 0); -} - -template -void InstanceReplayer::shut_down() { - C_SaferCond shut_down_ctx; - shut_down(&shut_down_ctx); - int r = shut_down_ctx.wait(); - assert(r == 0); -} - -template -void InstanceReplayer::shut_down(Context *on_finish) { - dout(20) << dendl; - - Mutex::Locker locker(m_lock); - - assert(m_on_shut_down == nullptr); - m_on_shut_down = on_finish; - - Context *ctx = new FunctionContext( - [this] (int r) { - cancel_image_state_check_task(); - wait_for_ops(); - }); - - m_threads->work_queue->queue(ctx, 0); -} - -template -void InstanceReplayer::add_peer(std::string peer_uuid, - librados::IoCtx io_ctx) { - dout(20) << peer_uuid << dendl; - - Mutex::Locker locker(m_lock); - auto result = m_peers.insert(Peer(peer_uuid, io_ctx)).second; - assert(result); -} - -template -void InstanceReplayer::release_all(Context *on_finish) { - dout(20) << dendl; - - Mutex::Locker locker(m_lock); - - C_Gather *gather_ctx = new C_Gather(g_ceph_context, on_finish); - for (auto it = m_image_replayers.begin(); it != m_image_replayers.end(); - it = m_image_replayers.erase(it)) { - auto image_replayer = it->second; - auto ctx = gather_ctx->new_sub(); - ctx = new FunctionContext( - [image_replayer, ctx] (int r) { - image_replayer->destroy(); - ctx->complete(0); - }); - stop_image_replayer(image_replayer, ctx); - } - gather_ctx->activate(); -} - -template -void InstanceReplayer::acquire_image(InstanceWatcher *instance_watcher, - const std::string &global_image_id, - Context *on_finish) { - dout(20) << "global_image_id=" << global_image_id << dendl; - - Mutex::Locker locker(m_lock); - - assert(m_on_shut_down == nullptr); - - auto it = m_image_replayers.find(global_image_id); - if (it == m_image_replayers.end()) { - auto image_replayer = ImageReplayer::create( - m_threads, m_image_deleter, instance_watcher, m_local_rados, - m_local_mirror_uuid, m_local_pool_id, global_image_id); - - dout(20) << global_image_id << ": creating replayer " << image_replayer - << dendl; - - it = m_image_replayers.insert(std::make_pair(global_image_id, - image_replayer)).first; - - // TODO only a single peer is currently supported - assert(m_peers.size() == 1); - auto peer = *m_peers.begin(); - image_replayer->add_peer(peer.peer_uuid, peer.io_ctx); - } - - auto& image_replayer = it->second; - // TODO temporary until policy integrated - image_replayer->set_finished(false); - - start_image_replayer(image_replayer); - m_threads->work_queue->queue(on_finish, 0); -} - -template -void InstanceReplayer::release_image(const std::string &global_image_id, - Context *on_finish) { - dout(20) << "global_image_id=" << global_image_id << dendl; - - Mutex::Locker locker(m_lock); - assert(m_on_shut_down == nullptr); - - auto it = m_image_replayers.find(global_image_id); - if (it == m_image_replayers.end()) { - dout(20) << global_image_id << ": not found" << dendl; - m_threads->work_queue->queue(on_finish, 0); - return; - } - - auto image_replayer = it->second; - m_image_replayers.erase(it); - - on_finish = new FunctionContext( - [image_replayer, on_finish] (int r) { - image_replayer->destroy(); - on_finish->complete(0); - }); - stop_image_replayer(image_replayer, on_finish); -} - -template -void InstanceReplayer::remove_peer_image(const std::string &global_image_id, - const std::string &peer_mirror_uuid, - Context *on_finish) { - dout(20) << "global_image_id=" << global_image_id << ", " - << "peer_mirror_uuid=" << peer_mirror_uuid << dendl; - - Mutex::Locker locker(m_lock); - assert(m_on_shut_down == nullptr); - - auto it = m_image_replayers.find(global_image_id); - if (it != m_image_replayers.end()) { - // TODO only a single peer is currently supported, therefore - // we can just interrupt the current image replayer and - // it will eventually detect that the peer image is missing and - // determine if a delete propagation is required. - auto image_replayer = it->second; - image_replayer->restart(); - } - m_threads->work_queue->queue(on_finish, 0); -} - -template -void InstanceReplayer::print_status(Formatter *f, stringstream *ss) { - dout(20) << dendl; - - if (!f) { - return; - } - - Mutex::Locker locker(m_lock); - - f->open_array_section("image_replayers"); - for (auto &kv : m_image_replayers) { - auto &image_replayer = kv.second; - image_replayer->print_status(f, ss); - } - f->close_section(); -} - -template -void InstanceReplayer::start() -{ - dout(20) << dendl; - - Mutex::Locker locker(m_lock); - - m_manual_stop = false; - - for (auto &kv : m_image_replayers) { - auto &image_replayer = kv.second; - image_replayer->start(nullptr, true); - } -} - -template -void InstanceReplayer::stop() -{ - dout(20) << dendl; - - Mutex::Locker locker(m_lock); - - m_manual_stop = true; - - for (auto &kv : m_image_replayers) { - auto &image_replayer = kv.second; - image_replayer->stop(nullptr, true); - } -} - -template -void InstanceReplayer::restart() -{ - dout(20) << dendl; - - Mutex::Locker locker(m_lock); - - m_manual_stop = false; - - for (auto &kv : m_image_replayers) { - auto &image_replayer = kv.second; - image_replayer->restart(); - } -} - -template -void InstanceReplayer::flush() -{ - dout(20) << "enter" << dendl; - - Mutex::Locker locker(m_lock); - - for (auto &kv : m_image_replayers) { - auto &image_replayer = kv.second; - image_replayer->flush(); - } -} - -template -void InstanceReplayer::start_image_replayer( - ImageReplayer *image_replayer) { - assert(m_lock.is_locked()); - - std::string global_image_id = image_replayer->get_global_image_id(); - dout(20) << "global_image_id=" << global_image_id << dendl; - - if (!image_replayer->is_stopped()) { - return; - } else if (image_replayer->is_blacklisted()) { - derr << "blacklisted detected during image replay" << dendl; - return; - } else if (image_replayer->is_finished()) { - // TODO temporary until policy integrated - dout(5) << "removing image replayer for global_image_id=" - << global_image_id << dendl; - m_image_replayers.erase(image_replayer->get_global_image_id()); - image_replayer->destroy(); - return; - } - - image_replayer->start(nullptr, false); -} - -template -void InstanceReplayer::queue_start_image_replayers() { - dout(20) << dendl; - - Context *ctx = create_context_callback< - InstanceReplayer, &InstanceReplayer::start_image_replayers>(this); - m_async_op_tracker.start_op(); - m_threads->work_queue->queue(ctx, 0); -} - -template -void InstanceReplayer::start_image_replayers(int r) { - dout(20) << dendl; - - Mutex::Locker locker(m_lock); - if (m_on_shut_down != nullptr) { - return; - } - - uint64_t image_count = 0; - uint64_t warning_count = 0; - uint64_t error_count = 0; - for (auto it = m_image_replayers.begin(); - it != m_image_replayers.end();) { - auto current_it(it); - ++it; - - ++image_count; - auto health_state = current_it->second->get_health_state(); - if (health_state == image_replayer::HEALTH_STATE_WARNING) { - ++warning_count; - } else if (health_state == image_replayer::HEALTH_STATE_ERROR) { - ++error_count; - } - - start_image_replayer(current_it->second); - } - - m_service_daemon->add_or_update_attribute( - m_local_pool_id, SERVICE_DAEMON_ASSIGNED_COUNT_KEY, image_count); - m_service_daemon->add_or_update_attribute( - m_local_pool_id, SERVICE_DAEMON_WARNING_COUNT_KEY, warning_count); - m_service_daemon->add_or_update_attribute( - m_local_pool_id, SERVICE_DAEMON_ERROR_COUNT_KEY, error_count); - - m_async_op_tracker.finish_op(); -} - -template -void InstanceReplayer::stop_image_replayer(ImageReplayer *image_replayer, - Context *on_finish) { - dout(20) << image_replayer << " global_image_id=" - << image_replayer->get_global_image_id() << ", on_finish=" - << on_finish << dendl; - - if (image_replayer->is_stopped()) { - m_threads->work_queue->queue(on_finish, 0); - return; - } - - m_async_op_tracker.start_op(); - Context *ctx = create_async_context_callback( - m_threads->work_queue, new FunctionContext( - [this, image_replayer, on_finish] (int r) { - stop_image_replayer(image_replayer, on_finish); - m_async_op_tracker.finish_op(); - })); - - if (image_replayer->is_running()) { - image_replayer->stop(ctx, false); - } else { - int after = 1; - dout(20) << "scheduling image replayer " << image_replayer << " stop after " - << after << " sec (task " << ctx << ")" << dendl; - ctx = new FunctionContext( - [this, after, ctx] (int r) { - Mutex::Locker timer_locker(m_threads->timer_lock); - m_threads->timer->add_event_after(after, ctx); - }); - m_threads->work_queue->queue(ctx, 0); - } -} - -template -void InstanceReplayer::wait_for_ops() { - dout(20) << dendl; - - Context *ctx = create_context_callback< - InstanceReplayer, &InstanceReplayer::handle_wait_for_ops>(this); - - m_async_op_tracker.wait_for_ops(ctx); -} - -template -void InstanceReplayer::handle_wait_for_ops(int r) { - dout(20) << "r=" << r << dendl; - - assert(r == 0); - - Mutex::Locker locker(m_lock); - stop_image_replayers(); -} - -template -void InstanceReplayer::stop_image_replayers() { - dout(20) << dendl; - - assert(m_lock.is_locked()); - - Context *ctx = create_async_context_callback( - m_threads->work_queue, create_context_callback, - &InstanceReplayer::handle_stop_image_replayers>(this)); - - C_Gather *gather_ctx = new C_Gather(g_ceph_context, ctx); - for (auto &it : m_image_replayers) { - stop_image_replayer(it.second, gather_ctx->new_sub()); - } - gather_ctx->activate(); -} - -template -void InstanceReplayer::handle_stop_image_replayers(int r) { - dout(20) << "r=" << r << dendl; - - assert(r == 0); - - Context *on_finish = nullptr; - { - Mutex::Locker locker(m_lock); - - for (auto &it : m_image_replayers) { - assert(it.second->is_stopped()); - it.second->destroy(); - } - m_image_replayers.clear(); - - assert(m_on_shut_down != nullptr); - std::swap(on_finish, m_on_shut_down); - } - on_finish->complete(r); -} - -template -void InstanceReplayer::cancel_image_state_check_task() { - Mutex::Locker timer_locker(m_threads->timer_lock); - - if (m_image_state_check_task == nullptr) { - return; - } - - dout(20) << m_image_state_check_task << dendl; - bool canceled = m_threads->timer->cancel_event(m_image_state_check_task); - assert(canceled); - m_image_state_check_task = nullptr; -} - -template -void InstanceReplayer::schedule_image_state_check_task() { - assert(m_threads->timer_lock.is_locked()); - assert(m_image_state_check_task == nullptr); - - m_image_state_check_task = new FunctionContext( - [this](int r) { - assert(m_threads->timer_lock.is_locked()); - m_image_state_check_task = nullptr; - schedule_image_state_check_task(); - queue_start_image_replayers(); - }); - - int after = g_ceph_context->_conf->get_val( - "rbd_mirror_image_state_check_interval"); - - dout(20) << "scheduling image state check after " << after << " sec (task " - << m_image_state_check_task << ")" << dendl; - m_threads->timer->add_event_after(after, m_image_state_check_task); -} - -} // namespace mirror -} // namespace rbd - -template class rbd::mirror::InstanceReplayer;