1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
4 #ifndef CEPH_JOURNAL_JOURNAL_RECORDER_H
5 #define CEPH_JOURNAL_JOURNAL_RECORDER_H
7 #include "include/int_types.h"
8 #include "include/Context.h"
9 #include "include/rados/librados.hpp"
10 #include "common/Mutex.h"
11 #include "journal/Future.h"
12 #include "journal/FutureImpl.h"
13 #include "journal/JournalMetadata.h"
14 #include "journal/ObjectRecorder.h"
22 class JournalRecorder {
24 JournalRecorder(librados::IoCtx &ioctx, const std::string &object_oid_prefix,
25 const JournalMetadataPtr &journal_metadata,
26 uint32_t flush_interval, uint64_t flush_bytes,
30 Future append(uint64_t tag_tid, const bufferlist &bl);
31 void flush(Context *on_safe);
33 ObjectRecorderPtr get_object(uint8_t splay_offset);
36 typedef std::map<uint8_t, ObjectRecorderPtr> ObjectRecorderPtrs;
38 struct Listener : public JournalMetadataListener {
39 JournalRecorder *journal_recorder;
41 Listener(JournalRecorder *_journal_recorder)
42 : journal_recorder(_journal_recorder) {}
44 void handle_update(JournalMetadata *) override {
45 journal_recorder->handle_update();
49 struct ObjectHandler : public ObjectRecorder::Handler {
50 JournalRecorder *journal_recorder;
52 ObjectHandler(JournalRecorder *_journal_recorder)
53 : journal_recorder(_journal_recorder) {
56 void closed(ObjectRecorder *object_recorder) override {
57 journal_recorder->handle_closed(object_recorder);
59 void overflow(ObjectRecorder *object_recorder) override {
60 journal_recorder->handle_overflow(object_recorder);
64 struct C_AdvanceObjectSet : public Context {
65 JournalRecorder *journal_recorder;
67 C_AdvanceObjectSet(JournalRecorder *_journal_recorder)
68 : journal_recorder(_journal_recorder) {
70 void finish(int r) override {
71 journal_recorder->handle_advance_object_set(r);
75 librados::IoCtx m_ioctx;
77 std::string m_object_oid_prefix;
79 JournalMetadataPtr m_journal_metadata;
81 uint32_t m_flush_interval;
82 uint64_t m_flush_bytes;
86 ObjectHandler m_object_handler;
90 uint32_t m_in_flight_advance_sets = 0;
91 uint32_t m_in_flight_object_closes = 0;
92 uint64_t m_current_set;
93 ObjectRecorderPtrs m_object_ptrs;
94 std::vector<std::shared_ptr<Mutex>> m_object_locks;
96 FutureImplPtr m_prev_future;
98 void open_object_set();
99 bool close_object_set(uint64_t active_set);
101 void advance_object_set();
102 void handle_advance_object_set(int r);
104 void close_and_advance_object_set(uint64_t object_set);
106 ObjectRecorderPtr create_object_recorder(uint64_t object_number,
107 std::shared_ptr<Mutex> lock);
108 void create_next_object_recorder_unlock(ObjectRecorderPtr object_recorder);
110 void handle_update();
112 void handle_closed(ObjectRecorder *object_recorder);
113 void handle_overflow(ObjectRecorder *object_recorder);
115 void lock_object_recorders() {
116 for (auto& lock : m_object_locks) {
121 void unlock_object_recorders() {
122 for (auto& lock : m_object_locks) {
128 } // namespace journal
130 #endif // CEPH_JOURNAL_JOURNAL_RECORDER_H