X-Git-Url: https://gerrit.opnfv.org/gerrit/gitweb?a=blobdiff_plain;f=src%2Fceph%2Fsrc%2Fjournal%2FJournaler.h;fp=src%2Fceph%2Fsrc%2Fjournal%2FJournaler.h;h=bd3529d06a7559d90fb98fe2ebf24a771a4b4f42;hb=812ff6ca9fcd3e629e49d4328905f33eee8ca3f5;hp=0000000000000000000000000000000000000000;hpb=15280273faafb77777eab341909a3f495cf248d9;p=stor4nfv.git diff --git a/src/ceph/src/journal/Journaler.h b/src/ceph/src/journal/Journaler.h new file mode 100644 index 0000000..bd3529d --- /dev/null +++ b/src/ceph/src/journal/Journaler.h @@ -0,0 +1,166 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab + +#ifndef CEPH_JOURNAL_JOURNALER_H +#define CEPH_JOURNAL_JOURNALER_H + +#include "include/int_types.h" +#include "include/buffer_fwd.h" +#include "include/Context.h" +#include "include/rados/librados.hpp" +#include "journal/Future.h" +#include "journal/JournalMetadataListener.h" +#include "cls/journal/cls_journal_types.h" +#include +#include +#include +#include "include/assert.h" + +class ContextWQ; +class SafeTimer; +class ThreadPool; + +namespace journal { + +class JournalMetadata; +class JournalPlayer; +class JournalRecorder; +class JournalTrimmer; +class ReplayEntry; +class ReplayHandler; +class Settings; + +class Journaler { +public: + struct Threads { + Threads(CephContext *cct); + ~Threads(); + + ThreadPool *thread_pool = nullptr; + ContextWQ *work_queue = nullptr; + + SafeTimer *timer = nullptr; + Mutex timer_lock; + }; + + typedef cls::journal::Tag Tag; + typedef std::list Tags; + typedef std::set RegisteredClients; + + static std::string header_oid(const std::string &journal_id); + static std::string object_oid_prefix(int pool_id, + const std::string &journal_id); + + Journaler(librados::IoCtx &header_ioctx, const std::string &journal_id, + const std::string &client_id, const Settings &settings); + Journaler(ContextWQ *work_queue, SafeTimer *timer, Mutex *timer_lock, + librados::IoCtx &header_ioctx, const std::string &journal_id, + const std::string &client_id, const Settings &settings); + ~Journaler(); + + void exists(Context *on_finish) const; + void create(uint8_t order, uint8_t splay_width, int64_t pool_id, Context *ctx); + void remove(bool force, Context *on_finish); + + void init(Context *on_init); + void shut_down(); + void shut_down(Context *on_finish); + + bool is_initialized() const; + + void get_immutable_metadata(uint8_t *order, uint8_t *splay_width, + int64_t *pool_id, Context *on_finish); + void get_mutable_metadata(uint64_t *minimum_set, uint64_t *active_set, + RegisteredClients *clients, Context *on_finish); + + void add_listener(JournalMetadataListener *listener); + void remove_listener(JournalMetadataListener *listener); + + int register_client(const bufferlist &data); + void register_client(const bufferlist &data, Context *on_finish); + + int unregister_client(); + void unregister_client(Context *on_finish); + + void update_client(const bufferlist &data, Context *on_finish); + void get_client(const std::string &client_id, cls::journal::Client *client, + Context *on_finish); + int get_cached_client(const std::string &client_id, + cls::journal::Client *client); + + void flush_commit_position(Context *on_safe); + + void allocate_tag(const bufferlist &data, cls::journal::Tag *tag, + Context *on_finish); + void allocate_tag(uint64_t tag_class, const bufferlist &data, + cls::journal::Tag *tag, Context *on_finish); + void get_tag(uint64_t tag_tid, Tag *tag, Context *on_finish); + void get_tags(uint64_t tag_class, Tags *tags, Context *on_finish); + void get_tags(uint64_t start_after_tag_tid, uint64_t tag_class, Tags *tags, + Context *on_finish); + + void start_replay(ReplayHandler *replay_handler); + void start_live_replay(ReplayHandler *replay_handler, double interval); + bool try_pop_front(ReplayEntry *replay_entry, uint64_t *tag_tid = nullptr); + void stop_replay(); + void stop_replay(Context *on_finish); + + uint64_t get_max_append_size() const; + void start_append(int flush_interval, uint64_t flush_bytes, double flush_age); + Future append(uint64_t tag_tid, const bufferlist &bl); + void flush_append(Context *on_safe); + void stop_append(Context *on_safe); + + void committed(const ReplayEntry &replay_entry); + void committed(const Future &future); + + void get_metadata(uint8_t *order, uint8_t *splay_width, int64_t *pool_id); + +private: + struct C_InitJournaler : public Context { + Journaler *journaler; + Context *on_safe; + C_InitJournaler(Journaler *_journaler, Context *_on_safe) + : journaler(_journaler), on_safe(_on_safe) { + } + void finish(int r) override { + if (r == 0) { + r = journaler->init_complete(); + } + on_safe->complete(r); + } + }; + + Threads *m_threads = nullptr; + + mutable librados::IoCtx m_header_ioctx; + librados::IoCtx m_data_ioctx; + CephContext *m_cct; + std::string m_client_id; + + std::string m_header_oid; + std::string m_object_oid_prefix; + + bool m_initialized = false; + JournalMetadata *m_metadata = nullptr; + JournalPlayer *m_player = nullptr; + JournalRecorder *m_recorder = nullptr; + JournalTrimmer *m_trimmer = nullptr; + + void set_up(ContextWQ *work_queue, SafeTimer *timer, Mutex *timer_lock, + librados::IoCtx &header_ioctx, const std::string &journal_id, + const Settings &settings); + + int init_complete(); + void create_player(ReplayHandler *replay_handler); + + friend std::ostream &operator<<(std::ostream &os, + const Journaler &journaler); +}; + +std::ostream &operator<<(std::ostream &os, + const Journaler &journaler); + +} // namespace journal + +#endif // CEPH_JOURNAL_JOURNALER_H