Fix some bugs when testing opensds ansible
[stor4nfv.git] / src / ceph / src / tools / rbd_mirror / ImageReplayer.h
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
3
4 #ifndef CEPH_RBD_MIRROR_IMAGE_REPLAYER_H
5 #define CEPH_RBD_MIRROR_IMAGE_REPLAYER_H
6
7 #include "common/AsyncOpTracker.h"
8 #include "common/Mutex.h"
9 #include "common/WorkQueue.h"
10 #include "include/rados/librados.hpp"
11 #include "cls/journal/cls_journal_types.h"
12 #include "cls/rbd/cls_rbd_types.h"
13 #include "journal/JournalMetadataListener.h"
14 #include "journal/ReplayEntry.h"
15 #include "librbd/ImageCtx.h"
16 #include "librbd/journal/Types.h"
17 #include "librbd/journal/TypeTraits.h"
18 #include "ProgressContext.h"
19 #include "types.h"
20 #include "tools/rbd_mirror/image_replayer/Types.h"
21
22 #include <boost/noncopyable.hpp>
23 #include <boost/optional.hpp>
24
25 #include <set>
26 #include <map>
27 #include <atomic>
28 #include <string>
29 #include <vector>
30
31 class AdminSocketHook;
32
33 namespace journal {
34
35 class Journaler;
36 class ReplayHandler;
37
38 }
39
40 namespace librbd {
41
42 class ImageCtx;
43 namespace journal { template <typename> class Replay; }
44
45 }
46
47 namespace rbd {
48 namespace mirror {
49
50 template <typename> struct ImageDeleter;
51 template <typename> struct InstanceWatcher;
52 template <typename> struct Threads;
53
54 namespace image_replayer { template <typename> class BootstrapRequest; }
55 namespace image_replayer { template <typename> class EventPreprocessor; }
56 namespace image_replayer { template <typename> class ReplayStatusFormatter; }
57
58 /**
59  * Replays changes from a remote cluster for a single image.
60  */
61 template <typename ImageCtxT = librbd::ImageCtx>
62 class ImageReplayer {
63 public:
64   static ImageReplayer *create(
65     Threads<ImageCtxT> *threads, ImageDeleter<ImageCtxT>* image_deleter,
66     InstanceWatcher<ImageCtxT> *instance_watcher,
67     RadosRef local, const std::string &local_mirror_uuid, int64_t local_pool_id,
68     const std::string &global_image_id) {
69     return new ImageReplayer(threads, image_deleter, instance_watcher,
70                              local, local_mirror_uuid, local_pool_id,
71                              global_image_id);
72   }
73   void destroy() {
74     delete this;
75   }
76
77   ImageReplayer(Threads<ImageCtxT> *threads,
78                 ImageDeleter<ImageCtxT>* image_deleter,
79                 InstanceWatcher<ImageCtxT> *instance_watcher,
80                 RadosRef local, const std::string &local_mirror_uuid,
81                 int64_t local_pool_id, const std::string &global_image_id);
82   virtual ~ImageReplayer();
83   ImageReplayer(const ImageReplayer&) = delete;
84   ImageReplayer& operator=(const ImageReplayer&) = delete;
85
86   bool is_stopped() { Mutex::Locker l(m_lock); return is_stopped_(); }
87   bool is_running() { Mutex::Locker l(m_lock); return is_running_(); }
88   bool is_replaying() { Mutex::Locker l(m_lock); return is_replaying_(); }
89
90   std::string get_name() { Mutex::Locker l(m_lock); return m_name; };
91   void set_state_description(int r, const std::string &desc);
92
93   // TODO temporary until policy handles release of image replayers
94   inline bool is_finished() const {
95     Mutex::Locker locker(m_lock);
96     return m_finished;
97   }
98   inline void set_finished(bool finished) {
99     Mutex::Locker locker(m_lock);
100     m_finished = finished;
101   }
102
103   inline bool is_blacklisted() const {
104     Mutex::Locker locker(m_lock);
105     return (m_last_r == -EBLACKLISTED);
106   }
107
108   image_replayer::HealthState get_health_state() const;
109
110   void add_peer(const std::string &peer_uuid, librados::IoCtx &remote_io_ctx);
111
112   inline int64_t get_local_pool_id() const {
113     return m_local_pool_id;
114   }
115   inline const std::string& get_global_image_id() const {
116     return m_global_image_id;
117   }
118
119   void start(Context *on_finish = nullptr, bool manual = false);
120   void stop(Context *on_finish = nullptr, bool manual = false,
121             int r = 0, const std::string& desc = "");
122   void restart(Context *on_finish = nullptr);
123   void flush(Context *on_finish = nullptr);
124
125   void resync_image(Context *on_finish=nullptr);
126
127   void print_status(Formatter *f, stringstream *ss);
128
129   virtual void handle_replay_ready();
130   virtual void handle_replay_complete(int r, const std::string &error_desc);
131
132 protected:
133   /**
134    * @verbatim
135    *                   (error)
136    * <uninitialized> <------------------------------------ FAIL
137    *    |                                                   ^
138    *    v                                                   *
139    * <starting>                                             *
140    *    |                                                   *
141    *    v                                                   *
142    * WAIT_FOR_DELETION                                      *
143    *    |                                                   *
144    *    v                                           (error) *
145    * PREPARE_LOCAL_IMAGE  * * * * * * * * * * * * * * * * * *
146    *    |                                                   *
147    *    v                                           (error) *
148    * PREPARE_REMOTE_IMAGE * * * * * * * * * * * * * * * * * *
149    *    |                                                   *
150    *    v                                           (error) *
151    * BOOTSTRAP_IMAGE  * * * * * * * * * * * * * * * * * * * *
152    *    |                                                   *
153    *    v                                           (error) *
154    * INIT_REMOTE_JOURNALER  * * * * * * * * * * * * * * * * *
155    *    |                                                   *
156    *    v                                           (error) *
157    * START_REPLAY * * * * * * * * * * * * * * * * * * * * * *
158    *    |
159    *    |  /--------------------------------------------\
160    *    |  |                                            |
161    *    v  v   (asok flush)                             |
162    * REPLAYING -------------> LOCAL_REPLAY_FLUSH        |
163    *    |       \                 |                     |
164    *    |       |                 v                     |
165    *    |       |             FLUSH_COMMIT_POSITION     |
166    *    |       |                 |                     |
167    *    |       |                 \--------------------/|
168    *    |       |                                       |
169    *    |       | (entries available)                   |
170    *    |       \-----------> REPLAY_READY              |
171    *    |                         |                     |
172    *    |                         | (skip if not        |
173    *    |                         v  needed)        (error)
174    *    |                     REPLAY_FLUSH  * * * * * * * * *
175    *    |                         |                     |   *
176    *    |                         | (skip if not        |   *
177    *    |                         v  needed)        (error) *
178    *    |                     GET_REMOTE_TAG  * * * * * * * *
179    *    |                         |                     |   *
180    *    |                         | (skip if not        |   *
181    *    |                         v  needed)        (error) *
182    *    |                     ALLOCATE_LOCAL_TAG  * * * * * *
183    *    |                         |                     |   *
184    *    |                         v                 (error) *
185    *    |                     PREPROCESS_ENTRY  * * * * * * *
186    *    |                         |                     |   *
187    *    |                         v                 (error) *
188    *    |                     PROCESS_ENTRY * * * * * * * * *
189    *    |                         |                     |   *
190    *    |                         \---------------------/   *
191    *    v                                                   *
192    * REPLAY_COMPLETE  < * * * * * * * * * * * * * * * * * * *
193    *    |
194    *    v
195    * JOURNAL_REPLAY_SHUT_DOWN
196    *    |
197    *    v
198    * LOCAL_IMAGE_CLOSE
199    *    |
200    *    v
201    * <stopped>
202    *
203    * @endverbatim
204    */
205
206   virtual void on_start_fail(int r, const std::string &desc = "");
207   virtual bool on_start_interrupted();
208
209   virtual void on_stop_journal_replay(int r = 0, const std::string &desc = "");
210
211   virtual void on_flush_local_replay_flush_start(Context *on_flush);
212   virtual void on_flush_local_replay_flush_finish(Context *on_flush, int r);
213   virtual void on_flush_flush_commit_position_start(Context *on_flush);
214   virtual void on_flush_flush_commit_position_finish(Context *on_flush, int r);
215
216   bool on_replay_interrupted();
217
218 private:
219   typedef typename librbd::journal::TypeTraits<ImageCtxT>::ReplayEntry ReplayEntry;
220
221   enum State {
222     STATE_UNKNOWN,
223     STATE_STARTING,
224     STATE_REPLAYING,
225     STATE_REPLAY_FLUSHING,
226     STATE_STOPPING,
227     STATE_STOPPED,
228   };
229
230   struct RemoteImage {
231     std::string mirror_uuid;
232     std::string image_id;
233     librados::IoCtx io_ctx;
234
235     RemoteImage() {
236     }
237     RemoteImage(const Peer& peer) : io_ctx(peer.io_ctx) {
238     }
239   };
240
241   typedef typename librbd::journal::TypeTraits<ImageCtxT>::Journaler Journaler;
242   typedef boost::optional<State> OptionalState;
243   typedef boost::optional<cls::rbd::MirrorImageStatusState>
244       OptionalMirrorImageStatusState;
245
246   struct JournalListener : public librbd::journal::Listener {
247     ImageReplayer *img_replayer;
248
249     JournalListener(ImageReplayer *img_replayer)
250       : img_replayer(img_replayer) {
251     }
252
253     void handle_close() override {
254       img_replayer->on_stop_journal_replay();
255     }
256
257     void handle_promoted() override {
258       img_replayer->on_stop_journal_replay(0, "force promoted");
259     }
260
261     void handle_resync() override {
262       img_replayer->resync_image();
263     }
264   };
265
266   class BootstrapProgressContext : public ProgressContext {
267   public:
268     BootstrapProgressContext(ImageReplayer<ImageCtxT> *replayer) :
269       replayer(replayer) {
270     }
271
272     void update_progress(const std::string &description,
273                                  bool flush = true) override;
274   private:
275     ImageReplayer<ImageCtxT> *replayer;
276   };
277
278   Threads<ImageCtxT> *m_threads;
279   ImageDeleter<ImageCtxT>* m_image_deleter;
280   InstanceWatcher<ImageCtxT> *m_instance_watcher;
281
282   Peers m_peers;
283   RemoteImage m_remote_image;
284
285   RadosRef m_local;
286   std::string m_local_mirror_uuid;
287   int64_t m_local_pool_id;
288   std::string m_local_image_id;
289   std::string m_global_image_id;
290   std::string m_name;
291
292   mutable Mutex m_lock;
293   State m_state = STATE_STOPPED;
294   std::string m_state_desc;
295
296   OptionalMirrorImageStatusState m_mirror_image_status_state = boost::none;
297   int m_last_r = 0;
298
299   BootstrapProgressContext m_progress_cxt;
300
301   bool m_finished = false;
302   bool m_delete_requested = false;
303   bool m_resync_requested = false;
304
305   image_replayer::EventPreprocessor<ImageCtxT> *m_event_preprocessor = nullptr;
306   image_replayer::ReplayStatusFormatter<ImageCtxT> *m_replay_status_formatter =
307     nullptr;
308   librados::IoCtx m_local_ioctx;
309   ImageCtxT *m_local_image_ctx = nullptr;
310   std::string m_local_image_tag_owner;
311
312   decltype(ImageCtxT::journal) m_local_journal = nullptr;
313   librbd::journal::Replay<ImageCtxT> *m_local_replay = nullptr;
314   Journaler* m_remote_journaler = nullptr;
315   ::journal::ReplayHandler *m_replay_handler = nullptr;
316   librbd::journal::Listener *m_journal_listener;
317
318   Context *m_on_start_finish = nullptr;
319   Context *m_on_stop_finish = nullptr;
320   Context *m_update_status_task = nullptr;
321   int m_update_status_interval = 0;
322   librados::AioCompletion *m_update_status_comp = nullptr;
323   bool m_stop_requested = false;
324   bool m_manual_stop = false;
325
326   AdminSocketHook *m_asok_hook = nullptr;
327
328   image_replayer::BootstrapRequest<ImageCtxT> *m_bootstrap_request = nullptr;
329
330   uint32_t m_in_flight_status_updates = 0;
331   bool m_update_status_requested = false;
332   Context *m_on_update_status_finish = nullptr;
333
334   librbd::journal::MirrorPeerClientMeta m_client_meta;
335
336   ReplayEntry m_replay_entry;
337   bool m_replay_tag_valid = false;
338   uint64_t m_replay_tag_tid = 0;
339   cls::journal::Tag m_replay_tag;
340   librbd::journal::TagData m_replay_tag_data;
341   librbd::journal::EventEntry m_event_entry;
342   AsyncOpTracker m_event_replay_tracker;
343   Context *m_delayed_preprocess_task = nullptr;
344
345   struct RemoteJournalerListener : public ::journal::JournalMetadataListener {
346     ImageReplayer *replayer;
347
348     RemoteJournalerListener(ImageReplayer *replayer) : replayer(replayer) { }
349
350     void handle_update(::journal::JournalMetadata *) override;
351   } m_remote_listener;
352
353   struct C_ReplayCommitted : public Context {
354     ImageReplayer *replayer;
355     ReplayEntry replay_entry;
356
357     C_ReplayCommitted(ImageReplayer *replayer,
358                       ReplayEntry &&replay_entry)
359       : replayer(replayer), replay_entry(std::move(replay_entry)) {
360     }
361     void finish(int r) override {
362       replayer->handle_process_entry_safe(replay_entry, r);
363     }
364   };
365
366   static std::string to_string(const State state);
367
368   bool is_stopped_() const {
369     return m_state == STATE_STOPPED;
370   }
371   bool is_running_() const {
372     return !is_stopped_() && m_state != STATE_STOPPING && !m_stop_requested;
373   }
374   bool is_replaying_() const {
375     return (m_state == STATE_REPLAYING ||
376             m_state == STATE_REPLAY_FLUSHING);
377   }
378
379   bool update_mirror_image_status(bool force, const OptionalState &state);
380   bool start_mirror_image_status_update(bool force, bool restarting);
381   void finish_mirror_image_status_update();
382   void queue_mirror_image_status_update(const OptionalState &state);
383   void send_mirror_status_update(const OptionalState &state);
384   void handle_mirror_status_update(int r);
385   void reschedule_update_status_task(int new_interval = 0);
386
387   void shut_down(int r);
388   void handle_shut_down(int r);
389   void handle_remote_journal_metadata_updated();
390
391   void wait_for_deletion();
392   void handle_wait_for_deletion(int r);
393
394   void prepare_local_image();
395   void handle_prepare_local_image(int r);
396
397   void prepare_remote_image();
398   void handle_prepare_remote_image(int r);
399
400   void bootstrap();
401   void handle_bootstrap(int r);
402
403   void init_remote_journaler();
404   void handle_init_remote_journaler(int r);
405
406   void start_replay();
407   void handle_start_replay(int r);
408
409   void replay_flush();
410   void handle_replay_flush(int r);
411
412   void get_remote_tag();
413   void handle_get_remote_tag(int r);
414
415   void allocate_local_tag();
416   void handle_allocate_local_tag(int r);
417
418   void preprocess_entry();
419   void handle_preprocess_entry_ready(int r);
420   void handle_preprocess_entry_safe(int r);
421
422   void process_entry();
423   void handle_process_entry_ready(int r);
424   void handle_process_entry_safe(const ReplayEntry& replay_entry, int r);
425
426   void register_admin_socket_hook();
427   void unregister_admin_socket_hook();
428
429   void on_name_changed();
430 };
431
432 } // namespace mirror
433 } // namespace rbd
434
435 extern template class rbd::mirror::ImageReplayer<librbd::ImageCtx>;
436
437 #endif // CEPH_RBD_MIRROR_IMAGE_REPLAYER_H