X-Git-Url: https://gerrit.opnfv.org/gerrit/gitweb?a=blobdiff_plain;f=src%2Fceph%2Fsrc%2Fjournal%2FJournaler.cc;fp=src%2Fceph%2Fsrc%2Fjournal%2FJournaler.cc;h=0000000000000000000000000000000000000000;hb=7da45d65be36d36b880cc55c5036e96c24b53f00;hp=a138c7c08103f8f8ed7f51956861f5051993a3a2;hpb=691462d09d0987b47e112d6ee8740375df3c51b2;p=stor4nfv.git diff --git a/src/ceph/src/journal/Journaler.cc b/src/ceph/src/journal/Journaler.cc deleted file mode 100644 index a138c7c..0000000 --- a/src/ceph/src/journal/Journaler.cc +++ /dev/null @@ -1,456 +0,0 @@ -// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- -// vim: ts=8 sw=2 smarttab - -#include "journal/Journaler.h" -#include "include/stringify.h" -#include "common/errno.h" -#include "common/Timer.h" -#include "common/WorkQueue.h" -#include "journal/Entry.h" -#include "journal/FutureImpl.h" -#include "journal/JournalMetadata.h" -#include "journal/JournalPlayer.h" -#include "journal/JournalRecorder.h" -#include "journal/JournalTrimmer.h" -#include "journal/ReplayEntry.h" -#include "journal/ReplayHandler.h" -#include "cls/journal/cls_journal_client.h" -#include "cls/journal/cls_journal_types.h" -#include "Utils.h" - -#define dout_subsys ceph_subsys_journaler -#undef dout_prefix -#define dout_prefix *_dout << "Journaler: " << this << " " - -namespace journal { - -namespace { - -static const std::string JOURNAL_HEADER_PREFIX = "journal."; -static const std::string JOURNAL_OBJECT_PREFIX = "journal_data."; - -} // anonymous namespace - -using namespace cls::journal; -using utils::rados_ctx_callback; - -std::string Journaler::header_oid(const std::string &journal_id) { - return JOURNAL_HEADER_PREFIX + journal_id; -} - -std::string Journaler::object_oid_prefix(int pool_id, - const std::string &journal_id) { - return JOURNAL_OBJECT_PREFIX + stringify(pool_id) + "." + journal_id + "."; -} - -Journaler::Threads::Threads(CephContext *cct) - : timer_lock("Journaler::timer_lock") { - thread_pool = new ThreadPool(cct, "Journaler::thread_pool", "tp_journal", 1); - thread_pool->start(); - - work_queue = new ContextWQ("Journaler::work_queue", 60, thread_pool); - - timer = new SafeTimer(cct, timer_lock, true); - timer->init(); -} - -Journaler::Threads::~Threads() { - { - Mutex::Locker timer_locker(timer_lock); - timer->shutdown(); - } - delete timer; - - work_queue->drain(); - delete work_queue; - - thread_pool->stop(); - delete thread_pool; -} - -Journaler::Journaler(librados::IoCtx &header_ioctx, - const std::string &journal_id, - const std::string &client_id, const Settings &settings) - : m_threads(new Threads(reinterpret_cast(header_ioctx.cct()))), - m_client_id(client_id) { - set_up(m_threads->work_queue, m_threads->timer, &m_threads->timer_lock, - header_ioctx, journal_id, settings); -} - -Journaler::Journaler(ContextWQ *work_queue, SafeTimer *timer, - Mutex *timer_lock, librados::IoCtx &header_ioctx, - const std::string &journal_id, - const std::string &client_id, const Settings &settings) - : m_client_id(client_id) { - set_up(work_queue, timer, timer_lock, header_ioctx, journal_id, - settings); -} - -void Journaler::set_up(ContextWQ *work_queue, SafeTimer *timer, - Mutex *timer_lock, librados::IoCtx &header_ioctx, - const std::string &journal_id, - const Settings &settings) { - m_header_ioctx.dup(header_ioctx); - m_cct = reinterpret_cast(m_header_ioctx.cct()); - - m_header_oid = header_oid(journal_id); - m_object_oid_prefix = object_oid_prefix(m_header_ioctx.get_id(), journal_id); - - m_metadata = new JournalMetadata(work_queue, timer, timer_lock, - m_header_ioctx, m_header_oid, m_client_id, - settings); - m_metadata->get(); -} - -Journaler::~Journaler() { - if (m_metadata != nullptr) { - assert(!m_metadata->is_initialized()); - if (!m_initialized) { - // never initialized -- ensure any in-flight ops are complete - // since we wouldn't expect shut_down to be invoked - m_metadata->wait_for_ops(); - } - m_metadata->put(); - m_metadata = nullptr; - } - assert(m_trimmer == nullptr); - assert(m_player == nullptr); - assert(m_recorder == nullptr); - - delete m_threads; -} - -void Journaler::exists(Context *on_finish) const { - librados::ObjectReadOperation op; - op.stat(NULL, NULL, NULL); - - librados::AioCompletion *comp = - librados::Rados::aio_create_completion(on_finish, nullptr, rados_ctx_callback); - int r = m_header_ioctx.aio_operate(m_header_oid, comp, &op, NULL); - assert(r == 0); - comp->release(); -} - -void Journaler::init(Context *on_init) { - m_initialized = true; - m_metadata->init(new C_InitJournaler(this, on_init)); -} - -int Journaler::init_complete() { - int64_t pool_id = m_metadata->get_pool_id(); - - if (pool_id < 0 || pool_id == m_header_ioctx.get_id()) { - ldout(m_cct, 20) << "using image pool for journal data" << dendl; - m_data_ioctx.dup(m_header_ioctx); - } else { - ldout(m_cct, 20) << "using pool id=" << pool_id << " for journal data" - << dendl; - librados::Rados rados(m_header_ioctx); - int r = rados.ioctx_create2(pool_id, m_data_ioctx); - if (r < 0) { - if (r == -ENOENT) { - ldout(m_cct, 1) << "pool id=" << pool_id << " no longer exists" - << dendl; - } - return r; - } - } - m_trimmer = new JournalTrimmer(m_data_ioctx, m_object_oid_prefix, - m_metadata); - return 0; -} - -void Journaler::shut_down() { - C_SaferCond ctx; - shut_down(&ctx); - ctx.wait(); -} - -void Journaler::shut_down(Context *on_finish) { - assert(m_player == nullptr); - assert(m_recorder == nullptr); - - JournalMetadata *metadata = nullptr; - std::swap(metadata, m_metadata); - assert(metadata != nullptr); - - on_finish = new FunctionContext([metadata, on_finish](int r) { - metadata->put(); - on_finish->complete(0); - }); - - JournalTrimmer *trimmer = nullptr; - std::swap(trimmer, m_trimmer); - if (trimmer == nullptr) { - metadata->shut_down(on_finish); - return; - } - - on_finish = new FunctionContext([trimmer, metadata, on_finish](int r) { - delete trimmer; - metadata->shut_down(on_finish); - }); - trimmer->shut_down(on_finish); -} - -bool Journaler::is_initialized() const { - return m_metadata->is_initialized(); -} - -void Journaler::get_immutable_metadata(uint8_t *order, uint8_t *splay_width, - int64_t *pool_id, Context *on_finish) { - m_metadata->get_immutable_metadata(order, splay_width, pool_id, on_finish); -} - -void Journaler::get_mutable_metadata(uint64_t *minimum_set, - uint64_t *active_set, - RegisteredClients *clients, - Context *on_finish) { - m_metadata->get_mutable_metadata(minimum_set, active_set, clients, on_finish); -} - -void Journaler::create(uint8_t order, uint8_t splay_width, - int64_t pool_id, Context *on_finish) { - if (order > 64 || order < 12) { - lderr(m_cct) << "order must be in the range [12, 64]" << dendl; - on_finish->complete(-EDOM); - return; - } - if (splay_width == 0) { - on_finish->complete(-EINVAL); - return; - } - - ldout(m_cct, 5) << "creating new journal: " << m_header_oid << dendl; - - librados::ObjectWriteOperation op; - client::create(&op, order, splay_width, pool_id); - - librados::AioCompletion *comp = - librados::Rados::aio_create_completion(on_finish, nullptr, rados_ctx_callback); - int r = m_header_ioctx.aio_operate(m_header_oid, comp, &op); - assert(r == 0); - comp->release(); -} - -void Journaler::remove(bool force, Context *on_finish) { - // chain journal removal (reverse order) - on_finish = new FunctionContext([this, on_finish](int r) { - librados::AioCompletion *comp = librados::Rados::aio_create_completion( - on_finish, nullptr, utils::rados_ctx_callback); - r = m_header_ioctx.aio_remove(m_header_oid, comp); - assert(r == 0); - comp->release(); - }); - - on_finish = new FunctionContext([this, force, on_finish](int r) { - m_trimmer->remove_objects(force, on_finish); - }); - - m_metadata->shut_down(on_finish); -} - -void Journaler::flush_commit_position(Context *on_safe) { - m_metadata->flush_commit_position(on_safe); -} - -void Journaler::add_listener(JournalMetadataListener *listener) { - m_metadata->add_listener(listener); -} - -void Journaler::remove_listener(JournalMetadataListener *listener) { - m_metadata->remove_listener(listener); -} - -int Journaler::register_client(const bufferlist &data) { - C_SaferCond cond; - register_client(data, &cond); - return cond.wait(); -} - -int Journaler::unregister_client() { - C_SaferCond cond; - unregister_client(&cond); - return cond.wait(); -} - -void Journaler::register_client(const bufferlist &data, Context *on_finish) { - return m_metadata->register_client(data, on_finish); -} - -void Journaler::update_client(const bufferlist &data, Context *on_finish) { - return m_metadata->update_client(data, on_finish); -} - -void Journaler::unregister_client(Context *on_finish) { - return m_metadata->unregister_client(on_finish); -} - -void Journaler::get_client(const std::string &client_id, - cls::journal::Client *client, - Context *on_finish) { - m_metadata->get_client(client_id, client, on_finish); -} - -int Journaler::get_cached_client(const std::string &client_id, - cls::journal::Client *client) { - RegisteredClients clients; - m_metadata->get_registered_clients(&clients); - - auto it = clients.find({client_id, {}}); - if (it == clients.end()) { - return -ENOENT; - } - - *client = *it; - return 0; -} - -void Journaler::allocate_tag(const bufferlist &data, cls::journal::Tag *tag, - Context *on_finish) { - m_metadata->allocate_tag(cls::journal::Tag::TAG_CLASS_NEW, data, tag, - on_finish); -} - -void Journaler::allocate_tag(uint64_t tag_class, const bufferlist &data, - cls::journal::Tag *tag, Context *on_finish) { - m_metadata->allocate_tag(tag_class, data, tag, on_finish); -} - -void Journaler::get_tag(uint64_t tag_tid, Tag *tag, Context *on_finish) { - m_metadata->get_tag(tag_tid, tag, on_finish); -} - -void Journaler::get_tags(uint64_t tag_class, Tags *tags, Context *on_finish) { - m_metadata->get_tags(0, tag_class, tags, on_finish); -} - -void Journaler::get_tags(uint64_t start_after_tag_tid, uint64_t tag_class, - Tags *tags, Context *on_finish) { - m_metadata->get_tags(start_after_tag_tid, tag_class, tags, on_finish); -} - -void Journaler::start_replay(ReplayHandler *replay_handler) { - create_player(replay_handler); - m_player->prefetch(); -} - -void Journaler::start_live_replay(ReplayHandler *replay_handler, - double interval) { - create_player(replay_handler); - m_player->prefetch_and_watch(interval); -} - -bool Journaler::try_pop_front(ReplayEntry *replay_entry, - uint64_t *tag_tid) { - assert(m_player != NULL); - - Entry entry; - uint64_t commit_tid; - if (!m_player->try_pop_front(&entry, &commit_tid)) { - return false; - } - - *replay_entry = ReplayEntry(entry.get_data(), commit_tid); - if (tag_tid != nullptr) { - *tag_tid = entry.get_tag_tid(); - } - return true; -} - -void Journaler::stop_replay() { - C_SaferCond ctx; - stop_replay(&ctx); - ctx.wait(); -} - -void Journaler::stop_replay(Context *on_finish) { - JournalPlayer *player = nullptr; - std::swap(player, m_player); - assert(player != nullptr); - - on_finish = new FunctionContext([player, on_finish](int r) { - delete player; - on_finish->complete(r); - }); - player->shut_down(on_finish); -} - -void Journaler::committed(const ReplayEntry &replay_entry) { - m_trimmer->committed(replay_entry.get_commit_tid()); -} - -void Journaler::committed(const Future &future) { - FutureImplPtr future_impl = future.get_future_impl(); - m_trimmer->committed(future_impl->get_commit_tid()); -} - -void Journaler::start_append(int flush_interval, uint64_t flush_bytes, - double flush_age) { - assert(m_recorder == NULL); - - // TODO verify active object set >= current replay object set - - m_recorder = new JournalRecorder(m_data_ioctx, m_object_oid_prefix, - m_metadata, flush_interval, flush_bytes, - flush_age); -} - -void Journaler::stop_append(Context *on_safe) { - JournalRecorder *recorder = nullptr; - std::swap(recorder, m_recorder); - assert(recorder != nullptr); - - on_safe = new FunctionContext([recorder, on_safe](int r) { - delete recorder; - on_safe->complete(r); - }); - recorder->flush(on_safe); -} - -uint64_t Journaler::get_max_append_size() const { - uint64_t max_payload_size = m_metadata->get_object_size() - - Entry::get_fixed_size(); - if (m_metadata->get_settings().max_payload_bytes > 0) { - max_payload_size = MIN(max_payload_size, - m_metadata->get_settings().max_payload_bytes); - } - return max_payload_size; -} - -Future Journaler::append(uint64_t tag_tid, const bufferlist &payload_bl) { - return m_recorder->append(tag_tid, payload_bl); -} - -void Journaler::flush_append(Context *on_safe) { - m_recorder->flush(on_safe); -} - -void Journaler::create_player(ReplayHandler *replay_handler) { - assert(m_player == NULL); - m_player = new JournalPlayer(m_data_ioctx, m_object_oid_prefix, m_metadata, - replay_handler); -} - -void Journaler::get_metadata(uint8_t *order, uint8_t *splay_width, - int64_t *pool_id) { - assert(m_metadata != NULL); - - *order = m_metadata->get_order(); - *splay_width = m_metadata->get_splay_width(); - *pool_id = m_metadata->get_pool_id(); -} - -std::ostream &operator<<(std::ostream &os, - const Journaler &journaler) { - os << "[metadata="; - if (journaler.m_metadata != NULL) { - os << *journaler.m_metadata; - } else { - os << "NULL"; - } - os << "]"; - return os; -} - -} // namespace journal