X-Git-Url: https://gerrit.opnfv.org/gerrit/gitweb?a=blobdiff_plain;f=src%2Fceph%2Fsrc%2Ftools%2Frbd_mirror%2FImageSyncThrottler.cc;fp=src%2Fceph%2Fsrc%2Ftools%2Frbd_mirror%2FImageSyncThrottler.cc;h=0000000000000000000000000000000000000000;hb=7da45d65be36d36b880cc55c5036e96c24b53f00;hp=dfa96ed4d3e463f3a21faca658958aa54398e99f;hpb=691462d09d0987b47e112d6ee8740375df3c51b2;p=stor4nfv.git diff --git a/src/ceph/src/tools/rbd_mirror/ImageSyncThrottler.cc b/src/ceph/src/tools/rbd_mirror/ImageSyncThrottler.cc deleted file mode 100644 index dfa96ed..0000000 --- a/src/ceph/src/tools/rbd_mirror/ImageSyncThrottler.cc +++ /dev/null @@ -1,215 +0,0 @@ -// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- -// vim: ts=8 sw=2 smarttab -/* - * Ceph - scalable distributed file system - * - * Copyright (C) 2016 SUSE LINUX GmbH - * - * This is free software; you can redistribute it and/or - * modify it under the terms of the GNU Lesser General Public - * License version 2.1, as published by the Free Software - * Foundation. See file COPYING. - * - */ - -#include "ImageSyncThrottler.h" -#include "common/Formatter.h" -#include "common/debug.h" -#include "common/errno.h" -#include "librbd/Utils.h" - -#define dout_context g_ceph_context -#define dout_subsys ceph_subsys_rbd_mirror -#undef dout_prefix -#define dout_prefix *_dout << "rbd::mirror::ImageSyncThrottler:: " << this \ - << " " << __func__ << ": " - -namespace rbd { -namespace mirror { - -template -ImageSyncThrottler::ImageSyncThrottler() - : m_lock(librbd::util::unique_lock_name("rbd::mirror::ImageSyncThrottler", - this)), - m_max_concurrent_syncs(g_ceph_context->_conf->get_val( - "rbd_mirror_concurrent_image_syncs")) { - dout(20) << "max_concurrent_syncs=" << m_max_concurrent_syncs << dendl; - g_ceph_context->_conf->add_observer(this); -} - -template -ImageSyncThrottler::~ImageSyncThrottler() { - g_ceph_context->_conf->remove_observer(this); - - Mutex::Locker locker(m_lock); - assert(m_inflight_ops.empty()); - assert(m_queue.empty()); -} - -template -void ImageSyncThrottler::start_op(const std::string &id, Context *on_start) { - dout(20) << "id=" << id << dendl; - - { - Mutex::Locker locker(m_lock); - - if (m_inflight_ops.count(id) > 0) { - dout(20) << "duplicate for already started op " << id << dendl; - } else if (m_max_concurrent_syncs == 0 || - m_inflight_ops.size() < m_max_concurrent_syncs) { - assert(m_queue.empty()); - m_inflight_ops.insert(id); - dout(20) << "ready to start sync for " << id << " [" - << m_inflight_ops.size() << "/" << m_max_concurrent_syncs << "]" - << dendl; - } else { - m_queue.push_back(std::make_pair(id, on_start)); - on_start = nullptr; - dout(20) << "image sync for " << id << " has been queued" << dendl; - } - } - - if (on_start != nullptr) { - on_start->complete(0); - } -} - -template -bool ImageSyncThrottler::cancel_op(const std::string &id) { - dout(20) << "id=" << id << dendl; - - Context *on_start = nullptr; - { - Mutex::Locker locker(m_lock); - for (auto it = m_queue.begin(); it != m_queue.end(); ++it) { - if (it->first == id) { - on_start = it->second; - dout(20) << "canceled queued sync for " << id << dendl; - m_queue.erase(it); - break; - } - } - } - - if (on_start == nullptr) { - return false; - } - - on_start->complete(-ECANCELED); - return true; -} - -template -void ImageSyncThrottler::finish_op(const std::string &id) { - dout(20) << "id=" << id << dendl; - - if (cancel_op(id)) { - return; - } - - Context *on_start = nullptr; - { - Mutex::Locker locker(m_lock); - - m_inflight_ops.erase(id); - - if (m_inflight_ops.size() < m_max_concurrent_syncs && !m_queue.empty()) { - auto pair = m_queue.front(); - m_inflight_ops.insert(pair.first); - dout(20) << "ready to start sync for " << pair.first << " [" - << m_inflight_ops.size() << "/" << m_max_concurrent_syncs << "]" - << dendl; - on_start= pair.second; - m_queue.pop_front(); - } - } - - if (on_start != nullptr) { - on_start->complete(0); - } -} - -template -void ImageSyncThrottler::drain(int r) { - dout(20) << dendl; - - std::list> queue; - { - Mutex::Locker locker(m_lock); - std::swap(m_queue, queue); - m_inflight_ops.clear(); - } - - for (auto &pair : queue) { - pair.second->complete(r); - } -} - -template -void ImageSyncThrottler::set_max_concurrent_syncs(uint32_t max) { - dout(20) << "max=" << max << dendl; - - std::list ops; - { - Mutex::Locker locker(m_lock); - m_max_concurrent_syncs = max; - - // Start waiting ops in the case of available free slots - while ((m_max_concurrent_syncs == 0 || - m_inflight_ops.size() < m_max_concurrent_syncs) && - !m_queue.empty()) { - auto pair = m_queue.front(); - m_inflight_ops.insert(pair.first); - dout(20) << "ready to start sync for " << pair.first << " [" - << m_inflight_ops.size() << "/" << m_max_concurrent_syncs << "]" - << dendl; - ops.push_back(pair.second); - m_queue.pop_front(); - } - } - - for (const auto& ctx : ops) { - ctx->complete(0); - } -} - -template -void ImageSyncThrottler::print_status(Formatter *f, std::stringstream *ss) { - dout(20) << dendl; - - Mutex::Locker locker(m_lock); - - if (f) { - f->dump_int("max_parallel_syncs", m_max_concurrent_syncs); - f->dump_int("running_syncs", m_inflight_ops.size()); - f->dump_int("waiting_syncs", m_queue.size()); - f->flush(*ss); - } else { - *ss << "[ "; - *ss << "max_parallel_syncs=" << m_max_concurrent_syncs << ", "; - *ss << "running_syncs=" << m_inflight_ops.size() << ", "; - *ss << "waiting_syncs=" << m_queue.size() << " ]"; - } -} - -template -const char** ImageSyncThrottler::get_tracked_conf_keys() const { - static const char* KEYS[] = { - "rbd_mirror_concurrent_image_syncs", - NULL - }; - return KEYS; -} - -template -void ImageSyncThrottler::handle_conf_change(const struct md_config_t *conf, - const set &changed) { - if (changed.count("rbd_mirror_concurrent_image_syncs")) { - set_max_concurrent_syncs(conf->get_val("rbd_mirror_concurrent_image_syncs")); - } -} - -} // namespace mirror -} // namespace rbd - -template class rbd::mirror::ImageSyncThrottler;