// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- #include "bencher.h" #include "include/utime.h" #include #include "include/memory.h" #include "common/Mutex.h" #include "common/Cond.h" template struct C_Holder : public Context { T obj; explicit C_Holder( T obj) : obj(obj) {} void finish(int r) override { return; } }; struct OnDelete { Context *c; explicit OnDelete(Context *c) : c(c) {} ~OnDelete() { c->complete(0); } }; struct Cleanup : public Context { Bencher *bench; explicit Cleanup(Bencher *bench) : bench(bench) {} void finish(int r) override { bench->complete_op(); } }; struct OnWriteApplied : public Context { Bencher *bench; uint64_t seq; ceph::shared_ptr on_delete; OnWriteApplied( Bencher *bench, uint64_t seq, ceph::shared_ptr on_delete ) : bench(bench), seq(seq), on_delete(on_delete) {} void finish(int r) override { bench->stat_collector->write_applied(seq); } }; struct OnWriteCommit : public Context { Bencher *bench; uint64_t seq; ceph::shared_ptr on_delete; OnWriteCommit( Bencher *bench, uint64_t seq, ceph::shared_ptr on_delete ) : bench(bench), seq(seq), on_delete(on_delete) {} void finish(int r) override { bench->stat_collector->write_committed(seq); } }; struct OnReadComplete : public Context { Bencher *bench; uint64_t seq; boost::scoped_ptr bl; OnReadComplete(Bencher *bench, uint64_t seq, bufferlist *bl) : bench(bench), seq(seq), bl(bl) {} void finish(int r) override { bench->stat_collector->read_complete(seq); bench->complete_op(); } }; void Bencher::start_op() { Mutex::Locker l(lock); while (open_ops >= max_in_flight) open_ops_cond.Wait(lock); ++open_ops; } void Bencher::drain_ops() { Mutex::Locker l(lock); while (open_ops) open_ops_cond.Wait(lock); } void Bencher::complete_op() { Mutex::Locker l(lock); assert(open_ops > 0); --open_ops; open_ops_cond.Signal(); } struct OnFinish { bool *done; Mutex *lock; Cond *cond; OnFinish( bool *done, Mutex *lock, Cond *cond) : done(done), lock(lock), cond(cond) {} ~OnFinish() { Mutex::Locker l(*lock); *done = true; cond->Signal(); } }; void Bencher::init( const set &objects, uint64_t size, std::ostream *out ) { bufferlist bl; for (uint64_t i = 0; i < size; ++i) { bl.append(0); } Mutex lock("init_lock"); Cond cond; bool done = 0; { ceph::shared_ptr on_finish( new OnFinish(&done, &lock, &cond)); uint64_t num = 0; for (set::const_iterator i = objects.begin(); i != objects.end(); ++i, ++num) { if (!(num % 20)) *out << "Creating " << num << "/" << objects.size() << std::endl; backend->write( *i, 0, bl, new C_Holder >(on_finish), new C_Holder >(on_finish) ); } } { Mutex::Locker l(lock); while (!done) cond.Wait(lock); } } void Bencher::run_bench() { time_t end = time(0) + max_duration; uint64_t ops = 0; bufferlist bl; while ((!max_duration || time(0) < end) && (!max_ops || ops < max_ops)) { start_op(); uint64_t seq = stat_collector->next_seq(); boost::tuple next = (*op_dist)(); string obj_name = next.get<0>(); uint64_t offset = next.get<1>(); uint64_t length = next.get<2>(); OpType op_type = next.get<3>(); switch (op_type) { case WRITE: { ceph::shared_ptr on_delete( new OnDelete(new Cleanup(this))); stat_collector->start_write(seq, length); while (bl.length() < length) { bl.append(rand()); } backend->write( obj_name, offset, bl, new OnWriteApplied( this, seq, on_delete), new OnWriteCommit( this, seq, on_delete) ); break; } case READ: { stat_collector->start_read(seq, length); bufferlist *read_bl = new bufferlist; backend->read( obj_name, offset, length, read_bl, new OnReadComplete( this, seq, read_bl) ); break; } default: { ceph_abort(); } } ops++; } drain_ops(); }