--- /dev/null
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+
+#include <boost/scoped_ptr.hpp>
+#include <boost/lexical_cast.hpp>
+#include <boost/program_options/option.hpp>
+#include <boost/program_options/options_description.hpp>
+#include <boost/program_options/variables_map.hpp>
+#include <boost/program_options/cmdline.hpp>
+#include <boost/program_options/parsers.hpp>
+#include <iostream>
+#include <set>
+#include <sstream>
+#include <stdlib.h>
+#include <fstream>
+
+#include "common/Formatter.h"
+
+#include "bencher.h"
+#include "rados_backend.h"
+#include "detailed_stat_collector.h"
+#include "distribution.h"
+#include "global/global_init.h"
+#include "common/WorkQueue.h"
+#include "common/Semaphore.h"
+#include "common/Finisher.h"
+
+namespace po = boost::program_options;
+using namespace std;
+class Queueable {
+public:
+ virtual void queue(unsigned *) = 0;
+ virtual void start() = 0;
+ virtual void stop() = 0;
+ virtual ~Queueable() {};
+};
+class Base : public Queueable {
+ DetailedStatCollector *col;
+ Semaphore *sem;
+public:
+ Base(DetailedStatCollector *col,
+ Semaphore *sem) : col(col), sem(sem) {}
+ void queue(unsigned *item) override {
+ col->read_complete(*item);
+ sem->Put();
+ delete item;
+ }
+ void start() override {}
+ void stop() override {}
+};
+class WQWrapper : public Queueable {
+ boost::scoped_ptr<ThreadPool::WorkQueue<unsigned> > wq;
+ boost::scoped_ptr<ThreadPool> tp;
+public:
+ WQWrapper(ThreadPool::WorkQueue<unsigned> *wq, ThreadPool *tp):
+ wq(wq), tp(tp) {}
+ void queue(unsigned *item) override { wq->queue(item); }
+ void start() override { tp->start(); }
+ void stop() override { tp->stop(); }
+};
+class FinisherWrapper : public Queueable {
+ class CB : public Context {
+ Queueable *next;
+ unsigned *item;
+ public:
+ CB(Queueable *next, unsigned *item) : next(next), item(item) {}
+ void finish(int) override {
+ next->queue(item);
+ }
+ };
+ Finisher f;
+ Queueable *next;
+public:
+ FinisherWrapper(CephContext *cct, Queueable *next) :
+ f(cct), next(next) {}
+ void queue(unsigned *item) override {
+ f.queue(new CB(next, item));
+ }
+ void start() override { f.start(); }
+ void stop() override { f.stop(); }
+};
+class PassAlong : public ThreadPool::WorkQueue<unsigned> {
+ Queueable *next;
+ list<unsigned*> q;
+ bool _enqueue(unsigned *item) override {
+ q.push_back(item);
+ return true;
+ }
+ void _dequeue(unsigned *item) override { ceph_abort(); }
+ unsigned *_dequeue() override {
+ if (q.empty())
+ return 0;
+ unsigned *val = q.front();
+ q.pop_front();
+ return val;
+ }
+ void _process(unsigned *item, ThreadPool::TPHandle &) override {
+ next->queue(item);
+ }
+ void _clear() override { q.clear(); }
+ bool _empty() override { return q.empty(); }
+public:
+ PassAlong(ThreadPool *tp, Queueable *_next) :
+ ThreadPool::WorkQueue<unsigned>("TestQueue", 100, 100, tp), next(_next) {}
+};
+
+int main(int argc, char **argv)
+{
+ po::options_description desc("Allowed options");
+ desc.add_options()
+ ("help", "produce help message")
+ ("num-threads", po::value<unsigned>()->default_value(10),
+ "set number of threads")
+ ("queue-size", po::value<unsigned>()->default_value(30),
+ "queue size")
+ ("num-items", po::value<unsigned>()->default_value(3000000),
+ "num items")
+ ("layers", po::value<string>()->default_value(""),
+ "layer desc")
+ ;
+
+ vector<string> ceph_option_strings;
+ po::variables_map vm;
+ try {
+ po::parsed_options parsed =
+ po::command_line_parser(argc, argv).options(desc).allow_unregistered().run();
+ po::store(
+ parsed,
+ vm);
+ po::notify(vm);
+
+ ceph_option_strings = po::collect_unrecognized(parsed.options,
+ po::include_positional);
+ } catch(po::error &e) {
+ std::cerr << e.what() << std::endl;
+ return 1;
+ }
+ vector<const char *> ceph_options, def_args;
+ ceph_options.reserve(ceph_option_strings.size());
+ for (vector<string>::iterator i = ceph_option_strings.begin();
+ i != ceph_option_strings.end();
+ ++i) {
+ ceph_options.push_back(i->c_str());
+ }
+
+ auto cct = global_init(
+ &def_args, ceph_options, CEPH_ENTITY_TYPE_CLIENT,
+ CODE_ENVIRONMENT_UTILITY,
+ CINIT_FLAG_NO_DEFAULT_CONFIG_FILE);
+ common_init_finish(g_ceph_context);
+ g_ceph_context->_conf->apply_changes(NULL);
+
+ if (vm.count("help")) {
+ cout << desc << std::endl;
+ return 1;
+ }
+
+ DetailedStatCollector col(1, new JSONFormatter, 0, &cout);
+ Semaphore sem;
+ for (unsigned i = 0; i < vm["queue-size"].as<unsigned>(); ++i)
+ sem.Put();
+
+ typedef list<Queueable*> QQ;
+ QQ wqs;
+ wqs.push_back(
+ new Base(&col, &sem));
+ string layers(vm["layers"].as<string>());
+ unsigned num = 0;
+ for (string::reverse_iterator i = layers.rbegin();
+ i != layers.rend(); ++i) {
+ stringstream ss;
+ ss << "Test " << num;
+ if (*i == 'q') {
+ ThreadPool *tp =
+ new ThreadPool(
+ g_ceph_context, ss.str(), "tp_test", vm["num-threads"].as<unsigned>(), 0);
+ wqs.push_back(
+ new WQWrapper(
+ new PassAlong(tp, wqs.back()),
+ tp
+ ));
+ } else if (*i == 'f') {
+ wqs.push_back(
+ new FinisherWrapper(
+ g_ceph_context, wqs.back()));
+ }
+ ++num;
+ }
+
+ for (QQ::iterator i = wqs.begin();
+ i != wqs.end();
+ ++i) {
+ (*i)->start();
+ }
+
+ for (uint64_t i = 0; i < vm["num-items"].as<unsigned>(); ++i) {
+ sem.Get();
+ unsigned *item = new unsigned(col.next_seq());
+ col.start_read(*item, 1);
+ wqs.back()->queue(item);
+ }
+
+ for (QQ::iterator i = wqs.begin();
+ i != wqs.end();
+ ++i) {
+ (*i)->stop();
+ }
+ for (QQ::iterator i = wqs.begin(); i != wqs.end(); wqs.erase(i++)) {
+ delete *i;
+ }
+ return 0;
+}