1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
4 #ifndef CEPH_JOURNAL_OBJECT_RECORDER_H
5 #define CEPH_JOURNAL_OBJECT_RECORDER_H
7 #include "include/Context.h"
8 #include "include/rados/librados.hpp"
9 #include "common/Cond.h"
10 #include "common/Mutex.h"
11 #include "common/RefCountedObj.h"
12 #include "common/WorkQueue.h"
13 #include "journal/FutureImpl.h"
17 #include <boost/intrusive_ptr.hpp>
18 #include <boost/noncopyable.hpp>
19 #include "include/assert.h"
26 typedef boost::intrusive_ptr<ObjectRecorder> ObjectRecorderPtr;
28 typedef std::pair<FutureImplPtr, bufferlist> AppendBuffer;
29 typedef std::list<AppendBuffer> AppendBuffers;
31 class ObjectRecorder : public RefCountedObject, boost::noncopyable {
36 virtual void closed(ObjectRecorder *object_recorder) = 0;
37 virtual void overflow(ObjectRecorder *object_recorder) = 0;
40 ObjectRecorder(librados::IoCtx &ioctx, const std::string &oid,
41 uint64_t object_number, std::shared_ptr<Mutex> lock,
42 ContextWQ *work_queue, SafeTimer &timer, Mutex &timer_lock,
43 Handler *handler, uint8_t order, uint32_t flush_interval,
44 uint64_t flush_bytes, double flush_age);
45 ~ObjectRecorder() override;
47 inline uint64_t get_object_number() const {
48 return m_object_number;
50 inline const std::string &get_oid() const {
54 bool append_unlock(AppendBuffers &&append_buffers);
55 void flush(Context *on_safe);
56 void flush(const FutureImplPtr &future);
58 void claim_append_buffers(AppendBuffers *append_buffers);
60 bool is_closed() const {
61 assert(m_lock->is_locked());
62 return (m_object_closed && m_in_flight_appends.empty());
66 inline CephContext *cct() const {
70 inline size_t get_pending_appends() const {
71 Mutex::Locker locker(*m_lock);
72 return m_append_buffers.size();
76 typedef std::set<uint64_t> InFlightTids;
77 typedef std::map<uint64_t, AppendBuffers> InFlightAppends;
79 struct FlushHandler : public FutureImpl::FlushHandler {
80 ObjectRecorder *object_recorder;
81 FlushHandler(ObjectRecorder *o) : object_recorder(o) {}
83 object_recorder->get();
86 object_recorder->put();
88 void flush(const FutureImplPtr &future) override {
89 Mutex::Locker locker(*(object_recorder->m_lock));
90 object_recorder->flush(future);
93 struct C_AppendFlush : public Context {
94 ObjectRecorder *object_recorder;
96 C_AppendFlush(ObjectRecorder *o, uint64_t _tid)
97 : object_recorder(o), tid(_tid) {
98 object_recorder->get();
100 void finish(int r) override {
101 object_recorder->handle_append_flushed(tid, r);
102 object_recorder->put();
106 librados::IoCtx m_ioctx;
108 uint64_t m_object_number;
111 ContextWQ *m_op_work_queue;
119 uint64_t m_soft_max_size;
121 uint32_t m_flush_interval;
122 uint64_t m_flush_bytes;
125 FlushHandler m_flush_handler;
127 Context *m_append_task = nullptr;
129 mutable std::shared_ptr<Mutex> m_lock;
130 AppendBuffers m_append_buffers;
131 uint64_t m_append_tid;
132 uint32_t m_pending_bytes;
134 InFlightTids m_in_flight_tids;
135 InFlightAppends m_in_flight_appends;
138 bool m_object_closed;
140 bufferlist m_prefetch_bl;
142 bool m_in_flight_flushes;
143 Cond m_in_flight_flushes_cond;
145 AppendBuffers m_pending_buffers;
146 bool m_aio_scheduled;
148 void handle_append_task();
149 void cancel_append_task();
150 void schedule_append_task();
152 bool append(const AppendBuffer &append_buffer, bool *schedule_append);
153 bool flush_appends(bool force);
154 void handle_append_flushed(uint64_t tid, int r);
155 void append_overflowed();
156 void send_appends(AppendBuffers *append_buffers);
157 void send_appends_aio();
159 void notify_handler_unlock();
162 } // namespace journal
164 #endif // CEPH_JOURNAL_OBJECT_RECORDER_H