X-Git-Url: https://gerrit.opnfv.org/gerrit/gitweb?a=blobdiff_plain;f=src%2Fceph%2Fsrc%2Fjournal%2FJournalTrimmer.cc;fp=src%2Fceph%2Fsrc%2Fjournal%2FJournalTrimmer.cc;h=0e60a58437c9a7a51e1f8d62951649abe2630f02;hb=812ff6ca9fcd3e629e49d4328905f33eee8ca3f5;hp=0000000000000000000000000000000000000000;hpb=15280273faafb77777eab341909a3f495cf248d9;p=stor4nfv.git diff --git a/src/ceph/src/journal/JournalTrimmer.cc b/src/ceph/src/journal/JournalTrimmer.cc new file mode 100644 index 0000000..0e60a58 --- /dev/null +++ b/src/ceph/src/journal/JournalTrimmer.cc @@ -0,0 +1,247 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab + +#include "journal/JournalTrimmer.h" +#include "journal/Utils.h" +#include "common/Cond.h" +#include "common/errno.h" +#include + +#define dout_subsys ceph_subsys_journaler +#undef dout_prefix +#define dout_prefix *_dout << "JournalTrimmer: " << this << " " + +namespace journal { + +struct JournalTrimmer::C_RemoveSet : public Context { + JournalTrimmer *journal_trimmer; + uint64_t object_set; + Mutex lock; + uint32_t refs; + int return_value; + + C_RemoveSet(JournalTrimmer *_journal_trimmer, uint64_t _object_set, + uint8_t _splay_width); + void complete(int r) override; + void finish(int r) override { + journal_trimmer->handle_set_removed(r, object_set); + journal_trimmer->m_async_op_tracker.finish_op(); + } +}; + +JournalTrimmer::JournalTrimmer(librados::IoCtx &ioctx, + const std::string &object_oid_prefix, + const JournalMetadataPtr &journal_metadata) + : m_cct(NULL), m_object_oid_prefix(object_oid_prefix), + m_journal_metadata(journal_metadata), m_metadata_listener(this), + m_lock("JournalTrimmer::m_lock"), m_remove_set_pending(false), + m_remove_set(0), m_remove_set_ctx(NULL) { + m_ioctx.dup(ioctx); + m_cct = reinterpret_cast(m_ioctx.cct()); + + m_journal_metadata->add_listener(&m_metadata_listener); +} + +JournalTrimmer::~JournalTrimmer() { + assert(m_shutdown); +} + +void JournalTrimmer::shut_down(Context *on_finish) { + ldout(m_cct, 20) << __func__ << dendl; + { + Mutex::Locker locker(m_lock); + assert(!m_shutdown); + m_shutdown = true; + } + + m_journal_metadata->remove_listener(&m_metadata_listener); + + // chain the shut down sequence (reverse order) + on_finish = new FunctionContext([this, on_finish](int r) { + m_async_op_tracker.wait_for_ops(on_finish); + }); + m_journal_metadata->flush_commit_position(on_finish); +} + +void JournalTrimmer::remove_objects(bool force, Context *on_finish) { + ldout(m_cct, 20) << __func__ << dendl; + + on_finish = new FunctionContext([this, force, on_finish](int r) { + Mutex::Locker locker(m_lock); + + if (m_remove_set_pending) { + on_finish->complete(-EBUSY); + } + + if (!force) { + JournalMetadata::RegisteredClients registered_clients; + m_journal_metadata->get_registered_clients(®istered_clients); + + if (registered_clients.size() == 0) { + on_finish->complete(-EINVAL); + return; + } else if (registered_clients.size() > 1) { + on_finish->complete(-EBUSY); + return; + } + } + + m_remove_set = std::numeric_limits::max(); + m_remove_set_pending = true; + m_remove_set_ctx = on_finish; + + remove_set(m_journal_metadata->get_minimum_set()); + }); + + m_async_op_tracker.wait_for_ops(on_finish); +} + +void JournalTrimmer::committed(uint64_t commit_tid) { + ldout(m_cct, 20) << __func__ << ": commit_tid=" << commit_tid << dendl; + m_journal_metadata->committed(commit_tid, + m_create_commit_position_safe_context); +} + +void JournalTrimmer::trim_objects(uint64_t minimum_set) { + assert(m_lock.is_locked()); + + ldout(m_cct, 20) << __func__ << ": min_set=" << minimum_set << dendl; + if (minimum_set <= m_journal_metadata->get_minimum_set()) { + return; + } + + if (m_remove_set_pending) { + m_remove_set = MAX(m_remove_set, minimum_set); + return; + } + + m_remove_set = minimum_set; + m_remove_set_pending = true; + remove_set(m_journal_metadata->get_minimum_set()); +} + +void JournalTrimmer::remove_set(uint64_t object_set) { + assert(m_lock.is_locked()); + + m_async_op_tracker.start_op(); + uint8_t splay_width = m_journal_metadata->get_splay_width(); + C_RemoveSet *ctx = new C_RemoveSet(this, object_set, splay_width); + + ldout(m_cct, 20) << __func__ << ": removing object set " << object_set + << dendl; + for (uint64_t object_number = object_set * splay_width; + object_number < (object_set + 1) * splay_width; + ++object_number) { + std::string oid = utils::get_object_name(m_object_oid_prefix, + object_number); + + ldout(m_cct, 20) << "removing journal object " << oid << dendl; + librados::AioCompletion *comp = + librados::Rados::aio_create_completion(ctx, NULL, + utils::rados_ctx_callback); + int r = m_ioctx.aio_remove(oid, comp); + assert(r == 0); + comp->release(); + } +} + +void JournalTrimmer::handle_metadata_updated() { + ldout(m_cct, 20) << __func__ << dendl; + + Mutex::Locker locker(m_lock); + + JournalMetadata::RegisteredClients registered_clients; + m_journal_metadata->get_registered_clients(®istered_clients); + + uint8_t splay_width = m_journal_metadata->get_splay_width(); + uint64_t minimum_set = m_journal_metadata->get_minimum_set(); + uint64_t active_set = m_journal_metadata->get_active_set(); + uint64_t minimum_commit_set = active_set; + std::string minimum_client_id; + + for (auto &client : registered_clients) { + if (client.state == cls::journal::CLIENT_STATE_DISCONNECTED) { + continue; + } + + if (client.commit_position.object_positions.empty()) { + // client hasn't recorded any commits + minimum_commit_set = minimum_set; + minimum_client_id = client.id; + break; + } + + for (auto &position : client.commit_position.object_positions) { + uint64_t object_set = position.object_number / splay_width; + if (object_set < minimum_commit_set) { + minimum_client_id = client.id; + minimum_commit_set = object_set; + } + } + } + + if (minimum_commit_set > minimum_set) { + trim_objects(minimum_commit_set); + } else { + ldout(m_cct, 20) << "object set " << minimum_commit_set << " still " + << "in-use by client " << minimum_client_id << dendl; + } +} + +void JournalTrimmer::handle_set_removed(int r, uint64_t object_set) { + ldout(m_cct, 20) << __func__ << ": r=" << r << ", set=" << object_set << ", " + << "trim=" << m_remove_set << dendl; + + Mutex::Locker locker(m_lock); + m_remove_set_pending = false; + + if (r == -ENOENT) { + // no objects within the set existed + r = 0; + } + if (r == 0) { + // advance the minimum set to the next set + m_journal_metadata->set_minimum_set(object_set + 1); + uint64_t active_set = m_journal_metadata->get_active_set(); + uint64_t minimum_set = m_journal_metadata->get_minimum_set(); + + if (m_remove_set > minimum_set && minimum_set <= active_set) { + m_remove_set_pending = true; + remove_set(minimum_set); + } + } + + if (m_remove_set_ctx != nullptr && !m_remove_set_pending) { + ldout(m_cct, 20) << "completing remove set context" << dendl; + m_remove_set_ctx->complete(r); + m_remove_set_ctx = nullptr; + } +} + +JournalTrimmer::C_RemoveSet::C_RemoveSet(JournalTrimmer *_journal_trimmer, + uint64_t _object_set, + uint8_t _splay_width) + : journal_trimmer(_journal_trimmer), object_set(_object_set), + lock(utils::unique_lock_name("C_RemoveSet::lock", this)), + refs(_splay_width), return_value(-ENOENT) { +} + +void JournalTrimmer::C_RemoveSet::complete(int r) { + lock.Lock(); + if (r < 0 && r != -ENOENT && + (return_value == -ENOENT || return_value == 0)) { + return_value = r; + } else if (r == 0 && return_value == -ENOENT) { + return_value = 0; + } + + if (--refs == 0) { + finish(return_value); + lock.Unlock(); + delete this; + } else { + lock.Unlock(); + } +} + +} // namespace journal