X-Git-Url: https://gerrit.opnfv.org/gerrit/gitweb?a=blobdiff_plain;f=src%2Fceph%2Fsrc%2Fjournal%2FJournalMetadata.cc;fp=src%2Fceph%2Fsrc%2Fjournal%2FJournalMetadata.cc;h=0000000000000000000000000000000000000000;hb=7da45d65be36d36b880cc55c5036e96c24b53f00;hp=4073216bcdfb2eb561cf03fe77f0d58d62a38dc8;hpb=691462d09d0987b47e112d6ee8740375df3c51b2;p=stor4nfv.git diff --git a/src/ceph/src/journal/JournalMetadata.cc b/src/ceph/src/journal/JournalMetadata.cc deleted file mode 100644 index 4073216..0000000 --- a/src/ceph/src/journal/JournalMetadata.cc +++ /dev/null @@ -1,1128 +0,0 @@ -// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- -// vim: ts=8 sw=2 smarttab - -#include "journal/JournalMetadata.h" -#include "journal/Utils.h" -#include "common/errno.h" -#include "common/Timer.h" -#include "cls/journal/cls_journal_client.h" -#include -#include - -#define dout_subsys ceph_subsys_journaler -#undef dout_prefix -#define dout_prefix *_dout << "JournalMetadata: " << this << " " - -namespace journal { - -using namespace cls::journal; - -namespace { - -struct C_GetClient : public Context { - CephContext *cct; - librados::IoCtx &ioctx; - const std::string &oid; - AsyncOpTracker &async_op_tracker; - std::string client_id; - cls::journal::Client *client; - Context *on_finish; - - bufferlist out_bl; - - C_GetClient(CephContext *cct, librados::IoCtx &ioctx, const std::string &oid, - AsyncOpTracker &async_op_tracker, const std::string &client_id, - cls::journal::Client *client, Context *on_finish) - : cct(cct), ioctx(ioctx), oid(oid), async_op_tracker(async_op_tracker), - client_id(client_id), client(client), on_finish(on_finish) { - async_op_tracker.start_op(); - } - ~C_GetClient() override { - async_op_tracker.finish_op(); - } - - virtual void send() { - send_get_client(); - } - - void send_get_client() { - ldout(cct, 20) << "C_GetClient: " << __func__ << dendl; - - librados::ObjectReadOperation op; - client::get_client_start(&op, client_id); - - librados::AioCompletion *comp = librados::Rados::aio_create_completion( - this, nullptr, &utils::rados_state_callback< - C_GetClient, &C_GetClient::handle_get_client>); - - int r = ioctx.aio_operate(oid, comp, &op, &out_bl); - assert(r == 0); - comp->release(); - } - - void handle_get_client(int r) { - ldout(cct, 20) << "C_GetClient: " << __func__ << ": r=" << r << dendl; - - if (r == 0) { - bufferlist::iterator it = out_bl.begin(); - r = client::get_client_finish(&it, client); - } - complete(r); - } - - void finish(int r) override { - on_finish->complete(r); - } -}; - -struct C_AllocateTag : public Context { - CephContext *cct; - librados::IoCtx &ioctx; - const std::string &oid; - AsyncOpTracker &async_op_tracker; - uint64_t tag_class; - Tag *tag; - Context *on_finish; - - bufferlist out_bl; - - C_AllocateTag(CephContext *cct, librados::IoCtx &ioctx, - const std::string &oid, AsyncOpTracker &async_op_tracker, - uint64_t tag_class, const bufferlist &data, Tag *tag, - Context *on_finish) - : cct(cct), ioctx(ioctx), oid(oid), async_op_tracker(async_op_tracker), - tag_class(tag_class), tag(tag), on_finish(on_finish) { - async_op_tracker.start_op(); - tag->data = data; - } - ~C_AllocateTag() override { - async_op_tracker.finish_op(); - } - - void send() { - send_get_next_tag_tid(); - } - - void send_get_next_tag_tid() { - ldout(cct, 20) << "C_AllocateTag: " << __func__ << dendl; - - librados::ObjectReadOperation op; - client::get_next_tag_tid_start(&op); - - librados::AioCompletion *comp = librados::Rados::aio_create_completion( - this, nullptr, &utils::rados_state_callback< - C_AllocateTag, &C_AllocateTag::handle_get_next_tag_tid>); - - out_bl.clear(); - int r = ioctx.aio_operate(oid, comp, &op, &out_bl); - assert(r == 0); - comp->release(); - } - - void handle_get_next_tag_tid(int r) { - ldout(cct, 20) << "C_AllocateTag: " << __func__ << ": r=" << r << dendl; - - if (r == 0) { - bufferlist::iterator iter = out_bl.begin(); - r = client::get_next_tag_tid_finish(&iter, &tag->tid); - } - if (r < 0) { - complete(r); - return; - } - send_tag_create(); - } - - void send_tag_create() { - ldout(cct, 20) << "C_AllocateTag: " << __func__ << dendl; - - librados::ObjectWriteOperation op; - client::tag_create(&op, tag->tid, tag_class, tag->data); - - librados::AioCompletion *comp = librados::Rados::aio_create_completion( - this, nullptr, &utils::rados_state_callback< - C_AllocateTag, &C_AllocateTag::handle_tag_create>); - - int r = ioctx.aio_operate(oid, comp, &op); - assert(r == 0); - comp->release(); - } - - void handle_tag_create(int r) { - ldout(cct, 20) << "C_AllocateTag: " << __func__ << ": r=" << r << dendl; - - if (r == -ESTALE) { - send_get_next_tag_tid(); - return; - } else if (r < 0) { - complete(r); - return; - } - - send_get_tag(); - } - - void send_get_tag() { - ldout(cct, 20) << "C_AllocateTag: " << __func__ << dendl; - - librados::ObjectReadOperation op; - client::get_tag_start(&op, tag->tid); - - librados::AioCompletion *comp = librados::Rados::aio_create_completion( - this, nullptr, &utils::rados_state_callback< - C_AllocateTag, &C_AllocateTag::handle_get_tag>); - - out_bl.clear(); - int r = ioctx.aio_operate(oid, comp, &op, &out_bl); - assert(r == 0); - comp->release(); - } - - void handle_get_tag(int r) { - ldout(cct, 20) << "C_AllocateTag: " << __func__ << ": r=" << r << dendl; - - if (r == 0) { - bufferlist::iterator iter = out_bl.begin(); - - cls::journal::Tag journal_tag; - r = client::get_tag_finish(&iter, &journal_tag); - if (r == 0) { - *tag = journal_tag; - } - } - complete(r); - } - - void finish(int r) override { - on_finish->complete(r); - } -}; - -struct C_GetTag : public Context { - CephContext *cct; - librados::IoCtx &ioctx; - const std::string &oid; - AsyncOpTracker &async_op_tracker; - uint64_t tag_tid; - JournalMetadata::Tag *tag; - Context *on_finish; - - bufferlist out_bl; - - C_GetTag(CephContext *cct, librados::IoCtx &ioctx, const std::string &oid, - AsyncOpTracker &async_op_tracker, uint64_t tag_tid, - JournalMetadata::Tag *tag, Context *on_finish) - : cct(cct), ioctx(ioctx), oid(oid), async_op_tracker(async_op_tracker), - tag_tid(tag_tid), tag(tag), on_finish(on_finish) { - async_op_tracker.start_op(); - } - ~C_GetTag() override { - async_op_tracker.finish_op(); - } - - void send() { - send_get_tag(); - } - - void send_get_tag() { - librados::ObjectReadOperation op; - client::get_tag_start(&op, tag_tid); - - librados::AioCompletion *comp = librados::Rados::aio_create_completion( - this, nullptr, &utils::rados_state_callback< - C_GetTag, &C_GetTag::handle_get_tag>); - - int r = ioctx.aio_operate(oid, comp, &op, &out_bl); - assert(r == 0); - comp->release(); - } - - void handle_get_tag(int r) { - if (r == 0) { - bufferlist::iterator iter = out_bl.begin(); - r = client::get_tag_finish(&iter, tag); - } - complete(r); - } - - void finish(int r) override { - on_finish->complete(r); - } -}; - -struct C_GetTags : public Context { - CephContext *cct; - librados::IoCtx &ioctx; - const std::string &oid; - const std::string &client_id; - AsyncOpTracker &async_op_tracker; - uint64_t start_after_tag_tid; - boost::optional tag_class; - JournalMetadata::Tags *tags; - Context *on_finish; - - const uint64_t MAX_RETURN = 64; - bufferlist out_bl; - - C_GetTags(CephContext *cct, librados::IoCtx &ioctx, const std::string &oid, - const std::string &client_id, AsyncOpTracker &async_op_tracker, - uint64_t start_after_tag_tid, - const boost::optional &tag_class, - JournalMetadata::Tags *tags, Context *on_finish) - : cct(cct), ioctx(ioctx), oid(oid), client_id(client_id), - async_op_tracker(async_op_tracker), - start_after_tag_tid(start_after_tag_tid), tag_class(tag_class), - tags(tags), on_finish(on_finish) { - async_op_tracker.start_op(); - } - ~C_GetTags() override { - async_op_tracker.finish_op(); - } - - void send() { - send_tag_list(); - } - - void send_tag_list() { - librados::ObjectReadOperation op; - client::tag_list_start(&op, start_after_tag_tid, MAX_RETURN, client_id, - tag_class); - - librados::AioCompletion *comp = librados::Rados::aio_create_completion( - this, nullptr, &utils::rados_state_callback< - C_GetTags, &C_GetTags::handle_tag_list>); - - out_bl.clear(); - int r = ioctx.aio_operate(oid, comp, &op, &out_bl); - assert(r == 0); - comp->release(); - } - - void handle_tag_list(int r) { - if (r == 0) { - std::set journal_tags; - bufferlist::iterator iter = out_bl.begin(); - r = client::tag_list_finish(&iter, &journal_tags); - if (r == 0) { - for (auto &journal_tag : journal_tags) { - tags->push_back(journal_tag); - start_after_tag_tid = journal_tag.tid; - } - - if (journal_tags.size() == MAX_RETURN) { - send_tag_list(); - return; - } - } - } - complete(r); - } - - void finish(int r) override { - on_finish->complete(r); - } -}; - -struct C_FlushCommitPosition : public Context { - Context *commit_position_ctx; - Context *on_finish; - - C_FlushCommitPosition(Context *commit_position_ctx, Context *on_finish) - : commit_position_ctx(commit_position_ctx), on_finish(on_finish) { - } - void finish(int r) override { - if (commit_position_ctx != nullptr) { - commit_position_ctx->complete(r); - } - on_finish->complete(r); - } -}; - -struct C_AssertActiveTag : public Context { - CephContext *cct; - librados::IoCtx &ioctx; - const std::string &oid; - AsyncOpTracker &async_op_tracker; - std::string client_id; - uint64_t tag_tid; - Context *on_finish; - - bufferlist out_bl; - - C_AssertActiveTag(CephContext *cct, librados::IoCtx &ioctx, - const std::string &oid, AsyncOpTracker &async_op_tracker, - const std::string &client_id, uint64_t tag_tid, - Context *on_finish) - : cct(cct), ioctx(ioctx), oid(oid), async_op_tracker(async_op_tracker), - client_id(client_id), tag_tid(tag_tid), on_finish(on_finish) { - async_op_tracker.start_op(); - } - ~C_AssertActiveTag() override { - async_op_tracker.finish_op(); - } - - void send() { - ldout(cct, 20) << "C_AssertActiveTag: " << __func__ << dendl; - - librados::ObjectReadOperation op; - client::tag_list_start(&op, tag_tid, 2, client_id, boost::none); - - librados::AioCompletion *comp = librados::Rados::aio_create_completion( - this, nullptr, &utils::rados_state_callback< - C_AssertActiveTag, &C_AssertActiveTag::handle_send>); - - int r = ioctx.aio_operate(oid, comp, &op, &out_bl); - assert(r == 0); - comp->release(); - } - - void handle_send(int r) { - ldout(cct, 20) << "C_AssertActiveTag: " << __func__ << ": r=" << r << dendl; - - std::set tags; - if (r == 0) { - bufferlist::iterator it = out_bl.begin(); - r = client::tag_list_finish(&it, &tags); - } - - // NOTE: since 0 is treated as an uninitialized list filter, we need to - // load to entries and look at the last tid - if (r == 0 && !tags.empty() && tags.rbegin()->tid > tag_tid) { - r = -ESTALE; - } - complete(r); - } - - void finish(int r) override { - on_finish->complete(r); - } -}; - -} // anonymous namespace - -JournalMetadata::JournalMetadata(ContextWQ *work_queue, SafeTimer *timer, - Mutex *timer_lock, librados::IoCtx &ioctx, - const std::string &oid, - const std::string &client_id, - const Settings &settings) - : RefCountedObject(NULL, 0), m_cct(NULL), m_oid(oid), - m_client_id(client_id), m_settings(settings), m_order(0), - m_splay_width(0), m_pool_id(-1), m_initialized(false), - m_work_queue(work_queue), m_timer(timer), m_timer_lock(timer_lock), - m_lock("JournalMetadata::m_lock"), m_commit_tid(0), m_watch_ctx(this), - m_watch_handle(0), m_minimum_set(0), m_active_set(0), - m_update_notifications(0), m_commit_position_ctx(NULL), - m_commit_position_task_ctx(NULL) { - m_ioctx.dup(ioctx); - m_cct = reinterpret_cast(m_ioctx.cct()); -} - -JournalMetadata::~JournalMetadata() { - Mutex::Locker locker(m_lock); - assert(!m_initialized); -} - -void JournalMetadata::init(Context *on_finish) { - { - Mutex::Locker locker(m_lock); - assert(!m_initialized); - m_initialized = true; - } - - // chain the init sequence (reverse order) - on_finish = utils::create_async_context_callback( - this, on_finish); - on_finish = new C_ImmutableMetadata(this, on_finish); - on_finish = new FunctionContext([this, on_finish](int r) { - if (r < 0) { - lderr(m_cct) << __func__ << ": failed to watch journal" - << cpp_strerror(r) << dendl; - Mutex::Locker locker(m_lock); - m_watch_handle = 0; - on_finish->complete(r); - return; - } - - get_immutable_metadata(&m_order, &m_splay_width, &m_pool_id, on_finish); - }); - - librados::AioCompletion *comp = librados::Rados::aio_create_completion( - on_finish, nullptr, utils::rados_ctx_callback); - int r = m_ioctx.aio_watch(m_oid, comp, &m_watch_handle, &m_watch_ctx); - assert(r == 0); - comp->release(); -} - -void JournalMetadata::shut_down(Context *on_finish) { - - ldout(m_cct, 20) << __func__ << dendl; - - uint64_t watch_handle = 0; - { - Mutex::Locker locker(m_lock); - m_initialized = false; - std::swap(watch_handle, m_watch_handle); - } - - // chain the shut down sequence (reverse order) - on_finish = utils::create_async_context_callback( - this, on_finish); - on_finish = new FunctionContext([this, on_finish](int r) { - ldout(m_cct, 20) << "shut_down: waiting for ops" << dendl; - m_async_op_tracker.wait_for_ops(on_finish); - }); - on_finish = new FunctionContext([this, on_finish](int r) { - ldout(m_cct, 20) << "shut_down: flushing watch" << dendl; - librados::Rados rados(m_ioctx); - librados::AioCompletion *comp = librados::Rados::aio_create_completion( - on_finish, nullptr, utils::rados_ctx_callback); - r = rados.aio_watch_flush(comp); - assert(r == 0); - comp->release(); - }); - on_finish = new FunctionContext([this, on_finish](int r) { - flush_commit_position(on_finish); - }); - if (watch_handle != 0) { - librados::AioCompletion *comp = librados::Rados::aio_create_completion( - on_finish, nullptr, utils::rados_ctx_callback); - int r = m_ioctx.aio_unwatch(watch_handle, comp); - assert(r == 0); - comp->release(); - } else { - on_finish->complete(0); - } -} - -void JournalMetadata::get_immutable_metadata(uint8_t *order, - uint8_t *splay_width, - int64_t *pool_id, - Context *on_finish) { - client::get_immutable_metadata(m_ioctx, m_oid, order, splay_width, pool_id, - on_finish); -} - -void JournalMetadata::get_mutable_metadata(uint64_t *minimum_set, - uint64_t *active_set, - RegisteredClients *clients, - Context *on_finish) { - client::get_mutable_metadata(m_ioctx, m_oid, minimum_set, active_set, clients, - on_finish); -} - -void JournalMetadata::register_client(const bufferlist &data, - Context *on_finish) { - ldout(m_cct, 10) << __func__ << ": " << m_client_id << dendl; - librados::ObjectWriteOperation op; - client::client_register(&op, m_client_id, data); - - C_NotifyUpdate *ctx = new C_NotifyUpdate(this, on_finish); - - librados::AioCompletion *comp = - librados::Rados::aio_create_completion(ctx, NULL, - utils::rados_ctx_callback); - int r = m_ioctx.aio_operate(m_oid, comp, &op); - assert(r == 0); - comp->release(); -} - -void JournalMetadata::update_client(const bufferlist &data, - Context *on_finish) { - ldout(m_cct, 10) << __func__ << ": " << m_client_id << dendl; - librados::ObjectWriteOperation op; - client::client_update_data(&op, m_client_id, data); - - C_NotifyUpdate *ctx = new C_NotifyUpdate(this, on_finish); - - librados::AioCompletion *comp = - librados::Rados::aio_create_completion(ctx, NULL, - utils::rados_ctx_callback); - int r = m_ioctx.aio_operate(m_oid, comp, &op); - assert(r == 0); - comp->release(); -} - -void JournalMetadata::unregister_client(Context *on_finish) { - assert(!m_client_id.empty()); - - ldout(m_cct, 10) << __func__ << ": " << m_client_id << dendl; - librados::ObjectWriteOperation op; - client::client_unregister(&op, m_client_id); - - C_NotifyUpdate *ctx = new C_NotifyUpdate(this, on_finish); - - librados::AioCompletion *comp = - librados::Rados::aio_create_completion(ctx, NULL, - utils::rados_ctx_callback); - int r = m_ioctx.aio_operate(m_oid, comp, &op); - assert(r == 0); - comp->release(); -} - -void JournalMetadata::allocate_tag(uint64_t tag_class, const bufferlist &data, - Tag *tag, Context *on_finish) { - on_finish = new C_NotifyUpdate(this, on_finish); - C_AllocateTag *ctx = new C_AllocateTag(m_cct, m_ioctx, m_oid, - m_async_op_tracker, tag_class, - data, tag, on_finish); - ctx->send(); -} - -void JournalMetadata::get_client(const std::string &client_id, - cls::journal::Client *client, - Context *on_finish) { - C_GetClient *ctx = new C_GetClient(m_cct, m_ioctx, m_oid, m_async_op_tracker, - client_id, client, on_finish); - ctx->send(); -} - -void JournalMetadata::get_tag(uint64_t tag_tid, Tag *tag, Context *on_finish) { - C_GetTag *ctx = new C_GetTag(m_cct, m_ioctx, m_oid, m_async_op_tracker, - tag_tid, tag, on_finish); - ctx->send(); -} - -void JournalMetadata::get_tags(uint64_t start_after_tag_tid, - const boost::optional &tag_class, - Tags *tags, Context *on_finish) { - C_GetTags *ctx = new C_GetTags(m_cct, m_ioctx, m_oid, m_client_id, - m_async_op_tracker, start_after_tag_tid, - tag_class, tags, on_finish); - ctx->send(); -} - -void JournalMetadata::add_listener(JournalMetadataListener *listener) { - Mutex::Locker locker(m_lock); - while (m_update_notifications > 0) { - m_update_cond.Wait(m_lock); - } - m_listeners.push_back(listener); -} - -void JournalMetadata::remove_listener(JournalMetadataListener *listener) { - Mutex::Locker locker(m_lock); - while (m_update_notifications > 0) { - m_update_cond.Wait(m_lock); - } - m_listeners.remove(listener); -} - -void JournalMetadata::set_minimum_set(uint64_t object_set) { - Mutex::Locker locker(m_lock); - - ldout(m_cct, 20) << __func__ << ": current=" << m_minimum_set - << ", new=" << object_set << dendl; - if (m_minimum_set >= object_set) { - return; - } - - librados::ObjectWriteOperation op; - client::set_minimum_set(&op, object_set); - - C_NotifyUpdate *ctx = new C_NotifyUpdate(this); - librados::AioCompletion *comp = - librados::Rados::aio_create_completion(ctx, NULL, - utils::rados_ctx_callback); - int r = m_ioctx.aio_operate(m_oid, comp, &op); - assert(r == 0); - comp->release(); - - m_minimum_set = object_set; -} - -int JournalMetadata::set_active_set(uint64_t object_set) { - C_SaferCond ctx; - set_active_set(object_set, &ctx); - return ctx.wait(); -} - -void JournalMetadata::set_active_set(uint64_t object_set, Context *on_finish) { - Mutex::Locker locker(m_lock); - - ldout(m_cct, 20) << __func__ << ": current=" << m_active_set - << ", new=" << object_set << dendl; - if (m_active_set >= object_set) { - m_work_queue->queue(on_finish, 0); - return; - } - - librados::ObjectWriteOperation op; - client::set_active_set(&op, object_set); - - C_NotifyUpdate *ctx = new C_NotifyUpdate(this, on_finish); - librados::AioCompletion *comp = - librados::Rados::aio_create_completion(ctx, NULL, - utils::rados_ctx_callback); - int r = m_ioctx.aio_operate(m_oid, comp, &op); - assert(r == 0); - comp->release(); - - m_active_set = object_set; -} - -void JournalMetadata::assert_active_tag(uint64_t tag_tid, Context *on_finish) { - Mutex::Locker locker(m_lock); - - C_AssertActiveTag *ctx = new C_AssertActiveTag(m_cct, m_ioctx, m_oid, - m_async_op_tracker, - m_client_id, tag_tid, - on_finish); - ctx->send(); -} - -void JournalMetadata::flush_commit_position() { - ldout(m_cct, 20) << __func__ << dendl; - - Mutex::Locker timer_locker(*m_timer_lock); - Mutex::Locker locker(m_lock); - if (m_commit_position_ctx == nullptr) { - return; - } - - cancel_commit_task(); - handle_commit_position_task(); -} - -void JournalMetadata::flush_commit_position(Context *on_safe) { - ldout(m_cct, 20) << __func__ << dendl; - - Mutex::Locker timer_locker(*m_timer_lock); - Mutex::Locker locker(m_lock); - if (m_commit_position_ctx == nullptr) { - // nothing to flush - if (on_safe != nullptr) { - m_work_queue->queue(on_safe, 0); - } - return; - } - - if (on_safe != nullptr) { - m_commit_position_ctx = new C_FlushCommitPosition( - m_commit_position_ctx, on_safe); - } - cancel_commit_task(); - handle_commit_position_task(); -} - -void JournalMetadata::reserve_entry_tid(uint64_t tag_tid, uint64_t entry_tid) { - Mutex::Locker locker(m_lock); - uint64_t &allocated_entry_tid = m_allocated_entry_tids[tag_tid]; - if (allocated_entry_tid <= entry_tid) { - allocated_entry_tid = entry_tid + 1; - } -} - -bool JournalMetadata::get_last_allocated_entry_tid(uint64_t tag_tid, - uint64_t *entry_tid) const { - Mutex::Locker locker(m_lock); - - AllocatedEntryTids::const_iterator it = m_allocated_entry_tids.find(tag_tid); - if (it == m_allocated_entry_tids.end()) { - return false; - } - - assert(it->second > 0); - *entry_tid = it->second - 1; - return true; -} - -void JournalMetadata::handle_immutable_metadata(int r, Context *on_init) { - if (r < 0) { - lderr(m_cct) << "failed to initialize immutable metadata: " - << cpp_strerror(r) << dendl; - on_init->complete(r); - return; - } - - ldout(m_cct, 10) << "initialized immutable metadata" << dendl; - refresh(on_init); -} - -void JournalMetadata::refresh(Context *on_complete) { - ldout(m_cct, 10) << "refreshing mutable metadata" << dendl; - C_Refresh *refresh = new C_Refresh(this, on_complete); - get_mutable_metadata(&refresh->minimum_set, &refresh->active_set, - &refresh->registered_clients, refresh); -} - -void JournalMetadata::handle_refresh_complete(C_Refresh *refresh, int r) { - ldout(m_cct, 10) << "refreshed mutable metadata: r=" << r << dendl; - if (r == 0) { - Mutex::Locker locker(m_lock); - - Client client(m_client_id, bufferlist()); - RegisteredClients::iterator it = refresh->registered_clients.find(client); - if (it != refresh->registered_clients.end()) { - if (it->state == cls::journal::CLIENT_STATE_DISCONNECTED) { - ldout(m_cct, 0) << "client flagged disconnected: " << m_client_id - << dendl; - } - m_minimum_set = MAX(m_minimum_set, refresh->minimum_set); - m_active_set = MAX(m_active_set, refresh->active_set); - m_registered_clients = refresh->registered_clients; - m_client = *it; - - ++m_update_notifications; - m_lock.Unlock(); - for (Listeners::iterator it = m_listeners.begin(); - it != m_listeners.end(); ++it) { - (*it)->handle_update(this); - } - m_lock.Lock(); - if (--m_update_notifications == 0) { - m_update_cond.Signal(); - } - } else { - lderr(m_cct) << "failed to locate client: " << m_client_id << dendl; - r = -ENOENT; - } - } - - if (refresh->on_finish != NULL) { - refresh->on_finish->complete(r); - } -} - -void JournalMetadata::cancel_commit_task() { - ldout(m_cct, 20) << __func__ << dendl; - - assert(m_timer_lock->is_locked()); - assert(m_lock.is_locked()); - assert(m_commit_position_ctx != nullptr); - assert(m_commit_position_task_ctx != nullptr); - - m_timer->cancel_event(m_commit_position_task_ctx); - m_commit_position_task_ctx = NULL; -} - -void JournalMetadata::schedule_commit_task() { - ldout(m_cct, 20) << __func__ << dendl; - - assert(m_timer_lock->is_locked()); - assert(m_lock.is_locked()); - assert(m_commit_position_ctx != nullptr); - if (m_commit_position_task_ctx == NULL) { - m_commit_position_task_ctx = - m_timer->add_event_after(m_settings.commit_interval, - new C_CommitPositionTask(this)); - } -} - -void JournalMetadata::handle_commit_position_task() { - assert(m_timer_lock->is_locked()); - assert(m_lock.is_locked()); - ldout(m_cct, 20) << __func__ << ": " - << "client_id=" << m_client_id << ", " - << "commit_position=" << m_commit_position << dendl; - - librados::ObjectWriteOperation op; - client::client_commit(&op, m_client_id, m_commit_position); - - Context *ctx = new C_NotifyUpdate(this, m_commit_position_ctx); - m_commit_position_ctx = NULL; - - ctx = schedule_laggy_clients_disconnect(ctx); - - librados::AioCompletion *comp = - librados::Rados::aio_create_completion(ctx, NULL, - utils::rados_ctx_callback); - int r = m_ioctx.aio_operate(m_oid, comp, &op); - assert(r == 0); - comp->release(); - - m_commit_position_task_ctx = NULL; -} - -void JournalMetadata::schedule_watch_reset() { - assert(m_timer_lock->is_locked()); - m_timer->add_event_after(1, new C_WatchReset(this)); -} - -void JournalMetadata::handle_watch_reset() { - assert(m_timer_lock->is_locked()); - if (!m_initialized) { - return; - } - - int r = m_ioctx.watch2(m_oid, &m_watch_handle, &m_watch_ctx); - if (r < 0) { - if (r == -ENOENT) { - ldout(m_cct, 5) << __func__ << ": journal header not found" << dendl; - } else if (r == -EBLACKLISTED) { - ldout(m_cct, 5) << __func__ << ": client blacklisted" << dendl; - } else { - lderr(m_cct) << __func__ << ": failed to watch journal: " - << cpp_strerror(r) << dendl; - } - schedule_watch_reset(); - } else { - ldout(m_cct, 10) << __func__ << ": reset journal watch" << dendl; - refresh(NULL); - } -} - -void JournalMetadata::handle_watch_notify(uint64_t notify_id, uint64_t cookie) { - ldout(m_cct, 10) << "journal header updated" << dendl; - - bufferlist bl; - m_ioctx.notify_ack(m_oid, notify_id, cookie, bl); - - refresh(NULL); -} - -void JournalMetadata::handle_watch_error(int err) { - if (err == -ENOTCONN) { - ldout(m_cct, 5) << "journal watch error: header removed" << dendl; - } else if (err == -EBLACKLISTED) { - lderr(m_cct) << "journal watch error: client blacklisted" << dendl; - } else { - lderr(m_cct) << "journal watch error: " << cpp_strerror(err) << dendl; - } - - Mutex::Locker timer_locker(*m_timer_lock); - Mutex::Locker locker(m_lock); - - // release old watch on error - if (m_watch_handle != 0) { - m_ioctx.unwatch2(m_watch_handle); - m_watch_handle = 0; - } - - if (m_initialized && err != -ENOENT) { - schedule_watch_reset(); - } -} - -uint64_t JournalMetadata::allocate_commit_tid(uint64_t object_num, - uint64_t tag_tid, - uint64_t entry_tid) { - Mutex::Locker locker(m_lock); - uint64_t commit_tid = ++m_commit_tid; - m_pending_commit_tids[commit_tid] = CommitEntry(object_num, tag_tid, - entry_tid); - - ldout(m_cct, 20) << "allocated commit tid: commit_tid=" << commit_tid << " [" - << "object_num=" << object_num << ", " - << "tag_tid=" << tag_tid << ", " - << "entry_tid=" << entry_tid << "]" - << dendl; - return commit_tid; -} - -void JournalMetadata::overflow_commit_tid(uint64_t commit_tid, - uint64_t object_num) { - Mutex::Locker locker(m_lock); - - auto it = m_pending_commit_tids.find(commit_tid); - assert(it != m_pending_commit_tids.end()); - assert(it->second.object_num < object_num); - - ldout(m_cct, 20) << __func__ << ": " - << "commit_tid=" << commit_tid << ", " - << "old_object_num=" << it->second.object_num << ", " - << "new_object_num=" << object_num << dendl; - it->second.object_num = object_num; -} - -void JournalMetadata::get_commit_entry(uint64_t commit_tid, - uint64_t *object_num, - uint64_t *tag_tid, uint64_t *entry_tid) { - Mutex::Locker locker(m_lock); - - auto it = m_pending_commit_tids.find(commit_tid); - assert(it != m_pending_commit_tids.end()); - - *object_num = it->second.object_num; - *tag_tid = it->second.tag_tid; - *entry_tid = it->second.entry_tid; -} - -void JournalMetadata::committed(uint64_t commit_tid, - const CreateContext &create_context) { - ldout(m_cct, 20) << "committed tid=" << commit_tid << dendl; - - ObjectSetPosition commit_position; - Context *stale_ctx = nullptr; - { - Mutex::Locker timer_locker(*m_timer_lock); - Mutex::Locker locker(m_lock); - assert(commit_tid > m_commit_position_tid); - - if (!m_commit_position.object_positions.empty()) { - // in-flight commit position update - commit_position = m_commit_position; - } else { - // safe commit position - commit_position = m_client.commit_position; - } - - CommitTids::iterator it = m_pending_commit_tids.find(commit_tid); - assert(it != m_pending_commit_tids.end()); - - CommitEntry &commit_entry = it->second; - commit_entry.committed = true; - - bool update_commit_position = false; - while (!m_pending_commit_tids.empty()) { - CommitTids::iterator it = m_pending_commit_tids.begin(); - CommitEntry &commit_entry = it->second; - if (!commit_entry.committed) { - break; - } - - commit_position.object_positions.emplace_front( - commit_entry.object_num, commit_entry.tag_tid, - commit_entry.entry_tid); - m_pending_commit_tids.erase(it); - update_commit_position = true; - } - - if (!update_commit_position) { - return; - } - - // prune the position to have one position per splay offset - std::set in_use_splay_offsets; - ObjectPositions::iterator ob_it = commit_position.object_positions.begin(); - while (ob_it != commit_position.object_positions.end()) { - uint8_t splay_offset = ob_it->object_number % m_splay_width; - if (!in_use_splay_offsets.insert(splay_offset).second) { - ob_it = commit_position.object_positions.erase(ob_it); - } else { - ++ob_it; - } - } - - stale_ctx = m_commit_position_ctx; - m_commit_position_ctx = create_context(); - m_commit_position = commit_position; - m_commit_position_tid = commit_tid; - - ldout(m_cct, 20) << "updated commit position: " << commit_position << ", " - << "on_safe=" << m_commit_position_ctx << dendl; - schedule_commit_task(); - } - - - if (stale_ctx != nullptr) { - ldout(m_cct, 20) << "canceling stale commit: on_safe=" << stale_ctx - << dendl; - stale_ctx->complete(-ESTALE); - } -} - -void JournalMetadata::notify_update() { - ldout(m_cct, 10) << "notifying journal header update" << dendl; - - bufferlist bl; - m_ioctx.notify2(m_oid, bl, 5000, NULL); -} - -void JournalMetadata::async_notify_update(Context *on_safe) { - ldout(m_cct, 10) << "async notifying journal header update" << dendl; - - C_AioNotify *ctx = new C_AioNotify(this, on_safe); - librados::AioCompletion *comp = - librados::Rados::aio_create_completion(ctx, NULL, - utils::rados_ctx_callback); - - bufferlist bl; - int r = m_ioctx.aio_notify(m_oid, comp, bl, 5000, NULL); - assert(r == 0); - - comp->release(); -} - -void JournalMetadata::wait_for_ops() { - C_SaferCond ctx; - m_async_op_tracker.wait_for_ops(&ctx); - ctx.wait(); -} - -void JournalMetadata::handle_notified(int r) { - ldout(m_cct, 10) << "notified journal header update: r=" << r << dendl; -} - -Context *JournalMetadata::schedule_laggy_clients_disconnect(Context *on_finish) { - assert(m_lock.is_locked()); - - ldout(m_cct, 20) << __func__ << dendl; - - if (m_settings.max_concurrent_object_sets <= 0) { - return on_finish; - } - - Context *ctx = on_finish; - - for (auto &c : m_registered_clients) { - if (c.state == cls::journal::CLIENT_STATE_DISCONNECTED || - c.id == m_client_id || - m_settings.whitelisted_laggy_clients.count(c.id) > 0) { - continue; - } - const std::string &client_id = c.id; - uint64_t object_set = 0; - if (!c.commit_position.object_positions.empty()) { - auto &position = *(c.commit_position.object_positions.begin()); - object_set = position.object_number / m_splay_width; - } - - if (m_active_set > object_set + m_settings.max_concurrent_object_sets) { - ldout(m_cct, 1) << __func__ << ": " << client_id - << ": scheduling disconnect" << dendl; - - ctx = new FunctionContext([this, client_id, ctx](int r1) { - ldout(m_cct, 10) << __func__ << ": " << client_id - << ": flagging disconnected" << dendl; - - librados::ObjectWriteOperation op; - client::client_update_state(&op, client_id, - cls::journal::CLIENT_STATE_DISCONNECTED); - - librados::AioCompletion *comp = - librados::Rados::aio_create_completion(ctx, nullptr, - utils::rados_ctx_callback); - int r = m_ioctx.aio_operate(m_oid, comp, &op); - assert(r == 0); - comp->release(); - }); - } - } - - if (ctx == on_finish) { - ldout(m_cct, 20) << __func__ << ": no laggy clients to disconnect" << dendl; - } - - return ctx; -} - -std::ostream &operator<<(std::ostream &os, - const JournalMetadata::RegisteredClients &clients) { - os << "["; - for (JournalMetadata::RegisteredClients::const_iterator c = clients.begin(); - c != clients.end(); ++c) { - os << (c == clients.begin() ? "" : ", " ) << *c; - } - os << "]"; - return os; -} - -std::ostream &operator<<(std::ostream &os, - const JournalMetadata &jm) { - Mutex::Locker locker(jm.m_lock); - os << "[oid=" << jm.m_oid << ", " - << "initialized=" << jm.m_initialized << ", " - << "order=" << (int)jm.m_order << ", " - << "splay_width=" << (int)jm.m_splay_width << ", " - << "pool_id=" << jm.m_pool_id << ", " - << "minimum_set=" << jm.m_minimum_set << ", " - << "active_set=" << jm.m_active_set << ", " - << "client_id=" << jm.m_client_id << ", " - << "commit_tid=" << jm.m_commit_tid << ", " - << "commit_interval=" << jm.m_settings.commit_interval << ", " - << "commit_position=" << jm.m_commit_position << ", " - << "registered_clients=" << jm.m_registered_clients << "]"; - return os; -} - -} // namespace journal