1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
4 #include "include/stringify.h"
5 #include "common/Timer.h"
6 #include "common/debug.h"
7 #include "common/errno.h"
8 #include "librbd/Utils.h"
9 #include "ImageReplayer.h"
10 #include "InstanceReplayer.h"
11 #include "ServiceDaemon.h"
14 #define dout_context g_ceph_context
15 #define dout_subsys ceph_subsys_rbd_mirror
17 #define dout_prefix *_dout << "rbd::mirror::InstanceReplayer: " \
18 << this << " " << __func__ << ": "
25 const std::string SERVICE_DAEMON_ASSIGNED_COUNT_KEY("image_assigned_count");
26 const std::string SERVICE_DAEMON_WARNING_COUNT_KEY("image_warning_count");
27 const std::string SERVICE_DAEMON_ERROR_COUNT_KEY("image_error_count");
29 } // anonymous namespace
31 using librbd::util::create_async_context_callback;
32 using librbd::util::create_context_callback;
35 InstanceReplayer<I>::InstanceReplayer(
36 Threads<I> *threads, ServiceDaemon<I>* service_daemon,
37 ImageDeleter<I>* image_deleter, RadosRef local_rados,
38 const std::string &local_mirror_uuid, int64_t local_pool_id)
39 : m_threads(threads), m_service_daemon(service_daemon),
40 m_image_deleter(image_deleter), m_local_rados(local_rados),
41 m_local_mirror_uuid(local_mirror_uuid), m_local_pool_id(local_pool_id),
42 m_lock("rbd::mirror::InstanceReplayer " + stringify(local_pool_id)) {
46 InstanceReplayer<I>::~InstanceReplayer() {
47 assert(m_image_state_check_task == nullptr);
48 assert(m_async_op_tracker.empty());
49 assert(m_image_replayers.empty());
53 int InstanceReplayer<I>::init() {
56 return init_ctx.wait();
60 void InstanceReplayer<I>::init(Context *on_finish) {
63 Context *ctx = new FunctionContext(
64 [this, on_finish] (int r) {
66 Mutex::Locker timer_locker(m_threads->timer_lock);
67 schedule_image_state_check_task();
69 on_finish->complete(0);
72 m_threads->work_queue->queue(ctx, 0);
76 void InstanceReplayer<I>::shut_down() {
77 C_SaferCond shut_down_ctx;
78 shut_down(&shut_down_ctx);
79 int r = shut_down_ctx.wait();
84 void InstanceReplayer<I>::shut_down(Context *on_finish) {
87 Mutex::Locker locker(m_lock);
89 assert(m_on_shut_down == nullptr);
90 m_on_shut_down = on_finish;
92 Context *ctx = new FunctionContext(
94 cancel_image_state_check_task();
98 m_threads->work_queue->queue(ctx, 0);
101 template <typename I>
102 void InstanceReplayer<I>::add_peer(std::string peer_uuid,
103 librados::IoCtx io_ctx) {
104 dout(20) << peer_uuid << dendl;
106 Mutex::Locker locker(m_lock);
107 auto result = m_peers.insert(Peer(peer_uuid, io_ctx)).second;
111 template <typename I>
112 void InstanceReplayer<I>::release_all(Context *on_finish) {
115 Mutex::Locker locker(m_lock);
117 C_Gather *gather_ctx = new C_Gather(g_ceph_context, on_finish);
118 for (auto it = m_image_replayers.begin(); it != m_image_replayers.end();
119 it = m_image_replayers.erase(it)) {
120 auto image_replayer = it->second;
121 auto ctx = gather_ctx->new_sub();
122 ctx = new FunctionContext(
123 [image_replayer, ctx] (int r) {
124 image_replayer->destroy();
127 stop_image_replayer(image_replayer, ctx);
129 gather_ctx->activate();
132 template <typename I>
133 void InstanceReplayer<I>::acquire_image(InstanceWatcher<I> *instance_watcher,
134 const std::string &global_image_id,
135 Context *on_finish) {
136 dout(20) << "global_image_id=" << global_image_id << dendl;
138 Mutex::Locker locker(m_lock);
140 assert(m_on_shut_down == nullptr);
142 auto it = m_image_replayers.find(global_image_id);
143 if (it == m_image_replayers.end()) {
144 auto image_replayer = ImageReplayer<I>::create(
145 m_threads, m_image_deleter, instance_watcher, m_local_rados,
146 m_local_mirror_uuid, m_local_pool_id, global_image_id);
148 dout(20) << global_image_id << ": creating replayer " << image_replayer
151 it = m_image_replayers.insert(std::make_pair(global_image_id,
152 image_replayer)).first;
154 // TODO only a single peer is currently supported
155 assert(m_peers.size() == 1);
156 auto peer = *m_peers.begin();
157 image_replayer->add_peer(peer.peer_uuid, peer.io_ctx);
160 auto& image_replayer = it->second;
161 // TODO temporary until policy integrated
162 image_replayer->set_finished(false);
164 start_image_replayer(image_replayer);
165 m_threads->work_queue->queue(on_finish, 0);
168 template <typename I>
169 void InstanceReplayer<I>::release_image(const std::string &global_image_id,
170 Context *on_finish) {
171 dout(20) << "global_image_id=" << global_image_id << dendl;
173 Mutex::Locker locker(m_lock);
174 assert(m_on_shut_down == nullptr);
176 auto it = m_image_replayers.find(global_image_id);
177 if (it == m_image_replayers.end()) {
178 dout(20) << global_image_id << ": not found" << dendl;
179 m_threads->work_queue->queue(on_finish, 0);
183 auto image_replayer = it->second;
184 m_image_replayers.erase(it);
186 on_finish = new FunctionContext(
187 [image_replayer, on_finish] (int r) {
188 image_replayer->destroy();
189 on_finish->complete(0);
191 stop_image_replayer(image_replayer, on_finish);
194 template <typename I>
195 void InstanceReplayer<I>::remove_peer_image(const std::string &global_image_id,
196 const std::string &peer_mirror_uuid,
197 Context *on_finish) {
198 dout(20) << "global_image_id=" << global_image_id << ", "
199 << "peer_mirror_uuid=" << peer_mirror_uuid << dendl;
201 Mutex::Locker locker(m_lock);
202 assert(m_on_shut_down == nullptr);
204 auto it = m_image_replayers.find(global_image_id);
205 if (it != m_image_replayers.end()) {
206 // TODO only a single peer is currently supported, therefore
207 // we can just interrupt the current image replayer and
208 // it will eventually detect that the peer image is missing and
209 // determine if a delete propagation is required.
210 auto image_replayer = it->second;
211 image_replayer->restart();
213 m_threads->work_queue->queue(on_finish, 0);
216 template <typename I>
217 void InstanceReplayer<I>::print_status(Formatter *f, stringstream *ss) {
224 Mutex::Locker locker(m_lock);
226 f->open_array_section("image_replayers");
227 for (auto &kv : m_image_replayers) {
228 auto &image_replayer = kv.second;
229 image_replayer->print_status(f, ss);
234 template <typename I>
235 void InstanceReplayer<I>::start()
239 Mutex::Locker locker(m_lock);
241 m_manual_stop = false;
243 for (auto &kv : m_image_replayers) {
244 auto &image_replayer = kv.second;
245 image_replayer->start(nullptr, true);
249 template <typename I>
250 void InstanceReplayer<I>::stop()
254 Mutex::Locker locker(m_lock);
256 m_manual_stop = true;
258 for (auto &kv : m_image_replayers) {
259 auto &image_replayer = kv.second;
260 image_replayer->stop(nullptr, true);
264 template <typename I>
265 void InstanceReplayer<I>::restart()
269 Mutex::Locker locker(m_lock);
271 m_manual_stop = false;
273 for (auto &kv : m_image_replayers) {
274 auto &image_replayer = kv.second;
275 image_replayer->restart();
279 template <typename I>
280 void InstanceReplayer<I>::flush()
282 dout(20) << "enter" << dendl;
284 Mutex::Locker locker(m_lock);
286 for (auto &kv : m_image_replayers) {
287 auto &image_replayer = kv.second;
288 image_replayer->flush();
292 template <typename I>
293 void InstanceReplayer<I>::start_image_replayer(
294 ImageReplayer<I> *image_replayer) {
295 assert(m_lock.is_locked());
297 std::string global_image_id = image_replayer->get_global_image_id();
298 dout(20) << "global_image_id=" << global_image_id << dendl;
300 if (!image_replayer->is_stopped()) {
302 } else if (image_replayer->is_blacklisted()) {
303 derr << "blacklisted detected during image replay" << dendl;
305 } else if (image_replayer->is_finished()) {
306 // TODO temporary until policy integrated
307 dout(5) << "removing image replayer for global_image_id="
308 << global_image_id << dendl;
309 m_image_replayers.erase(image_replayer->get_global_image_id());
310 image_replayer->destroy();
314 image_replayer->start(nullptr, false);
317 template <typename I>
318 void InstanceReplayer<I>::queue_start_image_replayers() {
321 Context *ctx = create_context_callback<
322 InstanceReplayer, &InstanceReplayer<I>::start_image_replayers>(this);
323 m_async_op_tracker.start_op();
324 m_threads->work_queue->queue(ctx, 0);
327 template <typename I>
328 void InstanceReplayer<I>::start_image_replayers(int r) {
331 Mutex::Locker locker(m_lock);
332 if (m_on_shut_down != nullptr) {
336 uint64_t image_count = 0;
337 uint64_t warning_count = 0;
338 uint64_t error_count = 0;
339 for (auto it = m_image_replayers.begin();
340 it != m_image_replayers.end();) {
345 auto health_state = current_it->second->get_health_state();
346 if (health_state == image_replayer::HEALTH_STATE_WARNING) {
348 } else if (health_state == image_replayer::HEALTH_STATE_ERROR) {
352 start_image_replayer(current_it->second);
355 m_service_daemon->add_or_update_attribute(
356 m_local_pool_id, SERVICE_DAEMON_ASSIGNED_COUNT_KEY, image_count);
357 m_service_daemon->add_or_update_attribute(
358 m_local_pool_id, SERVICE_DAEMON_WARNING_COUNT_KEY, warning_count);
359 m_service_daemon->add_or_update_attribute(
360 m_local_pool_id, SERVICE_DAEMON_ERROR_COUNT_KEY, error_count);
362 m_async_op_tracker.finish_op();
365 template <typename I>
366 void InstanceReplayer<I>::stop_image_replayer(ImageReplayer<I> *image_replayer,
367 Context *on_finish) {
368 dout(20) << image_replayer << " global_image_id="
369 << image_replayer->get_global_image_id() << ", on_finish="
370 << on_finish << dendl;
372 if (image_replayer->is_stopped()) {
373 m_threads->work_queue->queue(on_finish, 0);
377 m_async_op_tracker.start_op();
378 Context *ctx = create_async_context_callback(
379 m_threads->work_queue, new FunctionContext(
380 [this, image_replayer, on_finish] (int r) {
381 stop_image_replayer(image_replayer, on_finish);
382 m_async_op_tracker.finish_op();
385 if (image_replayer->is_running()) {
386 image_replayer->stop(ctx, false);
389 dout(20) << "scheduling image replayer " << image_replayer << " stop after "
390 << after << " sec (task " << ctx << ")" << dendl;
391 ctx = new FunctionContext(
392 [this, after, ctx] (int r) {
393 Mutex::Locker timer_locker(m_threads->timer_lock);
394 m_threads->timer->add_event_after(after, ctx);
396 m_threads->work_queue->queue(ctx, 0);
400 template <typename I>
401 void InstanceReplayer<I>::wait_for_ops() {
404 Context *ctx = create_context_callback<
405 InstanceReplayer, &InstanceReplayer<I>::handle_wait_for_ops>(this);
407 m_async_op_tracker.wait_for_ops(ctx);
410 template <typename I>
411 void InstanceReplayer<I>::handle_wait_for_ops(int r) {
412 dout(20) << "r=" << r << dendl;
416 Mutex::Locker locker(m_lock);
417 stop_image_replayers();
420 template <typename I>
421 void InstanceReplayer<I>::stop_image_replayers() {
424 assert(m_lock.is_locked());
426 Context *ctx = create_async_context_callback(
427 m_threads->work_queue, create_context_callback<InstanceReplayer<I>,
428 &InstanceReplayer<I>::handle_stop_image_replayers>(this));
430 C_Gather *gather_ctx = new C_Gather(g_ceph_context, ctx);
431 for (auto &it : m_image_replayers) {
432 stop_image_replayer(it.second, gather_ctx->new_sub());
434 gather_ctx->activate();
437 template <typename I>
438 void InstanceReplayer<I>::handle_stop_image_replayers(int r) {
439 dout(20) << "r=" << r << dendl;
443 Context *on_finish = nullptr;
445 Mutex::Locker locker(m_lock);
447 for (auto &it : m_image_replayers) {
448 assert(it.second->is_stopped());
449 it.second->destroy();
451 m_image_replayers.clear();
453 assert(m_on_shut_down != nullptr);
454 std::swap(on_finish, m_on_shut_down);
456 on_finish->complete(r);
459 template <typename I>
460 void InstanceReplayer<I>::cancel_image_state_check_task() {
461 Mutex::Locker timer_locker(m_threads->timer_lock);
463 if (m_image_state_check_task == nullptr) {
467 dout(20) << m_image_state_check_task << dendl;
468 bool canceled = m_threads->timer->cancel_event(m_image_state_check_task);
470 m_image_state_check_task = nullptr;
473 template <typename I>
474 void InstanceReplayer<I>::schedule_image_state_check_task() {
475 assert(m_threads->timer_lock.is_locked());
476 assert(m_image_state_check_task == nullptr);
478 m_image_state_check_task = new FunctionContext(
480 assert(m_threads->timer_lock.is_locked());
481 m_image_state_check_task = nullptr;
482 schedule_image_state_check_task();
483 queue_start_image_replayers();
486 int after = g_ceph_context->_conf->get_val<int64_t>(
487 "rbd_mirror_image_state_check_interval");
489 dout(20) << "scheduling image state check after " << after << " sec (task "
490 << m_image_state_check_task << ")" << dendl;
491 m_threads->timer->add_event_after(after, m_image_state_check_task);
494 } // namespace mirror
497 template class rbd::mirror::InstanceReplayer<librbd::ImageCtx>;