X-Git-Url: https://gerrit.opnfv.org/gerrit/gitweb?a=blobdiff_plain;f=src%2Fceph%2Fsrc%2Ftools%2Frbd_mirror%2FLeaderWatcher.cc;fp=src%2Fceph%2Fsrc%2Ftools%2Frbd_mirror%2FLeaderWatcher.cc;h=46f555252f0e48e0c7e6dcdfc61479058fc74066;hb=812ff6ca9fcd3e629e49d4328905f33eee8ca3f5;hp=0000000000000000000000000000000000000000;hpb=15280273faafb77777eab341909a3f495cf248d9;p=stor4nfv.git diff --git a/src/ceph/src/tools/rbd_mirror/LeaderWatcher.cc b/src/ceph/src/tools/rbd_mirror/LeaderWatcher.cc new file mode 100644 index 0000000..46f5552 --- /dev/null +++ b/src/ceph/src/tools/rbd_mirror/LeaderWatcher.cc @@ -0,0 +1,1127 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab + +#include "LeaderWatcher.h" +#include "common/Timer.h" +#include "common/debug.h" +#include "common/errno.h" +#include "cls/rbd/cls_rbd_client.h" +#include "include/stringify.h" +#include "librbd/Utils.h" +#include "librbd/watcher/Types.h" +#include "Threads.h" + +#define dout_context g_ceph_context +#define dout_subsys ceph_subsys_rbd_mirror +#undef dout_prefix +#define dout_prefix *_dout << "rbd::mirror::LeaderWatcher: " \ + << this << " " << __func__ << ": " +namespace rbd { +namespace mirror { + +using namespace leader_watcher; + +using librbd::util::create_async_context_callback; +using librbd::util::create_context_callback; +using librbd::util::create_rados_callback; + +template +LeaderWatcher::LeaderWatcher(Threads *threads, librados::IoCtx &io_ctx, + Listener *listener) + : Watcher(io_ctx, threads->work_queue, RBD_MIRROR_LEADER), + m_threads(threads), m_listener(listener), + m_lock("rbd::mirror::LeaderWatcher " + io_ctx.get_pool_name()), + m_notifier_id(librados::Rados(io_ctx).get_instance_id()), + m_leader_lock(new LeaderLock(m_ioctx, m_work_queue, m_oid, this, true, + m_cct->_conf->get_val( + "rbd_blacklist_expire_seconds"))) { +} + +template +LeaderWatcher::~LeaderWatcher() { + assert(m_status_watcher == nullptr); + assert(m_instances == nullptr); + assert(m_timer_task == nullptr); + + delete m_leader_lock; +} + +template +std::string LeaderWatcher::get_instance_id() { + return stringify(m_notifier_id); +} + +template +int LeaderWatcher::init() { + C_SaferCond init_ctx; + init(&init_ctx); + return init_ctx.wait(); +} + +template +void LeaderWatcher::init(Context *on_finish) { + dout(20) << "notifier_id=" << m_notifier_id << dendl; + + Mutex::Locker locker(m_lock); + + assert(m_on_finish == nullptr); + m_on_finish = on_finish; + + create_leader_object(); +} + +template +void LeaderWatcher::create_leader_object() { + dout(20) << dendl; + + assert(m_lock.is_locked()); + + librados::ObjectWriteOperation op; + op.create(false); + + librados::AioCompletion *aio_comp = create_rados_callback< + LeaderWatcher, &LeaderWatcher::handle_create_leader_object>(this); + int r = m_ioctx.aio_operate(m_oid, aio_comp, &op); + assert(r == 0); + aio_comp->release(); +} + +template +void LeaderWatcher::handle_create_leader_object(int r) { + dout(20) << "r=" << r << dendl; + + Context *on_finish = nullptr; + { + Mutex::Locker locker(m_lock); + + if (r == 0) { + register_watch(); + return; + } + + derr << "error creating " << m_oid << " object: " << cpp_strerror(r) + << dendl; + + std::swap(on_finish, m_on_finish); + } + on_finish->complete(r); +} + +template +void LeaderWatcher::register_watch() { + dout(20) << dendl; + + assert(m_lock.is_locked()); + + Context *ctx = create_async_context_callback( + m_work_queue, create_context_callback< + LeaderWatcher, &LeaderWatcher::handle_register_watch>(this)); + + librbd::Watcher::register_watch(ctx); +} + +template +void LeaderWatcher::handle_register_watch(int r) { + dout(20) << "r=" << r << dendl; + + Context *on_finish = nullptr; + { + Mutex::Locker timer_locker(m_threads->timer_lock); + Mutex::Locker locker(m_lock); + + if (r < 0) { + derr << "error registering leader watcher for " << m_oid << " object: " + << cpp_strerror(r) << dendl; + } else { + schedule_acquire_leader_lock(0); + } + + std::swap(on_finish, m_on_finish); + } + on_finish->complete(r); +} + +template +void LeaderWatcher::shut_down() { + C_SaferCond shut_down_ctx; + shut_down(&shut_down_ctx); + int r = shut_down_ctx.wait(); + assert(r == 0); +} + +template +void LeaderWatcher::shut_down(Context *on_finish) { + dout(20) << dendl; + + Mutex::Locker timer_locker(m_threads->timer_lock); + Mutex::Locker locker(m_lock); + + assert(m_on_shut_down_finish == nullptr); + m_on_shut_down_finish = on_finish; + cancel_timer_task(); + shut_down_leader_lock(); +} + +template +void LeaderWatcher::shut_down_leader_lock() { + dout(20) << dendl; + + assert(m_lock.is_locked()); + + Context *ctx = create_async_context_callback( + m_work_queue, create_context_callback< + LeaderWatcher, &LeaderWatcher::handle_shut_down_leader_lock>(this)); + + m_leader_lock->shut_down(ctx); +} + +template +void LeaderWatcher::handle_shut_down_leader_lock(int r) { + dout(20) << "r=" << r << dendl; + + Mutex::Locker locker(m_lock); + + if (r < 0) { + derr << "error shutting down leader lock: " << cpp_strerror(r) << dendl; + } + + unregister_watch(); +} + +template +void LeaderWatcher::unregister_watch() { + dout(20) << dendl; + + assert(m_lock.is_locked()); + + Context *ctx = create_async_context_callback( + m_work_queue, create_context_callback< + LeaderWatcher, &LeaderWatcher::handle_unregister_watch>(this)); + + librbd::Watcher::unregister_watch(ctx); +} + +template +void LeaderWatcher::handle_unregister_watch(int r) { + dout(20) << "r=" << r << dendl; + + if (r < 0) { + derr << "error unregistering leader watcher for " << m_oid << " object: " + << cpp_strerror(r) << dendl; + } + wait_for_tasks(); +} + +template +void LeaderWatcher::wait_for_tasks() { + dout(20) << dendl; + + Mutex::Locker timer_locker(m_threads->timer_lock); + Mutex::Locker locker(m_lock); + schedule_timer_task("wait for tasks", 0, false, + &LeaderWatcher::handle_wait_for_tasks, true); +} + +template +void LeaderWatcher::handle_wait_for_tasks() { + dout(20) << dendl; + + assert(m_threads->timer_lock.is_locked()); + assert(m_lock.is_locked()); + assert(m_on_shut_down_finish != nullptr); + + assert(!m_timer_op_tracker.empty()); + m_timer_op_tracker.finish_op(); + + auto ctx = new FunctionContext([this](int r) { + Context *on_finish; + { + // ensure lock isn't held when completing shut down + Mutex::Locker locker(m_lock); + assert(m_on_shut_down_finish != nullptr); + on_finish = m_on_shut_down_finish; + } + on_finish->complete(0); + }); + m_work_queue->queue(ctx, 0); +} + +template +bool LeaderWatcher::is_leader() const { + Mutex::Locker locker(m_lock); + + return is_leader(m_lock); +} + +template +bool LeaderWatcher::is_leader(Mutex &lock) const { + assert(m_lock.is_locked()); + + bool leader = m_leader_lock->is_leader(); + dout(20) << leader << dendl; + return leader; +} + +template +bool LeaderWatcher::is_releasing_leader() const { + Mutex::Locker locker(m_lock); + + return is_releasing_leader(m_lock); +} + +template +bool LeaderWatcher::is_releasing_leader(Mutex &lock) const { + assert(m_lock.is_locked()); + + bool releasing = m_leader_lock->is_releasing_leader(); + dout(20) << releasing << dendl; + return releasing; +} + +template +bool LeaderWatcher::get_leader_instance_id(std::string *instance_id) const { + dout(20) << dendl; + + Mutex::Locker locker(m_lock); + + if (is_leader(m_lock) || is_releasing_leader(m_lock)) { + *instance_id = stringify(m_notifier_id); + return true; + } + + if (!m_locker.cookie.empty()) { + *instance_id = stringify(m_locker.entity.num()); + return true; + } + + return false; +} + +template +void LeaderWatcher::release_leader() { + dout(20) << dendl; + + Mutex::Locker locker(m_lock); + if (!is_leader(m_lock)) { + return; + } + + release_leader_lock(); +} + +template +void LeaderWatcher::list_instances(std::vector *instance_ids) { + dout(20) << dendl; + + Mutex::Locker locker(m_lock); + + instance_ids->clear(); + if (m_instances != nullptr) { + m_instances->list(instance_ids); + } +} + +template +void LeaderWatcher::cancel_timer_task() { + assert(m_threads->timer_lock.is_locked()); + assert(m_lock.is_locked()); + + if (m_timer_task == nullptr) { + return; + } + + dout(20) << m_timer_task << dendl; + bool canceled = m_threads->timer->cancel_event(m_timer_task); + assert(canceled); + m_timer_task = nullptr; +} + +template +void LeaderWatcher::schedule_timer_task(const std::string &name, + int delay_factor, bool leader, + TimerCallback timer_callback, + bool shutting_down) { + assert(m_threads->timer_lock.is_locked()); + assert(m_lock.is_locked()); + + if (!shutting_down && m_on_shut_down_finish != nullptr) { + return; + } + + cancel_timer_task(); + + m_timer_task = new FunctionContext( + [this, leader, timer_callback](int r) { + assert(m_threads->timer_lock.is_locked()); + m_timer_task = nullptr; + + if (m_timer_op_tracker.empty()) { + Mutex::Locker locker(m_lock); + execute_timer_task(leader, timer_callback); + return; + } + + // old timer task is still running -- do not start next + // task until the previous task completes + if (m_timer_gate == nullptr) { + m_timer_gate = new C_TimerGate(this); + m_timer_op_tracker.wait_for_ops(m_timer_gate); + } + m_timer_gate->leader = leader; + m_timer_gate->timer_callback = timer_callback; + }); + + int after = delay_factor * m_cct->_conf->get_val( + "rbd_mirror_leader_heartbeat_interval"); + + dout(20) << "scheduling " << name << " after " << after << " sec (task " + << m_timer_task << ")" << dendl; + m_threads->timer->add_event_after(after, m_timer_task); +} + +template +void LeaderWatcher::execute_timer_task(bool leader, + TimerCallback timer_callback) { + dout(20) << dendl; + + assert(m_threads->timer_lock.is_locked()); + assert(m_lock.is_locked()); + assert(m_timer_op_tracker.empty()); + + if (is_leader(m_lock) != leader) { + return; + } + + m_timer_op_tracker.start_op(); + (this->*timer_callback)(); +} + +template +void LeaderWatcher::handle_post_acquire_leader_lock(int r, + Context *on_finish) { + dout(20) << "r=" << r << dendl; + + if (r < 0) { + if (r == -EAGAIN) { + dout(20) << "already locked" << dendl; + } else { + derr << "error acquiring leader lock: " << cpp_strerror(r) << dendl; + } + on_finish->complete(r); + return; + } + + Mutex::Locker locker(m_lock); + assert(m_on_finish == nullptr); + m_on_finish = on_finish; + m_ret_val = 0; + + init_status_watcher(); +} + +template +void LeaderWatcher::handle_pre_release_leader_lock(Context *on_finish) { + dout(20) << dendl; + + Mutex::Locker locker(m_lock); + assert(m_on_finish == nullptr); + m_on_finish = on_finish; + m_ret_val = 0; + + notify_listener(); +} + +template +void LeaderWatcher::handle_post_release_leader_lock(int r, + Context *on_finish) { + dout(20) << "r=" << r << dendl; + + if (r < 0) { + on_finish->complete(r); + return; + } + + Mutex::Locker locker(m_lock); + assert(m_on_finish == nullptr); + m_on_finish = on_finish; + + notify_lock_released(); +} + +template +void LeaderWatcher::break_leader_lock() { + dout(20) << dendl; + + assert(m_threads->timer_lock.is_locked()); + assert(m_lock.is_locked()); + assert(!m_timer_op_tracker.empty()); + + if (m_locker.cookie.empty()) { + get_locker(); + return; + } + + Context *ctx = create_async_context_callback( + m_work_queue, create_context_callback< + LeaderWatcher, &LeaderWatcher::handle_break_leader_lock>(this)); + + m_leader_lock->break_lock(m_locker, true, ctx); +} + +template +void LeaderWatcher::handle_break_leader_lock(int r) { + dout(20) << "r=" << r << dendl; + + Mutex::Locker timer_locker(m_threads->timer_lock); + Mutex::Locker locker(m_lock); + assert(!m_timer_op_tracker.empty()); + + if (m_leader_lock->is_shutdown()) { + dout(20) << "canceling due to shutdown" << dendl; + m_timer_op_tracker.finish_op(); + return; + } + + if (r < 0 && r != -ENOENT) { + derr << "error beaking leader lock: " << cpp_strerror(r) << dendl; + schedule_acquire_leader_lock(1); + m_timer_op_tracker.finish_op(); + return; + } + + m_locker = {}; + m_acquire_attempts = 0; + acquire_leader_lock(); +} + +template +void LeaderWatcher::schedule_get_locker(bool reset_leader, + uint32_t delay_factor) { + dout(20) << dendl; + + assert(m_threads->timer_lock.is_locked()); + assert(m_lock.is_locked()); + + if (reset_leader) { + m_locker = {}; + m_acquire_attempts = 0; + } + + schedule_timer_task("get locker", delay_factor, false, + &LeaderWatcher::get_locker, false); +} + +template +void LeaderWatcher::get_locker() { + dout(20) << dendl; + + assert(m_threads->timer_lock.is_locked()); + assert(m_lock.is_locked()); + assert(!m_timer_op_tracker.empty()); + + C_GetLocker *get_locker_ctx = new C_GetLocker(this); + Context *ctx = create_async_context_callback(m_work_queue, get_locker_ctx); + + m_leader_lock->get_locker(&get_locker_ctx->locker, ctx); +} + +template +void LeaderWatcher::handle_get_locker(int r, + librbd::managed_lock::Locker& locker) { + dout(20) << "r=" << r << dendl; + + Mutex::Locker timer_locker(m_threads->timer_lock); + Mutex::Locker mutex_locker(m_lock); + assert(!m_timer_op_tracker.empty()); + + if (m_leader_lock->is_shutdown()) { + dout(20) << "canceling due to shutdown" << dendl; + m_timer_op_tracker.finish_op(); + return; + } + + if (is_leader(m_lock)) { + m_locker = {}; + m_timer_op_tracker.finish_op(); + return; + } + + if (r == -ENOENT) { + m_locker = {}; + m_acquire_attempts = 0; + acquire_leader_lock(); + return; + } else if (r < 0) { + derr << "error retrieving leader locker: " << cpp_strerror(r) << dendl; + schedule_get_locker(true, 1); + m_timer_op_tracker.finish_op(); + return; + } + + bool notify_listener = false; + if (m_locker != locker) { + m_locker = locker; + notify_listener = true; + if (m_acquire_attempts > 1) { + dout(10) << "new lock owner detected -- resetting heartbeat counter" + << dendl; + m_acquire_attempts = 0; + } + } + + if (m_acquire_attempts >= m_cct->_conf->get_val( + "rbd_mirror_leader_max_acquire_attempts_before_break")) { + dout(0) << "breaking leader lock after " << m_acquire_attempts << " " + << "failed attempts to acquire" << dendl; + break_leader_lock(); + return; + } + + schedule_acquire_leader_lock(1); + + if (!notify_listener) { + m_timer_op_tracker.finish_op(); + return; + } + + auto ctx = new FunctionContext( + [this](int r) { + std::string instance_id; + if (get_leader_instance_id(&instance_id)) { + m_listener->update_leader_handler(instance_id); + } + Mutex::Locker timer_locker(m_threads->timer_lock); + Mutex::Locker locker(m_lock); + m_timer_op_tracker.finish_op(); + }); + m_work_queue->queue(ctx, 0); +} + +template +void LeaderWatcher::schedule_acquire_leader_lock(uint32_t delay_factor) { + dout(20) << dendl; + + assert(m_threads->timer_lock.is_locked()); + assert(m_lock.is_locked()); + + schedule_timer_task("acquire leader lock", + delay_factor * + m_cct->_conf->get_val("rbd_mirror_leader_max_missed_heartbeats"), + false, &LeaderWatcher::acquire_leader_lock, false); +} + +template +void LeaderWatcher::acquire_leader_lock() { + assert(m_threads->timer_lock.is_locked()); + assert(m_lock.is_locked()); + assert(!m_timer_op_tracker.empty()); + + ++m_acquire_attempts; + dout(20) << "acquire_attempts=" << m_acquire_attempts << dendl; + + Context *ctx = create_async_context_callback( + m_work_queue, create_context_callback< + LeaderWatcher, &LeaderWatcher::handle_acquire_leader_lock>(this)); + m_leader_lock->try_acquire_lock(ctx); +} + +template +void LeaderWatcher::handle_acquire_leader_lock(int r) { + dout(20) << "r=" << r << dendl; + + Mutex::Locker timer_locker(m_threads->timer_lock); + Mutex::Locker locker(m_lock); + assert(!m_timer_op_tracker.empty()); + + if (m_leader_lock->is_shutdown()) { + dout(20) << "canceling due to shutdown" << dendl; + m_timer_op_tracker.finish_op(); + return; + } + + if (r < 0) { + if (r == -EAGAIN) { + dout(20) << "already locked" << dendl; + } else { + derr << "error acquiring lock: " << cpp_strerror(r) << dendl; + } + + get_locker(); + return; + } + + m_locker = {}; + m_acquire_attempts = 0; + + if (m_ret_val) { + dout(5) << "releasing due to error on notify" << dendl; + release_leader_lock(); + m_timer_op_tracker.finish_op(); + return; + } + + notify_heartbeat(); +} + +template +void LeaderWatcher::release_leader_lock() { + dout(20) << dendl; + + assert(m_lock.is_locked()); + + Context *ctx = create_async_context_callback( + m_work_queue, create_context_callback< + LeaderWatcher, &LeaderWatcher::handle_release_leader_lock>(this)); + + m_leader_lock->release_lock(ctx); +} + +template +void LeaderWatcher::handle_release_leader_lock(int r) { + dout(20) << "r=" << r << dendl; + + Mutex::Locker timer_locker(m_threads->timer_lock); + Mutex::Locker locker(m_lock); + + if (r < 0) { + derr << "error releasing lock: " << cpp_strerror(r) << dendl; + return; + } + + schedule_acquire_leader_lock(1); +} + +template +void LeaderWatcher::init_status_watcher() { + dout(20) << dendl; + + assert(m_lock.is_locked()); + assert(m_status_watcher == nullptr); + + m_status_watcher = MirrorStatusWatcher::create(m_ioctx, m_work_queue); + + Context *ctx = create_context_callback< + LeaderWatcher, &LeaderWatcher::handle_init_status_watcher>(this); + + m_status_watcher->init(ctx); +} + +template +void LeaderWatcher::handle_init_status_watcher(int r) { + dout(20) << "r=" << r << dendl; + + Context *on_finish = nullptr; + { + Mutex::Locker locker(m_lock); + + if (r == 0) { + init_instances(); + return; + } + + derr << "error initializing mirror status watcher: " << cpp_strerror(r) + << dendl; + m_status_watcher->destroy(); + m_status_watcher = nullptr; + assert(m_on_finish != nullptr); + std::swap(m_on_finish, on_finish); + } + on_finish->complete(r); +} + +template +void LeaderWatcher::shut_down_status_watcher() { + dout(20) << dendl; + + assert(m_lock.is_locked()); + assert(m_status_watcher != nullptr); + + Context *ctx = create_async_context_callback( + m_work_queue, create_context_callback, + &LeaderWatcher::handle_shut_down_status_watcher>(this)); + + m_status_watcher->shut_down(ctx); +} + +template +void LeaderWatcher::handle_shut_down_status_watcher(int r) { + dout(20) << "r=" << r << dendl; + + Context *on_finish = nullptr; + { + Mutex::Locker locker(m_lock); + + m_status_watcher->destroy(); + m_status_watcher = nullptr; + + if (r < 0) { + derr << "error shutting mirror status watcher down: " << cpp_strerror(r) + << dendl; + } + + if (m_ret_val != 0) { + r = m_ret_val; + } + + if (!is_leader(m_lock)) { + // ignore on releasing + r = 0; + } + + assert(m_on_finish != nullptr); + std::swap(m_on_finish, on_finish); + } + on_finish->complete(r); +} + +template +void LeaderWatcher::init_instances() { + dout(20) << dendl; + + assert(m_lock.is_locked()); + assert(m_instances == nullptr); + + m_instances = Instances::create(m_threads, m_ioctx); + + Context *ctx = create_context_callback< + LeaderWatcher, &LeaderWatcher::handle_init_instances>(this); + + m_instances->init(ctx); +} + +template +void LeaderWatcher::handle_init_instances(int r) { + dout(20) << "r=" << r << dendl; + + Mutex::Locker locker(m_lock); + + if (r < 0) { + derr << "error initializing instances: " << cpp_strerror(r) << dendl; + m_ret_val = r; + m_instances->destroy(); + m_instances = nullptr; + shut_down_status_watcher(); + return; + } + + notify_listener(); +} + +template +void LeaderWatcher::shut_down_instances() { + dout(20) << dendl; + + assert(m_lock.is_locked()); + assert(m_instances != nullptr); + + Context *ctx = create_async_context_callback( + m_work_queue, create_context_callback, + &LeaderWatcher::handle_shut_down_instances>(this)); + + m_instances->shut_down(ctx); +} + +template +void LeaderWatcher::handle_shut_down_instances(int r) { + dout(20) << "r=" << r << dendl; + assert(r == 0); + + Mutex::Locker locker(m_lock); + + m_instances->destroy(); + m_instances = nullptr; + + shut_down_status_watcher(); +} + +template +void LeaderWatcher::notify_listener() { + dout(20) << dendl; + + assert(m_lock.is_locked()); + + Context *ctx = create_async_context_callback( + m_work_queue, create_context_callback< + LeaderWatcher, &LeaderWatcher::handle_notify_listener>(this)); + + if (is_leader(m_lock)) { + ctx = new FunctionContext( + [this, ctx](int r) { + m_listener->post_acquire_handler(ctx); + }); + } else { + ctx = new FunctionContext( + [this, ctx](int r) { + m_listener->pre_release_handler(ctx); + }); + } + m_work_queue->queue(ctx, 0); +} + +template +void LeaderWatcher::handle_notify_listener(int r) { + dout(20) << "r=" << r << dendl; + + Mutex::Locker locker(m_lock); + + if (r < 0) { + derr << "error notifying listener: " << cpp_strerror(r) << dendl; + m_ret_val = r; + } + + if (is_leader(m_lock)) { + notify_lock_acquired(); + } else { + shut_down_instances(); + } +} + +template +void LeaderWatcher::notify_lock_acquired() { + dout(20) << dendl; + + assert(m_lock.is_locked()); + + Context *ctx = create_context_callback< + LeaderWatcher, &LeaderWatcher::handle_notify_lock_acquired>(this); + + bufferlist bl; + ::encode(NotifyMessage{LockAcquiredPayload{}}, bl); + + send_notify(bl, nullptr, ctx); +} + +template +void LeaderWatcher::handle_notify_lock_acquired(int r) { + dout(20) << "r=" << r << dendl; + + Context *on_finish = nullptr; + { + Mutex::Locker locker(m_lock); + if (r < 0 && r != -ETIMEDOUT) { + derr << "error notifying leader lock acquired: " << cpp_strerror(r) + << dendl; + m_ret_val = r; + } + + assert(m_on_finish != nullptr); + std::swap(m_on_finish, on_finish); + } + on_finish->complete(0); +} + +template +void LeaderWatcher::notify_lock_released() { + dout(20) << dendl; + + assert(m_lock.is_locked()); + + Context *ctx = create_context_callback< + LeaderWatcher, &LeaderWatcher::handle_notify_lock_released>(this); + + bufferlist bl; + ::encode(NotifyMessage{LockReleasedPayload{}}, bl); + + send_notify(bl, nullptr, ctx); +} + +template +void LeaderWatcher::handle_notify_lock_released(int r) { + dout(20) << "r=" << r << dendl; + + Context *on_finish = nullptr; + { + Mutex::Locker locker(m_lock); + if (r < 0 && r != -ETIMEDOUT) { + derr << "error notifying leader lock released: " << cpp_strerror(r) + << dendl; + } + + assert(m_on_finish != nullptr); + std::swap(m_on_finish, on_finish); + } + on_finish->complete(r); +} + +template +void LeaderWatcher::notify_heartbeat() { + dout(20) << dendl; + + assert(m_threads->timer_lock.is_locked()); + assert(m_lock.is_locked()); + assert(!m_timer_op_tracker.empty()); + + if (!is_leader(m_lock)) { + dout(5) << "not leader, canceling" << dendl; + m_timer_op_tracker.finish_op(); + return; + } + + Context *ctx = create_context_callback< + LeaderWatcher, &LeaderWatcher::handle_notify_heartbeat>(this); + + bufferlist bl; + ::encode(NotifyMessage{HeartbeatPayload{}}, bl); + + m_heartbeat_response.acks.clear(); + send_notify(bl, &m_heartbeat_response, ctx); +} + +template +void LeaderWatcher::handle_notify_heartbeat(int r) { + dout(20) << "r=" << r << dendl; + + Mutex::Locker timer_locker(m_threads->timer_lock); + Mutex::Locker locker(m_lock); + assert(!m_timer_op_tracker.empty()); + + m_timer_op_tracker.finish_op(); + if (m_leader_lock->is_shutdown()) { + dout(20) << "canceling due to shutdown" << dendl; + return; + } else if (!is_leader(m_lock)) { + return; + } + + if (r < 0 && r != -ETIMEDOUT) { + derr << "error notifying hearbeat: " << cpp_strerror(r) + << ", releasing leader" << dendl; + release_leader_lock(); + return; + } + + dout(20) << m_heartbeat_response.acks.size() << " acks received, " + << m_heartbeat_response.timeouts.size() << " timed out" << dendl; + + for (auto &it: m_heartbeat_response.acks) { + uint64_t notifier_id = it.first.gid; + if (notifier_id == m_notifier_id) { + continue; + } + + std::string instance_id = stringify(notifier_id); + m_instances->notify(instance_id); + } + + schedule_timer_task("heartbeat", 1, true, + &LeaderWatcher::notify_heartbeat, false); +} + +template +void LeaderWatcher::handle_heartbeat(Context *on_notify_ack) { + dout(20) << dendl; + + { + Mutex::Locker timer_locker(m_threads->timer_lock); + Mutex::Locker locker(m_lock); + if (is_leader(m_lock)) { + dout(5) << "got another leader heartbeat, ignoring" << dendl; + } else { + cancel_timer_task(); + m_acquire_attempts = 0; + schedule_acquire_leader_lock(1); + } + } + + on_notify_ack->complete(0); +} + +template +void LeaderWatcher::handle_lock_acquired(Context *on_notify_ack) { + dout(20) << dendl; + + { + Mutex::Locker timer_locker(m_threads->timer_lock); + Mutex::Locker locker(m_lock); + if (is_leader(m_lock)) { + dout(5) << "got another leader lock_acquired, ignoring" << dendl; + } else { + cancel_timer_task(); + schedule_get_locker(true, 0); + } + } + + on_notify_ack->complete(0); +} + +template +void LeaderWatcher::handle_lock_released(Context *on_notify_ack) { + dout(20) << dendl; + + { + Mutex::Locker timer_locker(m_threads->timer_lock); + Mutex::Locker locker(m_lock); + if (is_leader(m_lock)) { + dout(5) << "got another leader lock_released, ignoring" << dendl; + } else { + cancel_timer_task(); + schedule_get_locker(true, 0); + } + } + + on_notify_ack->complete(0); +} + +template +void LeaderWatcher::handle_notify(uint64_t notify_id, uint64_t handle, + uint64_t notifier_id, bufferlist &bl) { + dout(20) << "notify_id=" << notify_id << ", handle=" << handle << ", " + << "notifier_id=" << notifier_id << dendl; + + Context *ctx = new C_NotifyAck(this, notify_id, handle); + + if (notifier_id == m_notifier_id) { + dout(20) << "our own notification, ignoring" << dendl; + ctx->complete(0); + return; + } + + NotifyMessage notify_message; + try { + bufferlist::iterator iter = bl.begin(); + ::decode(notify_message, iter); + } catch (const buffer::error &err) { + derr << ": error decoding image notification: " << err.what() << dendl; + ctx->complete(0); + return; + } + + apply_visitor(HandlePayloadVisitor(this, ctx), notify_message.payload); +} + +template +void LeaderWatcher::handle_payload(const HeartbeatPayload &payload, + Context *on_notify_ack) { + dout(20) << "heartbeat" << dendl; + + handle_heartbeat(on_notify_ack); +} + +template +void LeaderWatcher::handle_payload(const LockAcquiredPayload &payload, + Context *on_notify_ack) { + dout(20) << "lock_acquired" << dendl; + + handle_lock_acquired(on_notify_ack); +} + +template +void LeaderWatcher::handle_payload(const LockReleasedPayload &payload, + Context *on_notify_ack) { + dout(20) << "lock_released" << dendl; + + handle_lock_released(on_notify_ack); +} + +template +void LeaderWatcher::handle_payload(const UnknownPayload &payload, + Context *on_notify_ack) { + dout(20) << "unknown" << dendl; + + on_notify_ack->complete(0); +} + +} // namespace mirror +} // namespace rbd + +template class rbd::mirror::LeaderWatcher;