// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- // vim: ts=8 sw=2 smarttab #include "include/stringify.h" #include "common/Timer.h" #include "common/debug.h" #include "common/errno.h" #include "librbd/Utils.h" #include "ImageReplayer.h" #include "InstanceReplayer.h" #include "ServiceDaemon.h" #include "Threads.h" #define dout_context g_ceph_context #define dout_subsys ceph_subsys_rbd_mirror #undef dout_prefix #define dout_prefix *_dout << "rbd::mirror::InstanceReplayer: " \ << this << " " << __func__ << ": " namespace rbd { namespace mirror { namespace { const std::string SERVICE_DAEMON_ASSIGNED_COUNT_KEY("image_assigned_count"); const std::string SERVICE_DAEMON_WARNING_COUNT_KEY("image_warning_count"); const std::string SERVICE_DAEMON_ERROR_COUNT_KEY("image_error_count"); } // anonymous namespace using librbd::util::create_async_context_callback; using librbd::util::create_context_callback; template InstanceReplayer::InstanceReplayer( Threads *threads, ServiceDaemon* service_daemon, ImageDeleter* image_deleter, RadosRef local_rados, const std::string &local_mirror_uuid, int64_t local_pool_id) : m_threads(threads), m_service_daemon(service_daemon), m_image_deleter(image_deleter), m_local_rados(local_rados), m_local_mirror_uuid(local_mirror_uuid), m_local_pool_id(local_pool_id), m_lock("rbd::mirror::InstanceReplayer " + stringify(local_pool_id)) { } template InstanceReplayer::~InstanceReplayer() { assert(m_image_state_check_task == nullptr); assert(m_async_op_tracker.empty()); assert(m_image_replayers.empty()); } template int InstanceReplayer::init() { C_SaferCond init_ctx; init(&init_ctx); return init_ctx.wait(); } template void InstanceReplayer::init(Context *on_finish) { dout(20) << dendl; Context *ctx = new FunctionContext( [this, on_finish] (int r) { { Mutex::Locker timer_locker(m_threads->timer_lock); schedule_image_state_check_task(); } on_finish->complete(0); }); m_threads->work_queue->queue(ctx, 0); } template void InstanceReplayer::shut_down() { C_SaferCond shut_down_ctx; shut_down(&shut_down_ctx); int r = shut_down_ctx.wait(); assert(r == 0); } template void InstanceReplayer::shut_down(Context *on_finish) { dout(20) << dendl; Mutex::Locker locker(m_lock); assert(m_on_shut_down == nullptr); m_on_shut_down = on_finish; Context *ctx = new FunctionContext( [this] (int r) { cancel_image_state_check_task(); wait_for_ops(); }); m_threads->work_queue->queue(ctx, 0); } template void InstanceReplayer::add_peer(std::string peer_uuid, librados::IoCtx io_ctx) { dout(20) << peer_uuid << dendl; Mutex::Locker locker(m_lock); auto result = m_peers.insert(Peer(peer_uuid, io_ctx)).second; assert(result); } template void InstanceReplayer::release_all(Context *on_finish) { dout(20) << dendl; Mutex::Locker locker(m_lock); C_Gather *gather_ctx = new C_Gather(g_ceph_context, on_finish); for (auto it = m_image_replayers.begin(); it != m_image_replayers.end(); it = m_image_replayers.erase(it)) { auto image_replayer = it->second; auto ctx = gather_ctx->new_sub(); ctx = new FunctionContext( [image_replayer, ctx] (int r) { image_replayer->destroy(); ctx->complete(0); }); stop_image_replayer(image_replayer, ctx); } gather_ctx->activate(); } template void InstanceReplayer::acquire_image(InstanceWatcher *instance_watcher, const std::string &global_image_id, Context *on_finish) { dout(20) << "global_image_id=" << global_image_id << dendl; Mutex::Locker locker(m_lock); assert(m_on_shut_down == nullptr); auto it = m_image_replayers.find(global_image_id); if (it == m_image_replayers.end()) { auto image_replayer = ImageReplayer::create( m_threads, m_image_deleter, instance_watcher, m_local_rados, m_local_mirror_uuid, m_local_pool_id, global_image_id); dout(20) << global_image_id << ": creating replayer " << image_replayer << dendl; it = m_image_replayers.insert(std::make_pair(global_image_id, image_replayer)).first; // TODO only a single peer is currently supported assert(m_peers.size() == 1); auto peer = *m_peers.begin(); image_replayer->add_peer(peer.peer_uuid, peer.io_ctx); } auto& image_replayer = it->second; // TODO temporary until policy integrated image_replayer->set_finished(false); start_image_replayer(image_replayer); m_threads->work_queue->queue(on_finish, 0); } template void InstanceReplayer::release_image(const std::string &global_image_id, Context *on_finish) { dout(20) << "global_image_id=" << global_image_id << dendl; Mutex::Locker locker(m_lock); assert(m_on_shut_down == nullptr); auto it = m_image_replayers.find(global_image_id); if (it == m_image_replayers.end()) { dout(20) << global_image_id << ": not found" << dendl; m_threads->work_queue->queue(on_finish, 0); return; } auto image_replayer = it->second; m_image_replayers.erase(it); on_finish = new FunctionContext( [image_replayer, on_finish] (int r) { image_replayer->destroy(); on_finish->complete(0); }); stop_image_replayer(image_replayer, on_finish); } template void InstanceReplayer::remove_peer_image(const std::string &global_image_id, const std::string &peer_mirror_uuid, Context *on_finish) { dout(20) << "global_image_id=" << global_image_id << ", " << "peer_mirror_uuid=" << peer_mirror_uuid << dendl; Mutex::Locker locker(m_lock); assert(m_on_shut_down == nullptr); auto it = m_image_replayers.find(global_image_id); if (it != m_image_replayers.end()) { // TODO only a single peer is currently supported, therefore // we can just interrupt the current image replayer and // it will eventually detect that the peer image is missing and // determine if a delete propagation is required. auto image_replayer = it->second; image_replayer->restart(); } m_threads->work_queue->queue(on_finish, 0); } template void InstanceReplayer::print_status(Formatter *f, stringstream *ss) { dout(20) << dendl; if (!f) { return; } Mutex::Locker locker(m_lock); f->open_array_section("image_replayers"); for (auto &kv : m_image_replayers) { auto &image_replayer = kv.second; image_replayer->print_status(f, ss); } f->close_section(); } template void InstanceReplayer::start() { dout(20) << dendl; Mutex::Locker locker(m_lock); m_manual_stop = false; for (auto &kv : m_image_replayers) { auto &image_replayer = kv.second; image_replayer->start(nullptr, true); } } template void InstanceReplayer::stop() { dout(20) << dendl; Mutex::Locker locker(m_lock); m_manual_stop = true; for (auto &kv : m_image_replayers) { auto &image_replayer = kv.second; image_replayer->stop(nullptr, true); } } template void InstanceReplayer::restart() { dout(20) << dendl; Mutex::Locker locker(m_lock); m_manual_stop = false; for (auto &kv : m_image_replayers) { auto &image_replayer = kv.second; image_replayer->restart(); } } template void InstanceReplayer::flush() { dout(20) << "enter" << dendl; Mutex::Locker locker(m_lock); for (auto &kv : m_image_replayers) { auto &image_replayer = kv.second; image_replayer->flush(); } } template void InstanceReplayer::start_image_replayer( ImageReplayer *image_replayer) { assert(m_lock.is_locked()); std::string global_image_id = image_replayer->get_global_image_id(); dout(20) << "global_image_id=" << global_image_id << dendl; if (!image_replayer->is_stopped()) { return; } else if (image_replayer->is_blacklisted()) { derr << "blacklisted detected during image replay" << dendl; return; } else if (image_replayer->is_finished()) { // TODO temporary until policy integrated dout(5) << "removing image replayer for global_image_id=" << global_image_id << dendl; m_image_replayers.erase(image_replayer->get_global_image_id()); image_replayer->destroy(); return; } image_replayer->start(nullptr, false); } template void InstanceReplayer::queue_start_image_replayers() { dout(20) << dendl; Context *ctx = create_context_callback< InstanceReplayer, &InstanceReplayer::start_image_replayers>(this); m_async_op_tracker.start_op(); m_threads->work_queue->queue(ctx, 0); } template void InstanceReplayer::start_image_replayers(int r) { dout(20) << dendl; Mutex::Locker locker(m_lock); if (m_on_shut_down != nullptr) { return; } uint64_t image_count = 0; uint64_t warning_count = 0; uint64_t error_count = 0; for (auto it = m_image_replayers.begin(); it != m_image_replayers.end();) { auto current_it(it); ++it; ++image_count; auto health_state = current_it->second->get_health_state(); if (health_state == image_replayer::HEALTH_STATE_WARNING) { ++warning_count; } else if (health_state == image_replayer::HEALTH_STATE_ERROR) { ++error_count; } start_image_replayer(current_it->second); } m_service_daemon->add_or_update_attribute( m_local_pool_id, SERVICE_DAEMON_ASSIGNED_COUNT_KEY, image_count); m_service_daemon->add_or_update_attribute( m_local_pool_id, SERVICE_DAEMON_WARNING_COUNT_KEY, warning_count); m_service_daemon->add_or_update_attribute( m_local_pool_id, SERVICE_DAEMON_ERROR_COUNT_KEY, error_count); m_async_op_tracker.finish_op(); } template void InstanceReplayer::stop_image_replayer(ImageReplayer *image_replayer, Context *on_finish) { dout(20) << image_replayer << " global_image_id=" << image_replayer->get_global_image_id() << ", on_finish=" << on_finish << dendl; if (image_replayer->is_stopped()) { m_threads->work_queue->queue(on_finish, 0); return; } m_async_op_tracker.start_op(); Context *ctx = create_async_context_callback( m_threads->work_queue, new FunctionContext( [this, image_replayer, on_finish] (int r) { stop_image_replayer(image_replayer, on_finish); m_async_op_tracker.finish_op(); })); if (image_replayer->is_running()) { image_replayer->stop(ctx, false); } else { int after = 1; dout(20) << "scheduling image replayer " << image_replayer << " stop after " << after << " sec (task " << ctx << ")" << dendl; ctx = new FunctionContext( [this, after, ctx] (int r) { Mutex::Locker timer_locker(m_threads->timer_lock); m_threads->timer->add_event_after(after, ctx); }); m_threads->work_queue->queue(ctx, 0); } } template void InstanceReplayer::wait_for_ops() { dout(20) << dendl; Context *ctx = create_context_callback< InstanceReplayer, &InstanceReplayer::handle_wait_for_ops>(this); m_async_op_tracker.wait_for_ops(ctx); } template void InstanceReplayer::handle_wait_for_ops(int r) { dout(20) << "r=" << r << dendl; assert(r == 0); Mutex::Locker locker(m_lock); stop_image_replayers(); } template void InstanceReplayer::stop_image_replayers() { dout(20) << dendl; assert(m_lock.is_locked()); Context *ctx = create_async_context_callback( m_threads->work_queue, create_context_callback, &InstanceReplayer::handle_stop_image_replayers>(this)); C_Gather *gather_ctx = new C_Gather(g_ceph_context, ctx); for (auto &it : m_image_replayers) { stop_image_replayer(it.second, gather_ctx->new_sub()); } gather_ctx->activate(); } template void InstanceReplayer::handle_stop_image_replayers(int r) { dout(20) << "r=" << r << dendl; assert(r == 0); Context *on_finish = nullptr; { Mutex::Locker locker(m_lock); for (auto &it : m_image_replayers) { assert(it.second->is_stopped()); it.second->destroy(); } m_image_replayers.clear(); assert(m_on_shut_down != nullptr); std::swap(on_finish, m_on_shut_down); } on_finish->complete(r); } template void InstanceReplayer::cancel_image_state_check_task() { Mutex::Locker timer_locker(m_threads->timer_lock); if (m_image_state_check_task == nullptr) { return; } dout(20) << m_image_state_check_task << dendl; bool canceled = m_threads->timer->cancel_event(m_image_state_check_task); assert(canceled); m_image_state_check_task = nullptr; } template void InstanceReplayer::schedule_image_state_check_task() { assert(m_threads->timer_lock.is_locked()); assert(m_image_state_check_task == nullptr); m_image_state_check_task = new FunctionContext( [this](int r) { assert(m_threads->timer_lock.is_locked()); m_image_state_check_task = nullptr; schedule_image_state_check_task(); queue_start_image_replayers(); }); int after = g_ceph_context->_conf->get_val( "rbd_mirror_image_state_check_interval"); dout(20) << "scheduling image state check after " << after << " sec (task " << m_image_state_check_task << ")" << dendl; m_threads->timer->add_event_after(after, m_image_state_check_task); } } // namespace mirror } // namespace rbd template class rbd::mirror::InstanceReplayer;