Fix some bugs when testing opensds ansible
[stor4nfv.git] / src / ceph / src / test / bench / tp_bench.cc
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2
3 #include <boost/scoped_ptr.hpp>
4 #include <boost/lexical_cast.hpp>
5 #include <boost/program_options/option.hpp>
6 #include <boost/program_options/options_description.hpp>
7 #include <boost/program_options/variables_map.hpp>
8 #include <boost/program_options/cmdline.hpp>
9 #include <boost/program_options/parsers.hpp>
10 #include <iostream>
11 #include <set>
12 #include <sstream>
13 #include <stdlib.h>
14 #include <fstream>
15
16 #include "common/Formatter.h"
17
18 #include "bencher.h"
19 #include "rados_backend.h"
20 #include "detailed_stat_collector.h"
21 #include "distribution.h"
22 #include "global/global_init.h"
23 #include "common/WorkQueue.h"
24 #include "common/Semaphore.h"
25 #include "common/Finisher.h"
26
27 namespace po = boost::program_options;
28 using namespace std;
29 class Queueable {
30 public:
31   virtual void queue(unsigned *) = 0;
32   virtual void start() = 0;
33   virtual void stop() = 0;
34   virtual ~Queueable() {};
35 };
36 class Base : public Queueable {
37   DetailedStatCollector *col;
38   Semaphore *sem;
39 public:
40   Base(DetailedStatCollector *col,
41        Semaphore *sem) : col(col), sem(sem) {}
42   void queue(unsigned *item) override {
43     col->read_complete(*item);
44     sem->Put();
45     delete item;
46   }
47   void start() override {}
48   void stop() override {}
49 };
50 class WQWrapper : public Queueable {
51   boost::scoped_ptr<ThreadPool::WorkQueue<unsigned> > wq;
52   boost::scoped_ptr<ThreadPool> tp;
53 public:
54   WQWrapper(ThreadPool::WorkQueue<unsigned> *wq, ThreadPool *tp):
55     wq(wq), tp(tp) {}
56   void queue(unsigned *item) override { wq->queue(item); }
57   void start() override { tp->start(); }
58   void stop() override { tp->stop(); }
59 };
60 class FinisherWrapper : public Queueable {
61   class CB : public Context {
62     Queueable *next;
63     unsigned *item;
64   public:
65     CB(Queueable *next, unsigned *item) : next(next), item(item) {}
66     void finish(int) override {
67       next->queue(item);
68     }
69   };
70   Finisher f;
71   Queueable *next;
72 public:
73   FinisherWrapper(CephContext *cct, Queueable *next) :
74     f(cct), next(next) {}
75   void queue(unsigned *item) override {
76     f.queue(new CB(next, item));
77   }
78   void start() override { f.start(); }
79   void stop() override { f.stop(); }
80 };
81 class PassAlong : public ThreadPool::WorkQueue<unsigned> {
82   Queueable *next;
83   list<unsigned*> q;
84   bool _enqueue(unsigned *item) override {
85     q.push_back(item);
86     return true;
87   }
88   void _dequeue(unsigned *item) override { ceph_abort(); }
89   unsigned *_dequeue() override {
90     if (q.empty())
91       return 0;
92     unsigned *val = q.front();
93     q.pop_front();
94     return val;
95   }
96   void _process(unsigned *item, ThreadPool::TPHandle &) override {
97     next->queue(item);
98   }
99   void _clear() override { q.clear(); }
100   bool _empty() override { return q.empty(); }
101 public:
102   PassAlong(ThreadPool *tp, Queueable *_next) :
103     ThreadPool::WorkQueue<unsigned>("TestQueue", 100, 100, tp), next(_next) {}
104 };
105
106 int main(int argc, char **argv)
107 {
108   po::options_description desc("Allowed options");
109   desc.add_options()
110     ("help", "produce help message")
111     ("num-threads", po::value<unsigned>()->default_value(10),
112      "set number of threads")
113     ("queue-size", po::value<unsigned>()->default_value(30),
114      "queue size")
115     ("num-items", po::value<unsigned>()->default_value(3000000),
116      "num items")
117     ("layers", po::value<string>()->default_value(""),
118      "layer desc")
119     ;
120
121   vector<string> ceph_option_strings;
122   po::variables_map vm;
123   try {
124     po::parsed_options parsed =
125       po::command_line_parser(argc, argv).options(desc).allow_unregistered().run();
126     po::store(
127               parsed,
128               vm);
129     po::notify(vm);
130
131     ceph_option_strings = po::collect_unrecognized(parsed.options,
132                                                    po::include_positional);
133   } catch(po::error &e) {
134     std::cerr << e.what() << std::endl;
135     return 1;
136   }
137   vector<const char *> ceph_options, def_args;
138   ceph_options.reserve(ceph_option_strings.size());
139   for (vector<string>::iterator i = ceph_option_strings.begin();
140        i != ceph_option_strings.end();
141        ++i) {
142     ceph_options.push_back(i->c_str());
143   }
144
145   auto cct = global_init(
146     &def_args, ceph_options, CEPH_ENTITY_TYPE_CLIENT,
147     CODE_ENVIRONMENT_UTILITY,
148     CINIT_FLAG_NO_DEFAULT_CONFIG_FILE);
149   common_init_finish(g_ceph_context);
150   g_ceph_context->_conf->apply_changes(NULL);
151
152   if (vm.count("help")) {
153     cout << desc << std::endl;
154     return 1;
155   }
156
157   DetailedStatCollector col(1, new JSONFormatter, 0, &cout);
158   Semaphore sem;
159   for (unsigned i = 0; i < vm["queue-size"].as<unsigned>(); ++i)
160     sem.Put();
161
162   typedef list<Queueable*> QQ;
163   QQ wqs;
164   wqs.push_back(
165     new Base(&col, &sem));
166   string layers(vm["layers"].as<string>());
167   unsigned num = 0;
168   for (string::reverse_iterator i = layers.rbegin();
169        i != layers.rend(); ++i) {
170     stringstream ss;
171     ss << "Test " << num;
172     if (*i == 'q') {
173       ThreadPool *tp =
174         new ThreadPool(
175           g_ceph_context, ss.str(), "tp_test",  vm["num-threads"].as<unsigned>(), 0);
176       wqs.push_back(
177         new WQWrapper(
178           new PassAlong(tp, wqs.back()),
179           tp
180           ));
181     } else if (*i == 'f') {
182       wqs.push_back(
183         new FinisherWrapper(
184           g_ceph_context, wqs.back()));
185     }
186     ++num;
187   }
188
189   for (QQ::iterator i = wqs.begin();
190        i != wqs.end();
191        ++i) {
192     (*i)->start();
193   }
194
195   for (uint64_t i = 0; i < vm["num-items"].as<unsigned>(); ++i) {
196     sem.Get();
197     unsigned *item = new unsigned(col.next_seq());
198     col.start_read(*item, 1);
199     wqs.back()->queue(item);
200   }
201
202   for (QQ::iterator i = wqs.begin();
203        i != wqs.end();
204        ++i) {
205     (*i)->stop();
206   }
207   for (QQ::iterator i = wqs.begin(); i != wqs.end(); wqs.erase(i++)) {
208     delete *i;
209   }
210   return 0;
211 }