// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- #include "detailed_stat_collector.h" #include #include #include void DetailedStatCollector::Op::dump( ostream *out, Formatter *f) { if (!out) return; f->open_object_section(type.c_str()); f->dump_string("type", type); f->dump_float("start", start); f->dump_float("latency", latency); f->dump_int("size", size); f->dump_int("seq", seq); f->close_section(); f->flush(*out); *out << std::endl; } static utime_t cur_time() { struct timeval tv; gettimeofday(&tv, 0); return utime_t(&tv); } DetailedStatCollector::Aggregator::Aggregator() : recent_size(0), total_size(0), recent_latency(0), total_latency(0), recent_ops(0), total_ops(0), started(false) {} void DetailedStatCollector::Aggregator::add(const Op &op) { if (!started) { last = first = op.start; started = true; } ++recent_ops; ++total_ops; recent_size += op.size; total_size += op.size; recent_latency += op.latency; total_latency += op.latency; } void DetailedStatCollector::Aggregator::dump(Formatter *f) { utime_t now = cur_time(); f->dump_stream("time") << now; f->dump_float("avg_recent_latency", recent_latency / recent_ops); f->dump_float("avg_total_latency", total_latency / total_ops); f->dump_float("avg_recent_iops", recent_ops / (now - last)); f->dump_float("avg_total_iops", total_ops / (now - first)); f->dump_float("avg_recent_throughput", recent_size / (now - last)); f->dump_float("avg_total_throughput", total_size / (now - first)); f->dump_float("avg_recent_throughput_mb", (recent_size / (now - last)) / (1024*1024)); f->dump_float("avg_total_throughput_mb", (total_size / (now - first)) / (1024*1024)); f->dump_float("duration", now - last); last = now; recent_latency = 0; recent_size = 0; recent_ops = 0; } DetailedStatCollector::DetailedStatCollector( double bin_size, Formatter *formatter, ostream *out, ostream *summary_out, AdditionalPrinting *details ) : bin_size(bin_size), f(formatter), out(out), summary_out(summary_out), details(details), lock("Stat::lock"), cur_seq(0) { last_dump = cur_time(); } uint64_t DetailedStatCollector::next_seq() { Mutex::Locker l(lock); if (summary_out && ((cur_time() - last_dump) > bin_size)) { f->open_object_section("stats"); for (map::iterator i = aggregators.begin(); i != aggregators.end(); ++i) { f->open_object_section(i->first.c_str()); i->second.dump(f.get()); f->close_section(); } f->close_section(); f->flush(*summary_out); *summary_out << std::endl; if (details) { (*details)(summary_out); *summary_out << std::endl; } last_dump = cur_time(); } return cur_seq++; } void DetailedStatCollector::start_write(uint64_t seq, uint64_t length) { Mutex::Locker l(lock); utime_t now(cur_time()); not_committed.insert(make_pair(seq, make_pair(length, now))); not_applied.insert(make_pair(seq, make_pair(length, now))); } void DetailedStatCollector::start_read(uint64_t seq, uint64_t length) { Mutex::Locker l(lock); utime_t now(cur_time()); not_read.insert(make_pair(seq, make_pair(length, now))); } void DetailedStatCollector::write_applied(uint64_t seq) { Mutex::Locker l(lock); Op op( "write_applied", not_applied[seq].second, cur_time() - not_applied[seq].second, not_applied[seq].first, seq); op.dump(out, f.get()); aggregators["write_applied"].add(op); not_applied.erase(seq); } void DetailedStatCollector::write_committed(uint64_t seq) { Mutex::Locker l(lock); Op op( "write_committed", not_committed[seq].second, cur_time() - not_committed[seq].second, not_committed[seq].first, seq); op.dump(out, f.get()); aggregators["write_committed"].add(op); not_committed.erase(seq); } void DetailedStatCollector::read_complete(uint64_t seq) { Mutex::Locker l(lock); Op op( "read", not_read[seq].second, cur_time() - not_read[seq].second, not_read[seq].first, seq); op.dump(out, f.get()); aggregators["read"].add(op); not_read.erase(seq); }