X-Git-Url: https://gerrit.opnfv.org/gerrit/gitweb?a=blobdiff_plain;f=src%2Fceph%2Fsrc%2Fmds%2FPurgeQueue.cc;fp=src%2Fceph%2Fsrc%2Fmds%2FPurgeQueue.cc;h=0000000000000000000000000000000000000000;hb=7da45d65be36d36b880cc55c5036e96c24b53f00;hp=49e48b04cc6d12700a4ff97ab149177a8a2f5c2b;hpb=691462d09d0987b47e112d6ee8740375df3c51b2;p=stor4nfv.git diff --git a/src/ceph/src/mds/PurgeQueue.cc b/src/ceph/src/mds/PurgeQueue.cc deleted file mode 100644 index 49e48b0..0000000 --- a/src/ceph/src/mds/PurgeQueue.cc +++ /dev/null @@ -1,631 +0,0 @@ -// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- -// vim: ts=8 sw=2 smarttab -/* - * Ceph - scalable distributed file system - * - * Copyright (C) 2015 Red Hat - * - * This is free software; you can redistribute it and/or - * modify it under the terms of the GNU Lesser General Public - * License version 2.1, as published by the Free Software - * Foundation. See file COPYING. - * - */ - -#include "common/debug.h" -#include "mds/mdstypes.h" -#include "mds/CInode.h" -#include "mds/MDCache.h" - -#include "PurgeQueue.h" - - -#define dout_context cct -#define dout_subsys ceph_subsys_mds -#undef dout_prefix -#define dout_prefix _prefix(_dout, rank) << __func__ << ": " -static ostream& _prefix(std::ostream *_dout, mds_rank_t rank) { - return *_dout << "mds." << rank << ".purge_queue "; -} - -void PurgeItem::encode(bufferlist &bl) const -{ - ENCODE_START(1, 1, bl); - ::encode((uint8_t)action, bl); - ::encode(ino, bl); - ::encode(size, bl); - ::encode(layout, bl, CEPH_FEATURE_FS_FILE_LAYOUT_V2); - ::encode(old_pools, bl); - ::encode(snapc, bl); - ::encode(fragtree, bl); - ENCODE_FINISH(bl); -} - -void PurgeItem::decode(bufferlist::iterator &p) -{ - DECODE_START(1, p); - ::decode((uint8_t&)action, p); - ::decode(ino, p); - ::decode(size, p); - ::decode(layout, p); - ::decode(old_pools, p); - ::decode(snapc, p); - ::decode(fragtree, p); - DECODE_FINISH(p); -} - -// TODO: if Objecter has any slow requests, take that as a hint and -// slow down our rate of purging (keep accepting pushes though) -PurgeQueue::PurgeQueue( - CephContext *cct_, - mds_rank_t rank_, - const int64_t metadata_pool_, - Objecter *objecter_, - Context *on_error_) - : - cct(cct_), - rank(rank_), - lock("PurgeQueue"), - metadata_pool(metadata_pool_), - finisher(cct, "PurgeQueue", "PQ_Finisher"), - timer(cct, lock), - filer(objecter_, &finisher), - objecter(objecter_), - journaler("pq", MDS_INO_PURGE_QUEUE + rank, metadata_pool, - CEPH_FS_ONDISK_MAGIC, objecter_, nullptr, 0, - &finisher), - on_error(on_error_), - ops_in_flight(0), - max_purge_ops(0), - drain_initial(0), - draining(false), - delayed_flush(nullptr), - recovered(false) -{ - assert(cct != nullptr); - assert(on_error != nullptr); - assert(objecter != nullptr); - journaler.set_write_error_handler(on_error); -} - -PurgeQueue::~PurgeQueue() -{ - if (logger) { - g_ceph_context->get_perfcounters_collection()->remove(logger.get()); - } -} - -void PurgeQueue::create_logger() -{ - PerfCountersBuilder pcb(g_ceph_context, - "purge_queue", l_pq_first, l_pq_last); - pcb.add_u64(l_pq_executing_ops, "pq_executing_ops", "Purge queue ops in flight"); - pcb.add_u64(l_pq_executing, "pq_executing", "Purge queue tasks in flight"); - pcb.add_u64_counter(l_pq_executed, "pq_executed", "Purge queue tasks executed", "purg", - PerfCountersBuilder::PRIO_INTERESTING); - - logger.reset(pcb.create_perf_counters()); - g_ceph_context->get_perfcounters_collection()->add(logger.get()); -} - -void PurgeQueue::init() -{ - Mutex::Locker l(lock); - - assert(logger != nullptr); - - finisher.start(); - timer.init(); -} - -void PurgeQueue::activate() -{ - Mutex::Locker l(lock); - if (journaler.get_read_pos() == journaler.get_write_pos()) - return; - - if (in_flight.empty()) { - dout(4) << "start work (by drain)" << dendl; - finisher.queue(new FunctionContext([this](int r) { - Mutex::Locker l(lock); - _consume(); - })); - } -} - -void PurgeQueue::shutdown() -{ - Mutex::Locker l(lock); - - journaler.shutdown(); - timer.shutdown(); - finisher.stop(); -} - -void PurgeQueue::open(Context *completion) -{ - dout(4) << "opening" << dendl; - - Mutex::Locker l(lock); - - if (completion) - waiting_for_recovery.push_back(completion); - - journaler.recover(new FunctionContext([this](int r){ - if (r == -ENOENT) { - dout(1) << "Purge Queue not found, assuming this is an upgrade and " - "creating it." << dendl; - create(NULL); - } else if (r == 0) { - Mutex::Locker l(lock); - dout(4) << "open complete" << dendl; - - // Journaler only guarantees entries before head write_pos have been - // fully flushed. Before appending new entries, we need to find and - // drop any partial written entry. - if (journaler.last_committed.write_pos < journaler.get_write_pos()) { - dout(4) << "recovering write_pos" << dendl; - journaler.set_read_pos(journaler.last_committed.write_pos); - _recover(); - return; - } - - journaler.set_writeable(); - recovered = true; - finish_contexts(g_ceph_context, waiting_for_recovery); - } else { - derr << "Error " << r << " loading Journaler" << dendl; - on_error->complete(r); - } - })); -} - -void PurgeQueue::wait_for_recovery(Context* c) -{ - Mutex::Locker l(lock); - if (recovered) - c->complete(0); - else - waiting_for_recovery.push_back(c); -} - -void PurgeQueue::_recover() -{ - assert(lock.is_locked_by_me()); - - // Journaler::is_readable() adjusts write_pos if partial entry is encountered - while (1) { - if (!journaler.is_readable() && - !journaler.get_error() && - journaler.get_read_pos() < journaler.get_write_pos()) { - journaler.wait_for_readable(new FunctionContext([this](int r) { - Mutex::Locker l(lock); - _recover(); - })); - return; - } - - if (journaler.get_error()) { - int r = journaler.get_error(); - derr << "Error " << r << " recovering write_pos" << dendl; - on_error->complete(r); - return; - } - - if (journaler.get_read_pos() == journaler.get_write_pos()) { - dout(4) << "write_pos recovered" << dendl; - // restore original read_pos - journaler.set_read_pos(journaler.last_committed.expire_pos); - journaler.set_writeable(); - recovered = true; - finish_contexts(g_ceph_context, waiting_for_recovery); - return; - } - - bufferlist bl; - bool readable = journaler.try_read_entry(bl); - assert(readable); // we checked earlier - } -} - -void PurgeQueue::create(Context *fin) -{ - dout(4) << "creating" << dendl; - Mutex::Locker l(lock); - - if (fin) - waiting_for_recovery.push_back(fin); - - file_layout_t layout = file_layout_t::get_default(); - layout.pool_id = metadata_pool; - journaler.set_writeable(); - journaler.create(&layout, JOURNAL_FORMAT_RESILIENT); - journaler.write_head(new FunctionContext([this](int r) { - Mutex::Locker l(lock); - recovered = true; - finish_contexts(g_ceph_context, waiting_for_recovery); - })); -} - -/** - * The `completion` context will always be called back via a Finisher - */ -void PurgeQueue::push(const PurgeItem &pi, Context *completion) -{ - dout(4) << "pushing inode 0x" << std::hex << pi.ino << std::dec << dendl; - Mutex::Locker l(lock); - - // Callers should have waited for open() before using us - assert(!journaler.is_readonly()); - - bufferlist bl; - - ::encode(pi, bl); - journaler.append_entry(bl); - journaler.wait_for_flush(completion); - - // Maybe go ahead and do something with it right away - bool could_consume = _consume(); - if (!could_consume) { - // Usually, it is not necessary to explicitly flush here, because the reader - // will get flushes generated inside Journaler::is_readable. However, - // if we remain in a can_consume()==false state for a long period then - // we should flush in order to allow MDCache to drop its strays rather - // than having them wait for purgequeue to progress. - if (!delayed_flush) { - delayed_flush = new FunctionContext([this](int r){ - delayed_flush = nullptr; - journaler.flush(); - }); - - timer.add_event_after( - g_conf->mds_purge_queue_busy_flush_period, - delayed_flush); - } - } -} - -uint32_t PurgeQueue::_calculate_ops(const PurgeItem &item) const -{ - uint32_t ops_required = 0; - if (item.action == PurgeItem::PURGE_DIR) { - // Directory, count dirfrags to be deleted - std::list ls; - if (!item.fragtree.is_leaf(frag_t())) { - item.fragtree.get_leaves(ls); - } - // One for the root, plus any leaves - ops_required = 1 + ls.size(); - } else { - // File, work out concurrent Filer::purge deletes - const uint64_t num = (item.size > 0) ? - Striper::get_num_objects(item.layout, item.size) : 1; - - ops_required = MIN(num, g_conf->filer_max_purge_ops); - - // Account for removing (or zeroing) backtrace - ops_required += 1; - - // Account for deletions for old pools - if (item.action != PurgeItem::TRUNCATE_FILE) { - ops_required += item.old_pools.size(); - } - } - - return ops_required; -} - -bool PurgeQueue::can_consume() -{ - dout(20) << ops_in_flight << "/" << max_purge_ops << " ops, " - << in_flight.size() << "/" << g_conf->mds_max_purge_files - << " files" << dendl; - - if (in_flight.size() == 0 && cct->_conf->mds_max_purge_files > 0) { - // Always permit consumption if nothing is in flight, so that the ops - // limit can never be so low as to forbid all progress (unless - // administrator has deliberately paused purging by setting max - // purge files to zero). - return true; - } - - if (ops_in_flight >= max_purge_ops) { - dout(20) << "Throttling on op limit " << ops_in_flight << "/" - << max_purge_ops << dendl; - return false; - } - - if (in_flight.size() >= cct->_conf->mds_max_purge_files) { - dout(20) << "Throttling on item limit " << in_flight.size() - << "/" << cct->_conf->mds_max_purge_files << dendl; - return false; - } else { - return true; - } -} - -bool PurgeQueue::_consume() -{ - assert(lock.is_locked_by_me()); - - bool could_consume = false; - while(can_consume()) { - could_consume = true; - - if (delayed_flush) { - // We are now going to read from the journal, so any proactive - // flush is no longer necessary. This is not functionally necessary - // but it can avoid generating extra fragmented flush IOs. - timer.cancel_event(delayed_flush); - delayed_flush = nullptr; - } - - if (!journaler.is_readable()) { - dout(10) << " not readable right now" << dendl; - // Because we are the writer and the reader of the journal - // via the same Journaler instance, we never need to reread_head - if (!journaler.have_waiter()) { - journaler.wait_for_readable(new FunctionContext([this](int r) { - Mutex::Locker l(lock); - if (r == 0) { - _consume(); - } - })); - } - - return could_consume; - } - - // The journaler is readable: consume an entry - bufferlist bl; - bool readable = journaler.try_read_entry(bl); - assert(readable); // we checked earlier - - dout(20) << " decoding entry" << dendl; - PurgeItem item; - bufferlist::iterator q = bl.begin(); - try { - ::decode(item, q); - } catch (const buffer::error &err) { - derr << "Decode error at read_pos=0x" << std::hex - << journaler.get_read_pos() << dendl; - on_error->complete(0); - } - dout(20) << " executing item (0x" << std::hex << item.ino - << std::dec << ")" << dendl; - _execute_item(item, journaler.get_read_pos()); - } - - dout(10) << " cannot consume right now" << dendl; - - return could_consume; -} - -void PurgeQueue::_execute_item( - const PurgeItem &item, - uint64_t expire_to) -{ - assert(lock.is_locked_by_me()); - - in_flight[expire_to] = item; - logger->set(l_pq_executing, in_flight.size()); - ops_in_flight += _calculate_ops(item); - logger->set(l_pq_executing_ops, ops_in_flight); - - SnapContext nullsnapc; - - C_GatherBuilder gather(cct); - if (item.action == PurgeItem::PURGE_FILE) { - if (item.size > 0) { - uint64_t num = Striper::get_num_objects(item.layout, item.size); - dout(10) << " 0~" << item.size << " objects 0~" << num - << " snapc " << item.snapc << " on " << item.ino << dendl; - filer.purge_range(item.ino, &item.layout, item.snapc, - 0, num, ceph::real_clock::now(), 0, - gather.new_sub()); - } - - // remove the backtrace object if it was not purged - object_t oid = CInode::get_object_name(item.ino, frag_t(), ""); - if (!gather.has_subs() || !item.layout.pool_ns.empty()) { - object_locator_t oloc(item.layout.pool_id); - dout(10) << " remove backtrace object " << oid - << " pool " << oloc.pool << " snapc " << item.snapc << dendl; - objecter->remove(oid, oloc, item.snapc, - ceph::real_clock::now(), 0, - gather.new_sub()); - } - - // remove old backtrace objects - for (const auto &p : item.old_pools) { - object_locator_t oloc(p); - dout(10) << " remove backtrace object " << oid - << " old pool " << p << " snapc " << item.snapc << dendl; - objecter->remove(oid, oloc, item.snapc, - ceph::real_clock::now(), 0, - gather.new_sub()); - } - } else if (item.action == PurgeItem::PURGE_DIR) { - object_locator_t oloc(metadata_pool); - std::list frags; - if (!item.fragtree.is_leaf(frag_t())) - item.fragtree.get_leaves(frags); - frags.push_back(frag_t()); - for (const auto &frag : frags) { - object_t oid = CInode::get_object_name(item.ino, frag, ""); - dout(10) << " remove dirfrag " << oid << dendl; - objecter->remove(oid, oloc, nullsnapc, - ceph::real_clock::now(), - 0, gather.new_sub()); - } - } else if (item.action == PurgeItem::TRUNCATE_FILE) { - const uint64_t num = Striper::get_num_objects(item.layout, item.size); - dout(10) << " 0~" << item.size << " objects 0~" << num - << " snapc " << item.snapc << " on " << item.ino << dendl; - - // keep backtrace object - if (num > 1) { - filer.purge_range(item.ino, &item.layout, item.snapc, - 1, num - 1, ceph::real_clock::now(), - 0, gather.new_sub()); - } - filer.zero(item.ino, &item.layout, item.snapc, - 0, item.layout.object_size, - ceph::real_clock::now(), - 0, true, gather.new_sub()); - } else { - derr << "Invalid item (action=" << item.action << ") in purge queue, " - "dropping it" << dendl; - in_flight.erase(expire_to); - logger->set(l_pq_executing, in_flight.size()); - return; - } - assert(gather.has_subs()); - - gather.set_finisher(new C_OnFinisher( - new FunctionContext([this, expire_to](int r){ - Mutex::Locker l(lock); - _execute_item_complete(expire_to); - - _consume(); - - // Have we gone idle? If so, do an extra write_head now instead of - // waiting for next flush after journaler_write_head_interval. - // Also do this periodically even if not idle, so that the persisted - // expire_pos doesn't fall too far behind our progress when consuming - // a very long queue. - if (in_flight.empty() || journaler.write_head_needed()) { - journaler.write_head(new FunctionContext([this](int r){ - journaler.trim(); - })); - } - }), &finisher)); - - gather.activate(); -} - -void PurgeQueue::_execute_item_complete( - uint64_t expire_to) -{ - assert(lock.is_locked_by_me()); - dout(10) << "complete at 0x" << std::hex << expire_to << std::dec << dendl; - assert(in_flight.count(expire_to) == 1); - - auto iter = in_flight.find(expire_to); - assert(iter != in_flight.end()); - if (iter == in_flight.begin()) { - // This was the lowest journal position in flight, so we can now - // safely expire the journal up to here. - dout(10) << "expiring to 0x" << std::hex << expire_to << std::dec << dendl; - journaler.set_expire_pos(expire_to); - } else { - // This is completely fine, we're not supposed to purge files in - // order when doing them in parallel. - dout(10) << "non-sequential completion, not expiring anything" << dendl; - } - - ops_in_flight -= _calculate_ops(iter->second); - logger->set(l_pq_executing_ops, ops_in_flight); - - dout(10) << "completed item for ino 0x" << std::hex << iter->second.ino - << std::dec << dendl; - - in_flight.erase(iter); - logger->set(l_pq_executing, in_flight.size()); - dout(10) << "in_flight.size() now " << in_flight.size() << dendl; - - logger->inc(l_pq_executed); -} - -void PurgeQueue::update_op_limit(const MDSMap &mds_map) -{ - Mutex::Locker l(lock); - - uint64_t pg_count = 0; - objecter->with_osdmap([&](const OSDMap& o) { - // Number of PGs across all data pools - const std::vector &data_pools = mds_map.get_data_pools(); - for (const auto dp : data_pools) { - if (o.get_pg_pool(dp) == NULL) { - // It is possible that we have an older OSDMap than MDSMap, - // because we don't start watching every OSDMap until after - // MDSRank is initialized - dout(4) << " data pool " << dp << " not found in OSDMap" << dendl; - continue; - } - pg_count += o.get_pg_num(dp); - } - }); - - // Work out a limit based on n_pgs / n_mdss, multiplied by the user's - // preference for how many ops per PG - max_purge_ops = uint64_t(((double)pg_count / (double)mds_map.get_max_mds()) * - cct->_conf->mds_max_purge_ops_per_pg); - - // User may also specify a hard limit, apply this if so. - if (cct->_conf->mds_max_purge_ops) { - max_purge_ops = MIN(max_purge_ops, cct->_conf->mds_max_purge_ops); - } -} - -void PurgeQueue::handle_conf_change(const struct md_config_t *conf, - const std::set &changed, - const MDSMap &mds_map) -{ - if (changed.count("mds_max_purge_ops") - || changed.count("mds_max_purge_ops_per_pg")) { - update_op_limit(mds_map); - } else if (changed.count("mds_max_purge_files")) { - Mutex::Locker l(lock); - - if (in_flight.empty()) { - // We might have gone from zero to a finite limit, so - // might need to kick off consume. - dout(4) << "maybe start work again (max_purge_files=" - << conf->mds_max_purge_files << dendl; - finisher.queue(new FunctionContext([this](int r){ - Mutex::Locker l(lock); - _consume(); - })); - } - } -} - -bool PurgeQueue::drain( - uint64_t *progress, - uint64_t *progress_total, - size_t *in_flight_count - ) -{ - assert(progress != nullptr); - assert(progress_total != nullptr); - assert(in_flight_count != nullptr); - - const bool done = in_flight.empty() && ( - journaler.get_read_pos() == journaler.get_write_pos()); - if (done) { - return true; - } - - const uint64_t bytes_remaining = journaler.get_write_pos() - - journaler.get_read_pos(); - - if (!draining) { - // Start of draining: remember how much there was outstanding at - // this point so that we can give a progress percentage later - draining = true; - - // Life the op throttle as this daemon now has nothing to do but - // drain the purge queue, so do it as fast as we can. - max_purge_ops = 0xffff; - } - - drain_initial = max(bytes_remaining, drain_initial); - - *progress = drain_initial - bytes_remaining; - *progress_total = drain_initial; - *in_flight_count = in_flight.size(); - - return false; -} -