Fix some bugs when testing opensds ansible
[stor4nfv.git] / src / ceph / src / librbd / journal / Replay.cc
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
3
4 #include "librbd/journal/Replay.h"
5 #include "common/dout.h"
6 #include "common/errno.h"
7 #include "common/WorkQueue.h"
8 #include "librbd/ExclusiveLock.h"
9 #include "librbd/ImageCtx.h"
10 #include "librbd/ImageState.h"
11 #include "librbd/internal.h"
12 #include "librbd/Operations.h"
13 #include "librbd/Utils.h"
14 #include "librbd/io/AioCompletion.h"
15 #include "librbd/io/ImageRequest.h"
16
17 #define dout_subsys ceph_subsys_rbd
18 #undef dout_prefix
19 #define dout_prefix *_dout << "librbd::journal::Replay: " << this << " "
20
21 namespace librbd {
22 namespace journal {
23
24 namespace {
25
26 static const uint64_t IN_FLIGHT_IO_LOW_WATER_MARK(32);
27 static const uint64_t IN_FLIGHT_IO_HIGH_WATER_MARK(64);
28
29 static NoOpProgressContext no_op_progress_callback;
30
31 template <typename I, typename E>
32 struct ExecuteOp : public Context {
33   I &image_ctx;
34   E event;
35   Context *on_op_complete;
36
37   ExecuteOp(I &image_ctx, const E &event, Context *on_op_complete)
38     : image_ctx(image_ctx), event(event), on_op_complete(on_op_complete) {
39   }
40
41   void execute(const journal::SnapCreateEvent &_) {
42     image_ctx.operations->execute_snap_create(event.snap_namespace,
43                                               event.snap_name,
44                                               on_op_complete,
45                                               event.op_tid, false);
46   }
47
48   void execute(const journal::SnapRemoveEvent &_) {
49     image_ctx.operations->execute_snap_remove(event.snap_namespace,
50                                               event.snap_name,
51                                               on_op_complete);
52   }
53
54   void execute(const journal::SnapRenameEvent &_) {
55     image_ctx.operations->execute_snap_rename(event.snap_id,
56                                               event.dst_snap_name,
57                                               on_op_complete);
58   }
59
60   void execute(const journal::SnapProtectEvent &_) {
61     image_ctx.operations->execute_snap_protect(event.snap_namespace,
62                                                event.snap_name,
63                                                on_op_complete);
64   }
65
66   void execute(const journal::SnapUnprotectEvent &_) {
67     image_ctx.operations->execute_snap_unprotect(event.snap_namespace,
68                                                  event.snap_name,
69                                                  on_op_complete);
70   }
71
72   void execute(const journal::SnapRollbackEvent &_) {
73     image_ctx.operations->execute_snap_rollback(event.snap_namespace,
74                                                 event.snap_name,
75                                                 no_op_progress_callback,
76                                                 on_op_complete);
77   }
78
79   void execute(const journal::RenameEvent &_) {
80     image_ctx.operations->execute_rename(event.image_name,
81                                          on_op_complete);
82   }
83
84   void execute(const journal::ResizeEvent &_) {
85     image_ctx.operations->execute_resize(event.size, true, no_op_progress_callback,
86                                          on_op_complete, event.op_tid);
87   }
88
89   void execute(const journal::FlattenEvent &_) {
90     image_ctx.operations->execute_flatten(no_op_progress_callback,
91                                           on_op_complete);
92   }
93
94   void execute(const journal::SnapLimitEvent &_) {
95     image_ctx.operations->execute_snap_set_limit(event.limit, on_op_complete);
96   }
97
98   void execute(const journal::UpdateFeaturesEvent &_) {
99     image_ctx.operations->execute_update_features(event.features, event.enabled,
100                                                   on_op_complete, event.op_tid);
101   }
102
103   void execute(const journal::MetadataSetEvent &_) {
104     image_ctx.operations->execute_metadata_set(event.key, event.value,
105                                                on_op_complete);
106   }
107
108   void execute(const journal::MetadataRemoveEvent &_) {
109     image_ctx.operations->execute_metadata_remove(event.key, on_op_complete);
110   }
111
112   void finish(int r) override {
113     CephContext *cct = image_ctx.cct;
114     if (r < 0) {
115       lderr(cct) << ": ExecuteOp::" << __func__ << ": r=" << r << dendl;
116       on_op_complete->complete(r);
117       return;
118     }
119
120     ldout(cct, 20) << ": ExecuteOp::" << __func__ << dendl;
121     RWLock::RLocker owner_locker(image_ctx.owner_lock);
122
123     if (image_ctx.exclusive_lock == nullptr ||
124         !image_ctx.exclusive_lock->accept_ops()) {
125       ldout(cct, 5) << ": lost exclusive lock -- skipping op" << dendl;
126       on_op_complete->complete(-ECANCELED);
127       return;
128     }
129
130     execute(event);
131   }
132 };
133
134 template <typename I>
135 struct C_RefreshIfRequired : public Context {
136   I &image_ctx;
137   Context *on_finish;
138
139   C_RefreshIfRequired(I &image_ctx, Context *on_finish)
140     : image_ctx(image_ctx), on_finish(on_finish) {
141   }
142   ~C_RefreshIfRequired() override {
143     delete on_finish;
144   }
145
146   void finish(int r) override {
147     CephContext *cct = image_ctx.cct;
148     Context *ctx = on_finish;
149     on_finish = nullptr;
150
151     if (r < 0) {
152       lderr(cct) << ": C_RefreshIfRequired::" << __func__ << ": r=" << r << dendl;
153       image_ctx.op_work_queue->queue(ctx, r);
154       return;
155     }
156
157     if (image_ctx.state->is_refresh_required()) {
158       ldout(cct, 20) << ": C_RefreshIfRequired::" << __func__ << ": "
159                      << "refresh required" << dendl;
160       image_ctx.state->refresh(ctx);
161       return;
162     }
163
164     image_ctx.op_work_queue->queue(ctx, 0);
165   }
166 };
167
168 } // anonymous namespace
169
170 #undef dout_prefix
171 #define dout_prefix *_dout << "librbd::journal::Replay: " << this << " " \
172                            << __func__
173
174 template <typename I>
175 Replay<I>::Replay(I &image_ctx)
176   : m_image_ctx(image_ctx), m_lock("Replay<I>::m_lock") {
177 }
178
179 template <typename I>
180 Replay<I>::~Replay() {
181   assert(m_in_flight_aio_flush == 0);
182   assert(m_in_flight_aio_modify == 0);
183   assert(m_aio_modify_unsafe_contexts.empty());
184   assert(m_aio_modify_safe_contexts.empty());
185   assert(m_op_events.empty());
186   assert(m_in_flight_op_events == 0);
187 }
188
189 template <typename I>
190 int Replay<I>::decode(bufferlist::iterator *it, EventEntry *event_entry) {
191   try {
192     ::decode(*event_entry, *it);
193   } catch (const buffer::error &err) {
194     return -EBADMSG;
195   }
196   return 0;
197 }
198
199 template <typename I>
200 void Replay<I>::process(const EventEntry &event_entry,
201                         Context *on_ready, Context *on_safe) {
202   CephContext *cct = m_image_ctx.cct;
203   ldout(cct, 20) << ": on_ready=" << on_ready << ", on_safe=" << on_safe
204                  << dendl;
205
206   on_ready = util::create_async_context_callback(m_image_ctx, on_ready);
207
208   RWLock::RLocker owner_lock(m_image_ctx.owner_lock);
209   if (m_image_ctx.exclusive_lock == nullptr ||
210       !m_image_ctx.exclusive_lock->accept_ops()) {
211     ldout(cct, 5) << ": lost exclusive lock -- skipping event" << dendl;
212     m_image_ctx.op_work_queue->queue(on_safe, -ECANCELED);
213     on_ready->complete(0);
214     return;
215   }
216
217   boost::apply_visitor(EventVisitor(this, on_ready, on_safe),
218                        event_entry.event);
219 }
220
221 template <typename I>
222 void Replay<I>::shut_down(bool cancel_ops, Context *on_finish) {
223   CephContext *cct = m_image_ctx.cct;
224   ldout(cct, 20) << dendl;
225
226   io::AioCompletion *flush_comp = nullptr;
227   on_finish = util::create_async_context_callback(
228     m_image_ctx, on_finish);
229
230   {
231     Mutex::Locker locker(m_lock);
232
233     // safely commit any remaining AIO modify operations
234     if ((m_in_flight_aio_flush + m_in_flight_aio_modify) != 0) {
235       flush_comp = create_aio_flush_completion(nullptr);
236       assert(flush_comp != nullptr);
237     }
238
239     for (auto &op_event_pair : m_op_events) {
240       OpEvent &op_event = op_event_pair.second;
241       if (cancel_ops) {
242         // cancel ops that are waiting to start (waiting for
243         // OpFinishEvent or waiting for ready)
244         if (op_event.on_start_ready == nullptr &&
245             op_event.on_op_finish_event != nullptr) {
246           Context *on_op_finish_event = nullptr;
247           std::swap(on_op_finish_event, op_event.on_op_finish_event);
248           m_image_ctx.op_work_queue->queue(on_op_finish_event, -ERESTART);
249         }
250       } else if (op_event.on_op_finish_event != nullptr) {
251         // start ops waiting for OpFinishEvent
252         Context *on_op_finish_event = nullptr;
253         std::swap(on_op_finish_event, op_event.on_op_finish_event);
254         m_image_ctx.op_work_queue->queue(on_op_finish_event, 0);
255       } else if (op_event.on_start_ready != nullptr) {
256         // waiting for op ready
257         op_event_pair.second.finish_on_ready = true;
258       }
259     }
260
261     assert(!m_shut_down);
262     m_shut_down = true;
263
264     assert(m_flush_ctx == nullptr);
265     if (m_in_flight_op_events > 0 || flush_comp != nullptr) {
266       std::swap(m_flush_ctx, on_finish);
267     }
268   }
269
270   // execute the following outside of lock scope
271   if (flush_comp != nullptr) {
272     RWLock::RLocker owner_locker(m_image_ctx.owner_lock);
273     io::ImageRequest<I>::aio_flush(&m_image_ctx, flush_comp, {});
274   }
275   if (on_finish != nullptr) {
276     on_finish->complete(0);
277   }
278 }
279
280 template <typename I>
281 void Replay<I>::flush(Context *on_finish) {
282   io::AioCompletion *aio_comp;
283   {
284     Mutex::Locker locker(m_lock);
285     aio_comp = create_aio_flush_completion(
286       util::create_async_context_callback(m_image_ctx, on_finish));
287     if (aio_comp == nullptr) {
288       return;
289     }
290   }
291
292   RWLock::RLocker owner_locker(m_image_ctx.owner_lock);
293   io::ImageRequest<I>::aio_flush(&m_image_ctx, aio_comp, {});
294 }
295
296 template <typename I>
297 void Replay<I>::replay_op_ready(uint64_t op_tid, Context *on_resume) {
298   CephContext *cct = m_image_ctx.cct;
299   ldout(cct, 20) << ": op_tid=" << op_tid << dendl;
300
301   Mutex::Locker locker(m_lock);
302   auto op_it = m_op_events.find(op_tid);
303   assert(op_it != m_op_events.end());
304
305   OpEvent &op_event = op_it->second;
306   assert(op_event.op_in_progress &&
307          op_event.on_op_finish_event == nullptr &&
308          op_event.on_finish_ready == nullptr &&
309          op_event.on_finish_safe == nullptr);
310
311   // resume processing replay events
312   Context *on_start_ready = nullptr;
313   std::swap(on_start_ready, op_event.on_start_ready);
314   on_start_ready->complete(0);
315
316   // cancel has been requested -- send error to paused state machine
317   if (!op_event.finish_on_ready && m_flush_ctx != nullptr) {
318     m_image_ctx.op_work_queue->queue(on_resume, -ERESTART);
319     return;
320   }
321
322   // resume the op state machine once the associated OpFinishEvent
323   // is processed
324   op_event.on_op_finish_event = new FunctionContext(
325     [on_resume](int r) {
326       on_resume->complete(r);
327     });
328
329   // shut down request -- don't expect OpFinishEvent
330   if (op_event.finish_on_ready) {
331     m_image_ctx.op_work_queue->queue(on_resume, 0);
332   }
333 }
334
335 template <typename I>
336 void Replay<I>::handle_event(const journal::AioDiscardEvent &event,
337                              Context *on_ready, Context *on_safe) {
338   CephContext *cct = m_image_ctx.cct;
339   ldout(cct, 20) << ": AIO discard event" << dendl;
340
341   bool flush_required;
342   auto aio_comp = create_aio_modify_completion(on_ready, on_safe,
343                                                io::AIO_TYPE_DISCARD,
344                                                &flush_required,
345                                                {});
346   if (aio_comp == nullptr) {
347     return;
348   }
349
350   io::ImageRequest<I>::aio_discard(&m_image_ctx, aio_comp, event.offset,
351                                    event.length, event.skip_partial_discard,
352                                    {});
353   if (flush_required) {
354     m_lock.Lock();
355     auto flush_comp = create_aio_flush_completion(nullptr);
356     m_lock.Unlock();
357
358     if (flush_comp != nullptr) {
359       io::ImageRequest<I>::aio_flush(&m_image_ctx, flush_comp, {});
360     }
361   }
362 }
363
364 template <typename I>
365 void Replay<I>::handle_event(const journal::AioWriteEvent &event,
366                              Context *on_ready, Context *on_safe) {
367   CephContext *cct = m_image_ctx.cct;
368   ldout(cct, 20) << ": AIO write event" << dendl;
369
370   bufferlist data = event.data;
371   bool flush_required;
372   auto aio_comp = create_aio_modify_completion(on_ready, on_safe,
373                                                io::AIO_TYPE_WRITE,
374                                                &flush_required,
375                                                {});
376   if (aio_comp == nullptr) {
377     return;
378   }
379
380   io::ImageRequest<I>::aio_write(&m_image_ctx, aio_comp,
381                                  {{event.offset, event.length}},
382                                  std::move(data), 0, {});
383   if (flush_required) {
384     m_lock.Lock();
385     auto flush_comp = create_aio_flush_completion(nullptr);
386     m_lock.Unlock();
387
388     if (flush_comp != nullptr) {
389       io::ImageRequest<I>::aio_flush(&m_image_ctx, flush_comp, {});
390     }
391   }
392 }
393
394 template <typename I>
395 void Replay<I>::handle_event(const journal::AioFlushEvent &event,
396                              Context *on_ready, Context *on_safe) {
397   CephContext *cct = m_image_ctx.cct;
398   ldout(cct, 20) << ": AIO flush event" << dendl;
399
400   io::AioCompletion *aio_comp;
401   {
402     Mutex::Locker locker(m_lock);
403     aio_comp = create_aio_flush_completion(on_safe);
404   }
405
406   if (aio_comp != nullptr) {
407     io::ImageRequest<I>::aio_flush(&m_image_ctx, aio_comp, {});
408   }
409   on_ready->complete(0);
410 }
411
412 template <typename I>
413 void Replay<I>::handle_event(const journal::AioWriteSameEvent &event,
414                              Context *on_ready, Context *on_safe) {
415   CephContext *cct = m_image_ctx.cct;
416   ldout(cct, 20) << ": AIO writesame event" << dendl;
417
418   bufferlist data = event.data;
419   bool flush_required;
420   auto aio_comp = create_aio_modify_completion(on_ready, on_safe,
421                                                io::AIO_TYPE_WRITESAME,
422                                                &flush_required,
423                                                {});
424   if (aio_comp == nullptr) {
425     return;
426   }
427
428   io::ImageRequest<I>::aio_writesame(&m_image_ctx, aio_comp, event.offset,
429                                      event.length, std::move(data), 0, {});
430   if (flush_required) {
431     m_lock.Lock();
432     auto flush_comp = create_aio_flush_completion(nullptr);
433     m_lock.Unlock();
434
435     if (flush_comp != nullptr) {
436       io::ImageRequest<I>::aio_flush(&m_image_ctx, flush_comp, {});
437     }
438   }
439 }
440
441  template <typename I>
442  void Replay<I>::handle_event(const journal::AioCompareAndWriteEvent &event,
443                               Context *on_ready, Context *on_safe) {
444   CephContext *cct = m_image_ctx.cct;
445   ldout(cct, 20) << ": AIO CompareAndWrite event" << dendl;
446
447   bufferlist cmp_data = event.cmp_data;
448   bufferlist write_data = event.write_data;
449   bool flush_required;
450   auto aio_comp = create_aio_modify_completion(on_ready, on_safe,
451                                                io::AIO_TYPE_COMPARE_AND_WRITE,
452                                                &flush_required,
453                                                {-EILSEQ});
454   io::ImageRequest<I>::aio_compare_and_write(&m_image_ctx, aio_comp,
455                                              {{event.offset, event.length}},
456                                              std::move(cmp_data),
457                                              std::move(write_data),
458                                              nullptr, 0, {});
459   if (flush_required) {
460     m_lock.Lock();
461     auto flush_comp = create_aio_flush_completion(nullptr);
462     m_lock.Unlock();
463
464     io::ImageRequest<I>::aio_flush(&m_image_ctx, flush_comp, {});
465   }
466 }
467
468 template <typename I>
469 void Replay<I>::handle_event(const journal::OpFinishEvent &event,
470                              Context *on_ready, Context *on_safe) {
471   CephContext *cct = m_image_ctx.cct;
472   ldout(cct, 20) << ": Op finish event: "
473                  << "op_tid=" << event.op_tid << dendl;
474
475   bool op_in_progress;
476   bool filter_ret_val;
477   Context *on_op_complete = nullptr;
478   Context *on_op_finish_event = nullptr;
479   {
480     Mutex::Locker locker(m_lock);
481     auto op_it = m_op_events.find(event.op_tid);
482     if (op_it == m_op_events.end()) {
483       ldout(cct, 10) << ": unable to locate associated op: assuming previously "
484                      << "committed." << dendl;
485       on_ready->complete(0);
486       m_image_ctx.op_work_queue->queue(on_safe, 0);
487       return;
488     }
489
490     OpEvent &op_event = op_it->second;
491     assert(op_event.on_finish_safe == nullptr);
492     op_event.on_finish_ready = on_ready;
493     op_event.on_finish_safe = on_safe;
494     op_in_progress = op_event.op_in_progress;
495     std::swap(on_op_complete, op_event.on_op_complete);
496     std::swap(on_op_finish_event, op_event.on_op_finish_event);
497
498     // special errors which indicate op never started but was recorded
499     // as failed in the journal
500     filter_ret_val = (op_event.op_finish_error_codes.count(event.r) != 0);
501   }
502
503   if (event.r < 0) {
504     if (op_in_progress) {
505       // bubble the error up to the in-progress op to cancel it
506       on_op_finish_event->complete(event.r);
507     } else {
508       // op hasn't been started -- bubble the error up since
509       // our image is now potentially in an inconsistent state
510       // since simple errors should have been caught before
511       // creating the op event
512       delete on_op_complete;
513       delete on_op_finish_event;
514       handle_op_complete(event.op_tid, filter_ret_val ? 0 : event.r);
515     }
516     return;
517   }
518
519   // journal recorded success -- apply the op now
520   on_op_finish_event->complete(0);
521 }
522
523 template <typename I>
524 void Replay<I>::handle_event(const journal::SnapCreateEvent &event,
525                              Context *on_ready, Context *on_safe) {
526   CephContext *cct = m_image_ctx.cct;
527   ldout(cct, 20) << ": Snap create event" << dendl;
528
529   Mutex::Locker locker(m_lock);
530   OpEvent *op_event;
531   Context *on_op_complete = create_op_context_callback(event.op_tid, on_ready,
532                                                        on_safe, &op_event);
533   if (on_op_complete == nullptr) {
534     return;
535   }
536
537   // ignore errors caused due to replay
538   op_event->ignore_error_codes = {-EEXIST};
539
540   // avoid lock cycles
541   m_image_ctx.op_work_queue->queue(new C_RefreshIfRequired<I>(
542     m_image_ctx, new ExecuteOp<I, journal::SnapCreateEvent>(m_image_ctx, event,
543                                                             on_op_complete)),
544     0);
545
546   // do not process more events until the state machine is ready
547   // since it will affect IO
548   op_event->op_in_progress = true;
549   op_event->on_start_ready = on_ready;
550 }
551
552 template <typename I>
553 void Replay<I>::handle_event(const journal::SnapRemoveEvent &event,
554                              Context *on_ready, Context *on_safe) {
555   CephContext *cct = m_image_ctx.cct;
556   ldout(cct, 20) << ": Snap remove event" << dendl;
557
558   Mutex::Locker locker(m_lock);
559   OpEvent *op_event;
560   Context *on_op_complete = create_op_context_callback(event.op_tid, on_ready,
561                                                        on_safe, &op_event);
562   if (on_op_complete == nullptr) {
563     return;
564   }
565
566   op_event->on_op_finish_event = new C_RefreshIfRequired<I>(
567     m_image_ctx, new ExecuteOp<I, journal::SnapRemoveEvent>(m_image_ctx, event,
568                                                             on_op_complete));
569
570   // ignore errors caused due to replay
571   op_event->ignore_error_codes = {-ENOENT};
572
573   on_ready->complete(0);
574 }
575
576 template <typename I>
577 void Replay<I>::handle_event(const journal::SnapRenameEvent &event,
578                              Context *on_ready, Context *on_safe) {
579   CephContext *cct = m_image_ctx.cct;
580   ldout(cct, 20) << ": Snap rename event" << dendl;
581
582   Mutex::Locker locker(m_lock);
583   OpEvent *op_event;
584   Context *on_op_complete = create_op_context_callback(event.op_tid, on_ready,
585                                                        on_safe, &op_event);
586   if (on_op_complete == nullptr) {
587     return;
588   }
589
590   op_event->on_op_finish_event = new C_RefreshIfRequired<I>(
591     m_image_ctx, new ExecuteOp<I, journal::SnapRenameEvent>(m_image_ctx, event,
592                                                             on_op_complete));
593
594   // ignore errors caused due to replay
595   op_event->ignore_error_codes = {-EEXIST};
596
597   on_ready->complete(0);
598 }
599
600 template <typename I>
601 void Replay<I>::handle_event(const journal::SnapProtectEvent &event,
602                              Context *on_ready, Context *on_safe) {
603   CephContext *cct = m_image_ctx.cct;
604   ldout(cct, 20) << ": Snap protect event" << dendl;
605
606   Mutex::Locker locker(m_lock);
607   OpEvent *op_event;
608   Context *on_op_complete = create_op_context_callback(event.op_tid, on_ready,
609                                                        on_safe, &op_event);
610   if (on_op_complete == nullptr) {
611     return;
612   }
613
614   op_event->on_op_finish_event = new C_RefreshIfRequired<I>(
615     m_image_ctx, new ExecuteOp<I, journal::SnapProtectEvent>(m_image_ctx, event,
616                                                              on_op_complete));
617
618   // ignore errors caused due to replay
619   op_event->ignore_error_codes = {-EBUSY};
620
621   on_ready->complete(0);
622 }
623
624 template <typename I>
625 void Replay<I>::handle_event(const journal::SnapUnprotectEvent &event,
626                              Context *on_ready, Context *on_safe) {
627   CephContext *cct = m_image_ctx.cct;
628   ldout(cct, 20) << ": Snap unprotect event" << dendl;
629
630   Mutex::Locker locker(m_lock);
631   OpEvent *op_event;
632   Context *on_op_complete = create_op_context_callback(event.op_tid, on_ready,
633                                                        on_safe, &op_event);
634   if (on_op_complete == nullptr) {
635     return;
636   }
637
638   op_event->on_op_finish_event = new C_RefreshIfRequired<I>(
639     m_image_ctx, new ExecuteOp<I, journal::SnapUnprotectEvent>(m_image_ctx,
640                                                                event,
641                                                                on_op_complete));
642
643   // ignore errors recorded in the journal
644   op_event->op_finish_error_codes = {-EBUSY};
645
646   // ignore errors caused due to replay
647   op_event->ignore_error_codes = {-EINVAL};
648
649   on_ready->complete(0);
650 }
651
652 template <typename I>
653 void Replay<I>::handle_event(const journal::SnapRollbackEvent &event,
654                              Context *on_ready, Context *on_safe) {
655   CephContext *cct = m_image_ctx.cct;
656   ldout(cct, 20) << ": Snap rollback start event" << dendl;
657
658   Mutex::Locker locker(m_lock);
659   OpEvent *op_event;
660   Context *on_op_complete = create_op_context_callback(event.op_tid, on_ready,
661                                                        on_safe, &op_event);
662   if (on_op_complete == nullptr) {
663     return;
664   }
665
666   op_event->on_op_finish_event = new C_RefreshIfRequired<I>(
667     m_image_ctx, new ExecuteOp<I, journal::SnapRollbackEvent>(m_image_ctx,
668                                                               event,
669                                                               on_op_complete));
670
671   on_ready->complete(0);
672 }
673
674 template <typename I>
675 void Replay<I>::handle_event(const journal::RenameEvent &event,
676                              Context *on_ready, Context *on_safe) {
677   CephContext *cct = m_image_ctx.cct;
678   ldout(cct, 20) << ": Rename event" << dendl;
679
680   Mutex::Locker locker(m_lock);
681   OpEvent *op_event;
682   Context *on_op_complete = create_op_context_callback(event.op_tid, on_ready,
683                                                        on_safe, &op_event);
684   if (on_op_complete == nullptr) {
685     return;
686   }
687
688   op_event->on_op_finish_event = new C_RefreshIfRequired<I>(
689     m_image_ctx, new ExecuteOp<I, journal::RenameEvent>(m_image_ctx, event,
690                                                         on_op_complete));
691
692   // ignore errors caused due to replay
693   op_event->ignore_error_codes = {-EEXIST};
694
695   on_ready->complete(0);
696 }
697
698 template <typename I>
699 void Replay<I>::handle_event(const journal::ResizeEvent &event,
700                              Context *on_ready, Context *on_safe) {
701   CephContext *cct = m_image_ctx.cct;
702   ldout(cct, 20) << ": Resize start event" << dendl;
703
704   Mutex::Locker locker(m_lock);
705   OpEvent *op_event;
706   Context *on_op_complete = create_op_context_callback(event.op_tid, on_ready,
707                                                        on_safe, &op_event);
708   if (on_op_complete == nullptr) {
709     return;
710   }
711
712   // avoid lock cycles
713   m_image_ctx.op_work_queue->queue(new C_RefreshIfRequired<I>(
714     m_image_ctx, new ExecuteOp<I, journal::ResizeEvent>(m_image_ctx, event,
715                                                         on_op_complete)), 0);
716
717   // do not process more events until the state machine is ready
718   // since it will affect IO
719   op_event->op_in_progress = true;
720   op_event->on_start_ready = on_ready;
721 }
722
723 template <typename I>
724 void Replay<I>::handle_event(const journal::FlattenEvent &event,
725                              Context *on_ready, Context *on_safe) {
726   CephContext *cct = m_image_ctx.cct;
727   ldout(cct, 20) << ": Flatten start event" << dendl;
728
729   Mutex::Locker locker(m_lock);
730   OpEvent *op_event;
731   Context *on_op_complete = create_op_context_callback(event.op_tid, on_ready,
732                                                        on_safe, &op_event);
733   if (on_op_complete == nullptr) {
734     return;
735   }
736
737   op_event->on_op_finish_event = new C_RefreshIfRequired<I>(
738     m_image_ctx, new ExecuteOp<I, journal::FlattenEvent>(m_image_ctx, event,
739                                                          on_op_complete));
740
741   // ignore errors caused due to replay
742   op_event->ignore_error_codes = {-EINVAL};
743
744   on_ready->complete(0);
745 }
746
747 template <typename I>
748 void Replay<I>::handle_event(const journal::DemotePromoteEvent &event,
749                              Context *on_ready, Context *on_safe) {
750   CephContext *cct = m_image_ctx.cct;
751   ldout(cct, 20) << ": Demote/Promote event" << dendl;
752   on_ready->complete(0);
753   on_safe->complete(0);
754 }
755
756 template <typename I>
757 void Replay<I>::handle_event(const journal::SnapLimitEvent &event,
758                              Context *on_ready, Context *on_safe) {
759   CephContext *cct = m_image_ctx.cct;
760   ldout(cct, 20) << ": Snap limit event" << dendl;
761
762   Mutex::Locker locker(m_lock);
763   OpEvent *op_event;
764   Context *on_op_complete = create_op_context_callback(event.op_tid, on_ready,
765                                                        on_safe, &op_event);
766   if (on_op_complete == nullptr) {
767     return;
768   }
769
770   op_event->on_op_finish_event = new C_RefreshIfRequired<I>(
771     m_image_ctx, new ExecuteOp<I, journal::SnapLimitEvent>(m_image_ctx,
772                                                            event,
773                                                            on_op_complete));
774
775   on_ready->complete(0);
776 }
777
778 template <typename I>
779 void Replay<I>::handle_event(const journal::UpdateFeaturesEvent &event,
780                              Context *on_ready, Context *on_safe) {
781   CephContext *cct = m_image_ctx.cct;
782   ldout(cct, 20) << ": Update features event" << dendl;
783
784   Mutex::Locker locker(m_lock);
785   OpEvent *op_event;
786   Context *on_op_complete = create_op_context_callback(event.op_tid, on_ready,
787                                                        on_safe, &op_event);
788   if (on_op_complete == nullptr) {
789     return;
790   }
791
792   // avoid lock cycles
793   m_image_ctx.op_work_queue->queue(new C_RefreshIfRequired<I>(
794     m_image_ctx, new ExecuteOp<I, journal::UpdateFeaturesEvent>(
795       m_image_ctx, event, on_op_complete)), 0);
796
797   // do not process more events until the state machine is ready
798   // since it will affect IO
799   op_event->op_in_progress = true;
800   op_event->on_start_ready = on_ready;
801 }
802
803 template <typename I>
804 void Replay<I>::handle_event(const journal::MetadataSetEvent &event,
805                              Context *on_ready, Context *on_safe) {
806   CephContext *cct = m_image_ctx.cct;
807   ldout(cct, 20) << ": Metadata set event" << dendl;
808
809   Mutex::Locker locker(m_lock);
810   OpEvent *op_event;
811   Context *on_op_complete = create_op_context_callback(event.op_tid, on_ready,
812                                                        on_safe, &op_event);
813   if (on_op_complete == nullptr) {
814     return;
815   }
816
817   op_event->on_op_finish_event = new C_RefreshIfRequired<I>(
818     m_image_ctx, new ExecuteOp<I, journal::MetadataSetEvent>(
819       m_image_ctx, event, on_op_complete));
820
821   on_ready->complete(0);
822 }
823
824 template <typename I>
825 void Replay<I>::handle_event(const journal::MetadataRemoveEvent &event,
826                              Context *on_ready, Context *on_safe) {
827   CephContext *cct = m_image_ctx.cct;
828   ldout(cct, 20) << ": Metadata remove event" << dendl;
829
830   Mutex::Locker locker(m_lock);
831   OpEvent *op_event;
832   Context *on_op_complete = create_op_context_callback(event.op_tid, on_ready,
833                                                        on_safe, &op_event);
834   if (on_op_complete == nullptr) {
835     return;
836   }
837
838   op_event->on_op_finish_event = new C_RefreshIfRequired<I>(
839     m_image_ctx, new ExecuteOp<I, journal::MetadataRemoveEvent>(
840       m_image_ctx, event, on_op_complete));
841
842   // ignore errors caused due to replay
843   op_event->ignore_error_codes = {-ENOENT};
844
845   on_ready->complete(0);
846 }
847
848 template <typename I>
849 void Replay<I>::handle_event(const journal::UnknownEvent &event,
850                              Context *on_ready, Context *on_safe) {
851   CephContext *cct = m_image_ctx.cct;
852   ldout(cct, 20) << ": unknown event" << dendl;
853   on_ready->complete(0);
854   on_safe->complete(0);
855 }
856
857 template <typename I>
858 void Replay<I>::handle_aio_modify_complete(Context *on_ready, Context *on_safe,
859                                            int r, std::set<int> &filters) {
860   Mutex::Locker locker(m_lock);
861   CephContext *cct = m_image_ctx.cct;
862   ldout(cct, 20) << ": on_ready=" << on_ready << ", "
863                  << "on_safe=" << on_safe << ", r=" << r << dendl;
864
865   if (on_ready != nullptr) {
866     on_ready->complete(0);
867   }
868
869   if (filters.find(r) != filters.end())
870     r = 0;
871
872   if (r < 0) {
873     lderr(cct) << ": AIO modify op failed: " << cpp_strerror(r) << dendl;
874     on_safe->complete(r);
875     return;
876   }
877
878   // will be completed after next flush operation completes
879   m_aio_modify_safe_contexts.insert(on_safe);
880 }
881
882 template <typename I>
883 void Replay<I>::handle_aio_flush_complete(Context *on_flush_safe,
884                                           Contexts &on_safe_ctxs, int r) {
885   CephContext *cct = m_image_ctx.cct;
886   ldout(cct, 20) << ": r=" << r << dendl;
887
888   if (r < 0) {
889     lderr(cct) << ": AIO flush failed: " << cpp_strerror(r) << dendl;
890   }
891
892   Context *on_aio_ready = nullptr;
893   Context *on_flush = nullptr;
894   {
895     Mutex::Locker locker(m_lock);
896     assert(m_in_flight_aio_flush > 0);
897     assert(m_in_flight_aio_modify >= on_safe_ctxs.size());
898     --m_in_flight_aio_flush;
899     m_in_flight_aio_modify -= on_safe_ctxs.size();
900
901     std::swap(on_aio_ready, m_on_aio_ready);
902     if (m_in_flight_op_events == 0 &&
903         (m_in_flight_aio_flush + m_in_flight_aio_modify) == 0) {
904       on_flush = m_flush_ctx;
905     }
906
907     // strip out previously failed on_safe contexts
908     for (auto it = on_safe_ctxs.begin(); it != on_safe_ctxs.end(); ) {
909       if (m_aio_modify_safe_contexts.erase(*it)) {
910         ++it;
911       } else {
912         it = on_safe_ctxs.erase(it);
913       }
914     }
915   }
916
917   if (on_aio_ready != nullptr) {
918     ldout(cct, 10) << ": resuming paused AIO" << dendl;
919     on_aio_ready->complete(0);
920   }
921
922   if (on_flush_safe != nullptr) {
923     on_safe_ctxs.push_back(on_flush_safe);
924   }
925   for (auto ctx : on_safe_ctxs) {
926     ldout(cct, 20) << ": completing safe context: " << ctx << dendl;
927     ctx->complete(r);
928   }
929
930   if (on_flush != nullptr) {
931     ldout(cct, 20) << ": completing flush context: " << on_flush << dendl;
932     on_flush->complete(r);
933   }
934 }
935
936 template <typename I>
937 Context *Replay<I>::create_op_context_callback(uint64_t op_tid,
938                                                Context *on_ready,
939                                                Context *on_safe,
940                                                OpEvent **op_event) {
941   CephContext *cct = m_image_ctx.cct;
942   if (m_shut_down) {
943     ldout(cct, 5) << ": ignoring event after shut down" << dendl;
944     on_ready->complete(0);
945     m_image_ctx.op_work_queue->queue(on_safe, -ESHUTDOWN);
946     return nullptr;
947   }
948
949   assert(m_lock.is_locked());
950   if (m_op_events.count(op_tid) != 0) {
951     lderr(cct) << ": duplicate op tid detected: " << op_tid << dendl;
952
953     // on_ready is already async but on failure invoke on_safe async
954     // as well
955     on_ready->complete(0);
956     m_image_ctx.op_work_queue->queue(on_safe, -EINVAL);
957     return nullptr;
958   }
959
960   ++m_in_flight_op_events;
961   *op_event = &m_op_events[op_tid];
962   (*op_event)->on_start_safe = on_safe;
963
964   Context *on_op_complete = new C_OpOnComplete(this, op_tid);
965   (*op_event)->on_op_complete = on_op_complete;
966   return on_op_complete;
967 }
968
969 template <typename I>
970 void Replay<I>::handle_op_complete(uint64_t op_tid, int r) {
971   CephContext *cct = m_image_ctx.cct;
972   ldout(cct, 20) << ": op_tid=" << op_tid << ", "
973                  << "r=" << r << dendl;
974
975   OpEvent op_event;
976   bool shutting_down = false;
977   {
978     Mutex::Locker locker(m_lock);
979     auto op_it = m_op_events.find(op_tid);
980     assert(op_it != m_op_events.end());
981
982     op_event = std::move(op_it->second);
983     m_op_events.erase(op_it);
984
985     if (m_shut_down) {
986       assert(m_flush_ctx != nullptr);
987       shutting_down = true;
988     }
989   }
990
991   assert(op_event.on_start_ready == nullptr || (r < 0 && r != -ERESTART));
992   if (op_event.on_start_ready != nullptr) {
993     // blocking op event failed before it became ready
994     assert(op_event.on_finish_ready == nullptr &&
995            op_event.on_finish_safe == nullptr);
996
997     op_event.on_start_ready->complete(0);
998   } else {
999     // event kicked off by OpFinishEvent
1000     assert((op_event.on_finish_ready != nullptr &&
1001             op_event.on_finish_safe != nullptr) || shutting_down);
1002   }
1003
1004   if (op_event.on_op_finish_event != nullptr) {
1005     op_event.on_op_finish_event->complete(r);
1006   }
1007
1008   if (op_event.on_finish_ready != nullptr) {
1009     op_event.on_finish_ready->complete(0);
1010   }
1011
1012   // filter out errors caused by replay of the same op
1013   if (r < 0 && op_event.ignore_error_codes.count(r) != 0) {
1014     r = 0;
1015   }
1016
1017   op_event.on_start_safe->complete(r);
1018   if (op_event.on_finish_safe != nullptr) {
1019     op_event.on_finish_safe->complete(r);
1020   }
1021
1022   // shut down request might have occurred while lock was
1023   // dropped -- handle if pending
1024   Context *on_flush = nullptr;
1025   {
1026     Mutex::Locker locker(m_lock);
1027     assert(m_in_flight_op_events > 0);
1028     --m_in_flight_op_events;
1029     if (m_in_flight_op_events == 0 &&
1030         (m_in_flight_aio_flush + m_in_flight_aio_modify) == 0) {
1031       on_flush = m_flush_ctx;
1032     }
1033   }
1034   if (on_flush != nullptr) {
1035     m_image_ctx.op_work_queue->queue(on_flush, 0);
1036   }
1037 }
1038
1039 template <typename I>
1040 io::AioCompletion *
1041 Replay<I>::create_aio_modify_completion(Context *on_ready,
1042                                         Context *on_safe,
1043                                         io::aio_type_t aio_type,
1044                                         bool *flush_required,
1045                                         std::set<int> &&filters) {
1046   Mutex::Locker locker(m_lock);
1047   CephContext *cct = m_image_ctx.cct;
1048   assert(m_on_aio_ready == nullptr);
1049
1050   if (m_shut_down) {
1051     ldout(cct, 5) << ": ignoring event after shut down" << dendl;
1052     on_ready->complete(0);
1053     m_image_ctx.op_work_queue->queue(on_safe, -ESHUTDOWN);
1054     return nullptr;
1055   }
1056
1057   ++m_in_flight_aio_modify;
1058   m_aio_modify_unsafe_contexts.push_back(on_safe);
1059
1060   // FLUSH if we hit the low-water mark -- on_safe contexts are
1061   // completed by flushes-only so that we don't move the journal
1062   // commit position until safely on-disk
1063
1064   *flush_required = (m_aio_modify_unsafe_contexts.size() ==
1065                        IN_FLIGHT_IO_LOW_WATER_MARK);
1066   if (*flush_required) {
1067     ldout(cct, 10) << ": hit AIO replay low-water mark: scheduling flush"
1068                    << dendl;
1069   }
1070
1071   // READY for more events if:
1072   // * not at high-water mark for IO
1073   // * in-flight ops are at a consistent point (snap create has IO flushed,
1074   //   shrink has adjusted clip boundary, etc) -- should have already been
1075   //   flagged not-ready
1076   if (m_in_flight_aio_modify == IN_FLIGHT_IO_HIGH_WATER_MARK) {
1077     ldout(cct, 10) << ": hit AIO replay high-water mark: pausing replay"
1078                    << dendl;
1079     assert(m_on_aio_ready == nullptr);
1080     std::swap(m_on_aio_ready, on_ready);
1081   }
1082
1083   // when the modification is ACKed by librbd, we can process the next
1084   // event. when flushed, the completion of the next flush will fire the
1085   // on_safe callback
1086   auto aio_comp = io::AioCompletion::create_and_start<Context>(
1087     new C_AioModifyComplete(this, on_ready, on_safe, std::move(filters)),
1088     util::get_image_ctx(&m_image_ctx), aio_type);
1089   return aio_comp;
1090 }
1091
1092 template <typename I>
1093 io::AioCompletion *Replay<I>::create_aio_flush_completion(Context *on_safe) {
1094   assert(m_lock.is_locked());
1095
1096   CephContext *cct = m_image_ctx.cct;
1097   if (m_shut_down) {
1098     ldout(cct, 5) << ": ignoring event after shut down" << dendl;
1099     if (on_safe != nullptr) {
1100       m_image_ctx.op_work_queue->queue(on_safe, -ESHUTDOWN);
1101     }
1102     return nullptr;
1103   }
1104
1105   ++m_in_flight_aio_flush;
1106
1107   // associate all prior write/discard ops to this flush request
1108   auto aio_comp = io::AioCompletion::create_and_start<Context>(
1109       new C_AioFlushComplete(this, on_safe,
1110                              std::move(m_aio_modify_unsafe_contexts)),
1111       util::get_image_ctx(&m_image_ctx), io::AIO_TYPE_FLUSH);
1112   m_aio_modify_unsafe_contexts.clear();
1113   return aio_comp;
1114 }
1115
1116 } // namespace journal
1117 } // namespace librbd
1118
1119 template class librbd::journal::Replay<librbd::ImageCtx>;