X-Git-Url: https://gerrit.opnfv.org/gerrit/gitweb?a=blobdiff_plain;f=src%2Fceph%2Fsrc%2Ftest%2Fbench%2Ftp_bench.cc;fp=src%2Fceph%2Fsrc%2Ftest%2Fbench%2Ftp_bench.cc;h=0000000000000000000000000000000000000000;hb=7da45d65be36d36b880cc55c5036e96c24b53f00;hp=599e03614d70b4bd0870e81cb3f66ea69d28ea75;hpb=691462d09d0987b47e112d6ee8740375df3c51b2;p=stor4nfv.git diff --git a/src/ceph/src/test/bench/tp_bench.cc b/src/ceph/src/test/bench/tp_bench.cc deleted file mode 100644 index 599e036..0000000 --- a/src/ceph/src/test/bench/tp_bench.cc +++ /dev/null @@ -1,211 +0,0 @@ -// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- - -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include - -#include "common/Formatter.h" - -#include "bencher.h" -#include "rados_backend.h" -#include "detailed_stat_collector.h" -#include "distribution.h" -#include "global/global_init.h" -#include "common/WorkQueue.h" -#include "common/Semaphore.h" -#include "common/Finisher.h" - -namespace po = boost::program_options; -using namespace std; -class Queueable { -public: - virtual void queue(unsigned *) = 0; - virtual void start() = 0; - virtual void stop() = 0; - virtual ~Queueable() {}; -}; -class Base : public Queueable { - DetailedStatCollector *col; - Semaphore *sem; -public: - Base(DetailedStatCollector *col, - Semaphore *sem) : col(col), sem(sem) {} - void queue(unsigned *item) override { - col->read_complete(*item); - sem->Put(); - delete item; - } - void start() override {} - void stop() override {} -}; -class WQWrapper : public Queueable { - boost::scoped_ptr > wq; - boost::scoped_ptr tp; -public: - WQWrapper(ThreadPool::WorkQueue *wq, ThreadPool *tp): - wq(wq), tp(tp) {} - void queue(unsigned *item) override { wq->queue(item); } - void start() override { tp->start(); } - void stop() override { tp->stop(); } -}; -class FinisherWrapper : public Queueable { - class CB : public Context { - Queueable *next; - unsigned *item; - public: - CB(Queueable *next, unsigned *item) : next(next), item(item) {} - void finish(int) override { - next->queue(item); - } - }; - Finisher f; - Queueable *next; -public: - FinisherWrapper(CephContext *cct, Queueable *next) : - f(cct), next(next) {} - void queue(unsigned *item) override { - f.queue(new CB(next, item)); - } - void start() override { f.start(); } - void stop() override { f.stop(); } -}; -class PassAlong : public ThreadPool::WorkQueue { - Queueable *next; - list q; - bool _enqueue(unsigned *item) override { - q.push_back(item); - return true; - } - void _dequeue(unsigned *item) override { ceph_abort(); } - unsigned *_dequeue() override { - if (q.empty()) - return 0; - unsigned *val = q.front(); - q.pop_front(); - return val; - } - void _process(unsigned *item, ThreadPool::TPHandle &) override { - next->queue(item); - } - void _clear() override { q.clear(); } - bool _empty() override { return q.empty(); } -public: - PassAlong(ThreadPool *tp, Queueable *_next) : - ThreadPool::WorkQueue("TestQueue", 100, 100, tp), next(_next) {} -}; - -int main(int argc, char **argv) -{ - po::options_description desc("Allowed options"); - desc.add_options() - ("help", "produce help message") - ("num-threads", po::value()->default_value(10), - "set number of threads") - ("queue-size", po::value()->default_value(30), - "queue size") - ("num-items", po::value()->default_value(3000000), - "num items") - ("layers", po::value()->default_value(""), - "layer desc") - ; - - vector ceph_option_strings; - po::variables_map vm; - try { - po::parsed_options parsed = - po::command_line_parser(argc, argv).options(desc).allow_unregistered().run(); - po::store( - parsed, - vm); - po::notify(vm); - - ceph_option_strings = po::collect_unrecognized(parsed.options, - po::include_positional); - } catch(po::error &e) { - std::cerr << e.what() << std::endl; - return 1; - } - vector ceph_options, def_args; - ceph_options.reserve(ceph_option_strings.size()); - for (vector::iterator i = ceph_option_strings.begin(); - i != ceph_option_strings.end(); - ++i) { - ceph_options.push_back(i->c_str()); - } - - auto cct = global_init( - &def_args, ceph_options, CEPH_ENTITY_TYPE_CLIENT, - CODE_ENVIRONMENT_UTILITY, - CINIT_FLAG_NO_DEFAULT_CONFIG_FILE); - common_init_finish(g_ceph_context); - g_ceph_context->_conf->apply_changes(NULL); - - if (vm.count("help")) { - cout << desc << std::endl; - return 1; - } - - DetailedStatCollector col(1, new JSONFormatter, 0, &cout); - Semaphore sem; - for (unsigned i = 0; i < vm["queue-size"].as(); ++i) - sem.Put(); - - typedef list QQ; - QQ wqs; - wqs.push_back( - new Base(&col, &sem)); - string layers(vm["layers"].as()); - unsigned num = 0; - for (string::reverse_iterator i = layers.rbegin(); - i != layers.rend(); ++i) { - stringstream ss; - ss << "Test " << num; - if (*i == 'q') { - ThreadPool *tp = - new ThreadPool( - g_ceph_context, ss.str(), "tp_test", vm["num-threads"].as(), 0); - wqs.push_back( - new WQWrapper( - new PassAlong(tp, wqs.back()), - tp - )); - } else if (*i == 'f') { - wqs.push_back( - new FinisherWrapper( - g_ceph_context, wqs.back())); - } - ++num; - } - - for (QQ::iterator i = wqs.begin(); - i != wqs.end(); - ++i) { - (*i)->start(); - } - - for (uint64_t i = 0; i < vm["num-items"].as(); ++i) { - sem.Get(); - unsigned *item = new unsigned(col.next_seq()); - col.start_read(*item, 1); - wqs.back()->queue(item); - } - - for (QQ::iterator i = wqs.begin(); - i != wqs.end(); - ++i) { - (*i)->stop(); - } - for (QQ::iterator i = wqs.begin(); i != wqs.end(); wqs.erase(i++)) { - delete *i; - } - return 0; -}