// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- // vim: ts=8 sw=2 smarttab /* * Ceph - scalable distributed file system * * Copyright (C) 2004-2006 Sage Weil * * This is free software; you can redistribute it and/or * modify it under the terms of the GNU Lesser General Public * License version 2.1, as published by the Free Software * Foundation. See file COPYING. * */ #ifndef CEPH_FINISHER_H #define CEPH_FINISHER_H #include "common/Mutex.h" #include "common/Cond.h" #include "common/perf_counters.h" class CephContext; /// Finisher queue length performance counter ID. enum { l_finisher_first = 997082, l_finisher_queue_len, l_finisher_complete_lat, l_finisher_last }; /** @brief Asynchronous cleanup class. * Finisher asynchronously completes Contexts, which are simple classes * representing callbacks, in a dedicated worker thread. Enqueuing * contexts to complete is thread-safe. */ class Finisher { CephContext *cct; Mutex finisher_lock; ///< Protects access to queues and finisher_running. Cond finisher_cond; ///< Signaled when there is something to process. Cond finisher_empty_cond; ///< Signaled when the finisher has nothing more to process. bool finisher_stop; ///< Set when the finisher should stop. bool finisher_running; ///< True when the finisher is currently executing contexts. bool finisher_empty_wait; ///< True mean someone wait finisher empty. /// Queue for contexts for which complete(0) will be called. /// NULLs in this queue indicate that an item from finisher_queue_rval /// should be completed in that place instead. vector finisher_queue; string thread_name; /// Queue for contexts for which the complete function will be called /// with a parameter other than 0. list > finisher_queue_rval; /// Performance counter for the finisher's queue length. /// Only active for named finishers. PerfCounters *logger; void *finisher_thread_entry(); struct FinisherThread : public Thread { Finisher *fin; explicit FinisherThread(Finisher *f) : fin(f) {} void* entry() override { return (void*)fin->finisher_thread_entry(); } } finisher_thread; public: /// Add a context to complete, optionally specifying a parameter for the complete function. void queue(Context *c, int r = 0) { finisher_lock.Lock(); if (finisher_queue.empty()) { finisher_cond.Signal(); } if (r) { finisher_queue_rval.push_back(pair(c, r)); finisher_queue.push_back(NULL); } else finisher_queue.push_back(c); if (logger) logger->inc(l_finisher_queue_len); finisher_lock.Unlock(); } void queue(vector& ls) { finisher_lock.Lock(); if (finisher_queue.empty()) { finisher_cond.Signal(); } finisher_queue.insert(finisher_queue.end(), ls.begin(), ls.end()); if (logger) logger->inc(l_finisher_queue_len, ls.size()); finisher_lock.Unlock(); ls.clear(); } void queue(deque& ls) { finisher_lock.Lock(); if (finisher_queue.empty()) { finisher_cond.Signal(); } finisher_queue.insert(finisher_queue.end(), ls.begin(), ls.end()); if (logger) logger->inc(l_finisher_queue_len, ls.size()); finisher_lock.Unlock(); ls.clear(); } void queue(list& ls) { finisher_lock.Lock(); if (finisher_queue.empty()) { finisher_cond.Signal(); } finisher_queue.insert(finisher_queue.end(), ls.begin(), ls.end()); if (logger) logger->inc(l_finisher_queue_len, ls.size()); finisher_lock.Unlock(); ls.clear(); } /// Start the worker thread. void start(); /** @brief Stop the worker thread. * * Does not wait until all outstanding contexts are completed. * To ensure that everything finishes, you should first shut down * all sources that can add contexts to this finisher and call * wait_for_empty() before calling stop(). */ void stop(); /** @brief Blocks until the finisher has nothing left to process. * This function will also return when a concurrent call to stop() * finishes, but this class should never be used in this way. */ void wait_for_empty(); /// Construct an anonymous Finisher. /// Anonymous finishers do not log their queue length. explicit Finisher(CephContext *cct_) : cct(cct_), finisher_lock("Finisher::finisher_lock"), finisher_stop(false), finisher_running(false), finisher_empty_wait(false), thread_name("fn_anonymous"), logger(0), finisher_thread(this) {} /// Construct a named Finisher that logs its queue length. Finisher(CephContext *cct_, string name, string tn) : cct(cct_), finisher_lock("Finisher::" + name), finisher_stop(false), finisher_running(false), finisher_empty_wait(false), thread_name(tn), logger(0), finisher_thread(this) { PerfCountersBuilder b(cct, string("finisher-") + name, l_finisher_first, l_finisher_last); b.add_u64(l_finisher_queue_len, "queue_len"); b.add_time_avg(l_finisher_complete_lat, "complete_latency"); logger = b.create_perf_counters(); cct->get_perfcounters_collection()->add(logger); logger->set(l_finisher_queue_len, 0); logger->set(l_finisher_complete_lat, 0); } ~Finisher() { if (logger && cct) { cct->get_perfcounters_collection()->remove(logger); delete logger; } } }; /// Context that is completed asynchronously on the supplied finisher. class C_OnFinisher : public Context { Context *con; Finisher *fin; public: C_OnFinisher(Context *c, Finisher *f) : con(c), fin(f) { assert(fin != NULL); assert(con != NULL); } ~C_OnFinisher() override { if (con != nullptr) { delete con; con = nullptr; } } void finish(int r) override { fin->queue(con, r); con = nullptr; } }; #endif