1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
4 #ifndef CEPH_JOURNAL_OBJECT_PLAYER_H
5 #define CEPH_JOURNAL_OBJECT_PLAYER_H
7 #include "include/Context.h"
8 #include "include/interval_set.h"
9 #include "include/rados/librados.hpp"
10 #include "common/Cond.h"
11 #include "common/Mutex.h"
12 #include "common/RefCountedObj.h"
13 #include "journal/Entry.h"
16 #include <boost/intrusive_ptr.hpp>
17 #include <boost/noncopyable.hpp>
18 #include <boost/unordered_map.hpp>
19 #include "include/assert.h"
26 typedef boost::intrusive_ptr<ObjectPlayer> ObjectPlayerPtr;
28 class ObjectPlayer : public RefCountedObject {
30 typedef std::list<Entry> Entries;
31 typedef interval_set<uint64_t> InvalidRanges;
35 REFETCH_STATE_REQUIRED,
36 REFETCH_STATE_IMMEDIATE
39 ObjectPlayer(librados::IoCtx &ioctx, const std::string &object_oid_prefix,
40 uint64_t object_num, SafeTimer &timer, Mutex &timer_lock,
41 uint8_t order, uint64_t max_fetch_bytes);
42 ~ObjectPlayer() override;
44 inline const std::string &get_oid() const {
47 inline uint64_t get_object_number() const {
51 void fetch(Context *on_finish);
52 void watch(Context *on_fetch, double interval);
55 void front(Entry *entry) const;
57 inline bool empty() const {
58 Mutex::Locker locker(m_lock);
59 return m_entries.empty();
62 inline void get_entries(Entries *entries) {
63 Mutex::Locker locker(m_lock);
66 inline void get_invalid_ranges(InvalidRanges *invalid_ranges) {
67 Mutex::Locker locker(m_lock);
68 *invalid_ranges = m_invalid_ranges;
71 inline bool refetch_required() const {
72 return (get_refetch_state() != REFETCH_STATE_NONE);
74 inline RefetchState get_refetch_state() const {
75 return m_refetch_state;
77 inline void set_refetch_state(RefetchState refetch_state) {
78 m_refetch_state = refetch_state;
82 typedef std::pair<uint64_t, uint64_t> EntryKey;
83 typedef boost::unordered_map<EntryKey, Entries::iterator> EntryKeys;
85 struct C_Fetch : public Context {
86 ObjectPlayerPtr object_player;
89 C_Fetch(ObjectPlayer *o, Context *ctx) : object_player(o), on_finish(ctx) {
91 void finish(int r) override;
93 struct C_WatchFetch : public Context {
94 ObjectPlayerPtr object_player;
95 C_WatchFetch(ObjectPlayer *o) : object_player(o) {
97 void finish(int r) override;
100 librados::IoCtx m_ioctx;
101 uint64_t m_object_num;
109 uint64_t m_max_fetch_bytes;
111 double m_watch_interval;
112 Context *m_watch_task;
114 mutable Mutex m_lock;
115 bool m_fetch_in_progress;
116 bufferlist m_read_bl;
117 uint32_t m_read_off = 0;
118 uint32_t m_read_bl_off = 0;
121 EntryKeys m_entry_keys;
122 InvalidRanges m_invalid_ranges;
124 Context *m_watch_ctx = nullptr;
126 bool m_unwatched = false;
127 RefetchState m_refetch_state = REFETCH_STATE_IMMEDIATE;
129 int handle_fetch_complete(int r, const bufferlist &bl, bool *refetch);
131 void clear_invalid_range(uint32_t off, uint32_t len);
133 void schedule_watch();
135 void handle_watch_task();
136 void handle_watch_fetched(int r);
139 } // namespace journal
141 #endif // CEPH_JOURNAL_OBJECT_PLAYER_H