Fix some bugs when testing opensds ansible
[stor4nfv.git] / src / ceph / src / tools / rbd_mirror / PoolReplayer.h
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
3
4 #ifndef CEPH_RBD_MIRROR_POOL_REPLAYER_H
5 #define CEPH_RBD_MIRROR_POOL_REPLAYER_H
6
7 #include "common/AsyncOpTracker.h"
8 #include "common/Cond.h"
9 #include "common/Mutex.h"
10 #include "common/WorkQueue.h"
11 #include "include/rados/librados.hpp"
12
13 #include "ClusterWatcher.h"
14 #include "LeaderWatcher.h"
15 #include "PoolWatcher.h"
16 #include "ImageDeleter.h"
17 #include "types.h"
18 #include "tools/rbd_mirror/service_daemon/Types.h"
19
20 #include <set>
21 #include <map>
22 #include <memory>
23 #include <atomic>
24 #include <string>
25
26 class AdminSocketHook;
27
28 namespace librbd { class ImageCtx; }
29
30 namespace rbd {
31 namespace mirror {
32
33 template <typename> class InstanceReplayer;
34 template <typename> class InstanceWatcher;
35 template <typename> class ServiceDaemon;
36 template <typename> struct Threads;
37
38 /**
39  * Controls mirroring for a single remote cluster.
40  */
41 class PoolReplayer {
42 public:
43   PoolReplayer(Threads<librbd::ImageCtx> *threads,
44                ServiceDaemon<librbd::ImageCtx>* service_daemon,
45                ImageDeleter<>* image_deleter,
46                int64_t local_pool_id, const peer_t &peer,
47                const std::vector<const char*> &args);
48   ~PoolReplayer();
49   PoolReplayer(const PoolReplayer&) = delete;
50   PoolReplayer& operator=(const PoolReplayer&) = delete;
51
52   bool is_blacklisted() const;
53   bool is_leader() const;
54   bool is_running() const;
55
56   void init();
57   void shut_down();
58
59   void run();
60
61   void print_status(Formatter *f, stringstream *ss);
62   void start();
63   void stop(bool manual);
64   void restart();
65   void flush();
66   void release_leader();
67
68 private:
69   struct PoolWatcherListener : public PoolWatcher<>::Listener {
70     PoolReplayer *pool_replayer;
71     bool local;
72
73     PoolWatcherListener(PoolReplayer *pool_replayer, bool local)
74       : pool_replayer(pool_replayer), local(local) {
75     }
76
77     void handle_update(const std::string &mirror_uuid,
78                        ImageIds &&added_image_ids,
79                        ImageIds &&removed_image_ids) override {
80       pool_replayer->handle_update((local ? "" : mirror_uuid),
81                                    std::move(added_image_ids),
82                                    std::move(removed_image_ids));
83     }
84   };
85
86   void handle_update(const std::string &mirror_uuid,
87                      ImageIds &&added_image_ids,
88                      ImageIds &&removed_image_ids);
89
90   int init_rados(const std::string &cluster_name,
91                  const std::string &client_name,
92                  const std::string &description, RadosRef *rados_ref,
93                  bool strip_cluster_overrides);
94
95   void handle_post_acquire_leader(Context *on_finish);
96   void handle_pre_release_leader(Context *on_finish);
97
98   void init_local_pool_watcher(Context *on_finish);
99   void handle_init_local_pool_watcher(int r, Context *on_finish);
100
101   void init_remote_pool_watcher(Context *on_finish);
102
103   void shut_down_pool_watchers(Context *on_finish);
104   void handle_shut_down_pool_watchers(int r, Context *on_finish);
105
106   void wait_for_update_ops(Context *on_finish);
107   void handle_wait_for_update_ops(int r, Context *on_finish);
108
109   void handle_update_leader(const std::string &leader_instance_id);
110
111   Threads<librbd::ImageCtx> *m_threads;
112   ServiceDaemon<librbd::ImageCtx>* m_service_daemon;
113   ImageDeleter<>* m_image_deleter;
114   int64_t m_local_pool_id = -1;
115   peer_t m_peer;
116   std::vector<const char*> m_args;
117
118   mutable Mutex m_lock;
119   Cond m_cond;
120   std::atomic<bool> m_stopping = { false };
121   bool m_manual_stop = false;
122   bool m_blacklisted = false;
123
124   RadosRef m_local_rados;
125   RadosRef m_remote_rados;
126
127   librados::IoCtx m_local_io_ctx;
128   librados::IoCtx m_remote_io_ctx;
129
130   PoolWatcherListener m_local_pool_watcher_listener;
131   std::unique_ptr<PoolWatcher<> > m_local_pool_watcher;
132
133   PoolWatcherListener m_remote_pool_watcher_listener;
134   std::unique_ptr<PoolWatcher<> > m_remote_pool_watcher;
135
136   std::unique_ptr<InstanceReplayer<librbd::ImageCtx>> m_instance_replayer;
137
138   std::string m_asok_hook_name;
139   AdminSocketHook *m_asok_hook = nullptr;
140
141   service_daemon::CalloutId m_callout_id = service_daemon::CALLOUT_ID_NONE;
142
143   class PoolReplayerThread : public Thread {
144     PoolReplayer *m_pool_replayer;
145   public:
146     PoolReplayerThread(PoolReplayer *pool_replayer)
147       : m_pool_replayer(pool_replayer) {
148     }
149     void *entry() override {
150       m_pool_replayer->run();
151       return 0;
152     }
153   } m_pool_replayer_thread;
154
155   class LeaderListener : public LeaderWatcher<>::Listener {
156   public:
157     LeaderListener(PoolReplayer *pool_replayer)
158       : m_pool_replayer(pool_replayer) {
159     }
160
161   protected:
162     void post_acquire_handler(Context *on_finish) override {
163       m_pool_replayer->handle_post_acquire_leader(on_finish);
164     }
165
166     void pre_release_handler(Context *on_finish) override {
167       m_pool_replayer->handle_pre_release_leader(on_finish);
168     }
169
170     void update_leader_handler(
171       const std::string &leader_instance_id) override {
172       m_pool_replayer->handle_update_leader(leader_instance_id);
173     }
174
175   private:
176     PoolReplayer *m_pool_replayer;
177   } m_leader_listener;
178
179   std::unique_ptr<LeaderWatcher<> > m_leader_watcher;
180   std::unique_ptr<InstanceWatcher<librbd::ImageCtx> > m_instance_watcher;
181   AsyncOpTracker m_update_op_tracker;
182 };
183
184 } // namespace mirror
185 } // namespace rbd
186
187 #endif // CEPH_RBD_MIRROR_POOL_REPLAYER_H