1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
3 #include "detailed_stat_collector.h"
6 #include <boost/tuple/tuple.hpp>
8 void DetailedStatCollector::Op::dump(
14 f->open_object_section(type.c_str());
15 f->dump_string("type", type);
16 f->dump_float("start", start);
17 f->dump_float("latency", latency);
18 f->dump_int("size", size);
19 f->dump_int("seq", seq);
25 static utime_t cur_time()
32 DetailedStatCollector::Aggregator::Aggregator()
33 : recent_size(0), total_size(0), recent_latency(0),
34 total_latency(0), recent_ops(0), total_ops(0), started(false)
37 void DetailedStatCollector::Aggregator::add(const Op &op)
40 last = first = op.start;
45 recent_size += op.size;
46 total_size += op.size;
47 recent_latency += op.latency;
48 total_latency += op.latency;
51 void DetailedStatCollector::Aggregator::dump(Formatter *f)
53 utime_t now = cur_time();
54 f->dump_stream("time") << now;
55 f->dump_float("avg_recent_latency", recent_latency / recent_ops);
56 f->dump_float("avg_total_latency", total_latency / total_ops);
57 f->dump_float("avg_recent_iops", recent_ops / (now - last));
58 f->dump_float("avg_total_iops", total_ops / (now - first));
59 f->dump_float("avg_recent_throughput", recent_size / (now - last));
60 f->dump_float("avg_total_throughput", total_size / (now - first));
61 f->dump_float("avg_recent_throughput_mb",
62 (recent_size / (now - last)) / (1024*1024));
63 f->dump_float("avg_total_throughput_mb",
64 (total_size / (now - first)) / (1024*1024));
65 f->dump_float("duration", now - last);
72 DetailedStatCollector::DetailedStatCollector(
77 AdditionalPrinting *details
78 ) : bin_size(bin_size), f(formatter), out(out),
79 summary_out(summary_out), details(details),
80 lock("Stat::lock"), cur_seq(0) {
81 last_dump = cur_time();
84 uint64_t DetailedStatCollector::next_seq()
86 Mutex::Locker l(lock);
87 if (summary_out && ((cur_time() - last_dump) > bin_size)) {
88 f->open_object_section("stats");
89 for (map<string, Aggregator>::iterator i = aggregators.begin();
90 i != aggregators.end();
92 f->open_object_section(i->first.c_str());
93 i->second.dump(f.get());
97 f->flush(*summary_out);
98 *summary_out << std::endl;
100 (*details)(summary_out);
101 *summary_out << std::endl;
103 last_dump = cur_time();
108 void DetailedStatCollector::start_write(uint64_t seq, uint64_t length)
110 Mutex::Locker l(lock);
111 utime_t now(cur_time());
112 not_committed.insert(make_pair(seq, make_pair(length, now)));
113 not_applied.insert(make_pair(seq, make_pair(length, now)));
116 void DetailedStatCollector::start_read(uint64_t seq, uint64_t length)
118 Mutex::Locker l(lock);
119 utime_t now(cur_time());
120 not_read.insert(make_pair(seq, make_pair(length, now)));
123 void DetailedStatCollector::write_applied(uint64_t seq)
125 Mutex::Locker l(lock);
128 not_applied[seq].second,
129 cur_time() - not_applied[seq].second,
130 not_applied[seq].first,
132 op.dump(out, f.get());
133 aggregators["write_applied"].add(op);
134 not_applied.erase(seq);
137 void DetailedStatCollector::write_committed(uint64_t seq)
139 Mutex::Locker l(lock);
142 not_committed[seq].second,
143 cur_time() - not_committed[seq].second,
144 not_committed[seq].first,
146 op.dump(out, f.get());
147 aggregators["write_committed"].add(op);
148 not_committed.erase(seq);
151 void DetailedStatCollector::read_complete(uint64_t seq)
153 Mutex::Locker l(lock);
156 not_read[seq].second,
157 cur_time() - not_read[seq].second,
160 op.dump(out, f.get());
161 aggregators["read"].add(op);