Fix some bugs when testing opensds ansible
[stor4nfv.git] / src / ceph / src / tools / rbd_mirror / ImageSyncThrottler.cc
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
3 /*
4  * Ceph - scalable distributed file system
5  *
6  * Copyright (C) 2016 SUSE LINUX GmbH
7  *
8  * This is free software; you can redistribute it and/or
9  * modify it under the terms of the GNU Lesser General Public
10  * License version 2.1, as published by the Free Software
11  * Foundation.  See file COPYING.
12  *
13  */
14
15 #include "ImageSyncThrottler.h"
16 #include "common/Formatter.h"
17 #include "common/debug.h"
18 #include "common/errno.h"
19 #include "librbd/Utils.h"
20
21 #define dout_context g_ceph_context
22 #define dout_subsys ceph_subsys_rbd_mirror
23 #undef dout_prefix
24 #define dout_prefix *_dout << "rbd::mirror::ImageSyncThrottler:: " << this \
25                            << " " << __func__ << ": "
26
27 namespace rbd {
28 namespace mirror {
29
30 template <typename I>
31 ImageSyncThrottler<I>::ImageSyncThrottler()
32   : m_lock(librbd::util::unique_lock_name("rbd::mirror::ImageSyncThrottler",
33                                           this)),
34     m_max_concurrent_syncs(g_ceph_context->_conf->get_val<uint64_t>(
35       "rbd_mirror_concurrent_image_syncs")) {
36   dout(20) << "max_concurrent_syncs=" << m_max_concurrent_syncs << dendl;
37   g_ceph_context->_conf->add_observer(this);
38 }
39
40 template <typename I>
41 ImageSyncThrottler<I>::~ImageSyncThrottler() {
42   g_ceph_context->_conf->remove_observer(this);
43
44   Mutex::Locker locker(m_lock);
45   assert(m_inflight_ops.empty());
46   assert(m_queue.empty());
47 }
48
49 template <typename I>
50 void ImageSyncThrottler<I>::start_op(const std::string &id, Context *on_start) {
51   dout(20) << "id=" << id << dendl;
52
53   {
54     Mutex::Locker locker(m_lock);
55
56     if (m_inflight_ops.count(id) > 0) {
57       dout(20) << "duplicate for already started op " << id << dendl;
58     } else if (m_max_concurrent_syncs == 0 ||
59                m_inflight_ops.size() < m_max_concurrent_syncs) {
60       assert(m_queue.empty());
61       m_inflight_ops.insert(id);
62       dout(20) << "ready to start sync for " << id << " ["
63                << m_inflight_ops.size() << "/" << m_max_concurrent_syncs << "]"
64                << dendl;
65     } else {
66       m_queue.push_back(std::make_pair(id, on_start));
67       on_start = nullptr;
68       dout(20) << "image sync for " << id << " has been queued" << dendl;
69     }
70   }
71
72   if (on_start != nullptr) {
73     on_start->complete(0);
74   }
75 }
76
77 template <typename I>
78 bool ImageSyncThrottler<I>::cancel_op(const std::string &id) {
79   dout(20) << "id=" << id << dendl;
80
81   Context *on_start = nullptr;
82   {
83     Mutex::Locker locker(m_lock);
84     for (auto it = m_queue.begin(); it != m_queue.end(); ++it) {
85       if (it->first == id) {
86         on_start = it->second;
87         dout(20) << "canceled queued sync for " << id << dendl;
88         m_queue.erase(it);
89         break;
90       }
91     }
92   }
93
94   if (on_start == nullptr) {
95     return false;
96   }
97
98   on_start->complete(-ECANCELED);
99   return true;
100 }
101
102 template <typename I>
103 void ImageSyncThrottler<I>::finish_op(const std::string &id) {
104   dout(20) << "id=" << id << dendl;
105
106   if (cancel_op(id)) {
107     return;
108   }
109
110   Context *on_start = nullptr;
111   {
112     Mutex::Locker locker(m_lock);
113
114     m_inflight_ops.erase(id);
115
116     if (m_inflight_ops.size() < m_max_concurrent_syncs && !m_queue.empty()) {
117       auto pair = m_queue.front();
118       m_inflight_ops.insert(pair.first);
119       dout(20) << "ready to start sync for " << pair.first << " ["
120                << m_inflight_ops.size() << "/" << m_max_concurrent_syncs << "]"
121                << dendl;
122       on_start= pair.second;
123       m_queue.pop_front();
124     }
125   }
126
127   if (on_start != nullptr) {
128     on_start->complete(0);
129   }
130 }
131
132 template <typename I>
133 void ImageSyncThrottler<I>::drain(int r) {
134   dout(20) << dendl;
135
136   std::list<std::pair<std::string, Context *>> queue;
137   {
138     Mutex::Locker locker(m_lock);
139     std::swap(m_queue, queue);
140     m_inflight_ops.clear();
141   }
142
143   for (auto &pair : queue) {
144     pair.second->complete(r);
145   }
146 }
147
148 template <typename I>
149 void ImageSyncThrottler<I>::set_max_concurrent_syncs(uint32_t max) {
150   dout(20) << "max=" << max << dendl;
151
152   std::list<Context *> ops;
153   {
154     Mutex::Locker locker(m_lock);
155     m_max_concurrent_syncs = max;
156
157     // Start waiting ops in the case of available free slots
158     while ((m_max_concurrent_syncs == 0 ||
159             m_inflight_ops.size() < m_max_concurrent_syncs) &&
160            !m_queue.empty()) {
161       auto pair = m_queue.front();
162       m_inflight_ops.insert(pair.first);
163       dout(20) << "ready to start sync for " << pair.first << " ["
164                << m_inflight_ops.size() << "/" << m_max_concurrent_syncs << "]"
165                << dendl;
166       ops.push_back(pair.second);
167       m_queue.pop_front();
168     }
169   }
170
171   for (const auto& ctx : ops) {
172     ctx->complete(0);
173   }
174 }
175
176 template <typename I>
177 void ImageSyncThrottler<I>::print_status(Formatter *f, std::stringstream *ss) {
178   dout(20) << dendl;
179
180   Mutex::Locker locker(m_lock);
181
182   if (f) {
183     f->dump_int("max_parallel_syncs", m_max_concurrent_syncs);
184     f->dump_int("running_syncs", m_inflight_ops.size());
185     f->dump_int("waiting_syncs", m_queue.size());
186     f->flush(*ss);
187   } else {
188     *ss << "[ ";
189     *ss << "max_parallel_syncs=" << m_max_concurrent_syncs << ", ";
190     *ss << "running_syncs=" << m_inflight_ops.size() << ", ";
191     *ss << "waiting_syncs=" << m_queue.size() << " ]";
192   }
193 }
194
195 template <typename I>
196 const char** ImageSyncThrottler<I>::get_tracked_conf_keys() const {
197   static const char* KEYS[] = {
198     "rbd_mirror_concurrent_image_syncs",
199     NULL
200   };
201   return KEYS;
202 }
203
204 template <typename I>
205 void ImageSyncThrottler<I>::handle_conf_change(const struct md_config_t *conf,
206                                       const set<string> &changed) {
207   if (changed.count("rbd_mirror_concurrent_image_syncs")) {
208     set_max_concurrent_syncs(conf->get_val<uint64_t>("rbd_mirror_concurrent_image_syncs"));
209   }
210 }
211
212 } // namespace mirror
213 } // namespace rbd
214
215 template class rbd::mirror::ImageSyncThrottler<librbd::ImageCtx>;