initial code repo
[stor4nfv.git] / src / ceph / src / tools / rbd_mirror / InstanceReplayer.cc
diff --git a/src/ceph/src/tools/rbd_mirror/InstanceReplayer.cc b/src/ceph/src/tools/rbd_mirror/InstanceReplayer.cc
new file mode 100644 (file)
index 0000000..8b7fca1
--- /dev/null
@@ -0,0 +1,497 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#include "include/stringify.h"
+#include "common/Timer.h"
+#include "common/debug.h"
+#include "common/errno.h"
+#include "librbd/Utils.h"
+#include "ImageReplayer.h"
+#include "InstanceReplayer.h"
+#include "ServiceDaemon.h"
+#include "Threads.h"
+
+#define dout_context g_ceph_context
+#define dout_subsys ceph_subsys_rbd_mirror
+#undef dout_prefix
+#define dout_prefix *_dout << "rbd::mirror::InstanceReplayer: " \
+                           << this << " " << __func__ << ": "
+
+namespace rbd {
+namespace mirror {
+
+namespace {
+
+const std::string SERVICE_DAEMON_ASSIGNED_COUNT_KEY("image_assigned_count");
+const std::string SERVICE_DAEMON_WARNING_COUNT_KEY("image_warning_count");
+const std::string SERVICE_DAEMON_ERROR_COUNT_KEY("image_error_count");
+
+} // anonymous namespace
+
+using librbd::util::create_async_context_callback;
+using librbd::util::create_context_callback;
+
+template <typename I>
+InstanceReplayer<I>::InstanceReplayer(
+    Threads<I> *threads, ServiceDaemon<I>* service_daemon,
+    ImageDeleter<I>* image_deleter, RadosRef local_rados,
+    const std::string &local_mirror_uuid, int64_t local_pool_id)
+  : m_threads(threads), m_service_daemon(service_daemon),
+    m_image_deleter(image_deleter), m_local_rados(local_rados),
+    m_local_mirror_uuid(local_mirror_uuid), m_local_pool_id(local_pool_id),
+    m_lock("rbd::mirror::InstanceReplayer " + stringify(local_pool_id)) {
+}
+
+template <typename I>
+InstanceReplayer<I>::~InstanceReplayer() {
+  assert(m_image_state_check_task == nullptr);
+  assert(m_async_op_tracker.empty());
+  assert(m_image_replayers.empty());
+}
+
+template <typename I>
+int InstanceReplayer<I>::init() {
+  C_SaferCond init_ctx;
+  init(&init_ctx);
+  return init_ctx.wait();
+}
+
+template <typename I>
+void InstanceReplayer<I>::init(Context *on_finish) {
+  dout(20) << dendl;
+
+  Context *ctx = new FunctionContext(
+    [this, on_finish] (int r) {
+      {
+        Mutex::Locker timer_locker(m_threads->timer_lock);
+        schedule_image_state_check_task();
+      }
+      on_finish->complete(0);
+    });
+
+  m_threads->work_queue->queue(ctx, 0);
+}
+
+template <typename I>
+void InstanceReplayer<I>::shut_down() {
+  C_SaferCond shut_down_ctx;
+  shut_down(&shut_down_ctx);
+  int r = shut_down_ctx.wait();
+  assert(r == 0);
+}
+
+template <typename I>
+void InstanceReplayer<I>::shut_down(Context *on_finish) {
+  dout(20) << dendl;
+
+  Mutex::Locker locker(m_lock);
+
+  assert(m_on_shut_down == nullptr);
+  m_on_shut_down = on_finish;
+
+  Context *ctx = new FunctionContext(
+    [this] (int r) {
+      cancel_image_state_check_task();
+      wait_for_ops();
+    });
+
+  m_threads->work_queue->queue(ctx, 0);
+}
+
+template <typename I>
+void InstanceReplayer<I>::add_peer(std::string peer_uuid,
+                                   librados::IoCtx io_ctx) {
+  dout(20) << peer_uuid << dendl;
+
+  Mutex::Locker locker(m_lock);
+  auto result = m_peers.insert(Peer(peer_uuid, io_ctx)).second;
+  assert(result);
+}
+
+template <typename I>
+void InstanceReplayer<I>::release_all(Context *on_finish) {
+  dout(20) << dendl;
+
+  Mutex::Locker locker(m_lock);
+
+  C_Gather *gather_ctx = new C_Gather(g_ceph_context, on_finish);
+  for (auto it = m_image_replayers.begin(); it != m_image_replayers.end();
+       it = m_image_replayers.erase(it)) {
+    auto image_replayer = it->second;
+    auto ctx = gather_ctx->new_sub();
+    ctx = new FunctionContext(
+      [image_replayer, ctx] (int r) {
+        image_replayer->destroy();
+        ctx->complete(0);
+      });
+    stop_image_replayer(image_replayer, ctx);
+  }
+  gather_ctx->activate();
+}
+
+template <typename I>
+void InstanceReplayer<I>::acquire_image(InstanceWatcher<I> *instance_watcher,
+                                        const std::string &global_image_id,
+                                        Context *on_finish) {
+  dout(20) << "global_image_id=" << global_image_id << dendl;
+
+  Mutex::Locker locker(m_lock);
+
+  assert(m_on_shut_down == nullptr);
+
+  auto it = m_image_replayers.find(global_image_id);
+  if (it == m_image_replayers.end()) {
+    auto image_replayer = ImageReplayer<I>::create(
+        m_threads, m_image_deleter, instance_watcher, m_local_rados,
+        m_local_mirror_uuid, m_local_pool_id, global_image_id);
+
+    dout(20) << global_image_id << ": creating replayer " << image_replayer
+             << dendl;
+
+    it = m_image_replayers.insert(std::make_pair(global_image_id,
+                                                 image_replayer)).first;
+
+    // TODO only a single peer is currently supported
+    assert(m_peers.size() == 1);
+    auto peer = *m_peers.begin();
+    image_replayer->add_peer(peer.peer_uuid, peer.io_ctx);
+  }
+
+  auto& image_replayer = it->second;
+  // TODO temporary until policy integrated
+  image_replayer->set_finished(false);
+
+  start_image_replayer(image_replayer);
+  m_threads->work_queue->queue(on_finish, 0);
+}
+
+template <typename I>
+void InstanceReplayer<I>::release_image(const std::string &global_image_id,
+                                        Context *on_finish) {
+  dout(20) << "global_image_id=" << global_image_id << dendl;
+
+  Mutex::Locker locker(m_lock);
+  assert(m_on_shut_down == nullptr);
+
+  auto it = m_image_replayers.find(global_image_id);
+  if (it == m_image_replayers.end()) {
+    dout(20) << global_image_id << ": not found" << dendl;
+    m_threads->work_queue->queue(on_finish, 0);
+    return;
+  }
+
+  auto image_replayer = it->second;
+  m_image_replayers.erase(it);
+
+  on_finish = new FunctionContext(
+    [image_replayer, on_finish] (int r) {
+      image_replayer->destroy();
+      on_finish->complete(0);
+    });
+  stop_image_replayer(image_replayer, on_finish);
+}
+
+template <typename I>
+void InstanceReplayer<I>::remove_peer_image(const std::string &global_image_id,
+                                            const std::string &peer_mirror_uuid,
+                                            Context *on_finish) {
+  dout(20) << "global_image_id=" << global_image_id << ", "
+           << "peer_mirror_uuid=" << peer_mirror_uuid << dendl;
+
+  Mutex::Locker locker(m_lock);
+  assert(m_on_shut_down == nullptr);
+
+  auto it = m_image_replayers.find(global_image_id);
+  if (it != m_image_replayers.end()) {
+    // TODO only a single peer is currently supported, therefore
+    // we can just interrupt the current image replayer and
+    // it will eventually detect that the peer image is missing and
+    // determine if a delete propagation is required.
+    auto image_replayer = it->second;
+    image_replayer->restart();
+  }
+  m_threads->work_queue->queue(on_finish, 0);
+}
+
+template <typename I>
+void InstanceReplayer<I>::print_status(Formatter *f, stringstream *ss) {
+  dout(20) << dendl;
+
+  if (!f) {
+    return;
+  }
+
+  Mutex::Locker locker(m_lock);
+
+  f->open_array_section("image_replayers");
+  for (auto &kv : m_image_replayers) {
+    auto &image_replayer = kv.second;
+    image_replayer->print_status(f, ss);
+  }
+  f->close_section();
+}
+
+template <typename I>
+void InstanceReplayer<I>::start()
+{
+  dout(20) << dendl;
+
+  Mutex::Locker locker(m_lock);
+
+  m_manual_stop = false;
+
+  for (auto &kv : m_image_replayers) {
+    auto &image_replayer = kv.second;
+    image_replayer->start(nullptr, true);
+  }
+}
+
+template <typename I>
+void InstanceReplayer<I>::stop()
+{
+  dout(20) << dendl;
+
+  Mutex::Locker locker(m_lock);
+
+  m_manual_stop = true;
+
+  for (auto &kv : m_image_replayers) {
+    auto &image_replayer = kv.second;
+    image_replayer->stop(nullptr, true);
+  }
+}
+
+template <typename I>
+void InstanceReplayer<I>::restart()
+{
+  dout(20) << dendl;
+
+  Mutex::Locker locker(m_lock);
+
+  m_manual_stop = false;
+
+  for (auto &kv : m_image_replayers) {
+    auto &image_replayer = kv.second;
+    image_replayer->restart();
+  }
+}
+
+template <typename I>
+void InstanceReplayer<I>::flush()
+{
+  dout(20) << "enter" << dendl;
+
+  Mutex::Locker locker(m_lock);
+
+  for (auto &kv : m_image_replayers) {
+    auto &image_replayer = kv.second;
+    image_replayer->flush();
+  }
+}
+
+template <typename I>
+void InstanceReplayer<I>::start_image_replayer(
+    ImageReplayer<I> *image_replayer) {
+  assert(m_lock.is_locked());
+
+  std::string global_image_id = image_replayer->get_global_image_id();
+  dout(20) << "global_image_id=" << global_image_id << dendl;
+
+  if (!image_replayer->is_stopped()) {
+    return;
+  } else if (image_replayer->is_blacklisted()) {
+    derr << "blacklisted detected during image replay" << dendl;
+    return;
+  } else if (image_replayer->is_finished()) {
+    // TODO temporary until policy integrated
+    dout(5) << "removing image replayer for global_image_id="
+            << global_image_id << dendl;
+    m_image_replayers.erase(image_replayer->get_global_image_id());
+    image_replayer->destroy();
+    return;
+  }
+
+  image_replayer->start(nullptr, false);
+}
+
+template <typename I>
+void InstanceReplayer<I>::queue_start_image_replayers() {
+  dout(20) << dendl;
+
+  Context *ctx = create_context_callback<
+    InstanceReplayer, &InstanceReplayer<I>::start_image_replayers>(this);
+  m_async_op_tracker.start_op();
+  m_threads->work_queue->queue(ctx, 0);
+}
+
+template <typename I>
+void InstanceReplayer<I>::start_image_replayers(int r) {
+  dout(20) << dendl;
+
+  Mutex::Locker locker(m_lock);
+  if (m_on_shut_down != nullptr) {
+    return;
+  }
+
+  uint64_t image_count = 0;
+  uint64_t warning_count = 0;
+  uint64_t error_count = 0;
+  for (auto it = m_image_replayers.begin();
+       it != m_image_replayers.end();) {
+    auto current_it(it);
+    ++it;
+
+    ++image_count;
+    auto health_state = current_it->second->get_health_state();
+    if (health_state == image_replayer::HEALTH_STATE_WARNING) {
+      ++warning_count;
+    } else if (health_state == image_replayer::HEALTH_STATE_ERROR) {
+      ++error_count;
+    }
+
+    start_image_replayer(current_it->second);
+  }
+
+  m_service_daemon->add_or_update_attribute(
+    m_local_pool_id, SERVICE_DAEMON_ASSIGNED_COUNT_KEY, image_count);
+  m_service_daemon->add_or_update_attribute(
+    m_local_pool_id, SERVICE_DAEMON_WARNING_COUNT_KEY, warning_count);
+  m_service_daemon->add_or_update_attribute(
+    m_local_pool_id, SERVICE_DAEMON_ERROR_COUNT_KEY, error_count);
+
+  m_async_op_tracker.finish_op();
+}
+
+template <typename I>
+void InstanceReplayer<I>::stop_image_replayer(ImageReplayer<I> *image_replayer,
+                                              Context *on_finish) {
+  dout(20) << image_replayer << " global_image_id="
+           << image_replayer->get_global_image_id() << ", on_finish="
+           << on_finish << dendl;
+
+  if (image_replayer->is_stopped()) {
+    m_threads->work_queue->queue(on_finish, 0);
+    return;
+  }
+
+  m_async_op_tracker.start_op();
+  Context *ctx = create_async_context_callback(
+    m_threads->work_queue, new FunctionContext(
+      [this, image_replayer, on_finish] (int r) {
+        stop_image_replayer(image_replayer, on_finish);
+        m_async_op_tracker.finish_op();
+      }));
+
+  if (image_replayer->is_running()) {
+    image_replayer->stop(ctx, false);
+  } else {
+    int after = 1;
+    dout(20) << "scheduling image replayer " << image_replayer << " stop after "
+             << after << " sec (task " << ctx << ")" << dendl;
+    ctx = new FunctionContext(
+      [this, after, ctx] (int r) {
+        Mutex::Locker timer_locker(m_threads->timer_lock);
+        m_threads->timer->add_event_after(after, ctx);
+      });
+    m_threads->work_queue->queue(ctx, 0);
+  }
+}
+
+template <typename I>
+void InstanceReplayer<I>::wait_for_ops() {
+  dout(20) << dendl;
+
+  Context *ctx = create_context_callback<
+    InstanceReplayer, &InstanceReplayer<I>::handle_wait_for_ops>(this);
+
+  m_async_op_tracker.wait_for_ops(ctx);
+}
+
+template <typename I>
+void InstanceReplayer<I>::handle_wait_for_ops(int r) {
+  dout(20) << "r=" << r << dendl;
+
+  assert(r == 0);
+
+  Mutex::Locker locker(m_lock);
+  stop_image_replayers();
+}
+
+template <typename I>
+void InstanceReplayer<I>::stop_image_replayers() {
+  dout(20) << dendl;
+
+  assert(m_lock.is_locked());
+
+  Context *ctx = create_async_context_callback(
+    m_threads->work_queue, create_context_callback<InstanceReplayer<I>,
+    &InstanceReplayer<I>::handle_stop_image_replayers>(this));
+
+  C_Gather *gather_ctx = new C_Gather(g_ceph_context, ctx);
+  for (auto &it : m_image_replayers) {
+    stop_image_replayer(it.second, gather_ctx->new_sub());
+  }
+  gather_ctx->activate();
+}
+
+template <typename I>
+void InstanceReplayer<I>::handle_stop_image_replayers(int r) {
+  dout(20) << "r=" << r << dendl;
+
+  assert(r == 0);
+
+  Context *on_finish = nullptr;
+  {
+    Mutex::Locker locker(m_lock);
+
+    for (auto &it : m_image_replayers) {
+      assert(it.second->is_stopped());
+      it.second->destroy();
+    }
+    m_image_replayers.clear();
+
+    assert(m_on_shut_down != nullptr);
+    std::swap(on_finish, m_on_shut_down);
+  }
+  on_finish->complete(r);
+}
+
+template <typename I>
+void InstanceReplayer<I>::cancel_image_state_check_task() {
+  Mutex::Locker timer_locker(m_threads->timer_lock);
+
+  if (m_image_state_check_task == nullptr) {
+    return;
+  }
+
+  dout(20) << m_image_state_check_task << dendl;
+  bool canceled = m_threads->timer->cancel_event(m_image_state_check_task);
+  assert(canceled);
+  m_image_state_check_task = nullptr;
+}
+
+template <typename I>
+void InstanceReplayer<I>::schedule_image_state_check_task() {
+  assert(m_threads->timer_lock.is_locked());
+  assert(m_image_state_check_task == nullptr);
+
+  m_image_state_check_task = new FunctionContext(
+    [this](int r) {
+      assert(m_threads->timer_lock.is_locked());
+      m_image_state_check_task = nullptr;
+      schedule_image_state_check_task();
+      queue_start_image_replayers();
+    });
+
+  int after = g_ceph_context->_conf->get_val<int64_t>(
+    "rbd_mirror_image_state_check_interval");
+
+  dout(20) << "scheduling image state check after " << after << " sec (task "
+           << m_image_state_check_task << ")" << dendl;
+  m_threads->timer->add_event_after(after, m_image_state_check_task);
+}
+
+} // namespace mirror
+} // namespace rbd
+
+template class rbd::mirror::InstanceReplayer<librbd::ImageCtx>;