Fix some bugs when testing opensds ansible
[stor4nfv.git] / src / ceph / src / journal / JournalRecorder.h
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
3
4 #ifndef CEPH_JOURNAL_JOURNAL_RECORDER_H
5 #define CEPH_JOURNAL_JOURNAL_RECORDER_H
6
7 #include "include/int_types.h"
8 #include "include/Context.h"
9 #include "include/rados/librados.hpp"
10 #include "common/Mutex.h"
11 #include "journal/Future.h"
12 #include "journal/FutureImpl.h"
13 #include "journal/JournalMetadata.h"
14 #include "journal/ObjectRecorder.h"
15 #include <map>
16 #include <string>
17
18 class SafeTimer;
19
20 namespace journal {
21
22 class JournalRecorder {
23 public:
24   JournalRecorder(librados::IoCtx &ioctx, const std::string &object_oid_prefix,
25                   const JournalMetadataPtr &journal_metadata,
26                   uint32_t flush_interval, uint64_t flush_bytes,
27                   double flush_age);
28   ~JournalRecorder();
29
30   Future append(uint64_t tag_tid, const bufferlist &bl);
31   void flush(Context *on_safe);
32
33   ObjectRecorderPtr get_object(uint8_t splay_offset);
34
35 private:
36   typedef std::map<uint8_t, ObjectRecorderPtr> ObjectRecorderPtrs;
37
38   struct Listener : public JournalMetadataListener {
39     JournalRecorder *journal_recorder;
40
41     Listener(JournalRecorder *_journal_recorder)
42       : journal_recorder(_journal_recorder) {}
43
44     void handle_update(JournalMetadata *) override {
45       journal_recorder->handle_update();
46     }
47   };
48
49   struct ObjectHandler : public ObjectRecorder::Handler {
50     JournalRecorder *journal_recorder;
51
52     ObjectHandler(JournalRecorder *_journal_recorder)
53       : journal_recorder(_journal_recorder) {
54     }
55
56     void closed(ObjectRecorder *object_recorder) override {
57       journal_recorder->handle_closed(object_recorder);
58     }
59     void overflow(ObjectRecorder *object_recorder) override {
60       journal_recorder->handle_overflow(object_recorder);
61     }
62   };
63
64   struct C_AdvanceObjectSet : public Context {
65     JournalRecorder *journal_recorder;
66
67     C_AdvanceObjectSet(JournalRecorder *_journal_recorder)
68       : journal_recorder(_journal_recorder) {
69     }
70     void finish(int r) override {
71       journal_recorder->handle_advance_object_set(r);
72     }
73   };
74
75   librados::IoCtx m_ioctx;
76   CephContext *m_cct;
77   std::string m_object_oid_prefix;
78
79   JournalMetadataPtr m_journal_metadata;
80
81   uint32_t m_flush_interval;
82   uint64_t m_flush_bytes;
83   double m_flush_age;
84
85   Listener m_listener;
86   ObjectHandler m_object_handler;
87
88   Mutex m_lock;
89
90   uint32_t m_in_flight_advance_sets = 0;
91   uint32_t m_in_flight_object_closes = 0;
92   uint64_t m_current_set;
93   ObjectRecorderPtrs m_object_ptrs;
94   std::vector<std::shared_ptr<Mutex>> m_object_locks;
95
96   FutureImplPtr m_prev_future;
97
98   void open_object_set();
99   bool close_object_set(uint64_t active_set);
100
101   void advance_object_set();
102   void handle_advance_object_set(int r);
103
104   void close_and_advance_object_set(uint64_t object_set);
105
106   ObjectRecorderPtr create_object_recorder(uint64_t object_number,
107                                            std::shared_ptr<Mutex> lock);
108   void create_next_object_recorder_unlock(ObjectRecorderPtr object_recorder);
109
110   void handle_update();
111
112   void handle_closed(ObjectRecorder *object_recorder);
113   void handle_overflow(ObjectRecorder *object_recorder);
114
115   void lock_object_recorders() {
116     for (auto& lock : m_object_locks) {
117       lock->Lock();
118     }
119   }
120
121   void unlock_object_recorders() {
122     for (auto& lock : m_object_locks) {
123       lock->Unlock();
124     }
125   }
126 };
127
128 } // namespace journal
129
130 #endif // CEPH_JOURNAL_JOURNAL_RECORDER_H