1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
3 #include <boost/scoped_ptr.hpp>
4 #include <boost/lexical_cast.hpp>
5 #include <boost/program_options/option.hpp>
6 #include <boost/program_options/options_description.hpp>
7 #include <boost/program_options/variables_map.hpp>
8 #include <boost/program_options/cmdline.hpp>
9 #include <boost/program_options/parsers.hpp>
16 #include "common/Formatter.h"
19 #include "rados_backend.h"
20 #include "detailed_stat_collector.h"
21 #include "distribution.h"
22 #include "global/global_init.h"
23 #include "common/WorkQueue.h"
24 #include "common/Semaphore.h"
25 #include "common/Finisher.h"
27 namespace po = boost::program_options;
31 virtual void queue(unsigned *) = 0;
32 virtual void start() = 0;
33 virtual void stop() = 0;
34 virtual ~Queueable() {};
36 class Base : public Queueable {
37 DetailedStatCollector *col;
40 Base(DetailedStatCollector *col,
41 Semaphore *sem) : col(col), sem(sem) {}
42 void queue(unsigned *item) override {
43 col->read_complete(*item);
47 void start() override {}
48 void stop() override {}
50 class WQWrapper : public Queueable {
51 boost::scoped_ptr<ThreadPool::WorkQueue<unsigned> > wq;
52 boost::scoped_ptr<ThreadPool> tp;
54 WQWrapper(ThreadPool::WorkQueue<unsigned> *wq, ThreadPool *tp):
56 void queue(unsigned *item) override { wq->queue(item); }
57 void start() override { tp->start(); }
58 void stop() override { tp->stop(); }
60 class FinisherWrapper : public Queueable {
61 class CB : public Context {
65 CB(Queueable *next, unsigned *item) : next(next), item(item) {}
66 void finish(int) override {
73 FinisherWrapper(CephContext *cct, Queueable *next) :
75 void queue(unsigned *item) override {
76 f.queue(new CB(next, item));
78 void start() override { f.start(); }
79 void stop() override { f.stop(); }
81 class PassAlong : public ThreadPool::WorkQueue<unsigned> {
84 bool _enqueue(unsigned *item) override {
88 void _dequeue(unsigned *item) override { ceph_abort(); }
89 unsigned *_dequeue() override {
92 unsigned *val = q.front();
96 void _process(unsigned *item, ThreadPool::TPHandle &) override {
99 void _clear() override { q.clear(); }
100 bool _empty() override { return q.empty(); }
102 PassAlong(ThreadPool *tp, Queueable *_next) :
103 ThreadPool::WorkQueue<unsigned>("TestQueue", 100, 100, tp), next(_next) {}
106 int main(int argc, char **argv)
108 po::options_description desc("Allowed options");
110 ("help", "produce help message")
111 ("num-threads", po::value<unsigned>()->default_value(10),
112 "set number of threads")
113 ("queue-size", po::value<unsigned>()->default_value(30),
115 ("num-items", po::value<unsigned>()->default_value(3000000),
117 ("layers", po::value<string>()->default_value(""),
121 vector<string> ceph_option_strings;
122 po::variables_map vm;
124 po::parsed_options parsed =
125 po::command_line_parser(argc, argv).options(desc).allow_unregistered().run();
131 ceph_option_strings = po::collect_unrecognized(parsed.options,
132 po::include_positional);
133 } catch(po::error &e) {
134 std::cerr << e.what() << std::endl;
137 vector<const char *> ceph_options, def_args;
138 ceph_options.reserve(ceph_option_strings.size());
139 for (vector<string>::iterator i = ceph_option_strings.begin();
140 i != ceph_option_strings.end();
142 ceph_options.push_back(i->c_str());
145 auto cct = global_init(
146 &def_args, ceph_options, CEPH_ENTITY_TYPE_CLIENT,
147 CODE_ENVIRONMENT_UTILITY,
148 CINIT_FLAG_NO_DEFAULT_CONFIG_FILE);
149 common_init_finish(g_ceph_context);
150 g_ceph_context->_conf->apply_changes(NULL);
152 if (vm.count("help")) {
153 cout << desc << std::endl;
157 DetailedStatCollector col(1, new JSONFormatter, 0, &cout);
159 for (unsigned i = 0; i < vm["queue-size"].as<unsigned>(); ++i)
162 typedef list<Queueable*> QQ;
165 new Base(&col, &sem));
166 string layers(vm["layers"].as<string>());
168 for (string::reverse_iterator i = layers.rbegin();
169 i != layers.rend(); ++i) {
171 ss << "Test " << num;
175 g_ceph_context, ss.str(), "tp_test", vm["num-threads"].as<unsigned>(), 0);
178 new PassAlong(tp, wqs.back()),
181 } else if (*i == 'f') {
184 g_ceph_context, wqs.back()));
189 for (QQ::iterator i = wqs.begin();
195 for (uint64_t i = 0; i < vm["num-items"].as<unsigned>(); ++i) {
197 unsigned *item = new unsigned(col.next_seq());
198 col.start_read(*item, 1);
199 wqs.back()->queue(item);
202 for (QQ::iterator i = wqs.begin();
207 for (QQ::iterator i = wqs.begin(); i != wqs.end(); wqs.erase(i++)) {