Fix some bugs when testing opensds ansible
[stor4nfv.git] / src / ceph / src / librbd / Journal.h
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
3
4 #ifndef CEPH_LIBRBD_JOURNAL_H
5 #define CEPH_LIBRBD_JOURNAL_H
6
7 #include "include/int_types.h"
8 #include "include/Context.h"
9 #include "include/interval_set.h"
10 #include "common/Cond.h"
11 #include "common/Mutex.h"
12 #include "common/Cond.h"
13 #include "common/WorkQueue.h"
14 #include "journal/Future.h"
15 #include "journal/JournalMetadataListener.h"
16 #include "journal/ReplayEntry.h"
17 #include "journal/ReplayHandler.h"
18 #include "librbd/Utils.h"
19 #include "librbd/journal/Types.h"
20 #include "librbd/journal/TypeTraits.h"
21
22 #include <algorithm>
23 #include <list>
24 #include <string>
25 #include <atomic>
26 #include <unordered_map>
27
28 class SafeTimer;
29 namespace journal {
30 class Journaler;
31 }
32 namespace librados {
33   class IoCtx;
34 }
35
36 namespace librbd {
37
38 class ImageCtx;
39
40 namespace io { struct ObjectRequestHandle; }
41 namespace journal { template <typename> class Replay; }
42
43 template <typename ImageCtxT = ImageCtx>
44 class Journal {
45 public:
46   /**
47    * @verbatim
48    *
49    * <start>
50    *    |
51    *    v
52    * UNINITIALIZED ---> INITIALIZING ---> REPLAYING ------> FLUSHING ---> READY
53    *    |                 *  .  ^             *  .              *           |
54    *    |                 *  .  |             *  .              *           |
55    *    |                 *  .  |    (error)  *  . . . . . . .  *           |
56    *    |                 *  .  |             *              .  *           |
57    *    |                 *  .  |             v              .  *           |
58    *    |                 *  .  |         FLUSHING_RESTART   .  *           |
59    *    |                 *  .  |             |              .  *           |
60    *    |                 *  .  |             |              .  *           |
61    *    |                 *  .  |             v              .  *           v
62    *    |                 *  .  |         RESTARTING  < * * * * *       STOPPING
63    *    |                 *  .  |             |              .              |
64    *    |                 *  .  |             |              .              |
65    *    |       * * * * * *  .  \-------------/              .              |
66    *    |       * (error)    .                               .              |
67    *    |       *            .   . . . . . . . . . . . . . . .              |
68    *    |       *            .   .                                          |
69    *    |       v            v   v                                          |
70    *    |     CLOSED <----- CLOSING <---------------------------------------/
71    *    |       |
72    *    |       v
73    *    \---> <finish>
74    *
75    * @endverbatim
76    */
77   enum State {
78     STATE_UNINITIALIZED,
79     STATE_INITIALIZING,
80     STATE_REPLAYING,
81     STATE_FLUSHING_RESTART,
82     STATE_RESTARTING_REPLAY,
83     STATE_FLUSHING_REPLAY,
84     STATE_READY,
85     STATE_STOPPING,
86     STATE_CLOSING,
87     STATE_CLOSED
88   };
89
90   static const std::string IMAGE_CLIENT_ID;
91   static const std::string LOCAL_MIRROR_UUID;
92   static const std::string ORPHAN_MIRROR_UUID;
93
94   typedef std::list<io::ObjectRequestHandle *> IOObjectRequests;
95
96   Journal(ImageCtxT &image_ctx);
97   ~Journal();
98
99   static bool is_journal_supported(ImageCtxT &image_ctx);
100   static int create(librados::IoCtx &io_ctx, const std::string &image_id,
101                     uint8_t order, uint8_t splay_width,
102                     const std::string &object_pool);
103   static int remove(librados::IoCtx &io_ctx, const std::string &image_id);
104   static int reset(librados::IoCtx &io_ctx, const std::string &image_id);
105
106   static void is_tag_owner(ImageCtxT *image_ctx, bool *is_tag_owner,
107                            Context *on_finish);
108   static void is_tag_owner(librados::IoCtx& io_ctx, std::string& image_id,
109                            bool *is_tag_owner, ContextWQ *op_work_queue,
110                            Context *on_finish);
111   static void get_tag_owner(librados::IoCtx& io_ctx, std::string& image_id,
112                             std::string *mirror_uuid,
113                             ContextWQ *op_work_queue, Context *on_finish);
114   static int request_resync(ImageCtxT *image_ctx);
115   static void promote(ImageCtxT *image_ctx, Context *on_finish);
116   static void demote(ImageCtxT *image_ctx, Context *on_finish);
117
118   bool is_journal_ready() const;
119   bool is_journal_replaying() const;
120   bool is_journal_appending() const;
121
122   void wait_for_journal_ready(Context *on_ready);
123
124   void open(Context *on_finish);
125   void close(Context *on_finish);
126
127   bool is_tag_owner() const;
128   uint64_t get_tag_tid() const;
129   journal::TagData get_tag_data() const;
130
131   void allocate_local_tag(Context *on_finish);
132   void allocate_tag(const std::string &mirror_uuid,
133                     const journal::TagPredecessor &predecessor,
134                     Context *on_finish);
135
136   void flush_commit_position(Context *on_finish);
137
138   uint64_t append_write_event(uint64_t offset, size_t length,
139                               const bufferlist &bl,
140                               const IOObjectRequests &requests,
141                               bool flush_entry);
142   uint64_t append_io_event(journal::EventEntry &&event_entry,
143                            const IOObjectRequests &requests,
144                            uint64_t offset, size_t length,
145                            bool flush_entry);
146   void commit_io_event(uint64_t tid, int r);
147   void commit_io_event_extent(uint64_t tid, uint64_t offset, uint64_t length,
148                               int r);
149
150   void append_op_event(uint64_t op_tid, journal::EventEntry &&event_entry,
151                        Context *on_safe);
152   void commit_op_event(uint64_t tid, int r, Context *on_safe);
153   void replay_op_ready(uint64_t op_tid, Context *on_resume);
154
155   void flush_event(uint64_t tid, Context *on_safe);
156   void wait_event(uint64_t tid, Context *on_safe);
157
158   uint64_t allocate_op_tid() {
159     uint64_t op_tid = ++m_op_tid;
160     assert(op_tid != 0);
161     return op_tid;
162   }
163
164   void start_external_replay(journal::Replay<ImageCtxT> **journal_replay,
165                              Context *on_start);
166   void stop_external_replay();
167
168   void add_listener(journal::Listener *listener);
169   void remove_listener(journal::Listener *listener);
170
171   int is_resync_requested(bool *do_resync);
172
173   inline ContextWQ *get_work_queue() {
174     return m_work_queue;
175   }
176
177 private:
178   ImageCtxT &m_image_ctx;
179
180   // mock unit testing support
181   typedef journal::TypeTraits<ImageCtxT> TypeTraits;
182   typedef typename TypeTraits::Journaler Journaler;
183   typedef typename TypeTraits::Future Future;
184   typedef typename TypeTraits::ReplayEntry ReplayEntry;
185
186   typedef std::list<bufferlist> Bufferlists;
187   typedef std::list<Context *> Contexts;
188   typedef std::list<Future> Futures;
189   typedef interval_set<uint64_t> ExtentInterval;
190
191   struct Event {
192     Futures futures;
193     IOObjectRequests aio_object_requests;
194     Contexts on_safe_contexts;
195     ExtentInterval pending_extents;
196     bool committed_io = false;
197     bool safe = false;
198     int ret_val = 0;
199
200     Event() {
201     }
202     Event(const Futures &_futures, const IOObjectRequests &_requests,
203           uint64_t offset, size_t length)
204       : futures(_futures), aio_object_requests(_requests) {
205       if (length > 0) {
206         pending_extents.insert(offset, length);
207       }
208     }
209   };
210
211   typedef std::unordered_map<uint64_t, Event> Events;
212   typedef std::unordered_map<uint64_t, Future> TidToFutures;
213
214   struct C_IOEventSafe : public Context {
215     Journal *journal;
216     uint64_t tid;
217
218     C_IOEventSafe(Journal *_journal, uint64_t _tid)
219       : journal(_journal), tid(_tid) {
220     }
221
222     void finish(int r) override {
223       journal->handle_io_event_safe(r, tid);
224     }
225   };
226
227   struct C_OpEventSafe : public Context {
228     Journal *journal;
229     uint64_t tid;
230     Future op_start_future;
231     Future op_finish_future;
232     Context *on_safe;
233
234     C_OpEventSafe(Journal *journal, uint64_t tid, const Future &op_start_future,
235                   const Future &op_finish_future, Context *on_safe)
236       : journal(journal), tid(tid), op_start_future(op_start_future),
237         op_finish_future(op_finish_future), on_safe(on_safe) {
238     }
239
240     void finish(int r) override {
241       journal->handle_op_event_safe(r, tid, op_start_future, op_finish_future,
242                                     on_safe);
243     }
244   };
245
246   struct C_ReplayProcessSafe : public Context {
247     Journal *journal;
248     ReplayEntry replay_entry;
249
250     C_ReplayProcessSafe(Journal *journal, ReplayEntry &&replay_entry) :
251       journal(journal), replay_entry(std::move(replay_entry)) {
252     }
253     void finish(int r) override {
254       journal->handle_replay_process_safe(replay_entry, r);
255     }
256   };
257
258   struct ReplayHandler : public ::journal::ReplayHandler {
259     Journal *journal;
260     ReplayHandler(Journal *_journal) : journal(_journal) {
261     }
262
263     void get() override {
264       // TODO
265     }
266     void put() override {
267       // TODO
268     }
269
270     void handle_entries_available() override {
271       journal->handle_replay_ready();
272     }
273     void handle_complete(int r) override {
274       journal->handle_replay_complete(r);
275     }
276   };
277
278   ContextWQ *m_work_queue = nullptr;
279   SafeTimer *m_timer = nullptr;
280   Mutex *m_timer_lock = nullptr;
281
282   Journaler *m_journaler;
283   mutable Mutex m_lock;
284   State m_state;
285   uint64_t m_max_append_size = 0;
286   uint64_t m_tag_class = 0;
287   uint64_t m_tag_tid = 0;
288   journal::ImageClientMeta m_client_meta;
289   journal::TagData m_tag_data;
290
291   int m_error_result;
292   Contexts m_wait_for_state_contexts;
293
294   ReplayHandler m_replay_handler;
295   bool m_close_pending;
296
297   Mutex m_event_lock;
298   uint64_t m_event_tid;
299   Events m_events;
300
301   std::atomic<uint64_t> m_op_tid = { 0 };
302   TidToFutures m_op_futures;
303
304   bool m_processing_entry = false;
305   bool m_blocking_writes;
306
307   journal::Replay<ImageCtxT> *m_journal_replay;
308
309   util::AsyncOpTracker m_async_journal_op_tracker;
310
311   struct MetadataListener : public ::journal::JournalMetadataListener {
312     Journal<ImageCtxT> *journal;
313
314     MetadataListener(Journal<ImageCtxT> *journal) : journal(journal) { }
315
316     void handle_update(::journal::JournalMetadata *) override {
317       FunctionContext *ctx = new FunctionContext([this](int r) {
318         journal->handle_metadata_updated();
319       });
320       journal->m_work_queue->queue(ctx, 0);
321     }
322   } m_metadata_listener;
323
324   typedef std::set<journal::Listener *> Listeners;
325   Listeners m_listeners;
326   Cond m_listener_cond;
327   bool m_listener_notify = false;
328
329   uint64_t m_refresh_sequence = 0;
330
331   bool is_journal_replaying(const Mutex &) const;
332   bool is_tag_owner(const Mutex &) const;
333
334   uint64_t append_io_events(journal::EventType event_type,
335                             const Bufferlists &bufferlists,
336                             const IOObjectRequests &requests,
337                             uint64_t offset, size_t length, bool flush_entry);
338   Future wait_event(Mutex &lock, uint64_t tid, Context *on_safe);
339
340   void create_journaler();
341   void destroy_journaler(int r);
342   void recreate_journaler(int r);
343
344   void complete_event(typename Events::iterator it, int r);
345
346   void start_append();
347
348   void handle_open(int r);
349
350   void handle_replay_ready();
351   void handle_replay_complete(int r);
352   void handle_replay_process_ready(int r);
353   void handle_replay_process_safe(ReplayEntry replay_entry, int r);
354
355   void handle_start_external_replay(int r,
356                                     journal::Replay<ImageCtxT> **journal_replay,
357                                     Context *on_finish);
358
359   void handle_flushing_restart(int r);
360   void handle_flushing_replay();
361
362   void handle_recording_stopped(int r);
363
364   void handle_journal_destroyed(int r);
365
366   void handle_io_event_safe(int r, uint64_t tid);
367   void handle_op_event_safe(int r, uint64_t tid, const Future &op_start_future,
368                             const Future &op_finish_future, Context *on_safe);
369
370   void stop_recording();
371
372   void transition_state(State state, int r);
373
374   bool is_steady_state() const;
375   void wait_for_steady_state(Context *on_state);
376
377   int check_resync_requested(bool *do_resync);
378
379   void handle_metadata_updated();
380   void handle_refresh_metadata(uint64_t refresh_sequence, uint64_t tag_tid,
381                                journal::TagData tag_data, int r);
382
383 };
384
385 } // namespace librbd
386
387 extern template class librbd::Journal<librbd::ImageCtx>;
388
389 #endif // CEPH_LIBRBD_JOURNAL_H