X-Git-Url: https://gerrit.opnfv.org/gerrit/gitweb?a=blobdiff_plain;f=src%2Fceph%2Fsrc%2Ftest%2Fbench%2Ftp_bench.cc;fp=src%2Fceph%2Fsrc%2Ftest%2Fbench%2Ftp_bench.cc;h=599e03614d70b4bd0870e81cb3f66ea69d28ea75;hb=812ff6ca9fcd3e629e49d4328905f33eee8ca3f5;hp=0000000000000000000000000000000000000000;hpb=15280273faafb77777eab341909a3f495cf248d9;p=stor4nfv.git diff --git a/src/ceph/src/test/bench/tp_bench.cc b/src/ceph/src/test/bench/tp_bench.cc new file mode 100644 index 0000000..599e036 --- /dev/null +++ b/src/ceph/src/test/bench/tp_bench.cc @@ -0,0 +1,211 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include "common/Formatter.h" + +#include "bencher.h" +#include "rados_backend.h" +#include "detailed_stat_collector.h" +#include "distribution.h" +#include "global/global_init.h" +#include "common/WorkQueue.h" +#include "common/Semaphore.h" +#include "common/Finisher.h" + +namespace po = boost::program_options; +using namespace std; +class Queueable { +public: + virtual void queue(unsigned *) = 0; + virtual void start() = 0; + virtual void stop() = 0; + virtual ~Queueable() {}; +}; +class Base : public Queueable { + DetailedStatCollector *col; + Semaphore *sem; +public: + Base(DetailedStatCollector *col, + Semaphore *sem) : col(col), sem(sem) {} + void queue(unsigned *item) override { + col->read_complete(*item); + sem->Put(); + delete item; + } + void start() override {} + void stop() override {} +}; +class WQWrapper : public Queueable { + boost::scoped_ptr > wq; + boost::scoped_ptr tp; +public: + WQWrapper(ThreadPool::WorkQueue *wq, ThreadPool *tp): + wq(wq), tp(tp) {} + void queue(unsigned *item) override { wq->queue(item); } + void start() override { tp->start(); } + void stop() override { tp->stop(); } +}; +class FinisherWrapper : public Queueable { + class CB : public Context { + Queueable *next; + unsigned *item; + public: + CB(Queueable *next, unsigned *item) : next(next), item(item) {} + void finish(int) override { + next->queue(item); + } + }; + Finisher f; + Queueable *next; +public: + FinisherWrapper(CephContext *cct, Queueable *next) : + f(cct), next(next) {} + void queue(unsigned *item) override { + f.queue(new CB(next, item)); + } + void start() override { f.start(); } + void stop() override { f.stop(); } +}; +class PassAlong : public ThreadPool::WorkQueue { + Queueable *next; + list q; + bool _enqueue(unsigned *item) override { + q.push_back(item); + return true; + } + void _dequeue(unsigned *item) override { ceph_abort(); } + unsigned *_dequeue() override { + if (q.empty()) + return 0; + unsigned *val = q.front(); + q.pop_front(); + return val; + } + void _process(unsigned *item, ThreadPool::TPHandle &) override { + next->queue(item); + } + void _clear() override { q.clear(); } + bool _empty() override { return q.empty(); } +public: + PassAlong(ThreadPool *tp, Queueable *_next) : + ThreadPool::WorkQueue("TestQueue", 100, 100, tp), next(_next) {} +}; + +int main(int argc, char **argv) +{ + po::options_description desc("Allowed options"); + desc.add_options() + ("help", "produce help message") + ("num-threads", po::value()->default_value(10), + "set number of threads") + ("queue-size", po::value()->default_value(30), + "queue size") + ("num-items", po::value()->default_value(3000000), + "num items") + ("layers", po::value()->default_value(""), + "layer desc") + ; + + vector ceph_option_strings; + po::variables_map vm; + try { + po::parsed_options parsed = + po::command_line_parser(argc, argv).options(desc).allow_unregistered().run(); + po::store( + parsed, + vm); + po::notify(vm); + + ceph_option_strings = po::collect_unrecognized(parsed.options, + po::include_positional); + } catch(po::error &e) { + std::cerr << e.what() << std::endl; + return 1; + } + vector ceph_options, def_args; + ceph_options.reserve(ceph_option_strings.size()); + for (vector::iterator i = ceph_option_strings.begin(); + i != ceph_option_strings.end(); + ++i) { + ceph_options.push_back(i->c_str()); + } + + auto cct = global_init( + &def_args, ceph_options, CEPH_ENTITY_TYPE_CLIENT, + CODE_ENVIRONMENT_UTILITY, + CINIT_FLAG_NO_DEFAULT_CONFIG_FILE); + common_init_finish(g_ceph_context); + g_ceph_context->_conf->apply_changes(NULL); + + if (vm.count("help")) { + cout << desc << std::endl; + return 1; + } + + DetailedStatCollector col(1, new JSONFormatter, 0, &cout); + Semaphore sem; + for (unsigned i = 0; i < vm["queue-size"].as(); ++i) + sem.Put(); + + typedef list QQ; + QQ wqs; + wqs.push_back( + new Base(&col, &sem)); + string layers(vm["layers"].as()); + unsigned num = 0; + for (string::reverse_iterator i = layers.rbegin(); + i != layers.rend(); ++i) { + stringstream ss; + ss << "Test " << num; + if (*i == 'q') { + ThreadPool *tp = + new ThreadPool( + g_ceph_context, ss.str(), "tp_test", vm["num-threads"].as(), 0); + wqs.push_back( + new WQWrapper( + new PassAlong(tp, wqs.back()), + tp + )); + } else if (*i == 'f') { + wqs.push_back( + new FinisherWrapper( + g_ceph_context, wqs.back())); + } + ++num; + } + + for (QQ::iterator i = wqs.begin(); + i != wqs.end(); + ++i) { + (*i)->start(); + } + + for (uint64_t i = 0; i < vm["num-items"].as(); ++i) { + sem.Get(); + unsigned *item = new unsigned(col.next_seq()); + col.start_read(*item, 1); + wqs.back()->queue(item); + } + + for (QQ::iterator i = wqs.begin(); + i != wqs.end(); + ++i) { + (*i)->stop(); + } + for (QQ::iterator i = wqs.begin(); i != wqs.end(); wqs.erase(i++)) { + delete *i; + } + return 0; +}