1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
4 #include "include/utime.h"
6 #include "include/memory.h"
7 #include "common/Mutex.h"
8 #include "common/Cond.h"
11 struct C_Holder : public Context {
16 void finish(int r) override {
23 explicit OnDelete(Context *c) : c(c) {}
24 ~OnDelete() { c->complete(0); }
27 struct Cleanup : public Context {
29 explicit Cleanup(Bencher *bench) : bench(bench) {}
30 void finish(int r) override {
35 struct OnWriteApplied : public Context {
38 ceph::shared_ptr<OnDelete> on_delete;
40 Bencher *bench, uint64_t seq,
41 ceph::shared_ptr<OnDelete> on_delete
42 ) : bench(bench), seq(seq), on_delete(on_delete) {}
43 void finish(int r) override {
44 bench->stat_collector->write_applied(seq);
48 struct OnWriteCommit : public Context {
51 ceph::shared_ptr<OnDelete> on_delete;
53 Bencher *bench, uint64_t seq,
54 ceph::shared_ptr<OnDelete> on_delete
55 ) : bench(bench), seq(seq), on_delete(on_delete) {}
56 void finish(int r) override {
57 bench->stat_collector->write_committed(seq);
61 struct OnReadComplete : public Context {
64 boost::scoped_ptr<bufferlist> bl;
65 OnReadComplete(Bencher *bench, uint64_t seq, bufferlist *bl) :
66 bench(bench), seq(seq), bl(bl) {}
67 void finish(int r) override {
68 bench->stat_collector->read_complete(seq);
73 void Bencher::start_op() {
74 Mutex::Locker l(lock);
75 while (open_ops >= max_in_flight)
76 open_ops_cond.Wait(lock);
80 void Bencher::drain_ops() {
81 Mutex::Locker l(lock);
83 open_ops_cond.Wait(lock);
86 void Bencher::complete_op() {
87 Mutex::Locker l(lock);
90 open_ops_cond.Signal();
101 done(done), lock(lock), cond(cond) {}
103 Mutex::Locker l(*lock);
110 const set<std::string> &objects,
116 for (uint64_t i = 0; i < size; ++i) {
119 Mutex lock("init_lock");
123 ceph::shared_ptr<OnFinish> on_finish(
124 new OnFinish(&done, &lock, &cond));
126 for (set<std::string>::const_iterator i = objects.begin();
130 *out << "Creating " << num << "/" << objects.size() << std::endl;
135 new C_Holder<ceph::shared_ptr<OnFinish> >(on_finish),
136 new C_Holder<ceph::shared_ptr<OnFinish> >(on_finish)
141 Mutex::Locker l(lock);
147 void Bencher::run_bench()
149 time_t end = time(0) + max_duration;
154 while ((!max_duration || time(0) < end) && (!max_ops || ops < max_ops)) {
156 uint64_t seq = stat_collector->next_seq();
157 boost::tuple<std::string, uint64_t, uint64_t, OpType> next =
159 string obj_name = next.get<0>();
160 uint64_t offset = next.get<1>();
161 uint64_t length = next.get<2>();
162 OpType op_type = next.get<3>();
165 ceph::shared_ptr<OnDelete> on_delete(
166 new OnDelete(new Cleanup(this)));
167 stat_collector->start_write(seq, length);
168 while (bl.length() < length) {
176 this, seq, on_delete),
178 this, seq, on_delete)
183 stat_collector->start_read(seq, length);
184 bufferlist *read_bl = new bufferlist;