Fix some bugs when testing opensds ansible
[stor4nfv.git] / src / ceph / src / journal / ObjectRecorder.h
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
3
4 #ifndef CEPH_JOURNAL_OBJECT_RECORDER_H
5 #define CEPH_JOURNAL_OBJECT_RECORDER_H
6
7 #include "include/Context.h"
8 #include "include/rados/librados.hpp"
9 #include "common/Cond.h"
10 #include "common/Mutex.h"
11 #include "common/RefCountedObj.h"
12 #include "common/WorkQueue.h"
13 #include "journal/FutureImpl.h"
14 #include <list>
15 #include <map>
16 #include <set>
17 #include <boost/intrusive_ptr.hpp>
18 #include <boost/noncopyable.hpp>
19 #include "include/assert.h"
20
21 class SafeTimer;
22
23 namespace journal {
24
25 class ObjectRecorder;
26 typedef boost::intrusive_ptr<ObjectRecorder> ObjectRecorderPtr;
27
28 typedef std::pair<FutureImplPtr, bufferlist> AppendBuffer;
29 typedef std::list<AppendBuffer> AppendBuffers;
30
31 class ObjectRecorder : public RefCountedObject, boost::noncopyable {
32 public:
33   struct Handler {
34     virtual ~Handler() {
35     }
36     virtual void closed(ObjectRecorder *object_recorder) = 0;
37     virtual void overflow(ObjectRecorder *object_recorder) = 0;
38   };
39
40   ObjectRecorder(librados::IoCtx &ioctx, const std::string &oid,
41                  uint64_t object_number, std::shared_ptr<Mutex> lock,
42                  ContextWQ *work_queue, SafeTimer &timer, Mutex &timer_lock,
43                  Handler *handler, uint8_t order, uint32_t flush_interval,
44                  uint64_t flush_bytes, double flush_age);
45   ~ObjectRecorder() override;
46
47   inline uint64_t get_object_number() const {
48     return m_object_number;
49   }
50   inline const std::string &get_oid() const {
51     return m_oid;
52   }
53
54   bool append_unlock(AppendBuffers &&append_buffers);
55   void flush(Context *on_safe);
56   void flush(const FutureImplPtr &future);
57
58   void claim_append_buffers(AppendBuffers *append_buffers);
59
60   bool is_closed() const {
61     assert(m_lock->is_locked());
62     return (m_object_closed && m_in_flight_appends.empty());
63   }
64   bool close();
65
66   inline CephContext *cct() const {
67     return m_cct;
68   }
69
70   inline size_t get_pending_appends() const {
71     Mutex::Locker locker(*m_lock);
72     return m_append_buffers.size();
73   }
74
75 private:
76   typedef std::set<uint64_t> InFlightTids;
77   typedef std::map<uint64_t, AppendBuffers> InFlightAppends;
78
79   struct FlushHandler : public FutureImpl::FlushHandler {
80     ObjectRecorder *object_recorder;
81     FlushHandler(ObjectRecorder *o) : object_recorder(o) {}
82     void get() override {
83       object_recorder->get();
84     }
85     void put() override {
86       object_recorder->put();
87     }
88     void flush(const FutureImplPtr &future) override {
89       Mutex::Locker locker(*(object_recorder->m_lock));
90       object_recorder->flush(future);
91     }
92   };
93   struct C_AppendFlush : public Context {
94     ObjectRecorder *object_recorder;
95     uint64_t tid;
96     C_AppendFlush(ObjectRecorder *o, uint64_t _tid)
97         : object_recorder(o), tid(_tid) {
98       object_recorder->get();
99     }
100     void finish(int r) override {
101       object_recorder->handle_append_flushed(tid, r);
102       object_recorder->put();
103     }
104   };
105
106   librados::IoCtx m_ioctx;
107   std::string m_oid;
108   uint64_t m_object_number;
109   CephContext *m_cct;
110
111   ContextWQ *m_op_work_queue;
112
113   SafeTimer &m_timer;
114   Mutex &m_timer_lock;
115
116   Handler *m_handler;
117
118   uint8_t m_order;
119   uint64_t m_soft_max_size;
120
121   uint32_t m_flush_interval;
122   uint64_t m_flush_bytes;
123   double m_flush_age;
124
125   FlushHandler m_flush_handler;
126
127   Context *m_append_task = nullptr;
128
129   mutable std::shared_ptr<Mutex> m_lock;
130   AppendBuffers m_append_buffers;
131   uint64_t m_append_tid;
132   uint32_t m_pending_bytes;
133
134   InFlightTids m_in_flight_tids;
135   InFlightAppends m_in_flight_appends;
136   uint64_t m_size;
137   bool m_overflowed;
138   bool m_object_closed;
139
140   bufferlist m_prefetch_bl;
141
142   bool m_in_flight_flushes;
143   Cond m_in_flight_flushes_cond;
144
145   AppendBuffers m_pending_buffers;
146   bool m_aio_scheduled;
147
148   void handle_append_task();
149   void cancel_append_task();
150   void schedule_append_task();
151
152   bool append(const AppendBuffer &append_buffer, bool *schedule_append);
153   bool flush_appends(bool force);
154   void handle_append_flushed(uint64_t tid, int r);
155   void append_overflowed();
156   void send_appends(AppendBuffers *append_buffers);
157   void send_appends_aio();
158
159   void notify_handler_unlock();
160 };
161
162 } // namespace journal
163
164 #endif // CEPH_JOURNAL_OBJECT_RECORDER_H