X-Git-Url: https://gerrit.opnfv.org/gerrit/gitweb?a=blobdiff_plain;f=src%2Fceph%2Fsrc%2Fjournal%2FObjectPlayer.h;fp=src%2Fceph%2Fsrc%2Fjournal%2FObjectPlayer.h;h=a3cbe807332f32182bfca354de051e18e00db344;hb=812ff6ca9fcd3e629e49d4328905f33eee8ca3f5;hp=0000000000000000000000000000000000000000;hpb=15280273faafb77777eab341909a3f495cf248d9;p=stor4nfv.git diff --git a/src/ceph/src/journal/ObjectPlayer.h b/src/ceph/src/journal/ObjectPlayer.h new file mode 100644 index 0000000..a3cbe80 --- /dev/null +++ b/src/ceph/src/journal/ObjectPlayer.h @@ -0,0 +1,141 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab + +#ifndef CEPH_JOURNAL_OBJECT_PLAYER_H +#define CEPH_JOURNAL_OBJECT_PLAYER_H + +#include "include/Context.h" +#include "include/interval_set.h" +#include "include/rados/librados.hpp" +#include "common/Cond.h" +#include "common/Mutex.h" +#include "common/RefCountedObj.h" +#include "journal/Entry.h" +#include +#include +#include +#include +#include +#include "include/assert.h" + +class SafeTimer; + +namespace journal { + +class ObjectPlayer; +typedef boost::intrusive_ptr ObjectPlayerPtr; + +class ObjectPlayer : public RefCountedObject { +public: + typedef std::list Entries; + typedef interval_set InvalidRanges; + + enum RefetchState { + REFETCH_STATE_NONE, + REFETCH_STATE_REQUIRED, + REFETCH_STATE_IMMEDIATE + }; + + ObjectPlayer(librados::IoCtx &ioctx, const std::string &object_oid_prefix, + uint64_t object_num, SafeTimer &timer, Mutex &timer_lock, + uint8_t order, uint64_t max_fetch_bytes); + ~ObjectPlayer() override; + + inline const std::string &get_oid() const { + return m_oid; + } + inline uint64_t get_object_number() const { + return m_object_num; + } + + void fetch(Context *on_finish); + void watch(Context *on_fetch, double interval); + void unwatch(); + + void front(Entry *entry) const; + void pop_front(); + inline bool empty() const { + Mutex::Locker locker(m_lock); + return m_entries.empty(); + } + + inline void get_entries(Entries *entries) { + Mutex::Locker locker(m_lock); + *entries = m_entries; + } + inline void get_invalid_ranges(InvalidRanges *invalid_ranges) { + Mutex::Locker locker(m_lock); + *invalid_ranges = m_invalid_ranges; + } + + inline bool refetch_required() const { + return (get_refetch_state() != REFETCH_STATE_NONE); + } + inline RefetchState get_refetch_state() const { + return m_refetch_state; + } + inline void set_refetch_state(RefetchState refetch_state) { + m_refetch_state = refetch_state; + } + +private: + typedef std::pair EntryKey; + typedef boost::unordered_map EntryKeys; + + struct C_Fetch : public Context { + ObjectPlayerPtr object_player; + Context *on_finish; + bufferlist read_bl; + C_Fetch(ObjectPlayer *o, Context *ctx) : object_player(o), on_finish(ctx) { + } + void finish(int r) override; + }; + struct C_WatchFetch : public Context { + ObjectPlayerPtr object_player; + C_WatchFetch(ObjectPlayer *o) : object_player(o) { + } + void finish(int r) override; + }; + + librados::IoCtx m_ioctx; + uint64_t m_object_num; + std::string m_oid; + CephContext *m_cct; + + SafeTimer &m_timer; + Mutex &m_timer_lock; + + uint8_t m_order; + uint64_t m_max_fetch_bytes; + + double m_watch_interval; + Context *m_watch_task; + + mutable Mutex m_lock; + bool m_fetch_in_progress; + bufferlist m_read_bl; + uint32_t m_read_off = 0; + uint32_t m_read_bl_off = 0; + + Entries m_entries; + EntryKeys m_entry_keys; + InvalidRanges m_invalid_ranges; + + Context *m_watch_ctx = nullptr; + + bool m_unwatched = false; + RefetchState m_refetch_state = REFETCH_STATE_IMMEDIATE; + + int handle_fetch_complete(int r, const bufferlist &bl, bool *refetch); + + void clear_invalid_range(uint32_t off, uint32_t len); + + void schedule_watch(); + bool cancel_watch(); + void handle_watch_task(); + void handle_watch_fetched(int r); +}; + +} // namespace journal + +#endif // CEPH_JOURNAL_OBJECT_PLAYER_H