+++ /dev/null
-// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
-
-#include "bencher.h"
-#include "include/utime.h"
-#include <unistd.h>
-#include "include/memory.h"
-#include "common/Mutex.h"
-#include "common/Cond.h"
-
-template<typename T>
-struct C_Holder : public Context {
- T obj;
- explicit C_Holder(
- T obj)
- : obj(obj) {}
- void finish(int r) override {
- return;
- }
-};
-
-struct OnDelete {
- Context *c;
- explicit OnDelete(Context *c) : c(c) {}
- ~OnDelete() { c->complete(0); }
-};
-
-struct Cleanup : public Context {
- Bencher *bench;
- explicit Cleanup(Bencher *bench) : bench(bench) {}
- void finish(int r) override {
- bench->complete_op();
- }
-};
-
-struct OnWriteApplied : public Context {
- Bencher *bench;
- uint64_t seq;
- ceph::shared_ptr<OnDelete> on_delete;
- OnWriteApplied(
- Bencher *bench, uint64_t seq,
- ceph::shared_ptr<OnDelete> on_delete
- ) : bench(bench), seq(seq), on_delete(on_delete) {}
- void finish(int r) override {
- bench->stat_collector->write_applied(seq);
- }
-};
-
-struct OnWriteCommit : public Context {
- Bencher *bench;
- uint64_t seq;
- ceph::shared_ptr<OnDelete> on_delete;
- OnWriteCommit(
- Bencher *bench, uint64_t seq,
- ceph::shared_ptr<OnDelete> on_delete
- ) : bench(bench), seq(seq), on_delete(on_delete) {}
- void finish(int r) override {
- bench->stat_collector->write_committed(seq);
- }
-};
-
-struct OnReadComplete : public Context {
- Bencher *bench;
- uint64_t seq;
- boost::scoped_ptr<bufferlist> bl;
- OnReadComplete(Bencher *bench, uint64_t seq, bufferlist *bl) :
- bench(bench), seq(seq), bl(bl) {}
- void finish(int r) override {
- bench->stat_collector->read_complete(seq);
- bench->complete_op();
- }
-};
-
-void Bencher::start_op() {
- Mutex::Locker l(lock);
- while (open_ops >= max_in_flight)
- open_ops_cond.Wait(lock);
- ++open_ops;
-}
-
-void Bencher::drain_ops() {
- Mutex::Locker l(lock);
- while (open_ops)
- open_ops_cond.Wait(lock);
-}
-
-void Bencher::complete_op() {
- Mutex::Locker l(lock);
- assert(open_ops > 0);
- --open_ops;
- open_ops_cond.Signal();
-}
-
-struct OnFinish {
- bool *done;
- Mutex *lock;
- Cond *cond;
- OnFinish(
- bool *done,
- Mutex *lock,
- Cond *cond) :
- done(done), lock(lock), cond(cond) {}
- ~OnFinish() {
- Mutex::Locker l(*lock);
- *done = true;
- cond->Signal();
- }
-};
-
-void Bencher::init(
- const set<std::string> &objects,
- uint64_t size,
- std::ostream *out
- )
-{
- bufferlist bl;
- for (uint64_t i = 0; i < size; ++i) {
- bl.append(0);
- }
- Mutex lock("init_lock");
- Cond cond;
- bool done = 0;
- {
- ceph::shared_ptr<OnFinish> on_finish(
- new OnFinish(&done, &lock, &cond));
- uint64_t num = 0;
- for (set<std::string>::const_iterator i = objects.begin();
- i != objects.end();
- ++i, ++num) {
- if (!(num % 20))
- *out << "Creating " << num << "/" << objects.size() << std::endl;
- backend->write(
- *i,
- 0,
- bl,
- new C_Holder<ceph::shared_ptr<OnFinish> >(on_finish),
- new C_Holder<ceph::shared_ptr<OnFinish> >(on_finish)
- );
- }
- }
- {
- Mutex::Locker l(lock);
- while (!done)
- cond.Wait(lock);
- }
-}
-
-void Bencher::run_bench()
-{
- time_t end = time(0) + max_duration;
- uint64_t ops = 0;
-
- bufferlist bl;
-
- while ((!max_duration || time(0) < end) && (!max_ops || ops < max_ops)) {
- start_op();
- uint64_t seq = stat_collector->next_seq();
- boost::tuple<std::string, uint64_t, uint64_t, OpType> next =
- (*op_dist)();
- string obj_name = next.get<0>();
- uint64_t offset = next.get<1>();
- uint64_t length = next.get<2>();
- OpType op_type = next.get<3>();
- switch (op_type) {
- case WRITE: {
- ceph::shared_ptr<OnDelete> on_delete(
- new OnDelete(new Cleanup(this)));
- stat_collector->start_write(seq, length);
- while (bl.length() < length) {
- bl.append(rand());
- }
- backend->write(
- obj_name,
- offset,
- bl,
- new OnWriteApplied(
- this, seq, on_delete),
- new OnWriteCommit(
- this, seq, on_delete)
- );
- break;
- }
- case READ: {
- stat_collector->start_read(seq, length);
- bufferlist *read_bl = new bufferlist;
- backend->read(
- obj_name,
- offset,
- length,
- read_bl,
- new OnReadComplete(
- this, seq, read_bl)
- );
- break;
- }
- default: {
- ceph_abort();
- }
- }
- ops++;
- }
- drain_ops();
-}