Fix some bugs when testing opensds ansible
[stor4nfv.git] / src / ceph / src / librbd / Journal.cc
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
3
4 #include "librbd/Journal.h"
5 #include "include/rados/librados.hpp"
6 #include "common/errno.h"
7 #include "common/Timer.h"
8 #include "common/WorkQueue.h"
9 #include "cls/journal/cls_journal_types.h"
10 #include "journal/Journaler.h"
11 #include "journal/Policy.h"
12 #include "journal/ReplayEntry.h"
13 #include "journal/Settings.h"
14 #include "journal/Utils.h"
15 #include "librbd/ExclusiveLock.h"
16 #include "librbd/ImageCtx.h"
17 #include "librbd/io/ImageRequestWQ.h"
18 #include "librbd/io/ObjectRequest.h"
19 #include "librbd/journal/CreateRequest.h"
20 #include "librbd/journal/DemoteRequest.h"
21 #include "librbd/journal/OpenRequest.h"
22 #include "librbd/journal/RemoveRequest.h"
23 #include "librbd/journal/Replay.h"
24 #include "librbd/journal/PromoteRequest.h"
25
26 #include <boost/scope_exit.hpp>
27 #include <utility>
28
29 #define dout_subsys ceph_subsys_rbd
30 #undef dout_prefix
31 #define dout_prefix *_dout << "librbd::Journal: "
32
33 namespace librbd {
34
35 using util::create_async_context_callback;
36 using util::create_context_callback;
37 using journal::util::C_DecodeTag;
38 using journal::util::C_DecodeTags;
39
40 namespace {
41
42 // TODO: once journaler is 100% async, remove separate threads and
43 // reuse ImageCtx's thread pool
44 class ThreadPoolSingleton : public ThreadPool {
45 public:
46   explicit ThreadPoolSingleton(CephContext *cct)
47     : ThreadPool(cct, "librbd::Journal", "tp_librbd_journ", 1) {
48     start();
49   }
50   ~ThreadPoolSingleton() override {
51     stop();
52   }
53 };
54
55 template <typename I>
56 struct C_IsTagOwner : public Context {
57   librados::IoCtx &io_ctx;
58   std::string image_id;
59   bool *is_tag_owner;
60   ContextWQ *op_work_queue;
61   Context *on_finish;
62
63   CephContext *cct = nullptr;
64   Journaler *journaler;
65   cls::journal::Client client;
66   journal::ImageClientMeta client_meta;
67   uint64_t tag_tid;
68   journal::TagData tag_data;
69
70   C_IsTagOwner(librados::IoCtx &io_ctx, const std::string &image_id,
71                bool *is_tag_owner, ContextWQ *op_work_queue, Context *on_finish)
72     : io_ctx(io_ctx), image_id(image_id), is_tag_owner(is_tag_owner),
73       op_work_queue(op_work_queue), on_finish(on_finish),
74       cct(reinterpret_cast<CephContext*>(io_ctx.cct())),
75       journaler(new Journaler(io_ctx, image_id, Journal<>::IMAGE_CLIENT_ID,
76                               {})) {
77   }
78
79   void finish(int r) override {
80     ldout(cct, 20) << this << " C_IsTagOwner::" << __func__ << ": r=" << r
81                    << dendl;
82     if (r < 0) {
83       lderr(cct) << this << " C_IsTagOwner::" << __func__ << ": "
84                  << "failed to get tag owner: " << cpp_strerror(r) << dendl;
85     } else {
86       *is_tag_owner = (tag_data.mirror_uuid == Journal<>::LOCAL_MIRROR_UUID);
87     }
88
89     Journaler *journaler = this->journaler;
90     Context *on_finish = this->on_finish;
91     FunctionContext *ctx = new FunctionContext(
92       [journaler, on_finish](int r) {
93         on_finish->complete(r);
94         delete journaler;
95       });
96     op_work_queue->queue(ctx, r);
97   }
98 };
99
100 struct C_GetTagOwner : public Context {
101   std::string *mirror_uuid;
102   Context *on_finish;
103
104   Journaler journaler;
105   cls::journal::Client client;
106   journal::ImageClientMeta client_meta;
107   uint64_t tag_tid;
108   journal::TagData tag_data;
109
110   C_GetTagOwner(librados::IoCtx &io_ctx, const std::string &image_id,
111                 std::string *mirror_uuid, Context *on_finish)
112     : mirror_uuid(mirror_uuid), on_finish(on_finish),
113       journaler(io_ctx, image_id, Journal<>::IMAGE_CLIENT_ID, {}) {
114   }
115
116   virtual void finish(int r) {
117     if (r >= 0) {
118       *mirror_uuid = tag_data.mirror_uuid;
119     }
120     on_finish->complete(r);
121   }
122 };
123
124 template <typename J>
125 struct GetTagsRequest {
126   CephContext *cct;
127   J *journaler;
128   cls::journal::Client *client;
129   journal::ImageClientMeta *client_meta;
130   uint64_t *tag_tid;
131   journal::TagData *tag_data;
132   Context *on_finish;
133
134   Mutex lock;
135
136   GetTagsRequest(CephContext *cct, J *journaler, cls::journal::Client *client,
137                  journal::ImageClientMeta *client_meta, uint64_t *tag_tid,
138                  journal::TagData *tag_data, Context *on_finish)
139     : cct(cct), journaler(journaler), client(client), client_meta(client_meta),
140       tag_tid(tag_tid), tag_data(tag_data), on_finish(on_finish), lock("lock") {
141   }
142
143   /**
144    * @verbatim
145    *
146    * <start>
147    *    |
148    *    v
149    * GET_CLIENT * * * * * * * * * * * *
150    *    |                             *
151    *    v                             *
152    * GET_TAGS * * * * * * * * * * * * * (error)
153    *    |                             *
154    *    v                             *
155    * <finish> * * * * * * * * * * * * *
156    *
157    * @endverbatim
158    */
159
160   void send() {
161     send_get_client();
162   }
163
164   void send_get_client() {
165     ldout(cct, 20) << __func__ << dendl;
166
167     FunctionContext *ctx = new FunctionContext(
168       [this](int r) {
169         handle_get_client(r);
170       });
171     journaler->get_client(Journal<ImageCtx>::IMAGE_CLIENT_ID, client, ctx);
172   }
173
174   void handle_get_client(int r) {
175     ldout(cct, 20) << __func__ << ": r=" << r << dendl;
176
177     if (r < 0) {
178       complete(r);
179       return;
180     }
181
182     librbd::journal::ClientData client_data;
183     bufferlist::iterator bl_it = client->data.begin();
184     try {
185       ::decode(client_data, bl_it);
186     } catch (const buffer::error &err) {
187       lderr(cct) << this << " OpenJournalerRequest::" << __func__ << ": "
188                  << "failed to decode client data" << dendl;
189       complete(-EBADMSG);
190       return;
191     }
192
193     journal::ImageClientMeta *image_client_meta =
194       boost::get<journal::ImageClientMeta>(&client_data.client_meta);
195     if (image_client_meta == nullptr) {
196       lderr(cct) << this << " OpenJournalerRequest::" << __func__ << ": "
197                  << "failed to get client meta" << dendl;
198       complete(-EINVAL);
199       return;
200     }
201     *client_meta = *image_client_meta;
202
203     send_get_tags();
204   }
205
206   void send_get_tags() {
207     ldout(cct, 20) << __func__ << dendl;
208
209     FunctionContext *ctx = new FunctionContext(
210       [this](int r) {
211         handle_get_tags(r);
212       });
213     C_DecodeTags *tags_ctx = new C_DecodeTags(cct, &lock, tag_tid, tag_data,
214                                               ctx);
215     journaler->get_tags(client_meta->tag_class, &tags_ctx->tags, tags_ctx);
216   }
217
218   void handle_get_tags(int r) {
219     ldout(cct, 20) << __func__ << ": r=" << r << dendl;
220
221     complete(r);
222   }
223
224   void complete(int r) {
225     on_finish->complete(r);
226     delete this;
227   }
228 };
229
230 template <typename J>
231 void get_tags(CephContext *cct, J *journaler,
232               cls::journal::Client *client,
233               journal::ImageClientMeta *client_meta,
234               uint64_t *tag_tid, journal::TagData *tag_data,
235               Context *on_finish) {
236   ldout(cct, 20) << __func__ << dendl;
237
238   GetTagsRequest<J> *req =
239     new GetTagsRequest<J>(cct, journaler, client, client_meta, tag_tid,
240                           tag_data, on_finish);
241   req->send();
242 }
243
244 template <typename J>
245 int allocate_journaler_tag(CephContext *cct, J *journaler,
246                            uint64_t tag_class,
247                            const journal::TagPredecessor &predecessor,
248                            const std::string &mirror_uuid,
249                            cls::journal::Tag *new_tag) {
250   journal::TagData tag_data;
251   tag_data.mirror_uuid = mirror_uuid;
252   tag_data.predecessor = predecessor;
253
254   bufferlist tag_bl;
255   ::encode(tag_data, tag_bl);
256
257   C_SaferCond allocate_tag_ctx;
258   journaler->allocate_tag(tag_class, tag_bl, new_tag, &allocate_tag_ctx);
259
260   int r = allocate_tag_ctx.wait();
261   if (r < 0) {
262     lderr(cct) << __func__ << ": "
263                << "failed to allocate tag: " << cpp_strerror(r) << dendl;
264     return r;
265   }
266   return 0;
267 }
268
269 } // anonymous namespace
270
271 // client id for local image
272 template <typename I>
273 const std::string Journal<I>::IMAGE_CLIENT_ID("");
274
275 // mirror uuid to use for local images
276 template <typename I>
277 const std::string Journal<I>::LOCAL_MIRROR_UUID("");
278
279 // mirror uuid to use for orphaned (demoted) images
280 template <typename I>
281 const std::string Journal<I>::ORPHAN_MIRROR_UUID("<orphan>");
282
283 template <typename I>
284 std::ostream &operator<<(std::ostream &os,
285                          const typename Journal<I>::State &state) {
286   switch (state) {
287   case Journal<I>::STATE_UNINITIALIZED:
288     os << "Uninitialized";
289     break;
290   case Journal<I>::STATE_INITIALIZING:
291     os << "Initializing";
292     break;
293   case Journal<I>::STATE_REPLAYING:
294     os << "Replaying";
295     break;
296   case Journal<I>::STATE_FLUSHING_RESTART:
297     os << "FlushingRestart";
298     break;
299   case Journal<I>::STATE_RESTARTING_REPLAY:
300     os << "RestartingReplay";
301     break;
302   case Journal<I>::STATE_FLUSHING_REPLAY:
303     os << "FlushingReplay";
304     break;
305   case Journal<I>::STATE_READY:
306     os << "Ready";
307     break;
308   case Journal<I>::STATE_STOPPING:
309     os << "Stopping";
310     break;
311   case Journal<I>::STATE_CLOSING:
312     os << "Closing";
313     break;
314   case Journal<I>::STATE_CLOSED:
315     os << "Closed";
316     break;
317   default:
318     os << "Unknown (" << static_cast<uint32_t>(state) << ")";
319     break;
320   }
321   return os;
322 }
323
324 template <typename I>
325 Journal<I>::Journal(I &image_ctx)
326   : m_image_ctx(image_ctx), m_journaler(NULL),
327     m_lock("Journal<I>::m_lock"), m_state(STATE_UNINITIALIZED),
328     m_error_result(0), m_replay_handler(this), m_close_pending(false),
329     m_event_lock("Journal<I>::m_event_lock"), m_event_tid(0),
330     m_blocking_writes(false), m_journal_replay(NULL),
331     m_metadata_listener(this) {
332
333   CephContext *cct = m_image_ctx.cct;
334   ldout(cct, 5) << this << ": ictx=" << &m_image_ctx << dendl;
335
336   ThreadPoolSingleton *thread_pool_singleton;
337   cct->lookup_or_create_singleton_object<ThreadPoolSingleton>(
338     thread_pool_singleton, "librbd::journal::thread_pool");
339   m_work_queue = new ContextWQ("librbd::journal::work_queue",
340                                cct->_conf->get_val<int64_t>("rbd_op_thread_timeout"),
341                                thread_pool_singleton);
342   ImageCtx::get_timer_instance(cct, &m_timer, &m_timer_lock);
343 }
344
345 template <typename I>
346 Journal<I>::~Journal() {
347   if (m_work_queue != nullptr) {
348     m_work_queue->drain();
349     delete m_work_queue;
350   }
351
352   assert(m_state == STATE_UNINITIALIZED || m_state == STATE_CLOSED);
353   assert(m_journaler == NULL);
354   assert(m_journal_replay == NULL);
355   assert(m_wait_for_state_contexts.empty());
356 }
357
358 template <typename I>
359 bool Journal<I>::is_journal_supported(I &image_ctx) {
360   assert(image_ctx.snap_lock.is_locked());
361   return ((image_ctx.features & RBD_FEATURE_JOURNALING) &&
362           !image_ctx.read_only && image_ctx.snap_id == CEPH_NOSNAP);
363 }
364
365 template <typename I>
366 int Journal<I>::create(librados::IoCtx &io_ctx, const std::string &image_id,
367                        uint8_t order, uint8_t splay_width,
368                        const std::string &object_pool) {
369   CephContext *cct = reinterpret_cast<CephContext *>(io_ctx.cct());
370   ldout(cct, 5) << __func__ << ": image=" << image_id << dendl;
371
372   ThreadPool *thread_pool;
373   ContextWQ *op_work_queue;
374   ImageCtx::get_thread_pool_instance(cct, &thread_pool, &op_work_queue);
375
376   C_SaferCond cond;
377   journal::TagData tag_data(LOCAL_MIRROR_UUID);
378   journal::CreateRequest<I> *req = journal::CreateRequest<I>::create(
379     io_ctx, image_id, order, splay_width, object_pool, cls::journal::Tag::TAG_CLASS_NEW,
380     tag_data, IMAGE_CLIENT_ID, op_work_queue, &cond);
381   req->send();
382
383   return cond.wait();
384 }
385
386 template <typename I>
387 int Journal<I>::remove(librados::IoCtx &io_ctx, const std::string &image_id) {
388   CephContext *cct = reinterpret_cast<CephContext *>(io_ctx.cct());
389   ldout(cct, 5) << __func__ << ": image=" << image_id << dendl;
390
391   ThreadPool *thread_pool;
392   ContextWQ *op_work_queue;
393   ImageCtx::get_thread_pool_instance(cct, &thread_pool, &op_work_queue);
394
395   C_SaferCond cond;
396   journal::RemoveRequest<I> *req = journal::RemoveRequest<I>::create(
397     io_ctx, image_id, IMAGE_CLIENT_ID, op_work_queue, &cond);
398   req->send();
399
400   return cond.wait();
401 }
402
403 template <typename I>
404 int Journal<I>::reset(librados::IoCtx &io_ctx, const std::string &image_id) {
405   CephContext *cct = reinterpret_cast<CephContext *>(io_ctx.cct());
406   ldout(cct, 5) << __func__ << ": image=" << image_id << dendl;
407
408   Journaler journaler(io_ctx, image_id, IMAGE_CLIENT_ID, {});
409
410   C_SaferCond cond;
411   journaler.init(&cond);
412   BOOST_SCOPE_EXIT_ALL(&journaler) {
413     journaler.shut_down();
414   };
415
416   int r = cond.wait();
417   if (r == -ENOENT) {
418     return 0;
419   } else if (r < 0) {
420     lderr(cct) << __func__ << ": "
421                << "failed to initialize journal: " << cpp_strerror(r) << dendl;
422     return r;
423   }
424
425   uint8_t order, splay_width;
426   int64_t pool_id;
427   journaler.get_metadata(&order, &splay_width, &pool_id);
428
429   std::string pool_name;
430   if (pool_id != -1) {
431     librados::Rados rados(io_ctx);
432     r = rados.pool_reverse_lookup(pool_id, &pool_name);
433     if (r < 0) {
434       lderr(cct) << __func__ << ": "
435                  << "failed to lookup data pool: " << cpp_strerror(r) << dendl;
436       return r;
437     }
438   }
439
440   C_SaferCond ctx1;
441   journaler.remove(true, &ctx1);
442   r = ctx1.wait();
443   if (r < 0) {
444     lderr(cct) << __func__ << ": "
445                << "failed to reset journal: " << cpp_strerror(r) << dendl;
446     return r;
447   }
448
449   r = create(io_ctx, image_id, order, splay_width, pool_name);
450   if (r < 0) {
451     lderr(cct) << __func__ << ": "
452                << "failed to create journal: " << cpp_strerror(r) << dendl;
453     return r;
454   }
455   return 0;
456 }
457
458 template <typename I>
459 void Journal<I>::is_tag_owner(I *image_ctx, bool *owner,
460                               Context *on_finish) {
461   Journal<I>::is_tag_owner(image_ctx->md_ctx, image_ctx->id, owner,
462                            image_ctx->op_work_queue, on_finish);
463 }
464
465 template <typename I>
466 void Journal<I>::is_tag_owner(librados::IoCtx& io_ctx, std::string& image_id,
467                               bool *is_tag_owner, ContextWQ *op_work_queue,
468                               Context *on_finish) {
469   CephContext *cct = reinterpret_cast<CephContext*>(io_ctx.cct());
470   ldout(cct, 20) << __func__ << dendl;
471
472   C_IsTagOwner<I> *is_tag_owner_ctx =  new C_IsTagOwner<I>(
473     io_ctx, image_id, is_tag_owner, op_work_queue, on_finish);
474   get_tags(cct, is_tag_owner_ctx->journaler, &is_tag_owner_ctx->client,
475            &is_tag_owner_ctx->client_meta, &is_tag_owner_ctx->tag_tid,
476            &is_tag_owner_ctx->tag_data, is_tag_owner_ctx);
477 }
478
479 template <typename I>
480 void Journal<I>::get_tag_owner(IoCtx& io_ctx, std::string& image_id,
481                                std::string *mirror_uuid,
482                                ContextWQ *op_work_queue, Context *on_finish) {
483   CephContext *cct = (CephContext *)io_ctx.cct();
484   ldout(cct, 20) << __func__ << dendl;
485
486   auto ctx = new C_GetTagOwner(io_ctx, image_id, mirror_uuid, on_finish);
487   get_tags(cct, &ctx->journaler, &ctx->client, &ctx->client_meta, &ctx->tag_tid,
488            &ctx->tag_data, create_async_context_callback(op_work_queue, ctx));
489 }
490
491 template <typename I>
492 int Journal<I>::request_resync(I *image_ctx) {
493   CephContext *cct = image_ctx->cct;
494   ldout(cct, 20) << __func__ << dendl;
495
496   Journaler journaler(image_ctx->md_ctx, image_ctx->id, IMAGE_CLIENT_ID, {});
497
498   Mutex lock("lock");
499   journal::ImageClientMeta client_meta;
500   uint64_t tag_tid;
501   journal::TagData tag_data;
502
503   C_SaferCond open_ctx;
504   auto open_req = journal::OpenRequest<I>::create(image_ctx, &journaler, &lock,
505                                                   &client_meta, &tag_tid,
506                                                   &tag_data, &open_ctx);
507   open_req->send();
508
509   BOOST_SCOPE_EXIT_ALL(&journaler) {
510     journaler.shut_down();
511   };
512
513   int r = open_ctx.wait();
514   if (r < 0) {
515     return r;
516   }
517
518   client_meta.resync_requested = true;
519
520   journal::ClientData client_data(client_meta);
521   bufferlist client_data_bl;
522   ::encode(client_data, client_data_bl);
523
524   C_SaferCond update_client_ctx;
525   journaler.update_client(client_data_bl, &update_client_ctx);
526
527   r = update_client_ctx.wait();
528   if (r < 0) {
529     lderr(cct) << __func__ << ": "
530                << "failed to update client: " << cpp_strerror(r) << dendl;
531     return r;
532   }
533   return 0;
534 }
535
536 template <typename I>
537 void Journal<I>::promote(I *image_ctx, Context *on_finish) {
538   CephContext *cct = image_ctx->cct;
539   ldout(cct, 20) << __func__ << dendl;
540
541   auto promote_req = journal::PromoteRequest<I>::create(image_ctx, false,
542                                                         on_finish);
543   promote_req->send();
544 }
545
546 template <typename I>
547 void Journal<I>::demote(I *image_ctx, Context *on_finish) {
548   CephContext *cct = image_ctx->cct;
549   ldout(cct, 20) << __func__ << dendl;
550
551   auto req = journal::DemoteRequest<I>::create(*image_ctx, on_finish);
552   req->send();
553 }
554
555 template <typename I>
556 bool Journal<I>::is_journal_ready() const {
557   Mutex::Locker locker(m_lock);
558   return (m_state == STATE_READY);
559 }
560
561 template <typename I>
562 bool Journal<I>::is_journal_replaying() const {
563   Mutex::Locker locker(m_lock);
564   return is_journal_replaying(m_lock);
565 }
566
567 template <typename I>
568 bool Journal<I>::is_journal_replaying(const Mutex &) const {
569   assert(m_lock.is_locked());
570   return (m_state == STATE_REPLAYING ||
571           m_state == STATE_FLUSHING_REPLAY ||
572           m_state == STATE_FLUSHING_RESTART ||
573           m_state == STATE_RESTARTING_REPLAY);
574 }
575
576 template <typename I>
577 bool Journal<I>::is_journal_appending() const {
578   assert(m_image_ctx.snap_lock.is_locked());
579   Mutex::Locker locker(m_lock);
580   return (m_state == STATE_READY &&
581           !m_image_ctx.get_journal_policy()->append_disabled());
582 }
583
584 template <typename I>
585 void Journal<I>::wait_for_journal_ready(Context *on_ready) {
586   on_ready = create_async_context_callback(m_image_ctx, on_ready);
587
588   Mutex::Locker locker(m_lock);
589   if (m_state == STATE_READY) {
590     on_ready->complete(m_error_result);
591   } else {
592     wait_for_steady_state(on_ready);
593   }
594 }
595
596 template <typename I>
597 void Journal<I>::open(Context *on_finish) {
598   CephContext *cct = m_image_ctx.cct;
599   ldout(cct, 20) << this << " " << __func__ << dendl;
600
601   on_finish = create_async_context_callback(m_image_ctx, on_finish);
602
603   Mutex::Locker locker(m_lock);
604   assert(m_state == STATE_UNINITIALIZED);
605   wait_for_steady_state(on_finish);
606   create_journaler();
607 }
608
609 template <typename I>
610 void Journal<I>::close(Context *on_finish) {
611   CephContext *cct = m_image_ctx.cct;
612   ldout(cct, 20) << this << " " << __func__ << dendl;
613
614   on_finish = create_async_context_callback(m_image_ctx, on_finish);
615
616   Mutex::Locker locker(m_lock);
617   while (m_listener_notify) {
618     m_listener_cond.Wait(m_lock);
619   }
620
621   Listeners listeners(m_listeners);
622   m_listener_notify = true;
623   m_lock.Unlock();
624   for (auto listener : listeners) {
625     listener->handle_close();
626   }
627
628   m_lock.Lock();
629   m_listener_notify = false;
630   m_listener_cond.Signal();
631
632   assert(m_state != STATE_UNINITIALIZED);
633   if (m_state == STATE_CLOSED) {
634     on_finish->complete(m_error_result);
635     return;
636   }
637
638   if (m_state == STATE_READY) {
639     stop_recording();
640   }
641
642   m_close_pending = true;
643   wait_for_steady_state(on_finish);
644 }
645
646 template <typename I>
647 bool Journal<I>::is_tag_owner() const {
648   Mutex::Locker locker(m_lock);
649   return is_tag_owner(m_lock);
650 }
651
652 template <typename I>
653 bool Journal<I>::is_tag_owner(const Mutex &) const {
654   assert(m_lock.is_locked());
655   return (m_tag_data.mirror_uuid == LOCAL_MIRROR_UUID);
656 }
657
658 template <typename I>
659 uint64_t Journal<I>::get_tag_tid() const {
660   Mutex::Locker locker(m_lock);
661   return m_tag_tid;
662 }
663
664 template <typename I>
665 journal::TagData Journal<I>::get_tag_data() const {
666   Mutex::Locker locker(m_lock);
667   return m_tag_data;
668 }
669
670 template <typename I>
671 void Journal<I>::allocate_local_tag(Context *on_finish) {
672   CephContext *cct = m_image_ctx.cct;
673   ldout(cct, 20) << this << " " << __func__ << dendl;
674
675   journal::TagPredecessor predecessor;
676   predecessor.mirror_uuid = LOCAL_MIRROR_UUID;
677   {
678     Mutex::Locker locker(m_lock);
679     assert(m_journaler != nullptr && is_tag_owner(m_lock));
680
681     cls::journal::Client client;
682     int r = m_journaler->get_cached_client(IMAGE_CLIENT_ID, &client);
683     if (r < 0) {
684       lderr(cct) << this << " " << __func__ << ": "
685                  << "failed to retrieve client: " << cpp_strerror(r) << dendl;
686       m_image_ctx.op_work_queue->queue(on_finish, r);
687       return;
688     }
689
690     // since we are primary, populate the predecessor with our known commit
691     // position
692     assert(m_tag_data.mirror_uuid == LOCAL_MIRROR_UUID);
693     if (!client.commit_position.object_positions.empty()) {
694       auto position = client.commit_position.object_positions.front();
695       predecessor.commit_valid = true;
696       predecessor.tag_tid = position.tag_tid;
697       predecessor.entry_tid = position.entry_tid;
698     }
699   }
700
701   allocate_tag(LOCAL_MIRROR_UUID, predecessor, on_finish);
702 }
703
704 template <typename I>
705 void Journal<I>::allocate_tag(const std::string &mirror_uuid,
706                               const journal::TagPredecessor &predecessor,
707                               Context *on_finish) {
708   CephContext *cct = m_image_ctx.cct;
709   ldout(cct, 20) << this << " " << __func__ << ":  mirror_uuid=" << mirror_uuid
710                  << dendl;
711
712   Mutex::Locker locker(m_lock);
713   assert(m_journaler != nullptr);
714
715   journal::TagData tag_data;
716   tag_data.mirror_uuid = mirror_uuid;
717   tag_data.predecessor = predecessor;
718
719   bufferlist tag_bl;
720   ::encode(tag_data, tag_bl);
721
722   C_DecodeTag *decode_tag_ctx = new C_DecodeTag(cct, &m_lock, &m_tag_tid,
723                                                 &m_tag_data, on_finish);
724   m_journaler->allocate_tag(m_tag_class, tag_bl, &decode_tag_ctx->tag,
725                             decode_tag_ctx);
726 }
727
728 template <typename I>
729 void Journal<I>::flush_commit_position(Context *on_finish) {
730   CephContext *cct = m_image_ctx.cct;
731   ldout(cct, 20) << this << " " << __func__ << dendl;
732
733   Mutex::Locker locker(m_lock);
734   assert(m_journaler != nullptr);
735   m_journaler->flush_commit_position(on_finish);
736 }
737
738 template <typename I>
739 uint64_t Journal<I>::append_write_event(uint64_t offset, size_t length,
740                                         const bufferlist &bl,
741                                         const IOObjectRequests &requests,
742                                         bool flush_entry) {
743   assert(m_max_append_size > journal::AioWriteEvent::get_fixed_size());
744   uint64_t max_write_data_size =
745     m_max_append_size - journal::AioWriteEvent::get_fixed_size();
746
747   // ensure that the write event fits within the journal entry
748   Bufferlists bufferlists;
749   uint64_t bytes_remaining = length;
750   uint64_t event_offset = 0;
751   do {
752     uint64_t event_length = MIN(bytes_remaining, max_write_data_size);
753
754     bufferlist event_bl;
755     event_bl.substr_of(bl, event_offset, event_length);
756     journal::EventEntry event_entry(journal::AioWriteEvent(offset + event_offset,
757                                                            event_length,
758                                                            event_bl),
759                                     ceph_clock_now());
760
761     bufferlists.emplace_back();
762     ::encode(event_entry, bufferlists.back());
763
764     event_offset += event_length;
765     bytes_remaining -= event_length;
766   } while (bytes_remaining > 0);
767
768   return append_io_events(journal::EVENT_TYPE_AIO_WRITE, bufferlists, requests,
769                           offset, length, flush_entry);
770 }
771
772 template <typename I>
773 uint64_t Journal<I>::append_io_event(journal::EventEntry &&event_entry,
774                                      const IOObjectRequests &requests,
775                                      uint64_t offset, size_t length,
776                                      bool flush_entry) {
777   bufferlist bl;
778   event_entry.timestamp = ceph_clock_now();
779   ::encode(event_entry, bl);
780   return append_io_events(event_entry.get_event_type(), {bl}, requests, offset,
781                           length, flush_entry);
782 }
783
784 template <typename I>
785 uint64_t Journal<I>::append_io_events(journal::EventType event_type,
786                                       const Bufferlists &bufferlists,
787                                       const IOObjectRequests &requests,
788                                       uint64_t offset, size_t length,
789                                       bool flush_entry) {
790   assert(!bufferlists.empty());
791
792   uint64_t tid;
793   {
794     Mutex::Locker locker(m_lock);
795     assert(m_state == STATE_READY);
796
797     tid = ++m_event_tid;
798     assert(tid != 0);
799   }
800
801   Futures futures;
802   for (auto &bl : bufferlists) {
803     assert(bl.length() <= m_max_append_size);
804     futures.push_back(m_journaler->append(m_tag_tid, bl));
805   }
806
807   {
808     Mutex::Locker event_locker(m_event_lock);
809     m_events[tid] = Event(futures, requests, offset, length);
810   }
811
812   CephContext *cct = m_image_ctx.cct;
813   ldout(cct, 20) << this << " " << __func__ << ": "
814                  << "event=" << event_type << ", "
815                  << "new_reqs=" << requests.size() << ", "
816                  << "offset=" << offset << ", "
817                  << "length=" << length << ", "
818                  << "flush=" << flush_entry << ", tid=" << tid << dendl;
819
820   Context *on_safe = create_async_context_callback(
821     m_image_ctx, new C_IOEventSafe(this, tid));
822   if (flush_entry) {
823     futures.back().flush(on_safe);
824   } else {
825     futures.back().wait(on_safe);
826   }
827
828   return tid;
829 }
830
831 template <typename I>
832 void Journal<I>::commit_io_event(uint64_t tid, int r) {
833   CephContext *cct = m_image_ctx.cct;
834   ldout(cct, 20) << this << " " << __func__ << ": tid=" << tid << ", "
835                  "r=" << r << dendl;
836
837   Mutex::Locker event_locker(m_event_lock);
838   typename Events::iterator it = m_events.find(tid);
839   if (it == m_events.end()) {
840     return;
841   }
842   complete_event(it, r);
843 }
844
845 template <typename I>
846 void Journal<I>::commit_io_event_extent(uint64_t tid, uint64_t offset,
847                                         uint64_t length, int r) {
848   assert(length > 0);
849
850   CephContext *cct = m_image_ctx.cct;
851   ldout(cct, 20) << this << " " << __func__ << ": tid=" << tid << ", "
852                  << "offset=" << offset << ", "
853                  << "length=" << length << ", "
854                  << "r=" << r << dendl;
855
856   Mutex::Locker event_locker(m_event_lock);
857   typename Events::iterator it = m_events.find(tid);
858   if (it == m_events.end()) {
859     return;
860   }
861
862   Event &event = it->second;
863   if (event.ret_val == 0 && r < 0) {
864     event.ret_val = r;
865   }
866
867   ExtentInterval extent;
868   extent.insert(offset, length);
869
870   ExtentInterval intersect;
871   intersect.intersection_of(extent, event.pending_extents);
872
873   event.pending_extents.subtract(intersect);
874   if (!event.pending_extents.empty()) {
875     ldout(cct, 20) << this << " " << __func__ << ": "
876                    << "pending extents: " << event.pending_extents << dendl;
877     return;
878   }
879   complete_event(it, event.ret_val);
880 }
881
882 template <typename I>
883 void Journal<I>::append_op_event(uint64_t op_tid,
884                                  journal::EventEntry &&event_entry,
885                                  Context *on_safe) {
886   assert(m_image_ctx.owner_lock.is_locked());
887
888   bufferlist bl;
889   event_entry.timestamp = ceph_clock_now();
890   ::encode(event_entry, bl);
891
892   Future future;
893   {
894     Mutex::Locker locker(m_lock);
895     assert(m_state == STATE_READY);
896
897     future = m_journaler->append(m_tag_tid, bl);
898
899     // delay committing op event to ensure consistent replay
900     assert(m_op_futures.count(op_tid) == 0);
901     m_op_futures[op_tid] = future;
902   }
903
904   on_safe = create_async_context_callback(m_image_ctx, on_safe);
905   on_safe = new FunctionContext([this, on_safe](int r) {
906       // ensure all committed IO before this op is committed
907       m_journaler->flush_commit_position(on_safe);
908     });
909   future.flush(on_safe);
910
911   CephContext *cct = m_image_ctx.cct;
912   ldout(cct, 10) << this << " " << __func__ << ": "
913                  << "op_tid=" << op_tid << ", "
914                  << "event=" << event_entry.get_event_type() << dendl;
915 }
916
917 template <typename I>
918 void Journal<I>::commit_op_event(uint64_t op_tid, int r, Context *on_safe) {
919   CephContext *cct = m_image_ctx.cct;
920   ldout(cct, 10) << this << " " << __func__ << ": op_tid=" << op_tid << ", "
921                  << "r=" << r << dendl;
922
923   journal::EventEntry event_entry((journal::OpFinishEvent(op_tid, r)),
924                                   ceph_clock_now());
925
926   bufferlist bl;
927   ::encode(event_entry, bl);
928
929   Future op_start_future;
930   Future op_finish_future;
931   {
932     Mutex::Locker locker(m_lock);
933     assert(m_state == STATE_READY);
934
935     // ready to commit op event
936     auto it = m_op_futures.find(op_tid);
937     assert(it != m_op_futures.end());
938     op_start_future = it->second;
939     m_op_futures.erase(it);
940
941     op_finish_future = m_journaler->append(m_tag_tid, bl);
942   }
943
944   op_finish_future.flush(create_async_context_callback(
945     m_image_ctx, new C_OpEventSafe(this, op_tid, op_start_future,
946                                    op_finish_future, on_safe)));
947 }
948
949 template <typename I>
950 void Journal<I>::replay_op_ready(uint64_t op_tid, Context *on_resume) {
951   CephContext *cct = m_image_ctx.cct;
952   ldout(cct, 10) << this << " " << __func__ << ": op_tid=" << op_tid << dendl;
953
954   {
955     Mutex::Locker locker(m_lock);
956     assert(m_journal_replay != nullptr);
957     m_journal_replay->replay_op_ready(op_tid, on_resume);
958   }
959 }
960
961 template <typename I>
962 void Journal<I>::flush_event(uint64_t tid, Context *on_safe) {
963   CephContext *cct = m_image_ctx.cct;
964   ldout(cct, 20) << this << " " << __func__ << ": tid=" << tid << ", "
965                  << "on_safe=" << on_safe << dendl;
966
967   Future future;
968   {
969     Mutex::Locker event_locker(m_event_lock);
970     future = wait_event(m_lock, tid, on_safe);
971   }
972
973   if (future.is_valid()) {
974     future.flush(nullptr);
975   }
976 }
977
978 template <typename I>
979 void Journal<I>::wait_event(uint64_t tid, Context *on_safe) {
980   CephContext *cct = m_image_ctx.cct;
981   ldout(cct, 20) << this << " " << __func__ << ": tid=" << tid << ", "
982                  << "on_safe=" << on_safe << dendl;
983
984   Mutex::Locker event_locker(m_event_lock);
985   wait_event(m_lock, tid, on_safe);
986 }
987
988 template <typename I>
989 typename Journal<I>::Future Journal<I>::wait_event(Mutex &lock, uint64_t tid,
990                                                    Context *on_safe) {
991   assert(m_event_lock.is_locked());
992   CephContext *cct = m_image_ctx.cct;
993
994   typename Events::iterator it = m_events.find(tid);
995   assert(it != m_events.end());
996
997   Event &event = it->second;
998   if (event.safe) {
999     // journal entry already safe
1000     ldout(cct, 20) << this << " " << __func__ << ": "
1001                    << "journal entry already safe" << dendl;
1002     m_image_ctx.op_work_queue->queue(on_safe, event.ret_val);
1003     return Future();
1004   }
1005
1006   event.on_safe_contexts.push_back(create_async_context_callback(m_image_ctx,
1007                                                                  on_safe));
1008   return event.futures.back();
1009 }
1010
1011 template <typename I>
1012 void Journal<I>::start_external_replay(journal::Replay<I> **journal_replay,
1013                                        Context *on_start) {
1014   CephContext *cct = m_image_ctx.cct;
1015   ldout(cct, 20) << this << " " << __func__ << dendl;
1016
1017   Mutex::Locker locker(m_lock);
1018   assert(m_state == STATE_READY);
1019   assert(m_journal_replay == nullptr);
1020
1021   on_start = util::create_async_context_callback(m_image_ctx, on_start);
1022   on_start = new FunctionContext(
1023     [this, journal_replay, on_start](int r) {
1024       handle_start_external_replay(r, journal_replay, on_start);
1025     });
1026
1027   // safely flush all in-flight events before starting external replay
1028   m_journaler->stop_append(util::create_async_context_callback(m_image_ctx,
1029                                                                on_start));
1030 }
1031
1032 template <typename I>
1033 void Journal<I>::handle_start_external_replay(int r,
1034                                               journal::Replay<I> **journal_replay,
1035                                               Context *on_finish) {
1036   CephContext *cct = m_image_ctx.cct;
1037   ldout(cct, 20) << this << " " << __func__ << dendl;
1038
1039   Mutex::Locker locker(m_lock);
1040   assert(m_state == STATE_READY);
1041   assert(m_journal_replay == nullptr);
1042
1043   if (r < 0) {
1044     lderr(cct) << this << " " << __func__ << ": "
1045                << "failed to stop recording: " << cpp_strerror(r) << dendl;
1046     *journal_replay = nullptr;
1047
1048     // get back to a sane-state
1049     start_append();
1050     on_finish->complete(r);
1051     return;
1052   }
1053
1054   transition_state(STATE_REPLAYING, 0);
1055   m_journal_replay = journal::Replay<I>::create(m_image_ctx);
1056   *journal_replay = m_journal_replay;
1057   on_finish->complete(0);
1058 }
1059
1060 template <typename I>
1061 void Journal<I>::stop_external_replay() {
1062   CephContext *cct = m_image_ctx.cct;
1063   ldout(cct, 20) << this << " " << __func__ << dendl;
1064
1065   Mutex::Locker locker(m_lock);
1066   assert(m_journal_replay != nullptr);
1067   assert(m_state == STATE_REPLAYING);
1068
1069   delete m_journal_replay;
1070   m_journal_replay = nullptr;
1071
1072   if (m_close_pending) {
1073     destroy_journaler(0);
1074     return;
1075   }
1076
1077   start_append();
1078 }
1079
1080 template <typename I>
1081 void Journal<I>::create_journaler() {
1082   CephContext *cct = m_image_ctx.cct;
1083   ldout(cct, 20) << this << " " << __func__ << dendl;
1084
1085   assert(m_lock.is_locked());
1086   assert(m_state == STATE_UNINITIALIZED || m_state == STATE_RESTARTING_REPLAY);
1087   assert(m_journaler == NULL);
1088
1089   transition_state(STATE_INITIALIZING, 0);
1090   ::journal::Settings settings;
1091   settings.commit_interval = m_image_ctx.journal_commit_age;
1092   settings.max_payload_bytes = m_image_ctx.journal_max_payload_bytes;
1093   settings.max_concurrent_object_sets =
1094     m_image_ctx.journal_max_concurrent_object_sets;
1095   // TODO: a configurable filter to exclude certain peers from being
1096   // disconnected.
1097   settings.whitelisted_laggy_clients = {IMAGE_CLIENT_ID};
1098
1099   m_journaler = new Journaler(m_work_queue, m_timer, m_timer_lock,
1100                               m_image_ctx.md_ctx, m_image_ctx.id,
1101                               IMAGE_CLIENT_ID, settings);
1102   m_journaler->add_listener(&m_metadata_listener);
1103
1104   Context *ctx = create_async_context_callback(
1105     m_image_ctx, create_context_callback<
1106       Journal<I>, &Journal<I>::handle_open>(this));
1107   auto open_req = journal::OpenRequest<I>::create(&m_image_ctx, m_journaler,
1108                                                   &m_lock, &m_client_meta,
1109                                                   &m_tag_tid, &m_tag_data, ctx);
1110   open_req->send();
1111 }
1112
1113 template <typename I>
1114 void Journal<I>::destroy_journaler(int r) {
1115   CephContext *cct = m_image_ctx.cct;
1116   ldout(cct, 20) << this << " " << __func__ << ": r=" << r << dendl;
1117
1118   assert(m_lock.is_locked());
1119
1120   delete m_journal_replay;
1121   m_journal_replay = NULL;
1122
1123   m_journaler->remove_listener(&m_metadata_listener);
1124
1125   transition_state(STATE_CLOSING, r);
1126
1127   Context *ctx = create_async_context_callback(
1128     m_image_ctx, create_context_callback<
1129       Journal<I>, &Journal<I>::handle_journal_destroyed>(this));
1130   ctx = new FunctionContext(
1131     [this, ctx](int r) {
1132       Mutex::Locker locker(m_lock);
1133       m_journaler->shut_down(ctx);
1134     });
1135   m_async_journal_op_tracker.wait(m_image_ctx, ctx);
1136 }
1137
1138 template <typename I>
1139 void Journal<I>::recreate_journaler(int r) {
1140   CephContext *cct = m_image_ctx.cct;
1141   ldout(cct, 20) << this << " " << __func__ << ": r=" << r << dendl;
1142
1143   assert(m_lock.is_locked());
1144   assert(m_state == STATE_FLUSHING_RESTART ||
1145          m_state == STATE_FLUSHING_REPLAY);
1146
1147   delete m_journal_replay;
1148   m_journal_replay = NULL;
1149
1150   m_journaler->remove_listener(&m_metadata_listener);
1151
1152   transition_state(STATE_RESTARTING_REPLAY, r);
1153   m_journaler->shut_down(create_async_context_callback(
1154     m_image_ctx, create_context_callback<
1155       Journal<I>, &Journal<I>::handle_journal_destroyed>(this)));
1156 }
1157
1158 template <typename I>
1159 void Journal<I>::complete_event(typename Events::iterator it, int r) {
1160   assert(m_event_lock.is_locked());
1161   assert(m_state == STATE_READY);
1162
1163   CephContext *cct = m_image_ctx.cct;
1164   ldout(cct, 20) << this << " " << __func__ << ": tid=" << it->first << " "
1165                  << "r=" << r << dendl;
1166
1167   Event &event = it->second;
1168   if (r < 0) {
1169     // event recorded to journal but failed to update disk, we cannot
1170     // commit this IO event. this event must be replayed.
1171     assert(event.safe);
1172     lderr(cct) << this << " " << __func__ << ": "
1173                << "failed to commit IO to disk, replay required: "
1174                << cpp_strerror(r) << dendl;
1175   }
1176
1177   event.committed_io = true;
1178   if (event.safe) {
1179     if (r >= 0) {
1180       for (auto &future : event.futures) {
1181         m_journaler->committed(future);
1182       }
1183     }
1184     m_events.erase(it);
1185   }
1186 }
1187
1188 template <typename I>
1189 void Journal<I>::start_append() {
1190   assert(m_lock.is_locked());
1191   m_journaler->start_append(m_image_ctx.journal_object_flush_interval,
1192                             m_image_ctx.journal_object_flush_bytes,
1193                             m_image_ctx.journal_object_flush_age);
1194   transition_state(STATE_READY, 0);
1195 }
1196
1197 template <typename I>
1198 void Journal<I>::handle_open(int r) {
1199   CephContext *cct = m_image_ctx.cct;
1200   ldout(cct, 20) << this << " " << __func__ << ": r=" << r << dendl;
1201
1202   Mutex::Locker locker(m_lock);
1203   assert(m_state == STATE_INITIALIZING);
1204
1205   if (r < 0) {
1206     lderr(cct) << this << " " << __func__ << ": "
1207                << "failed to initialize journal: " << cpp_strerror(r)
1208                << dendl;
1209     destroy_journaler(r);
1210     return;
1211   }
1212
1213   m_tag_class = m_client_meta.tag_class;
1214   m_max_append_size = m_journaler->get_max_append_size();
1215   ldout(cct, 20) << this << " " << __func__ << ": "
1216                  << "tag_class=" << m_tag_class << ", "
1217                  << "max_append_size=" << m_max_append_size << dendl;
1218
1219   transition_state(STATE_REPLAYING, 0);
1220   m_journal_replay = journal::Replay<I>::create(m_image_ctx);
1221   m_journaler->start_replay(&m_replay_handler);
1222 }
1223
1224 template <typename I>
1225 void Journal<I>::handle_replay_ready() {
1226   CephContext *cct = m_image_ctx.cct;
1227   ReplayEntry replay_entry;
1228   {
1229     Mutex::Locker locker(m_lock);
1230     if (m_state != STATE_REPLAYING) {
1231       return;
1232     }
1233
1234     ldout(cct, 20) << this << " " << __func__ << dendl;
1235     if (!m_journaler->try_pop_front(&replay_entry)) {
1236       return;
1237     }
1238
1239     // only one entry should be in-flight at a time
1240     assert(!m_processing_entry);
1241     m_processing_entry = true;
1242   }
1243
1244   bufferlist data = replay_entry.get_data();
1245   bufferlist::iterator it = data.begin();
1246
1247   journal::EventEntry event_entry;
1248   int r = m_journal_replay->decode(&it, &event_entry);
1249   if (r < 0) {
1250     lderr(cct) << this << " " << __func__ << ": "
1251                << "failed to decode journal event entry" << dendl;
1252     handle_replay_process_safe(replay_entry, r);
1253     return;
1254   }
1255
1256   Context *on_ready = create_context_callback<
1257     Journal<I>, &Journal<I>::handle_replay_process_ready>(this);
1258   Context *on_commit = new C_ReplayProcessSafe(this, std::move(replay_entry));
1259   m_journal_replay->process(event_entry, on_ready, on_commit);
1260 }
1261
1262 template <typename I>
1263 void Journal<I>::handle_replay_complete(int r) {
1264   CephContext *cct = m_image_ctx.cct;
1265
1266   bool cancel_ops = false;
1267   {
1268     Mutex::Locker locker(m_lock);
1269     if (m_state != STATE_REPLAYING) {
1270       return;
1271     }
1272
1273     ldout(cct, 20) << this << " " << __func__ << ": r=" << r << dendl;
1274     if (r < 0) {
1275       cancel_ops = true;
1276       transition_state(STATE_FLUSHING_RESTART, r);
1277     } else {
1278       // state might change back to FLUSHING_RESTART on flush error
1279       transition_state(STATE_FLUSHING_REPLAY, 0);
1280     }
1281   }
1282
1283   Context *ctx = new FunctionContext([this, cct](int r) {
1284       ldout(cct, 20) << this << " handle_replay_complete: "
1285                      << "handle shut down replay" << dendl;
1286
1287       State state;
1288       {
1289         Mutex::Locker locker(m_lock);
1290         assert(m_state == STATE_FLUSHING_RESTART ||
1291                m_state == STATE_FLUSHING_REPLAY);
1292         state = m_state;
1293       }
1294
1295       if (state == STATE_FLUSHING_RESTART) {
1296         handle_flushing_restart(0);
1297       } else {
1298         handle_flushing_replay();
1299       }
1300     });
1301   ctx = new FunctionContext([this, cct, cancel_ops, ctx](int r) {
1302       ldout(cct, 20) << this << " handle_replay_complete: "
1303                      << "shut down replay" << dendl;
1304       m_journal_replay->shut_down(cancel_ops, ctx);
1305     });
1306   m_journaler->stop_replay(ctx);
1307 }
1308
1309 template <typename I>
1310 void Journal<I>::handle_replay_process_ready(int r) {
1311   // journal::Replay is ready for more events -- attempt to pop another
1312   CephContext *cct = m_image_ctx.cct;
1313   ldout(cct, 20) << this << " " << __func__ << dendl;
1314
1315   assert(r == 0);
1316   {
1317     Mutex::Locker locker(m_lock);
1318     assert(m_processing_entry);
1319     m_processing_entry = false;
1320   }
1321   handle_replay_ready();
1322 }
1323
1324 template <typename I>
1325 void Journal<I>::handle_replay_process_safe(ReplayEntry replay_entry, int r) {
1326   CephContext *cct = m_image_ctx.cct;
1327
1328   m_lock.Lock();
1329   assert(m_state == STATE_REPLAYING ||
1330          m_state == STATE_FLUSHING_RESTART ||
1331          m_state == STATE_FLUSHING_REPLAY);
1332
1333   ldout(cct, 20) << this << " " << __func__ << ": r=" << r << dendl;
1334   if (r < 0) {
1335     if (r != -ECANCELED) {
1336       lderr(cct) << this << " " << __func__ << ": "
1337                  << "failed to commit journal event to disk: "
1338                  << cpp_strerror(r) << dendl;
1339     }
1340
1341     if (m_state == STATE_REPLAYING) {
1342       // abort the replay if we have an error
1343       transition_state(STATE_FLUSHING_RESTART, r);
1344       m_lock.Unlock();
1345
1346       // stop replay, shut down, and restart
1347       Context *ctx = new FunctionContext([this, cct](int r) {
1348           ldout(cct, 20) << this << " handle_replay_process_safe: "
1349                          << "shut down replay" << dendl;
1350           {
1351             Mutex::Locker locker(m_lock);
1352             assert(m_state == STATE_FLUSHING_RESTART);
1353           }
1354
1355           m_journal_replay->shut_down(true, create_context_callback<
1356             Journal<I>, &Journal<I>::handle_flushing_restart>(this));
1357         });
1358       m_journaler->stop_replay(ctx);
1359       return;
1360     } else if (m_state == STATE_FLUSHING_REPLAY) {
1361       // end-of-replay flush in-progress -- we need to restart replay
1362       transition_state(STATE_FLUSHING_RESTART, r);
1363       m_lock.Unlock();
1364       return;
1365     }
1366   } else {
1367     // only commit the entry if written successfully
1368     m_journaler->committed(replay_entry);
1369   }
1370   m_lock.Unlock();
1371 }
1372
1373 template <typename I>
1374 void Journal<I>::handle_flushing_restart(int r) {
1375   Mutex::Locker locker(m_lock);
1376
1377   CephContext *cct = m_image_ctx.cct;
1378   ldout(cct, 20) << this << " " << __func__ << dendl;
1379
1380   assert(r == 0);
1381   assert(m_state == STATE_FLUSHING_RESTART);
1382   if (m_close_pending) {
1383     destroy_journaler(r);
1384     return;
1385   }
1386
1387   recreate_journaler(r);
1388 }
1389
1390 template <typename I>
1391 void Journal<I>::handle_flushing_replay() {
1392   Mutex::Locker locker(m_lock);
1393
1394   CephContext *cct = m_image_ctx.cct;
1395   ldout(cct, 20) << this << " " << __func__ << dendl;
1396
1397   assert(m_state == STATE_FLUSHING_REPLAY || m_state == STATE_FLUSHING_RESTART);
1398   if (m_close_pending) {
1399     destroy_journaler(0);
1400     return;
1401   } else if (m_state == STATE_FLUSHING_RESTART) {
1402     // failed to replay one-or-more events -- restart
1403     recreate_journaler(0);
1404     return;
1405   }
1406
1407   delete m_journal_replay;
1408   m_journal_replay = NULL;
1409
1410   m_error_result = 0;
1411   start_append();
1412 }
1413
1414 template <typename I>
1415 void Journal<I>::handle_recording_stopped(int r) {
1416   CephContext *cct = m_image_ctx.cct;
1417   ldout(cct, 20) << this << " " << __func__ << ": r=" << r << dendl;
1418
1419   Mutex::Locker locker(m_lock);
1420   assert(m_state == STATE_STOPPING);
1421
1422   destroy_journaler(r);
1423 }
1424
1425 template <typename I>
1426 void Journal<I>::handle_journal_destroyed(int r) {
1427   CephContext *cct = m_image_ctx.cct;
1428   ldout(cct, 20) << this << " " << __func__ << ": r=" << r << dendl;
1429
1430   if (r < 0) {
1431     lderr(cct) << this << " " << __func__
1432                << "error detected while closing journal: " << cpp_strerror(r)
1433                << dendl;
1434   }
1435
1436   Mutex::Locker locker(m_lock);
1437   delete m_journaler;
1438   m_journaler = nullptr;
1439
1440   assert(m_state == STATE_CLOSING || m_state == STATE_RESTARTING_REPLAY);
1441   if (m_state == STATE_RESTARTING_REPLAY) {
1442     create_journaler();
1443     return;
1444   }
1445
1446   transition_state(STATE_CLOSED, r);
1447 }
1448
1449 template <typename I>
1450 void Journal<I>::handle_io_event_safe(int r, uint64_t tid) {
1451   CephContext *cct = m_image_ctx.cct;
1452   ldout(cct, 20) << this << " " << __func__ << ": r=" << r << ", "
1453                  << "tid=" << tid << dendl;
1454
1455   // journal will be flushed before closing
1456   assert(m_state == STATE_READY || m_state == STATE_STOPPING);
1457   if (r < 0) {
1458     lderr(cct) << this << " " << __func__ << ": "
1459                << "failed to commit IO event: "  << cpp_strerror(r) << dendl;
1460   }
1461
1462   IOObjectRequests aio_object_requests;
1463   Contexts on_safe_contexts;
1464   {
1465     Mutex::Locker event_locker(m_event_lock);
1466     typename Events::iterator it = m_events.find(tid);
1467     assert(it != m_events.end());
1468
1469     Event &event = it->second;
1470     aio_object_requests.swap(event.aio_object_requests);
1471     on_safe_contexts.swap(event.on_safe_contexts);
1472
1473     if (r < 0 || event.committed_io) {
1474       // failed journal write so IO won't be sent -- or IO extent was
1475       // overwritten by future IO operations so this was a no-op IO event
1476       event.ret_val = r;
1477       for (auto &future : event.futures) {
1478         m_journaler->committed(future);
1479       }
1480     }
1481
1482     if (event.committed_io) {
1483       m_events.erase(it);
1484     } else {
1485       event.safe = true;
1486     }
1487   }
1488
1489   ldout(cct, 20) << this << " " << __func__ << ": "
1490                  << "completing tid=" << tid << dendl;
1491   for (IOObjectRequests::iterator it = aio_object_requests.begin();
1492        it != aio_object_requests.end(); ++it) {
1493     if (r < 0) {
1494       // don't send aio requests if the journal fails -- bubble error up
1495       (*it)->complete(r);
1496     } else {
1497       // send any waiting aio requests now that journal entry is safe
1498       (*it)->send();
1499     }
1500   }
1501
1502   // alert the cache about the journal event status
1503   for (Contexts::iterator it = on_safe_contexts.begin();
1504        it != on_safe_contexts.end(); ++it) {
1505     (*it)->complete(r);
1506   }
1507 }
1508
1509 template <typename I>
1510 void Journal<I>::handle_op_event_safe(int r, uint64_t tid,
1511                                       const Future &op_start_future,
1512                                       const Future &op_finish_future,
1513                                       Context *on_safe) {
1514   CephContext *cct = m_image_ctx.cct;
1515   ldout(cct, 20) << this << " " << __func__ << ": r=" << r << ", "
1516                  << "tid=" << tid << dendl;
1517
1518   // journal will be flushed before closing
1519   assert(m_state == STATE_READY || m_state == STATE_STOPPING);
1520   if (r < 0) {
1521     lderr(cct) << this << " " << __func__ << ": "
1522                << "failed to commit op event: "  << cpp_strerror(r) << dendl;
1523   }
1524
1525   m_journaler->committed(op_start_future);
1526   m_journaler->committed(op_finish_future);
1527
1528   // reduce the replay window after committing an op event
1529   m_journaler->flush_commit_position(on_safe);
1530 }
1531
1532 template <typename I>
1533 void Journal<I>::stop_recording() {
1534   assert(m_lock.is_locked());
1535   assert(m_journaler != NULL);
1536
1537   assert(m_state == STATE_READY);
1538   transition_state(STATE_STOPPING, 0);
1539
1540   m_journaler->stop_append(util::create_async_context_callback(
1541     m_image_ctx, create_context_callback<
1542       Journal<I>, &Journal<I>::handle_recording_stopped>(this)));
1543 }
1544
1545 template <typename I>
1546 void Journal<I>::transition_state(State state, int r) {
1547   CephContext *cct = m_image_ctx.cct;
1548   ldout(cct, 20) << this << " " << __func__ << ": new state=" << state << dendl;
1549   assert(m_lock.is_locked());
1550   m_state = state;
1551
1552   if (m_error_result == 0 && r < 0) {
1553     m_error_result = r;
1554   }
1555
1556   if (is_steady_state()) {
1557     Contexts wait_for_state_contexts(std::move(m_wait_for_state_contexts));
1558     for (auto ctx : wait_for_state_contexts) {
1559       ctx->complete(m_error_result);
1560     }
1561   }
1562 }
1563
1564 template <typename I>
1565 bool Journal<I>::is_steady_state() const {
1566   assert(m_lock.is_locked());
1567   switch (m_state) {
1568   case STATE_READY:
1569   case STATE_CLOSED:
1570     return true;
1571   case STATE_UNINITIALIZED:
1572   case STATE_INITIALIZING:
1573   case STATE_REPLAYING:
1574   case STATE_FLUSHING_RESTART:
1575   case STATE_RESTARTING_REPLAY:
1576   case STATE_FLUSHING_REPLAY:
1577   case STATE_STOPPING:
1578   case STATE_CLOSING:
1579     break;
1580   }
1581   return false;
1582 }
1583
1584 template <typename I>
1585 void Journal<I>::wait_for_steady_state(Context *on_state) {
1586   assert(m_lock.is_locked());
1587   assert(!is_steady_state());
1588
1589   CephContext *cct = m_image_ctx.cct;
1590   ldout(cct, 20) << this << " " << __func__ << ": on_state=" << on_state
1591                  << dendl;
1592   m_wait_for_state_contexts.push_back(on_state);
1593 }
1594
1595 template <typename I>
1596 int Journal<I>::is_resync_requested(bool *do_resync) {
1597   Mutex::Locker l(m_lock);
1598   return check_resync_requested(do_resync);
1599 }
1600
1601 template <typename I>
1602 int Journal<I>::check_resync_requested(bool *do_resync) {
1603   CephContext *cct = m_image_ctx.cct;
1604   ldout(cct, 20) << this << " " << __func__ << dendl;
1605
1606   assert(m_lock.is_locked());
1607   assert(do_resync != nullptr);
1608
1609   cls::journal::Client client;
1610   int r = m_journaler->get_cached_client(IMAGE_CLIENT_ID, &client);
1611   if (r < 0) {
1612      lderr(cct) << this << " " << __func__ << ": "
1613                 << "failed to retrieve client: " << cpp_strerror(r) << dendl;
1614      return r;
1615   }
1616
1617   librbd::journal::ClientData client_data;
1618   bufferlist::iterator bl_it = client.data.begin();
1619   try {
1620     ::decode(client_data, bl_it);
1621   } catch (const buffer::error &err) {
1622     lderr(cct) << this << " " << __func__ << ": "
1623                << "failed to decode client data: " << err << dendl;
1624     return -EINVAL;
1625   }
1626
1627   journal::ImageClientMeta *image_client_meta =
1628     boost::get<journal::ImageClientMeta>(&client_data.client_meta);
1629   if (image_client_meta == nullptr) {
1630     lderr(cct) << this << " " << __func__ << ": "
1631                << "failed to access image client meta struct" << dendl;
1632     return -EINVAL;
1633   }
1634
1635   *do_resync = image_client_meta->resync_requested;
1636
1637   return 0;
1638 }
1639
1640 struct C_RefreshTags : public Context {
1641   util::AsyncOpTracker &async_op_tracker;
1642   Context *on_finish = nullptr;
1643
1644   Mutex lock;
1645   uint64_t tag_tid;
1646   journal::TagData tag_data;
1647
1648   C_RefreshTags(util::AsyncOpTracker &async_op_tracker)
1649     : async_op_tracker(async_op_tracker),
1650       lock("librbd::Journal::C_RefreshTags::lock") {
1651     async_op_tracker.start_op();
1652   }
1653   ~C_RefreshTags() override {
1654      async_op_tracker.finish_op();
1655   }
1656
1657   void finish(int r) override {
1658     on_finish->complete(r);
1659   }
1660 };
1661
1662 template <typename I>
1663 void Journal<I>::handle_metadata_updated() {
1664   CephContext *cct = m_image_ctx.cct;
1665   Mutex::Locker locker(m_lock);
1666
1667   if (m_state != STATE_READY && !is_journal_replaying(m_lock)) {
1668     return;
1669   } else if (is_tag_owner(m_lock)) {
1670     ldout(cct, 20) << this << " " << __func__ << ": primary image" << dendl;
1671     return;
1672   } else if (m_listeners.empty()) {
1673     ldout(cct, 20) << this << " " << __func__ << ": no listeners" << dendl;
1674     return;
1675   }
1676
1677   uint64_t refresh_sequence = ++m_refresh_sequence;
1678   ldout(cct, 20) << this << " " << __func__ << ": "
1679                  << "refresh_sequence=" << refresh_sequence << dendl;
1680
1681   // pull the most recent tags from the journal, decode, and
1682   // update the internal tag state
1683   C_RefreshTags *refresh_ctx = new C_RefreshTags(m_async_journal_op_tracker);
1684   refresh_ctx->on_finish = new FunctionContext(
1685     [this, refresh_sequence, refresh_ctx](int r) {
1686       handle_refresh_metadata(refresh_sequence, refresh_ctx->tag_tid,
1687                               refresh_ctx->tag_data, r);
1688     });
1689   C_DecodeTags *decode_tags_ctx = new C_DecodeTags(
1690       cct, &refresh_ctx->lock, &refresh_ctx->tag_tid,
1691       &refresh_ctx->tag_data, refresh_ctx);
1692   m_journaler->get_tags(m_tag_tid == 0 ? 0 : m_tag_tid - 1, m_tag_class,
1693                         &decode_tags_ctx->tags, decode_tags_ctx);
1694 }
1695
1696 template <typename I>
1697 void Journal<I>::handle_refresh_metadata(uint64_t refresh_sequence,
1698                                          uint64_t tag_tid,
1699                                          journal::TagData tag_data, int r) {
1700   CephContext *cct = m_image_ctx.cct;
1701   Mutex::Locker locker(m_lock);
1702
1703   if (r < 0) {
1704     lderr(cct) << this << " " << __func__ << ": failed to refresh metadata: "
1705                << cpp_strerror(r) << dendl;
1706     return;
1707   } else if (m_state != STATE_READY && !is_journal_replaying(m_lock)) {
1708     return;
1709   } else if (refresh_sequence != m_refresh_sequence) {
1710     // another, more up-to-date refresh is in-flight
1711     return;
1712   }
1713
1714   ldout(cct, 20) << this << " " << __func__ << ": "
1715                  << "refresh_sequence=" << refresh_sequence << ", "
1716                  << "tag_tid=" << tag_tid << ", "
1717                  << "tag_data=" << tag_data << dendl;
1718   while (m_listener_notify) {
1719     m_listener_cond.Wait(m_lock);
1720   }
1721
1722   bool was_tag_owner = is_tag_owner(m_lock);
1723   if (m_tag_tid < tag_tid) {
1724     m_tag_tid = tag_tid;
1725     m_tag_data = tag_data;
1726   }
1727   bool promoted_to_primary = (!was_tag_owner && is_tag_owner(m_lock));
1728
1729   bool resync_requested = false;
1730   r = check_resync_requested(&resync_requested);
1731   if (r < 0) {
1732     lderr(cct) << this << " " << __func__ << ": "
1733                << "failed to check if a resync was requested" << dendl;
1734     return;
1735   }
1736
1737   Listeners listeners(m_listeners);
1738   m_listener_notify = true;
1739   m_lock.Unlock();
1740
1741   if (promoted_to_primary) {
1742     for (auto listener : listeners) {
1743       listener->handle_promoted();
1744     }
1745   } else if (resync_requested) {
1746     for (auto listener : listeners) {
1747       listener->handle_resync();
1748     }
1749   }
1750
1751   m_lock.Lock();
1752   m_listener_notify = false;
1753   m_listener_cond.Signal();
1754 }
1755
1756 template <typename I>
1757 void Journal<I>::add_listener(journal::Listener *listener) {
1758   Mutex::Locker locker(m_lock);
1759   m_listeners.insert(listener);
1760 }
1761
1762 template <typename I>
1763 void Journal<I>::remove_listener(journal::Listener *listener) {
1764   Mutex::Locker locker(m_lock);
1765   while (m_listener_notify) {
1766     m_listener_cond.Wait(m_lock);
1767   }
1768   m_listeners.erase(listener);
1769 }
1770
1771 } // namespace librbd
1772
1773 #ifndef TEST_F
1774 template class librbd::Journal<librbd::ImageCtx>;
1775 #endif