X-Git-Url: https://gerrit.opnfv.org/gerrit/gitweb?a=blobdiff_plain;f=src%2Fceph%2Fsrc%2Ftools%2Frbd_mirror%2FImageSyncThrottler.cc;fp=src%2Fceph%2Fsrc%2Ftools%2Frbd_mirror%2FImageSyncThrottler.cc;h=dfa96ed4d3e463f3a21faca658958aa54398e99f;hb=812ff6ca9fcd3e629e49d4328905f33eee8ca3f5;hp=0000000000000000000000000000000000000000;hpb=15280273faafb77777eab341909a3f495cf248d9;p=stor4nfv.git diff --git a/src/ceph/src/tools/rbd_mirror/ImageSyncThrottler.cc b/src/ceph/src/tools/rbd_mirror/ImageSyncThrottler.cc new file mode 100644 index 0000000..dfa96ed --- /dev/null +++ b/src/ceph/src/tools/rbd_mirror/ImageSyncThrottler.cc @@ -0,0 +1,215 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab +/* + * Ceph - scalable distributed file system + * + * Copyright (C) 2016 SUSE LINUX GmbH + * + * This is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License version 2.1, as published by the Free Software + * Foundation. See file COPYING. + * + */ + +#include "ImageSyncThrottler.h" +#include "common/Formatter.h" +#include "common/debug.h" +#include "common/errno.h" +#include "librbd/Utils.h" + +#define dout_context g_ceph_context +#define dout_subsys ceph_subsys_rbd_mirror +#undef dout_prefix +#define dout_prefix *_dout << "rbd::mirror::ImageSyncThrottler:: " << this \ + << " " << __func__ << ": " + +namespace rbd { +namespace mirror { + +template +ImageSyncThrottler::ImageSyncThrottler() + : m_lock(librbd::util::unique_lock_name("rbd::mirror::ImageSyncThrottler", + this)), + m_max_concurrent_syncs(g_ceph_context->_conf->get_val( + "rbd_mirror_concurrent_image_syncs")) { + dout(20) << "max_concurrent_syncs=" << m_max_concurrent_syncs << dendl; + g_ceph_context->_conf->add_observer(this); +} + +template +ImageSyncThrottler::~ImageSyncThrottler() { + g_ceph_context->_conf->remove_observer(this); + + Mutex::Locker locker(m_lock); + assert(m_inflight_ops.empty()); + assert(m_queue.empty()); +} + +template +void ImageSyncThrottler::start_op(const std::string &id, Context *on_start) { + dout(20) << "id=" << id << dendl; + + { + Mutex::Locker locker(m_lock); + + if (m_inflight_ops.count(id) > 0) { + dout(20) << "duplicate for already started op " << id << dendl; + } else if (m_max_concurrent_syncs == 0 || + m_inflight_ops.size() < m_max_concurrent_syncs) { + assert(m_queue.empty()); + m_inflight_ops.insert(id); + dout(20) << "ready to start sync for " << id << " [" + << m_inflight_ops.size() << "/" << m_max_concurrent_syncs << "]" + << dendl; + } else { + m_queue.push_back(std::make_pair(id, on_start)); + on_start = nullptr; + dout(20) << "image sync for " << id << " has been queued" << dendl; + } + } + + if (on_start != nullptr) { + on_start->complete(0); + } +} + +template +bool ImageSyncThrottler::cancel_op(const std::string &id) { + dout(20) << "id=" << id << dendl; + + Context *on_start = nullptr; + { + Mutex::Locker locker(m_lock); + for (auto it = m_queue.begin(); it != m_queue.end(); ++it) { + if (it->first == id) { + on_start = it->second; + dout(20) << "canceled queued sync for " << id << dendl; + m_queue.erase(it); + break; + } + } + } + + if (on_start == nullptr) { + return false; + } + + on_start->complete(-ECANCELED); + return true; +} + +template +void ImageSyncThrottler::finish_op(const std::string &id) { + dout(20) << "id=" << id << dendl; + + if (cancel_op(id)) { + return; + } + + Context *on_start = nullptr; + { + Mutex::Locker locker(m_lock); + + m_inflight_ops.erase(id); + + if (m_inflight_ops.size() < m_max_concurrent_syncs && !m_queue.empty()) { + auto pair = m_queue.front(); + m_inflight_ops.insert(pair.first); + dout(20) << "ready to start sync for " << pair.first << " [" + << m_inflight_ops.size() << "/" << m_max_concurrent_syncs << "]" + << dendl; + on_start= pair.second; + m_queue.pop_front(); + } + } + + if (on_start != nullptr) { + on_start->complete(0); + } +} + +template +void ImageSyncThrottler::drain(int r) { + dout(20) << dendl; + + std::list> queue; + { + Mutex::Locker locker(m_lock); + std::swap(m_queue, queue); + m_inflight_ops.clear(); + } + + for (auto &pair : queue) { + pair.second->complete(r); + } +} + +template +void ImageSyncThrottler::set_max_concurrent_syncs(uint32_t max) { + dout(20) << "max=" << max << dendl; + + std::list ops; + { + Mutex::Locker locker(m_lock); + m_max_concurrent_syncs = max; + + // Start waiting ops in the case of available free slots + while ((m_max_concurrent_syncs == 0 || + m_inflight_ops.size() < m_max_concurrent_syncs) && + !m_queue.empty()) { + auto pair = m_queue.front(); + m_inflight_ops.insert(pair.first); + dout(20) << "ready to start sync for " << pair.first << " [" + << m_inflight_ops.size() << "/" << m_max_concurrent_syncs << "]" + << dendl; + ops.push_back(pair.second); + m_queue.pop_front(); + } + } + + for (const auto& ctx : ops) { + ctx->complete(0); + } +} + +template +void ImageSyncThrottler::print_status(Formatter *f, std::stringstream *ss) { + dout(20) << dendl; + + Mutex::Locker locker(m_lock); + + if (f) { + f->dump_int("max_parallel_syncs", m_max_concurrent_syncs); + f->dump_int("running_syncs", m_inflight_ops.size()); + f->dump_int("waiting_syncs", m_queue.size()); + f->flush(*ss); + } else { + *ss << "[ "; + *ss << "max_parallel_syncs=" << m_max_concurrent_syncs << ", "; + *ss << "running_syncs=" << m_inflight_ops.size() << ", "; + *ss << "waiting_syncs=" << m_queue.size() << " ]"; + } +} + +template +const char** ImageSyncThrottler::get_tracked_conf_keys() const { + static const char* KEYS[] = { + "rbd_mirror_concurrent_image_syncs", + NULL + }; + return KEYS; +} + +template +void ImageSyncThrottler::handle_conf_change(const struct md_config_t *conf, + const set &changed) { + if (changed.count("rbd_mirror_concurrent_image_syncs")) { + set_max_concurrent_syncs(conf->get_val("rbd_mirror_concurrent_image_syncs")); + } +} + +} // namespace mirror +} // namespace rbd + +template class rbd::mirror::ImageSyncThrottler;