Fix some bugs when testing opensds ansible
[stor4nfv.git] / src / ceph / src / journal / JournalMetadata.h
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
3
4 #ifndef CEPH_JOURNAL_JOURNAL_METADATA_H
5 #define CEPH_JOURNAL_JOURNAL_METADATA_H
6
7 #include "include/int_types.h"
8 #include "include/Context.h"
9 #include "include/rados/librados.hpp"
10 #include "common/AsyncOpTracker.h"
11 #include "common/Cond.h"
12 #include "common/Mutex.h"
13 #include "common/RefCountedObj.h"
14 #include "common/WorkQueue.h"
15 #include "cls/journal/cls_journal_types.h"
16 #include "journal/JournalMetadataListener.h"
17 #include "journal/Settings.h"
18 #include <boost/intrusive_ptr.hpp>
19 #include <boost/noncopyable.hpp>
20 #include <boost/optional.hpp>
21 #include <functional>
22 #include <list>
23 #include <map>
24 #include <string>
25 #include "include/assert.h"
26
27 class SafeTimer;
28
29 namespace journal {
30
31 class JournalMetadata;
32 typedef boost::intrusive_ptr<JournalMetadata> JournalMetadataPtr;
33
34 class JournalMetadata : public RefCountedObject, boost::noncopyable {
35 public:
36   typedef std::function<Context*()> CreateContext;
37   typedef cls::journal::ObjectPosition ObjectPosition;
38   typedef cls::journal::ObjectPositions ObjectPositions;
39   typedef cls::journal::ObjectSetPosition ObjectSetPosition;
40   typedef cls::journal::Client Client;
41   typedef cls::journal::Tag Tag;
42
43   typedef std::set<Client> RegisteredClients;
44   typedef std::list<Tag> Tags;
45
46   JournalMetadata(ContextWQ *work_queue, SafeTimer *timer, Mutex *timer_lock,
47                   librados::IoCtx &ioctx, const std::string &oid,
48                   const std::string &client_id, const Settings &settings);
49   ~JournalMetadata() override;
50
51   void init(Context *on_init);
52   void shut_down(Context *on_finish);
53
54   bool is_initialized() const { return m_initialized; }
55
56   void get_immutable_metadata(uint8_t *order, uint8_t *splay_width,
57                               int64_t *pool_id, Context *on_finish);
58
59   void get_mutable_metadata(uint64_t *minimum_set, uint64_t *active_set,
60                             RegisteredClients *clients, Context *on_finish);
61
62   void add_listener(JournalMetadataListener *listener);
63   void remove_listener(JournalMetadataListener *listener);
64
65   void register_client(const bufferlist &data, Context *on_finish);
66   void update_client(const bufferlist &data, Context *on_finish);
67   void unregister_client(Context *on_finish);
68   void get_client(const std::string &client_id, cls::journal::Client *client,
69                   Context *on_finish);
70
71   void allocate_tag(uint64_t tag_class, const bufferlist &data,
72                     Tag *tag, Context *on_finish);
73   void get_tag(uint64_t tag_tid, Tag *tag, Context *on_finish);
74   void get_tags(uint64_t start_after_tag_tid,
75                 const boost::optional<uint64_t> &tag_class, Tags *tags,
76                 Context *on_finish);
77
78   inline const Settings &get_settings() const {
79     return m_settings;
80   }
81   inline const std::string &get_client_id() const {
82     return m_client_id;
83   }
84   inline uint8_t get_order() const {
85     return m_order;
86   }
87   inline uint64_t get_object_size() const {
88     return 1 << m_order;
89   }
90   inline uint8_t get_splay_width() const {
91     return m_splay_width;
92   }
93   inline int64_t get_pool_id() const {
94     return m_pool_id;
95   }
96
97   inline void queue(Context *on_finish, int r) {
98     m_work_queue->queue(on_finish, r);
99   }
100
101   inline ContextWQ *get_work_queue() {
102     return m_work_queue;
103   }
104
105   inline SafeTimer &get_timer() {
106     return *m_timer;
107   }
108   inline Mutex &get_timer_lock() {
109     return *m_timer_lock;
110   }
111
112   void set_minimum_set(uint64_t object_set);
113   inline uint64_t get_minimum_set() const {
114     Mutex::Locker locker(m_lock);
115     return m_minimum_set;
116   }
117
118   int set_active_set(uint64_t object_set);
119   void set_active_set(uint64_t object_set, Context *on_finish);
120   inline uint64_t get_active_set() const {
121     Mutex::Locker locker(m_lock);
122     return m_active_set;
123   }
124
125   void assert_active_tag(uint64_t tag_tid, Context *on_finish);
126
127   void flush_commit_position();
128   void flush_commit_position(Context *on_safe);
129   void get_commit_position(ObjectSetPosition *commit_position) const {
130     Mutex::Locker locker(m_lock);
131     *commit_position = m_client.commit_position;
132   }
133
134   void get_registered_clients(RegisteredClients *registered_clients) {
135     Mutex::Locker locker(m_lock);
136     *registered_clients = m_registered_clients;
137   }
138
139   inline uint64_t allocate_entry_tid(uint64_t tag_tid) {
140     Mutex::Locker locker(m_lock);
141     return m_allocated_entry_tids[tag_tid]++;
142   }
143   void reserve_entry_tid(uint64_t tag_tid, uint64_t entry_tid);
144   bool get_last_allocated_entry_tid(uint64_t tag_tid, uint64_t *entry_tid) const;
145
146   uint64_t allocate_commit_tid(uint64_t object_num, uint64_t tag_tid,
147                                uint64_t entry_tid);
148   void overflow_commit_tid(uint64_t commit_tid, uint64_t object_num);
149   void get_commit_entry(uint64_t commit_tid, uint64_t *object_num,
150                         uint64_t *tag_tid, uint64_t *entry_tid);
151   void committed(uint64_t commit_tid, const CreateContext &create_context);
152
153   void notify_update();
154   void async_notify_update(Context *on_safe);
155
156   void wait_for_ops();
157
158 private:
159   typedef std::map<uint64_t, uint64_t> AllocatedEntryTids;
160   typedef std::list<JournalMetadataListener*> Listeners;
161
162   struct CommitEntry {
163     uint64_t object_num;
164     uint64_t tag_tid;
165     uint64_t entry_tid;
166     bool committed;
167
168     CommitEntry() : object_num(0), tag_tid(0), entry_tid(0), committed(false) {
169     }
170     CommitEntry(uint64_t _object_num, uint64_t _tag_tid, uint64_t _entry_tid)
171       : object_num(_object_num), tag_tid(_tag_tid), entry_tid(_entry_tid),
172         committed(false) {
173     }
174   };
175   typedef std::map<uint64_t, CommitEntry> CommitTids;
176
177   struct C_WatchCtx : public librados::WatchCtx2 {
178     JournalMetadata *journal_metadata;
179
180     C_WatchCtx(JournalMetadata *_journal_metadata)
181       : journal_metadata(_journal_metadata) {}
182
183     void handle_notify(uint64_t notify_id, uint64_t cookie,
184                                uint64_t notifier_id, bufferlist& bl) override {
185       journal_metadata->handle_watch_notify(notify_id, cookie);
186     }
187     void handle_error(uint64_t cookie, int err) override {
188       journal_metadata->handle_watch_error(err);
189     }
190   };
191
192   struct C_WatchReset : public Context {
193     JournalMetadata *journal_metadata;
194
195     C_WatchReset(JournalMetadata *_journal_metadata)
196       : journal_metadata(_journal_metadata) {
197       journal_metadata->m_async_op_tracker.start_op();
198     }
199     ~C_WatchReset() override {
200       journal_metadata->m_async_op_tracker.finish_op();
201     }
202     void finish(int r) override {
203       journal_metadata->handle_watch_reset();
204     }
205   };
206
207   struct C_CommitPositionTask : public Context {
208     JournalMetadata *journal_metadata;
209
210     C_CommitPositionTask(JournalMetadata *_journal_metadata)
211       : journal_metadata(_journal_metadata) {
212       journal_metadata->m_async_op_tracker.start_op();
213     }
214     ~C_CommitPositionTask() override {
215       journal_metadata->m_async_op_tracker.finish_op();
216     }
217     void finish(int r) override {
218       Mutex::Locker locker(journal_metadata->m_lock);
219       journal_metadata->handle_commit_position_task();
220     };
221   };
222
223   struct C_AioNotify : public Context {
224     JournalMetadata* journal_metadata;
225     Context *on_safe;
226
227     C_AioNotify(JournalMetadata *_journal_metadata, Context *_on_safe)
228       : journal_metadata(_journal_metadata), on_safe(_on_safe) {
229       journal_metadata->m_async_op_tracker.start_op();
230     }
231     ~C_AioNotify() override {
232       journal_metadata->m_async_op_tracker.finish_op();
233     }
234     void finish(int r) override {
235       journal_metadata->handle_notified(r);
236       if (on_safe != nullptr) {
237         on_safe->complete(0);
238       }
239     }
240   };
241
242   struct C_NotifyUpdate : public Context {
243     JournalMetadata* journal_metadata;
244     Context *on_safe;
245
246     C_NotifyUpdate(JournalMetadata *_journal_metadata, Context *_on_safe = NULL)
247       : journal_metadata(_journal_metadata), on_safe(_on_safe) {
248       journal_metadata->m_async_op_tracker.start_op();
249     }
250     ~C_NotifyUpdate() override {
251       journal_metadata->m_async_op_tracker.finish_op();
252     }
253     void finish(int r) override {
254       if (r == 0) {
255         journal_metadata->async_notify_update(on_safe);
256         return;
257       }
258       if (on_safe != NULL) {
259         on_safe->complete(r);
260       }
261     }
262   };
263
264   struct C_ImmutableMetadata : public Context {
265     JournalMetadata* journal_metadata;
266     Context *on_finish;
267
268     C_ImmutableMetadata(JournalMetadata *_journal_metadata, Context *_on_finish)
269       : journal_metadata(_journal_metadata), on_finish(_on_finish) {
270       Mutex::Locker locker(journal_metadata->m_lock);
271       journal_metadata->m_async_op_tracker.start_op();
272     }
273     ~C_ImmutableMetadata() override {
274       journal_metadata->m_async_op_tracker.finish_op();
275     }
276     void finish(int r) override {
277       journal_metadata->handle_immutable_metadata(r, on_finish);
278     }
279   };
280
281   struct C_Refresh : public Context {
282     JournalMetadata* journal_metadata;
283     uint64_t minimum_set;
284     uint64_t active_set;
285     RegisteredClients registered_clients;
286     Context *on_finish;
287
288     C_Refresh(JournalMetadata *_journal_metadata, Context *_on_finish)
289       : journal_metadata(_journal_metadata), minimum_set(0), active_set(0),
290         on_finish(_on_finish) {
291       Mutex::Locker locker(journal_metadata->m_lock);
292       journal_metadata->m_async_op_tracker.start_op();
293     }
294     ~C_Refresh() override {
295       journal_metadata->m_async_op_tracker.finish_op();
296     }
297     void finish(int r) override {
298       journal_metadata->handle_refresh_complete(this, r);
299     }
300   };
301
302   librados::IoCtx m_ioctx;
303   CephContext *m_cct;
304   std::string m_oid;
305   std::string m_client_id;
306   Settings m_settings;
307
308   uint8_t m_order;
309   uint8_t m_splay_width;
310   int64_t m_pool_id;
311   bool m_initialized;
312
313   ContextWQ *m_work_queue;
314   SafeTimer *m_timer;
315   Mutex *m_timer_lock;
316
317   mutable Mutex m_lock;
318
319   uint64_t m_commit_tid;
320   CommitTids m_pending_commit_tids;
321
322   Listeners m_listeners;
323
324   C_WatchCtx m_watch_ctx;
325   uint64_t m_watch_handle;
326
327   uint64_t m_minimum_set;
328   uint64_t m_active_set;
329   RegisteredClients m_registered_clients;
330   Client m_client;
331
332   AllocatedEntryTids m_allocated_entry_tids;
333
334   size_t m_update_notifications;
335   Cond m_update_cond;
336
337   uint64_t m_commit_position_tid = 0;
338   ObjectSetPosition m_commit_position;
339   Context *m_commit_position_ctx;
340   Context *m_commit_position_task_ctx;
341
342   AsyncOpTracker m_async_op_tracker;
343
344   void handle_immutable_metadata(int r, Context *on_init);
345
346   void refresh(Context *on_finish);
347   void handle_refresh_complete(C_Refresh *refresh, int r);
348
349   void cancel_commit_task();
350   void schedule_commit_task();
351   void handle_commit_position_task();
352
353   void schedule_watch_reset();
354   void handle_watch_reset();
355   void handle_watch_notify(uint64_t notify_id, uint64_t cookie);
356   void handle_watch_error(int err);
357   void handle_notified(int r);
358
359   Context *schedule_laggy_clients_disconnect(Context *on_finish);
360
361   friend std::ostream &operator<<(std::ostream &os,
362                                   const JournalMetadata &journal_metadata);
363 };
364
365 std::ostream &operator<<(std::ostream &os,
366                          const JournalMetadata::RegisteredClients &clients);
367
368 std::ostream &operator<<(std::ostream &os,
369                          const JournalMetadata &journal_metadata);
370
371 } // namespace journal
372
373 #endif // CEPH_JOURNAL_JOURNAL_METADATA_H