X-Git-Url: https://gerrit.opnfv.org/gerrit/gitweb?a=blobdiff_plain;f=src%2Fceph%2Fsrc%2Ftools%2Frbd_mirror%2FInstanceWatcher.cc;fp=src%2Fceph%2Fsrc%2Ftools%2Frbd_mirror%2FInstanceWatcher.cc;h=0000000000000000000000000000000000000000;hb=7da45d65be36d36b880cc55c5036e96c24b53f00;hp=7f94976ea1c60d8ce7cad511505ab95a316f25af;hpb=691462d09d0987b47e112d6ee8740375df3c51b2;p=stor4nfv.git diff --git a/src/ceph/src/tools/rbd_mirror/InstanceWatcher.cc b/src/ceph/src/tools/rbd_mirror/InstanceWatcher.cc deleted file mode 100644 index 7f94976..0000000 --- a/src/ceph/src/tools/rbd_mirror/InstanceWatcher.cc +++ /dev/null @@ -1,1308 +0,0 @@ -// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- -// vim: ts=8 sw=2 smarttab - -#include "InstanceWatcher.h" -#include "include/stringify.h" -#include "common/debug.h" -#include "common/errno.h" -#include "cls/rbd/cls_rbd_client.h" -#include "librbd/ManagedLock.h" -#include "librbd/Utils.h" -#include "InstanceReplayer.h" -#include "ImageSyncThrottler.h" - -#define dout_context g_ceph_context -#define dout_subsys ceph_subsys_rbd_mirror -#undef dout_prefix -#define dout_prefix *_dout << "rbd::mirror::InstanceWatcher: " - -namespace rbd { -namespace mirror { - -using namespace instance_watcher; - -using librbd::util::create_async_context_callback; -using librbd::util::create_context_callback; -using librbd::util::create_rados_callback; -using librbd::util::unique_lock_name; - -namespace { - -struct C_GetInstances : public Context { - std::vector *instance_ids; - Context *on_finish; - bufferlist out_bl; - - C_GetInstances(std::vector *instance_ids, Context *on_finish) - : instance_ids(instance_ids), on_finish(on_finish) { - } - - void finish(int r) override { - dout(20) << "C_GetInstances: " << this << " " << __func__ << ": r=" << r - << dendl; - - if (r == 0) { - bufferlist::iterator it = out_bl.begin(); - r = librbd::cls_client::mirror_instances_list_finish(&it, instance_ids); - } else if (r == -ENOENT) { - r = 0; - } - on_finish->complete(r); - } -}; - -template -struct C_RemoveInstanceRequest : public Context { - InstanceWatcher instance_watcher; - Context *on_finish; - - C_RemoveInstanceRequest(librados::IoCtx &io_ctx, ContextWQ *work_queue, - const std::string &instance_id, Context *on_finish) - : instance_watcher(io_ctx, work_queue, nullptr, instance_id), - on_finish(on_finish) { - } - - void send() { - dout(20) << "C_RemoveInstanceRequest: " << this << " " << __func__ << dendl; - - instance_watcher.remove(this); - } - - void finish(int r) override { - dout(20) << "C_RemoveInstanceRequest: " << this << " " << __func__ << ": r=" - << r << dendl; - assert(r == 0); - - on_finish->complete(r); - } -}; - -} // anonymous namespace - -template -struct InstanceWatcher::C_NotifyInstanceRequest : public Context { - InstanceWatcher *instance_watcher; - std::string instance_id; - uint64_t request_id; - bufferlist bl; - Context *on_finish; - bool send_to_leader; - std::unique_ptr notifier; - librbd::watcher::NotifyResponse response; - bool canceling = false; - - C_NotifyInstanceRequest(InstanceWatcher *instance_watcher, - const std::string &instance_id, uint64_t request_id, - bufferlist &&bl, Context *on_finish) - : instance_watcher(instance_watcher), instance_id(instance_id), - request_id(request_id), bl(bl), on_finish(on_finish), - send_to_leader(instance_id.empty()) { - dout(20) << "C_NotifyInstanceRequest: " << this << " " << __func__ - << ": instance_watcher=" << instance_watcher << ", instance_id=" - << instance_id << ", request_id=" << request_id << dendl; - - assert(instance_watcher->m_lock.is_locked()); - - if (!send_to_leader) { - assert((!instance_id.empty())); - notifier.reset(new librbd::watcher::Notifier( - instance_watcher->m_work_queue, - instance_watcher->m_ioctx, - RBD_MIRROR_INSTANCE_PREFIX + instance_id)); - } - - instance_watcher->m_notify_op_tracker.start_op(); - auto result = instance_watcher->m_notify_ops.insert( - std::make_pair(instance_id, this)).second; - assert(result); - } - - void send() { - dout(20) << "C_NotifyInstanceRequest: " << this << " " << __func__ << dendl; - - assert(instance_watcher->m_lock.is_locked()); - - if (canceling) { - dout(20) << "C_NotifyInstanceRequest: " << this << " " << __func__ - << ": canceling" << dendl; - instance_watcher->m_work_queue->queue(this, -ECANCELED); - return; - } - - if (send_to_leader) { - if (instance_watcher->m_leader_instance_id.empty()) { - dout(20) << "C_NotifyInstanceRequest: " << this << " " << __func__ - << ": suspending" << dendl; - instance_watcher->suspend_notify_request(this); - return; - } - - if (instance_watcher->m_leader_instance_id != instance_id) { - auto count = instance_watcher->m_notify_ops.erase( - std::make_pair(instance_id, this)); - assert(count > 0); - - instance_id = instance_watcher->m_leader_instance_id; - - auto result = instance_watcher->m_notify_ops.insert( - std::make_pair(instance_id, this)).second; - assert(result); - - notifier.reset(new librbd::watcher::Notifier( - instance_watcher->m_work_queue, - instance_watcher->m_ioctx, - RBD_MIRROR_INSTANCE_PREFIX + instance_id)); - } - } - - dout(20) << "C_NotifyInstanceRequest: " << this << " " << __func__ - << ": sending to " << instance_id << dendl; - notifier->notify(bl, &response, this); - } - - void cancel() { - dout(20) << "C_NotifyInstanceRequest: " << this << " " << __func__ << dendl; - - assert(instance_watcher->m_lock.is_locked()); - - canceling = true; - instance_watcher->unsuspend_notify_request(this); - } - - void finish(int r) override { - dout(20) << "C_NotifyInstanceRequest: " << this << " " << __func__ << ": r=" - << r << dendl; - - if (r == 0 || r == -ETIMEDOUT) { - bool found = false; - for (auto &it : response.acks) { - auto &bl = it.second; - if (it.second.length() == 0) { - dout(20) << "C_NotifyInstanceRequest: " << this << " " << __func__ - << ": no payload in ack, ignoring" << dendl; - continue; - } - try { - auto iter = bl.begin(); - NotifyAckPayload ack; - ::decode(ack, iter); - if (ack.instance_id != instance_watcher->get_instance_id()) { - derr << "C_NotifyInstanceRequest: " << this << " " << __func__ - << ": ack instance_id (" << ack.instance_id << ") " - << "does not match, ignoring" << dendl; - continue; - } - if (ack.request_id != request_id) { - derr << "C_NotifyInstanceRequest: " << this << " " << __func__ - << ": ack request_id (" << ack.request_id << ") " - << "does not match, ignoring" << dendl; - continue; - } - r = ack.ret_val; - found = true; - break; - } catch (const buffer::error &err) { - derr << "C_NotifyInstanceRequest: " << this << " " << __func__ - << ": failed to decode ack: " << err.what() << dendl; - continue; - } - } - - if (!found) { - if (r == -ETIMEDOUT) { - derr << "C_NotifyInstanceRequest: " << this << " " << __func__ - << ": resending after timeout" << dendl; - Mutex::Locker locker(instance_watcher->m_lock); - send(); - return; - } else { - r = -EINVAL; - } - } else { - if (r == -ESTALE && send_to_leader) { - derr << "C_NotifyInstanceRequest: " << this << " " << __func__ - << ": resending due to leader change" << dendl; - Mutex::Locker locker(instance_watcher->m_lock); - send(); - return; - } - } - } - - on_finish->complete(r); - - { - Mutex::Locker locker(instance_watcher->m_lock); - auto result = instance_watcher->m_notify_ops.erase( - std::make_pair(instance_id, this)); - assert(result > 0); - instance_watcher->m_notify_op_tracker.finish_op(); - } - - delete this; - } - - void complete(int r) override { - finish(r); - } -}; - -template -struct InstanceWatcher::C_SyncRequest : public Context { - InstanceWatcher *instance_watcher; - std::string sync_id; - Context *on_start; - Context *on_complete = nullptr; - C_NotifyInstanceRequest *req = nullptr; - - C_SyncRequest(InstanceWatcher *instance_watcher, - const std::string &sync_id, Context *on_start) - : instance_watcher(instance_watcher), sync_id(sync_id), - on_start(on_start) { - dout(20) << "C_SyncRequest: " << this << " " << __func__ << ": sync_id=" - << sync_id << dendl; - } - - void finish(int r) override { - dout(20) << "C_SyncRequest: " << this << " " << __func__ << ": r=" - << r << dendl; - - if (on_start != nullptr) { - instance_watcher->handle_notify_sync_request(this, r); - } else { - instance_watcher->handle_notify_sync_complete(this, r); - delete this; - } - } - - // called twice - void complete(int r) override { - finish(r); - } -}; - -#undef dout_prefix -#define dout_prefix *_dout << "rbd::mirror::InstanceWatcher: " \ - << this << " " << __func__ << ": " -template -void InstanceWatcher::get_instances(librados::IoCtx &io_ctx, - std::vector *instance_ids, - Context *on_finish) { - librados::ObjectReadOperation op; - librbd::cls_client::mirror_instances_list_start(&op); - C_GetInstances *ctx = new C_GetInstances(instance_ids, on_finish); - librados::AioCompletion *aio_comp = create_rados_callback(ctx); - - int r = io_ctx.aio_operate(RBD_MIRROR_LEADER, aio_comp, &op, &ctx->out_bl); - assert(r == 0); - aio_comp->release(); -} - -template -void InstanceWatcher::remove_instance(librados::IoCtx &io_ctx, - ContextWQ *work_queue, - const std::string &instance_id, - Context *on_finish) { - auto req = new C_RemoveInstanceRequest(io_ctx, work_queue, instance_id, - on_finish); - req->send(); -} - -template -InstanceWatcher *InstanceWatcher::create( - librados::IoCtx &io_ctx, ContextWQ *work_queue, - InstanceReplayer *instance_replayer) { - return new InstanceWatcher(io_ctx, work_queue, instance_replayer, - stringify(io_ctx.get_instance_id())); -} - -template -InstanceWatcher::InstanceWatcher(librados::IoCtx &io_ctx, - ContextWQ *work_queue, - InstanceReplayer *instance_replayer, - const std::string &instance_id) - : Watcher(io_ctx, work_queue, RBD_MIRROR_INSTANCE_PREFIX + instance_id), - m_instance_replayer(instance_replayer), m_instance_id(instance_id), - m_lock(unique_lock_name("rbd::mirror::InstanceWatcher::m_lock", this)), - m_instance_lock(librbd::ManagedLock::create( - m_ioctx, m_work_queue, m_oid, this, librbd::managed_lock::EXCLUSIVE, true, - m_cct->_conf->get_val("rbd_blacklist_expire_seconds"))) { -} - -template -InstanceWatcher::~InstanceWatcher() { - assert(m_notify_ops.empty()); - assert(m_notify_op_tracker.empty()); - assert(m_suspended_ops.empty()); - assert(m_inflight_sync_reqs.empty()); - assert(m_image_sync_throttler == nullptr); - m_instance_lock->destroy(); -} - -template -int InstanceWatcher::init() { - C_SaferCond init_ctx; - init(&init_ctx); - return init_ctx.wait(); -} - -template -void InstanceWatcher::init(Context *on_finish) { - dout(20) << "instance_id=" << m_instance_id << dendl; - - Mutex::Locker locker(m_lock); - - assert(m_on_finish == nullptr); - m_on_finish = on_finish; - m_ret_val = 0; - - register_instance(); -} - -template -void InstanceWatcher::shut_down() { - C_SaferCond shut_down_ctx; - shut_down(&shut_down_ctx); - int r = shut_down_ctx.wait(); - assert(r == 0); -} - -template -void InstanceWatcher::shut_down(Context *on_finish) { - dout(20) << dendl; - - Mutex::Locker locker(m_lock); - - assert(m_on_finish == nullptr); - m_on_finish = on_finish; - m_ret_val = 0; - - release_lock(); -} - -template -void InstanceWatcher::remove(Context *on_finish) { - dout(20) << dendl; - - Mutex::Locker locker(m_lock); - - assert(m_on_finish == nullptr); - m_on_finish = on_finish; - m_ret_val = 0; - m_removing = true; - - get_instance_locker(); -} - -template -void InstanceWatcher::notify_image_acquire( - const std::string &instance_id, const std::string &global_image_id, - Context *on_notify_ack) { - dout(20) << "instance_id=" << instance_id << ", global_image_id=" - << global_image_id << dendl; - - Mutex::Locker locker(m_lock); - - assert(m_on_finish == nullptr); - - if (instance_id == m_instance_id) { - handle_image_acquire(global_image_id, on_notify_ack); - } else { - uint64_t request_id = ++m_request_seq; - bufferlist bl; - ::encode(NotifyMessage{ImageAcquirePayload{request_id, global_image_id}}, - bl); - auto req = new C_NotifyInstanceRequest(this, instance_id, request_id, - std::move(bl), on_notify_ack); - req->send(); - } -} - -template -void InstanceWatcher::notify_image_release( - const std::string &instance_id, const std::string &global_image_id, - Context *on_notify_ack) { - dout(20) << "instance_id=" << instance_id << ", global_image_id=" - << global_image_id << dendl; - - Mutex::Locker locker(m_lock); - - assert(m_on_finish == nullptr); - - if (instance_id == m_instance_id) { - handle_image_release(global_image_id, on_notify_ack); - } else { - uint64_t request_id = ++m_request_seq; - bufferlist bl; - ::encode(NotifyMessage{ImageReleasePayload{request_id, global_image_id}}, - bl); - auto req = new C_NotifyInstanceRequest(this, instance_id, request_id, - std::move(bl), on_notify_ack); - req->send(); - } -} - -template -void InstanceWatcher::notify_peer_image_removed( - const std::string &instance_id, const std::string &global_image_id, - const std::string &peer_mirror_uuid, Context *on_notify_ack) { - dout(20) << "instance_id=" << instance_id << ", " - << "global_image_id=" << global_image_id << ", " - << "peer_mirror_uuid=" << peer_mirror_uuid << dendl; - - Mutex::Locker locker(m_lock); - assert(m_on_finish == nullptr); - - if (instance_id == m_instance_id) { - handle_peer_image_removed(global_image_id, peer_mirror_uuid, on_notify_ack); - } else { - uint64_t request_id = ++m_request_seq; - bufferlist bl; - ::encode(NotifyMessage{PeerImageRemovedPayload{request_id, global_image_id, - peer_mirror_uuid}}, bl); - auto req = new C_NotifyInstanceRequest(this, instance_id, request_id, - std::move(bl), on_notify_ack); - req->send(); - } -} - -template -void InstanceWatcher::notify_sync_request(const std::string &sync_id, - Context *on_sync_start) { - dout(20) << "sync_id=" << sync_id << dendl; - - Mutex::Locker locker(m_lock); - - assert(m_inflight_sync_reqs.count(sync_id) == 0); - - uint64_t request_id = ++m_request_seq; - - bufferlist bl; - ::encode(NotifyMessage{SyncRequestPayload{request_id, sync_id}}, bl); - - auto sync_ctx = new C_SyncRequest(this, sync_id, on_sync_start); - sync_ctx->req = new C_NotifyInstanceRequest(this, "", request_id, - std::move(bl), sync_ctx); - - m_inflight_sync_reqs[sync_id] = sync_ctx; - sync_ctx->req->send(); -} - -template -bool InstanceWatcher::cancel_sync_request(const std::string &sync_id) { - dout(20) << "sync_id=" << sync_id << dendl; - - Mutex::Locker locker(m_lock); - - auto it = m_inflight_sync_reqs.find(sync_id); - if (it == m_inflight_sync_reqs.end()) { - return false; - } - - auto sync_ctx = it->second; - - if (sync_ctx->on_start == nullptr) { - return false; - } - - assert(sync_ctx->req != nullptr); - sync_ctx->req->cancel(); - return true; -} - -template -void InstanceWatcher::notify_sync_start(const std::string &instance_id, - const std::string &sync_id) { - dout(20) << "sync_id=" << sync_id << dendl; - - Mutex::Locker locker(m_lock); - - uint64_t request_id = ++m_request_seq; - - bufferlist bl; - ::encode(NotifyMessage{SyncStartPayload{request_id, sync_id}}, bl); - - auto ctx = new FunctionContext( - [this, sync_id] (int r) { - dout(20) << "finish: sync_id=" << sync_id << ", r=" << r << dendl; - Mutex::Locker locker(m_lock); - if (r != -ESTALE && m_image_sync_throttler != nullptr) { - m_image_sync_throttler->finish_op(sync_id); - } - }); - auto req = new C_NotifyInstanceRequest(this, instance_id, request_id, - std::move(bl), ctx); - req->send(); -} - -template -void InstanceWatcher::notify_sync_complete(const std::string &sync_id) { - dout(20) << "sync_id=" << sync_id << dendl; - - Mutex::Locker locker(m_lock); - - auto it = m_inflight_sync_reqs.find(sync_id); - assert(it != m_inflight_sync_reqs.end()); - - auto sync_ctx = it->second; - assert(sync_ctx->req == nullptr); - - m_inflight_sync_reqs.erase(it); - m_work_queue->queue(sync_ctx, 0); -} - -template -void InstanceWatcher::handle_notify_sync_request(C_SyncRequest *sync_ctx, - int r) { - dout(20) << "sync_id=" << sync_ctx->sync_id << ", r=" << r << dendl; - - Context *on_start = nullptr; - { - Mutex::Locker locker(m_lock); - - assert(sync_ctx->req != nullptr); - assert(sync_ctx->on_start != nullptr); - - if (sync_ctx->req->canceling) { - r = -ECANCELED; - } - - std::swap(sync_ctx->on_start, on_start); - sync_ctx->req = nullptr; - } - - on_start->complete(r == -ECANCELED ? r : 0); - - if (r == -ECANCELED) { - notify_sync_complete(sync_ctx->sync_id); - } -} - -template -void InstanceWatcher::handle_notify_sync_complete(C_SyncRequest *sync_ctx, - int r) { - dout(20) << "sync_id=" << sync_ctx->sync_id << ", r=" << r << dendl; - - if (sync_ctx->on_complete != nullptr) { - sync_ctx->on_complete->complete(r); - } -} - -template -void InstanceWatcher::print_sync_status(Formatter *f, stringstream *ss) { - dout(20) << dendl; - - Mutex::Locker locker(m_lock); - if (m_image_sync_throttler != nullptr) { - m_image_sync_throttler->print_status(f, ss); - } -} - -template -void InstanceWatcher::handle_acquire_leader() { - dout(20) << dendl; - - Mutex::Locker locker(m_lock); - - assert(m_image_sync_throttler == nullptr); - m_image_sync_throttler = ImageSyncThrottler::create(); - - m_leader_instance_id = m_instance_id; - unsuspend_notify_requests(); -} - -template -void InstanceWatcher::handle_release_leader() { - dout(20) << dendl; - - Mutex::Locker locker(m_lock); - - assert(m_image_sync_throttler != nullptr); - - m_leader_instance_id.clear(); - - m_image_sync_throttler->drain(-ESTALE); - m_image_sync_throttler->destroy(); - m_image_sync_throttler = nullptr; -} - -template -void InstanceWatcher::handle_update_leader( - const std::string &leader_instance_id) { - dout(20) << "leader_instance_id=" << leader_instance_id << dendl; - - Mutex::Locker locker(m_lock); - - m_leader_instance_id = leader_instance_id; - - if (!m_leader_instance_id.empty()) { - unsuspend_notify_requests(); - } -} - -template -void InstanceWatcher::cancel_notify_requests( - const std::string &instance_id) { - dout(20) << "instance_id=" << instance_id << dendl; - - Mutex::Locker locker(m_lock); - - for (auto op : m_notify_ops) { - if (op.first == instance_id && !op.second->send_to_leader) { - op.second->cancel(); - } - } -} - -template -void InstanceWatcher::register_instance() { - assert(m_lock.is_locked()); - - dout(20) << dendl; - - librados::ObjectWriteOperation op; - librbd::cls_client::mirror_instances_add(&op, m_instance_id); - librados::AioCompletion *aio_comp = create_rados_callback< - InstanceWatcher, &InstanceWatcher::handle_register_instance>(this); - - int r = m_ioctx.aio_operate(RBD_MIRROR_LEADER, aio_comp, &op); - assert(r == 0); - aio_comp->release(); -} - -template -void InstanceWatcher::handle_register_instance(int r) { - dout(20) << "r=" << r << dendl; - - Context *on_finish = nullptr; - { - Mutex::Locker locker(m_lock); - - if (r == 0) { - create_instance_object(); - return; - } - - derr << "error registering instance: " << cpp_strerror(r) << dendl; - - std::swap(on_finish, m_on_finish); - } - on_finish->complete(r); -} - - -template -void InstanceWatcher::create_instance_object() { - dout(20) << dendl; - - assert(m_lock.is_locked()); - - librados::ObjectWriteOperation op; - op.create(true); - - librados::AioCompletion *aio_comp = create_rados_callback< - InstanceWatcher, - &InstanceWatcher::handle_create_instance_object>(this); - int r = m_ioctx.aio_operate(m_oid, aio_comp, &op); - assert(r == 0); - aio_comp->release(); -} - -template -void InstanceWatcher::handle_create_instance_object(int r) { - dout(20) << "r=" << r << dendl; - - Mutex::Locker locker(m_lock); - - if (r < 0) { - derr << "error creating " << m_oid << " object: " << cpp_strerror(r) - << dendl; - - m_ret_val = r; - unregister_instance(); - return; - } - - register_watch(); -} - -template -void InstanceWatcher::register_watch() { - dout(20) << dendl; - - assert(m_lock.is_locked()); - - Context *ctx = create_async_context_callback( - m_work_queue, create_context_callback< - InstanceWatcher, &InstanceWatcher::handle_register_watch>(this)); - - librbd::Watcher::register_watch(ctx); -} - -template -void InstanceWatcher::handle_register_watch(int r) { - dout(20) << "r=" << r << dendl; - - Mutex::Locker locker(m_lock); - - if (r < 0) { - derr << "error registering instance watcher for " << m_oid << " object: " - << cpp_strerror(r) << dendl; - - m_ret_val = r; - remove_instance_object(); - return; - } - - acquire_lock(); -} - -template -void InstanceWatcher::acquire_lock() { - dout(20) << dendl; - - assert(m_lock.is_locked()); - - Context *ctx = create_async_context_callback( - m_work_queue, create_context_callback< - InstanceWatcher, &InstanceWatcher::handle_acquire_lock>(this)); - - m_instance_lock->acquire_lock(ctx); -} - -template -void InstanceWatcher::handle_acquire_lock(int r) { - dout(20) << "r=" << r << dendl; - - Context *on_finish = nullptr; - { - Mutex::Locker locker(m_lock); - - if (r < 0) { - - derr << "error acquiring instance lock: " << cpp_strerror(r) << dendl; - - m_ret_val = r; - unregister_watch(); - return; - } - - std::swap(on_finish, m_on_finish); - } - - on_finish->complete(r); -} - -template -void InstanceWatcher::release_lock() { - dout(20) << dendl; - - assert(m_lock.is_locked()); - - Context *ctx = create_async_context_callback( - m_work_queue, create_context_callback< - InstanceWatcher, &InstanceWatcher::handle_release_lock>(this)); - - m_instance_lock->shut_down(ctx); -} - -template -void InstanceWatcher::handle_release_lock(int r) { - dout(20) << "r=" << r << dendl; - - Mutex::Locker locker(m_lock); - - if (r < 0) { - derr << "error releasing instance lock: " << cpp_strerror(r) << dendl; - } - - unregister_watch(); -} - -template -void InstanceWatcher::unregister_watch() { - dout(20) << dendl; - - assert(m_lock.is_locked()); - - Context *ctx = create_async_context_callback( - m_work_queue, create_context_callback< - InstanceWatcher, &InstanceWatcher::handle_unregister_watch>(this)); - - librbd::Watcher::unregister_watch(ctx); -} - -template -void InstanceWatcher::handle_unregister_watch(int r) { - dout(20) << "r=" << r << dendl; - - if (r < 0) { - derr << "error unregistering instance watcher for " << m_oid << " object: " - << cpp_strerror(r) << dendl; - } - - Mutex::Locker locker(m_lock); - remove_instance_object(); -} - -template -void InstanceWatcher::remove_instance_object() { - assert(m_lock.is_locked()); - - dout(20) << dendl; - - librados::ObjectWriteOperation op; - op.remove(); - - librados::AioCompletion *aio_comp = create_rados_callback< - InstanceWatcher, - &InstanceWatcher::handle_remove_instance_object>(this); - int r = m_ioctx.aio_operate(m_oid, aio_comp, &op); - assert(r == 0); - aio_comp->release(); -} - -template -void InstanceWatcher::handle_remove_instance_object(int r) { - dout(20) << "r=" << r << dendl; - - if (m_removing && r == -ENOENT) { - r = 0; - } - - if (r < 0) { - derr << "error removing " << m_oid << " object: " << cpp_strerror(r) - << dendl; - } - - Mutex::Locker locker(m_lock); - unregister_instance(); -} - -template -void InstanceWatcher::unregister_instance() { - dout(20) << dendl; - - assert(m_lock.is_locked()); - - librados::ObjectWriteOperation op; - librbd::cls_client::mirror_instances_remove(&op, m_instance_id); - librados::AioCompletion *aio_comp = create_rados_callback< - InstanceWatcher, &InstanceWatcher::handle_unregister_instance>(this); - - int r = m_ioctx.aio_operate(RBD_MIRROR_LEADER, aio_comp, &op); - assert(r == 0); - aio_comp->release(); -} - -template -void InstanceWatcher::handle_unregister_instance(int r) { - dout(20) << "r=" << r << dendl; - - if (r < 0) { - derr << "error unregistering instance: " << cpp_strerror(r) << dendl; - } - - Mutex::Locker locker(m_lock); - wait_for_notify_ops(); -} - -template -void InstanceWatcher::wait_for_notify_ops() { - dout(20) << dendl; - - assert(m_lock.is_locked()); - - for (auto op : m_notify_ops) { - op.second->cancel(); - } - - Context *ctx = create_async_context_callback( - m_work_queue, create_context_callback< - InstanceWatcher, &InstanceWatcher::handle_wait_for_notify_ops>(this)); - - m_notify_op_tracker.wait_for_ops(ctx); -} - -template -void InstanceWatcher::handle_wait_for_notify_ops(int r) { - dout(20) << "r=" << r << dendl; - - assert(r == 0); - - Context *on_finish = nullptr; - { - Mutex::Locker locker(m_lock); - - assert(m_notify_ops.empty()); - - std::swap(on_finish, m_on_finish); - r = m_ret_val; - - if (m_removing) { - m_removing = false; - } - } - on_finish->complete(r); -} - -template -void InstanceWatcher::get_instance_locker() { - dout(20) << dendl; - - assert(m_lock.is_locked()); - - Context *ctx = create_async_context_callback( - m_work_queue, create_context_callback< - InstanceWatcher, &InstanceWatcher::handle_get_instance_locker>(this)); - - m_instance_lock->get_locker(&m_instance_locker, ctx); -} - -template -void InstanceWatcher::handle_get_instance_locker(int r) { - dout(20) << "r=" << r << dendl; - - Mutex::Locker locker(m_lock); - - if (r < 0) { - if (r != -ENOENT) { - derr << "error retrieving instance locker: " << cpp_strerror(r) << dendl; - } - remove_instance_object(); - return; - } - - break_instance_lock(); -} - -template -void InstanceWatcher::break_instance_lock() { - dout(20) << dendl; - - assert(m_lock.is_locked()); - - Context *ctx = create_async_context_callback( - m_work_queue, create_context_callback< - InstanceWatcher, &InstanceWatcher::handle_break_instance_lock>(this)); - - m_instance_lock->break_lock(m_instance_locker, true, ctx); -} - -template -void InstanceWatcher::handle_break_instance_lock(int r) { - dout(20) << "r=" << r << dendl; - - Mutex::Locker locker(m_lock); - - if (r < 0) { - if (r != -ENOENT) { - derr << "error breaking instance lock: " << cpp_strerror(r) << dendl; - } - remove_instance_object(); - return; - } - - remove_instance_object(); -} - -template -void InstanceWatcher::suspend_notify_request(C_NotifyInstanceRequest *req) { - dout(20) << req << dendl; - - assert(m_lock.is_locked()); - - auto result = m_suspended_ops.insert(req).second; - assert(result); -} - -template -bool InstanceWatcher::unsuspend_notify_request( - C_NotifyInstanceRequest *req) { - dout(20) << req << dendl; - - assert(m_lock.is_locked()); - - auto result = m_suspended_ops.erase(req); - if (result == 0) { - return false; - } - - req->send(); - return true; -} - -template -void InstanceWatcher::unsuspend_notify_requests() { - dout(20) << dendl; - - assert(m_lock.is_locked()); - - std::set suspended_ops; - std::swap(m_suspended_ops, suspended_ops); - - for (auto op : suspended_ops) { - op->send(); - } -} - -template -Context *InstanceWatcher::prepare_request(const std::string &instance_id, - uint64_t request_id, - C_NotifyAck *on_notify_ack) { - dout(20) << "instance_id=" << instance_id << ", request_id=" << request_id - << dendl; - - Mutex::Locker locker(m_lock); - - Context *ctx = nullptr; - Request request(instance_id, request_id); - auto it = m_requests.find(request); - - if (it != m_requests.end()) { - dout(20) << "duplicate for in-progress request" << dendl; - delete it->on_notify_ack; - m_requests.erase(it); - } else { - ctx = create_async_context_callback( - m_work_queue, new FunctionContext( - [this, instance_id, request_id] (int r) { - complete_request(instance_id, request_id, r); - })); - } - - request.on_notify_ack = on_notify_ack; - m_requests.insert(request); - return ctx; -} - -template -void InstanceWatcher::complete_request(const std::string &instance_id, - uint64_t request_id, int r) { - dout(20) << "instance_id=" << instance_id << ", request_id=" << request_id - << dendl; - - C_NotifyAck *on_notify_ack; - { - Mutex::Locker locker(m_lock); - Request request(instance_id, request_id); - auto it = m_requests.find(request); - assert(it != m_requests.end()); - on_notify_ack = it->on_notify_ack; - m_requests.erase(it); - } - - ::encode(NotifyAckPayload(instance_id, request_id, r), on_notify_ack->out); - on_notify_ack->complete(0); -} - -template -void InstanceWatcher::handle_notify(uint64_t notify_id, uint64_t handle, - uint64_t notifier_id, bufferlist &bl) { - dout(20) << "notify_id=" << notify_id << ", handle=" << handle << ", " - << "notifier_id=" << notifier_id << dendl; - - auto ctx = new C_NotifyAck(this, notify_id, handle); - - NotifyMessage notify_message; - try { - bufferlist::iterator iter = bl.begin(); - ::decode(notify_message, iter); - } catch (const buffer::error &err) { - derr << "error decoding image notification: " << err.what() << dendl; - ctx->complete(0); - return; - } - - apply_visitor(HandlePayloadVisitor(this, stringify(notifier_id), ctx), - notify_message.payload); -} - -template -void InstanceWatcher::handle_image_acquire( - const std::string &global_image_id, Context *on_finish) { - dout(20) << "global_image_id=" << global_image_id << dendl; - - auto ctx = new FunctionContext( - [this, global_image_id, on_finish] (int r) { - m_instance_replayer->acquire_image(this, global_image_id, on_finish); - m_notify_op_tracker.finish_op(); - }); - - m_notify_op_tracker.start_op(); - m_work_queue->queue(ctx, 0); -} - -template -void InstanceWatcher::handle_image_release( - const std::string &global_image_id, Context *on_finish) { - dout(20) << "global_image_id=" << global_image_id << dendl; - - auto ctx = new FunctionContext( - [this, global_image_id, on_finish] (int r) { - m_instance_replayer->release_image(global_image_id, on_finish); - m_notify_op_tracker.finish_op(); - }); - - m_notify_op_tracker.start_op(); - m_work_queue->queue(ctx, 0); -} - -template -void InstanceWatcher::handle_peer_image_removed( - const std::string &global_image_id, const std::string &peer_mirror_uuid, - Context *on_finish) { - dout(20) << "global_image_id=" << global_image_id << ", " - << "peer_mirror_uuid=" << peer_mirror_uuid << dendl; - - auto ctx = new FunctionContext( - [this, peer_mirror_uuid, global_image_id, on_finish] (int r) { - m_instance_replayer->remove_peer_image(global_image_id, - peer_mirror_uuid, on_finish); - m_notify_op_tracker.finish_op(); - }); - - m_notify_op_tracker.start_op(); - m_work_queue->queue(ctx, 0); -} - -template -void InstanceWatcher::handle_sync_request(const std::string &instance_id, - const std::string &sync_id, - Context *on_finish) { - dout(20) << "instance_id=" << instance_id << ", sync_id=" << sync_id << dendl; - - Mutex::Locker locker(m_lock); - - if (m_image_sync_throttler == nullptr) { - dout(20) << "sync request for non-leader" << dendl; - m_work_queue->queue(on_finish, -ESTALE); - return; - } - - Context *on_start = create_async_context_callback( - m_work_queue, new FunctionContext( - [this, instance_id, sync_id, on_finish] (int r) { - dout(20) << "handle_sync_request: finish: instance_id=" << instance_id - << ", sync_id=" << sync_id << ", r=" << r << dendl; - if (r == 0) { - notify_sync_start(instance_id, sync_id); - } - on_finish->complete(r); - })); - m_image_sync_throttler->start_op(sync_id, on_start); -} - -template -void InstanceWatcher::handle_sync_start(const std::string &instance_id, - const std::string &sync_id, - Context *on_finish) { - dout(20) << "instance_id=" << instance_id << ", sync_id=" << sync_id << dendl; - - Mutex::Locker locker(m_lock); - - auto it = m_inflight_sync_reqs.find(sync_id); - if (it == m_inflight_sync_reqs.end()) { - dout(20) << "not found" << dendl; - m_work_queue->queue(on_finish, 0); - return; - } - - auto sync_ctx = it->second; - - if (sync_ctx->on_complete != nullptr) { - dout(20) << "duplicate request" << dendl; - m_work_queue->queue(sync_ctx->on_complete, -ESTALE); - } - - sync_ctx->on_complete = on_finish; -} - -template -void InstanceWatcher::handle_payload(const std::string &instance_id, - const ImageAcquirePayload &payload, - C_NotifyAck *on_notify_ack) { - dout(20) << "image_acquire: instance_id=" << instance_id << ", " - << "request_id=" << payload.request_id << dendl; - - auto on_finish = prepare_request(instance_id, payload.request_id, - on_notify_ack); - if (on_finish != nullptr) { - handle_image_acquire(payload.global_image_id, on_finish); - } -} - -template -void InstanceWatcher::handle_payload(const std::string &instance_id, - const ImageReleasePayload &payload, - C_NotifyAck *on_notify_ack) { - dout(20) << "image_release: instance_id=" << instance_id << ", " - << "request_id=" << payload.request_id << dendl; - - auto on_finish = prepare_request(instance_id, payload.request_id, - on_notify_ack); - if (on_finish != nullptr) { - handle_image_release(payload.global_image_id, on_finish); - } -} - -template -void InstanceWatcher::handle_payload(const std::string &instance_id, - const PeerImageRemovedPayload &payload, - C_NotifyAck *on_notify_ack) { - dout(20) << "remove_peer_image: instance_id=" << instance_id << ", " - << "request_id=" << payload.request_id << dendl; - - auto on_finish = prepare_request(instance_id, payload.request_id, - on_notify_ack); - if (on_finish != nullptr) { - handle_peer_image_removed(payload.global_image_id, payload.peer_mirror_uuid, - on_finish); - } -} - -template -void InstanceWatcher::handle_payload(const std::string &instance_id, - const SyncRequestPayload &payload, - C_NotifyAck *on_notify_ack) { - dout(20) << "sync_request: instance_id=" << instance_id << ", " - << "request_id=" << payload.request_id << dendl; - - auto on_finish = prepare_request(instance_id, payload.request_id, - on_notify_ack); - if (on_finish == nullptr) { - return; - } - - handle_sync_request(instance_id, payload.sync_id, on_finish); -} - -template -void InstanceWatcher::handle_payload(const std::string &instance_id, - const SyncStartPayload &payload, - C_NotifyAck *on_notify_ack) { - dout(20) << "sync_start: instance_id=" << instance_id << ", " - << "request_id=" << payload.request_id << dendl; - - auto on_finish = prepare_request(instance_id, payload.request_id, - on_notify_ack); - if (on_finish == nullptr) { - return; - } - - handle_sync_start(instance_id, payload.sync_id, on_finish); -} - -template -void InstanceWatcher::handle_payload(const std::string &instance_id, - const UnknownPayload &payload, - C_NotifyAck *on_notify_ack) { - dout(20) << "unknown: instance_id=" << instance_id << dendl; - - on_notify_ack->complete(0); -} - -} // namespace mirror -} // namespace rbd - -template class rbd::mirror::InstanceWatcher;