Fix some bugs when testing opensds ansible
[stor4nfv.git] / src / ceph / src / test / bench / bencher.cc
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2
3 #include "bencher.h"
4 #include "include/utime.h"
5 #include <unistd.h>
6 #include "include/memory.h"
7 #include "common/Mutex.h"
8 #include "common/Cond.h"
9
10 template<typename T>
11 struct C_Holder : public Context {
12   T obj;
13   explicit C_Holder(
14     T obj)
15     : obj(obj) {}
16   void finish(int r) override {
17     return;
18   }
19 };
20
21 struct OnDelete {
22   Context *c;
23   explicit OnDelete(Context *c) : c(c) {}
24   ~OnDelete() { c->complete(0); }
25 };
26
27 struct Cleanup : public Context {
28   Bencher *bench;
29   explicit Cleanup(Bencher *bench) : bench(bench) {}
30   void finish(int r) override {
31     bench->complete_op();
32   }
33 };
34
35 struct OnWriteApplied : public Context {
36   Bencher *bench;
37   uint64_t seq;
38   ceph::shared_ptr<OnDelete> on_delete;
39   OnWriteApplied(
40     Bencher *bench, uint64_t seq,
41     ceph::shared_ptr<OnDelete> on_delete
42     ) : bench(bench), seq(seq), on_delete(on_delete) {}
43   void finish(int r) override {
44     bench->stat_collector->write_applied(seq);
45   }
46 };
47
48 struct OnWriteCommit : public Context {
49   Bencher *bench;
50   uint64_t seq;
51   ceph::shared_ptr<OnDelete> on_delete;
52   OnWriteCommit(
53     Bencher *bench, uint64_t seq,
54     ceph::shared_ptr<OnDelete> on_delete
55     ) : bench(bench), seq(seq), on_delete(on_delete) {}
56   void finish(int r) override {
57     bench->stat_collector->write_committed(seq);
58   }
59 };
60
61 struct OnReadComplete : public Context {
62   Bencher *bench;
63   uint64_t seq;
64   boost::scoped_ptr<bufferlist> bl;
65   OnReadComplete(Bencher *bench, uint64_t seq, bufferlist *bl) :
66     bench(bench), seq(seq), bl(bl) {}
67   void finish(int r) override {
68     bench->stat_collector->read_complete(seq);
69     bench->complete_op();
70   }
71 };
72
73 void Bencher::start_op() {
74   Mutex::Locker l(lock);
75   while (open_ops >= max_in_flight)
76     open_ops_cond.Wait(lock);
77   ++open_ops;
78 }
79
80 void Bencher::drain_ops() {
81   Mutex::Locker l(lock);
82   while (open_ops)
83     open_ops_cond.Wait(lock);
84 }
85
86 void Bencher::complete_op() {
87   Mutex::Locker l(lock);
88   assert(open_ops > 0);
89   --open_ops;
90   open_ops_cond.Signal();
91 }
92
93 struct OnFinish {
94   bool *done;
95   Mutex *lock;
96   Cond *cond;
97   OnFinish(
98     bool *done,
99     Mutex *lock,
100     Cond *cond) :
101     done(done), lock(lock), cond(cond) {}
102   ~OnFinish() {
103     Mutex::Locker l(*lock);
104     *done = true;
105     cond->Signal();
106   }
107 };
108
109 void Bencher::init(
110   const set<std::string> &objects,
111   uint64_t size,
112   std::ostream *out
113   )
114 {
115   bufferlist bl;
116   for (uint64_t i = 0; i < size; ++i) {
117     bl.append(0);
118   }
119   Mutex lock("init_lock");
120   Cond cond;
121   bool done = 0;
122   {
123     ceph::shared_ptr<OnFinish> on_finish(
124       new OnFinish(&done, &lock, &cond));
125     uint64_t num = 0;
126     for (set<std::string>::const_iterator i = objects.begin();
127          i != objects.end();
128          ++i, ++num) {
129       if (!(num % 20))
130         *out << "Creating " << num << "/" << objects.size() << std::endl;
131       backend->write(
132         *i,
133         0,
134         bl,
135         new C_Holder<ceph::shared_ptr<OnFinish> >(on_finish),
136         new C_Holder<ceph::shared_ptr<OnFinish> >(on_finish)
137         );
138     }
139   }
140   {
141     Mutex::Locker l(lock);
142     while (!done)
143       cond.Wait(lock);
144   }
145 }
146
147 void Bencher::run_bench()
148 {
149   time_t end = time(0) + max_duration;
150   uint64_t ops = 0;
151
152   bufferlist bl;
153
154   while ((!max_duration || time(0) < end) && (!max_ops || ops < max_ops)) {
155     start_op();
156     uint64_t seq = stat_collector->next_seq();
157     boost::tuple<std::string, uint64_t, uint64_t, OpType> next =
158       (*op_dist)();
159     string obj_name = next.get<0>();
160     uint64_t offset = next.get<1>();
161     uint64_t length = next.get<2>();
162     OpType op_type = next.get<3>();
163     switch (op_type) {
164       case WRITE: {
165         ceph::shared_ptr<OnDelete> on_delete(
166           new OnDelete(new Cleanup(this)));
167         stat_collector->start_write(seq, length);
168         while (bl.length() < length) {
169           bl.append(rand());
170         }
171         backend->write(
172           obj_name,
173           offset,
174           bl,
175           new OnWriteApplied(
176             this, seq, on_delete),
177           new OnWriteCommit(
178             this, seq, on_delete)
179           );
180         break;
181       }
182       case READ: {
183         stat_collector->start_read(seq, length);
184         bufferlist *read_bl = new bufferlist;
185         backend->read(
186           obj_name,
187           offset,
188           length,
189           read_bl,
190           new OnReadComplete(
191             this, seq, read_bl)
192           );
193         break;
194       }
195       default: {
196         ceph_abort();
197       }
198     } 
199     ops++;
200   }
201   drain_ops();
202 }