X-Git-Url: https://gerrit.opnfv.org/gerrit/gitweb?a=blobdiff_plain;f=src%2Fceph%2Fsrc%2Ftools%2Frbd_mirror%2FInstanceReplayer.cc;fp=src%2Fceph%2Fsrc%2Ftools%2Frbd_mirror%2FInstanceReplayer.cc;h=8b7fca17ac16e923387fc07a3efda64a2ffedba9;hb=812ff6ca9fcd3e629e49d4328905f33eee8ca3f5;hp=0000000000000000000000000000000000000000;hpb=15280273faafb77777eab341909a3f495cf248d9;p=stor4nfv.git diff --git a/src/ceph/src/tools/rbd_mirror/InstanceReplayer.cc b/src/ceph/src/tools/rbd_mirror/InstanceReplayer.cc new file mode 100644 index 0000000..8b7fca1 --- /dev/null +++ b/src/ceph/src/tools/rbd_mirror/InstanceReplayer.cc @@ -0,0 +1,497 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab + +#include "include/stringify.h" +#include "common/Timer.h" +#include "common/debug.h" +#include "common/errno.h" +#include "librbd/Utils.h" +#include "ImageReplayer.h" +#include "InstanceReplayer.h" +#include "ServiceDaemon.h" +#include "Threads.h" + +#define dout_context g_ceph_context +#define dout_subsys ceph_subsys_rbd_mirror +#undef dout_prefix +#define dout_prefix *_dout << "rbd::mirror::InstanceReplayer: " \ + << this << " " << __func__ << ": " + +namespace rbd { +namespace mirror { + +namespace { + +const std::string SERVICE_DAEMON_ASSIGNED_COUNT_KEY("image_assigned_count"); +const std::string SERVICE_DAEMON_WARNING_COUNT_KEY("image_warning_count"); +const std::string SERVICE_DAEMON_ERROR_COUNT_KEY("image_error_count"); + +} // anonymous namespace + +using librbd::util::create_async_context_callback; +using librbd::util::create_context_callback; + +template +InstanceReplayer::InstanceReplayer( + Threads *threads, ServiceDaemon* service_daemon, + ImageDeleter* image_deleter, RadosRef local_rados, + const std::string &local_mirror_uuid, int64_t local_pool_id) + : m_threads(threads), m_service_daemon(service_daemon), + m_image_deleter(image_deleter), m_local_rados(local_rados), + m_local_mirror_uuid(local_mirror_uuid), m_local_pool_id(local_pool_id), + m_lock("rbd::mirror::InstanceReplayer " + stringify(local_pool_id)) { +} + +template +InstanceReplayer::~InstanceReplayer() { + assert(m_image_state_check_task == nullptr); + assert(m_async_op_tracker.empty()); + assert(m_image_replayers.empty()); +} + +template +int InstanceReplayer::init() { + C_SaferCond init_ctx; + init(&init_ctx); + return init_ctx.wait(); +} + +template +void InstanceReplayer::init(Context *on_finish) { + dout(20) << dendl; + + Context *ctx = new FunctionContext( + [this, on_finish] (int r) { + { + Mutex::Locker timer_locker(m_threads->timer_lock); + schedule_image_state_check_task(); + } + on_finish->complete(0); + }); + + m_threads->work_queue->queue(ctx, 0); +} + +template +void InstanceReplayer::shut_down() { + C_SaferCond shut_down_ctx; + shut_down(&shut_down_ctx); + int r = shut_down_ctx.wait(); + assert(r == 0); +} + +template +void InstanceReplayer::shut_down(Context *on_finish) { + dout(20) << dendl; + + Mutex::Locker locker(m_lock); + + assert(m_on_shut_down == nullptr); + m_on_shut_down = on_finish; + + Context *ctx = new FunctionContext( + [this] (int r) { + cancel_image_state_check_task(); + wait_for_ops(); + }); + + m_threads->work_queue->queue(ctx, 0); +} + +template +void InstanceReplayer::add_peer(std::string peer_uuid, + librados::IoCtx io_ctx) { + dout(20) << peer_uuid << dendl; + + Mutex::Locker locker(m_lock); + auto result = m_peers.insert(Peer(peer_uuid, io_ctx)).second; + assert(result); +} + +template +void InstanceReplayer::release_all(Context *on_finish) { + dout(20) << dendl; + + Mutex::Locker locker(m_lock); + + C_Gather *gather_ctx = new C_Gather(g_ceph_context, on_finish); + for (auto it = m_image_replayers.begin(); it != m_image_replayers.end(); + it = m_image_replayers.erase(it)) { + auto image_replayer = it->second; + auto ctx = gather_ctx->new_sub(); + ctx = new FunctionContext( + [image_replayer, ctx] (int r) { + image_replayer->destroy(); + ctx->complete(0); + }); + stop_image_replayer(image_replayer, ctx); + } + gather_ctx->activate(); +} + +template +void InstanceReplayer::acquire_image(InstanceWatcher *instance_watcher, + const std::string &global_image_id, + Context *on_finish) { + dout(20) << "global_image_id=" << global_image_id << dendl; + + Mutex::Locker locker(m_lock); + + assert(m_on_shut_down == nullptr); + + auto it = m_image_replayers.find(global_image_id); + if (it == m_image_replayers.end()) { + auto image_replayer = ImageReplayer::create( + m_threads, m_image_deleter, instance_watcher, m_local_rados, + m_local_mirror_uuid, m_local_pool_id, global_image_id); + + dout(20) << global_image_id << ": creating replayer " << image_replayer + << dendl; + + it = m_image_replayers.insert(std::make_pair(global_image_id, + image_replayer)).first; + + // TODO only a single peer is currently supported + assert(m_peers.size() == 1); + auto peer = *m_peers.begin(); + image_replayer->add_peer(peer.peer_uuid, peer.io_ctx); + } + + auto& image_replayer = it->second; + // TODO temporary until policy integrated + image_replayer->set_finished(false); + + start_image_replayer(image_replayer); + m_threads->work_queue->queue(on_finish, 0); +} + +template +void InstanceReplayer::release_image(const std::string &global_image_id, + Context *on_finish) { + dout(20) << "global_image_id=" << global_image_id << dendl; + + Mutex::Locker locker(m_lock); + assert(m_on_shut_down == nullptr); + + auto it = m_image_replayers.find(global_image_id); + if (it == m_image_replayers.end()) { + dout(20) << global_image_id << ": not found" << dendl; + m_threads->work_queue->queue(on_finish, 0); + return; + } + + auto image_replayer = it->second; + m_image_replayers.erase(it); + + on_finish = new FunctionContext( + [image_replayer, on_finish] (int r) { + image_replayer->destroy(); + on_finish->complete(0); + }); + stop_image_replayer(image_replayer, on_finish); +} + +template +void InstanceReplayer::remove_peer_image(const std::string &global_image_id, + const std::string &peer_mirror_uuid, + Context *on_finish) { + dout(20) << "global_image_id=" << global_image_id << ", " + << "peer_mirror_uuid=" << peer_mirror_uuid << dendl; + + Mutex::Locker locker(m_lock); + assert(m_on_shut_down == nullptr); + + auto it = m_image_replayers.find(global_image_id); + if (it != m_image_replayers.end()) { + // TODO only a single peer is currently supported, therefore + // we can just interrupt the current image replayer and + // it will eventually detect that the peer image is missing and + // determine if a delete propagation is required. + auto image_replayer = it->second; + image_replayer->restart(); + } + m_threads->work_queue->queue(on_finish, 0); +} + +template +void InstanceReplayer::print_status(Formatter *f, stringstream *ss) { + dout(20) << dendl; + + if (!f) { + return; + } + + Mutex::Locker locker(m_lock); + + f->open_array_section("image_replayers"); + for (auto &kv : m_image_replayers) { + auto &image_replayer = kv.second; + image_replayer->print_status(f, ss); + } + f->close_section(); +} + +template +void InstanceReplayer::start() +{ + dout(20) << dendl; + + Mutex::Locker locker(m_lock); + + m_manual_stop = false; + + for (auto &kv : m_image_replayers) { + auto &image_replayer = kv.second; + image_replayer->start(nullptr, true); + } +} + +template +void InstanceReplayer::stop() +{ + dout(20) << dendl; + + Mutex::Locker locker(m_lock); + + m_manual_stop = true; + + for (auto &kv : m_image_replayers) { + auto &image_replayer = kv.second; + image_replayer->stop(nullptr, true); + } +} + +template +void InstanceReplayer::restart() +{ + dout(20) << dendl; + + Mutex::Locker locker(m_lock); + + m_manual_stop = false; + + for (auto &kv : m_image_replayers) { + auto &image_replayer = kv.second; + image_replayer->restart(); + } +} + +template +void InstanceReplayer::flush() +{ + dout(20) << "enter" << dendl; + + Mutex::Locker locker(m_lock); + + for (auto &kv : m_image_replayers) { + auto &image_replayer = kv.second; + image_replayer->flush(); + } +} + +template +void InstanceReplayer::start_image_replayer( + ImageReplayer *image_replayer) { + assert(m_lock.is_locked()); + + std::string global_image_id = image_replayer->get_global_image_id(); + dout(20) << "global_image_id=" << global_image_id << dendl; + + if (!image_replayer->is_stopped()) { + return; + } else if (image_replayer->is_blacklisted()) { + derr << "blacklisted detected during image replay" << dendl; + return; + } else if (image_replayer->is_finished()) { + // TODO temporary until policy integrated + dout(5) << "removing image replayer for global_image_id=" + << global_image_id << dendl; + m_image_replayers.erase(image_replayer->get_global_image_id()); + image_replayer->destroy(); + return; + } + + image_replayer->start(nullptr, false); +} + +template +void InstanceReplayer::queue_start_image_replayers() { + dout(20) << dendl; + + Context *ctx = create_context_callback< + InstanceReplayer, &InstanceReplayer::start_image_replayers>(this); + m_async_op_tracker.start_op(); + m_threads->work_queue->queue(ctx, 0); +} + +template +void InstanceReplayer::start_image_replayers(int r) { + dout(20) << dendl; + + Mutex::Locker locker(m_lock); + if (m_on_shut_down != nullptr) { + return; + } + + uint64_t image_count = 0; + uint64_t warning_count = 0; + uint64_t error_count = 0; + for (auto it = m_image_replayers.begin(); + it != m_image_replayers.end();) { + auto current_it(it); + ++it; + + ++image_count; + auto health_state = current_it->second->get_health_state(); + if (health_state == image_replayer::HEALTH_STATE_WARNING) { + ++warning_count; + } else if (health_state == image_replayer::HEALTH_STATE_ERROR) { + ++error_count; + } + + start_image_replayer(current_it->second); + } + + m_service_daemon->add_or_update_attribute( + m_local_pool_id, SERVICE_DAEMON_ASSIGNED_COUNT_KEY, image_count); + m_service_daemon->add_or_update_attribute( + m_local_pool_id, SERVICE_DAEMON_WARNING_COUNT_KEY, warning_count); + m_service_daemon->add_or_update_attribute( + m_local_pool_id, SERVICE_DAEMON_ERROR_COUNT_KEY, error_count); + + m_async_op_tracker.finish_op(); +} + +template +void InstanceReplayer::stop_image_replayer(ImageReplayer *image_replayer, + Context *on_finish) { + dout(20) << image_replayer << " global_image_id=" + << image_replayer->get_global_image_id() << ", on_finish=" + << on_finish << dendl; + + if (image_replayer->is_stopped()) { + m_threads->work_queue->queue(on_finish, 0); + return; + } + + m_async_op_tracker.start_op(); + Context *ctx = create_async_context_callback( + m_threads->work_queue, new FunctionContext( + [this, image_replayer, on_finish] (int r) { + stop_image_replayer(image_replayer, on_finish); + m_async_op_tracker.finish_op(); + })); + + if (image_replayer->is_running()) { + image_replayer->stop(ctx, false); + } else { + int after = 1; + dout(20) << "scheduling image replayer " << image_replayer << " stop after " + << after << " sec (task " << ctx << ")" << dendl; + ctx = new FunctionContext( + [this, after, ctx] (int r) { + Mutex::Locker timer_locker(m_threads->timer_lock); + m_threads->timer->add_event_after(after, ctx); + }); + m_threads->work_queue->queue(ctx, 0); + } +} + +template +void InstanceReplayer::wait_for_ops() { + dout(20) << dendl; + + Context *ctx = create_context_callback< + InstanceReplayer, &InstanceReplayer::handle_wait_for_ops>(this); + + m_async_op_tracker.wait_for_ops(ctx); +} + +template +void InstanceReplayer::handle_wait_for_ops(int r) { + dout(20) << "r=" << r << dendl; + + assert(r == 0); + + Mutex::Locker locker(m_lock); + stop_image_replayers(); +} + +template +void InstanceReplayer::stop_image_replayers() { + dout(20) << dendl; + + assert(m_lock.is_locked()); + + Context *ctx = create_async_context_callback( + m_threads->work_queue, create_context_callback, + &InstanceReplayer::handle_stop_image_replayers>(this)); + + C_Gather *gather_ctx = new C_Gather(g_ceph_context, ctx); + for (auto &it : m_image_replayers) { + stop_image_replayer(it.second, gather_ctx->new_sub()); + } + gather_ctx->activate(); +} + +template +void InstanceReplayer::handle_stop_image_replayers(int r) { + dout(20) << "r=" << r << dendl; + + assert(r == 0); + + Context *on_finish = nullptr; + { + Mutex::Locker locker(m_lock); + + for (auto &it : m_image_replayers) { + assert(it.second->is_stopped()); + it.second->destroy(); + } + m_image_replayers.clear(); + + assert(m_on_shut_down != nullptr); + std::swap(on_finish, m_on_shut_down); + } + on_finish->complete(r); +} + +template +void InstanceReplayer::cancel_image_state_check_task() { + Mutex::Locker timer_locker(m_threads->timer_lock); + + if (m_image_state_check_task == nullptr) { + return; + } + + dout(20) << m_image_state_check_task << dendl; + bool canceled = m_threads->timer->cancel_event(m_image_state_check_task); + assert(canceled); + m_image_state_check_task = nullptr; +} + +template +void InstanceReplayer::schedule_image_state_check_task() { + assert(m_threads->timer_lock.is_locked()); + assert(m_image_state_check_task == nullptr); + + m_image_state_check_task = new FunctionContext( + [this](int r) { + assert(m_threads->timer_lock.is_locked()); + m_image_state_check_task = nullptr; + schedule_image_state_check_task(); + queue_start_image_replayers(); + }); + + int after = g_ceph_context->_conf->get_val( + "rbd_mirror_image_state_check_interval"); + + dout(20) << "scheduling image state check after " << after << " sec (task " + << m_image_state_check_task << ")" << dendl; + m_threads->timer->add_event_after(after, m_image_state_check_task); +} + +} // namespace mirror +} // namespace rbd + +template class rbd::mirror::InstanceReplayer;