// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- #include #include #include #include #include #include #include #include #include #include #include #include #include "common/Formatter.h" #include "bencher.h" #include "rados_backend.h" #include "detailed_stat_collector.h" #include "distribution.h" #include "global/global_init.h" #include "common/WorkQueue.h" #include "common/Semaphore.h" #include "common/Finisher.h" namespace po = boost::program_options; using namespace std; class Queueable { public: virtual void queue(unsigned *) = 0; virtual void start() = 0; virtual void stop() = 0; virtual ~Queueable() {}; }; class Base : public Queueable { DetailedStatCollector *col; Semaphore *sem; public: Base(DetailedStatCollector *col, Semaphore *sem) : col(col), sem(sem) {} void queue(unsigned *item) override { col->read_complete(*item); sem->Put(); delete item; } void start() override {} void stop() override {} }; class WQWrapper : public Queueable { boost::scoped_ptr > wq; boost::scoped_ptr tp; public: WQWrapper(ThreadPool::WorkQueue *wq, ThreadPool *tp): wq(wq), tp(tp) {} void queue(unsigned *item) override { wq->queue(item); } void start() override { tp->start(); } void stop() override { tp->stop(); } }; class FinisherWrapper : public Queueable { class CB : public Context { Queueable *next; unsigned *item; public: CB(Queueable *next, unsigned *item) : next(next), item(item) {} void finish(int) override { next->queue(item); } }; Finisher f; Queueable *next; public: FinisherWrapper(CephContext *cct, Queueable *next) : f(cct), next(next) {} void queue(unsigned *item) override { f.queue(new CB(next, item)); } void start() override { f.start(); } void stop() override { f.stop(); } }; class PassAlong : public ThreadPool::WorkQueue { Queueable *next; list q; bool _enqueue(unsigned *item) override { q.push_back(item); return true; } void _dequeue(unsigned *item) override { ceph_abort(); } unsigned *_dequeue() override { if (q.empty()) return 0; unsigned *val = q.front(); q.pop_front(); return val; } void _process(unsigned *item, ThreadPool::TPHandle &) override { next->queue(item); } void _clear() override { q.clear(); } bool _empty() override { return q.empty(); } public: PassAlong(ThreadPool *tp, Queueable *_next) : ThreadPool::WorkQueue("TestQueue", 100, 100, tp), next(_next) {} }; int main(int argc, char **argv) { po::options_description desc("Allowed options"); desc.add_options() ("help", "produce help message") ("num-threads", po::value()->default_value(10), "set number of threads") ("queue-size", po::value()->default_value(30), "queue size") ("num-items", po::value()->default_value(3000000), "num items") ("layers", po::value()->default_value(""), "layer desc") ; vector ceph_option_strings; po::variables_map vm; try { po::parsed_options parsed = po::command_line_parser(argc, argv).options(desc).allow_unregistered().run(); po::store( parsed, vm); po::notify(vm); ceph_option_strings = po::collect_unrecognized(parsed.options, po::include_positional); } catch(po::error &e) { std::cerr << e.what() << std::endl; return 1; } vector ceph_options, def_args; ceph_options.reserve(ceph_option_strings.size()); for (vector::iterator i = ceph_option_strings.begin(); i != ceph_option_strings.end(); ++i) { ceph_options.push_back(i->c_str()); } auto cct = global_init( &def_args, ceph_options, CEPH_ENTITY_TYPE_CLIENT, CODE_ENVIRONMENT_UTILITY, CINIT_FLAG_NO_DEFAULT_CONFIG_FILE); common_init_finish(g_ceph_context); g_ceph_context->_conf->apply_changes(NULL); if (vm.count("help")) { cout << desc << std::endl; return 1; } DetailedStatCollector col(1, new JSONFormatter, 0, &cout); Semaphore sem; for (unsigned i = 0; i < vm["queue-size"].as(); ++i) sem.Put(); typedef list QQ; QQ wqs; wqs.push_back( new Base(&col, &sem)); string layers(vm["layers"].as()); unsigned num = 0; for (string::reverse_iterator i = layers.rbegin(); i != layers.rend(); ++i) { stringstream ss; ss << "Test " << num; if (*i == 'q') { ThreadPool *tp = new ThreadPool( g_ceph_context, ss.str(), "tp_test", vm["num-threads"].as(), 0); wqs.push_back( new WQWrapper( new PassAlong(tp, wqs.back()), tp )); } else if (*i == 'f') { wqs.push_back( new FinisherWrapper( g_ceph_context, wqs.back())); } ++num; } for (QQ::iterator i = wqs.begin(); i != wqs.end(); ++i) { (*i)->start(); } for (uint64_t i = 0; i < vm["num-items"].as(); ++i) { sem.Get(); unsigned *item = new unsigned(col.next_seq()); col.start_read(*item, 1); wqs.back()->queue(item); } for (QQ::iterator i = wqs.begin(); i != wqs.end(); ++i) { (*i)->stop(); } for (QQ::iterator i = wqs.begin(); i != wqs.end(); wqs.erase(i++)) { delete *i; } return 0; }