1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
4 * Ceph - scalable distributed file system
6 * Copyright (C) 2012 New Dream Network/Sage Weil <sage@newdream.net>
8 * This is free software; you can redistribute it and/or
9 * modify it under the terms of the GNU Lesser General Public
10 * License version 2.1, as published by the Free Software
11 * Foundation. See file COPYING.
14 #ifndef TRACKEDREQUEST_H_
15 #define TRACKEDREQUEST_H_
18 #include "common/histogram.h"
19 #include "msg/Message.h"
20 #include "common/RWLock.h"
22 #define OPTRACKER_PREALLOC_EVENTS 20
25 typedef boost::intrusive_ptr<TrackedOp> TrackedOpRef;
28 set<pair<utime_t, TrackedOpRef> > arrived;
29 set<pair<double, TrackedOpRef> > duration;
30 set<pair<utime_t, TrackedOpRef> > slow_op;
31 Mutex ops_history_lock;
32 void cleanup(utime_t now);
34 uint32_t history_size;
35 uint32_t history_duration;
36 uint32_t history_slow_op_size;
37 uint32_t history_slow_op_threshold;
40 OpHistory() : ops_history_lock("OpHistory::Lock"), shutdown(false),
41 history_size(0), history_duration(0),
42 history_slow_op_size(0), history_slow_op_threshold(0) {}
44 assert(arrived.empty());
45 assert(duration.empty());
46 assert(slow_op.empty());
48 void insert(utime_t now, TrackedOpRef op);
49 void dump_ops(utime_t now, Formatter *f, set<string> filters = {""});
50 void dump_ops_by_duration(utime_t now, Formatter *f, set<string> filters = {""});
51 void dump_slow_ops(utime_t now, Formatter *f, set<string> filters = {""});
53 void set_size_and_duration(uint32_t new_size, uint32_t new_duration) {
54 history_size = new_size;
55 history_duration = new_duration;
57 void set_slow_op_size_and_threshold(uint32_t new_size, uint32_t new_threshold) {
58 history_slow_op_size = new_size;
59 history_slow_op_threshold = new_threshold;
63 struct ShardedTrackingData;
65 friend class OpHistory;
66 std::atomic<int64_t> seq = { 0 };
67 vector<ShardedTrackingData*> sharded_in_flight_list;
68 uint32_t num_optracker_shards;
72 bool tracking_enabled;
77 OpTracker(CephContext *cct_, bool tracking, uint32_t num_shards);
79 void set_complaint_and_threshold(float time, int threshold) {
80 complaint_time = time;
81 log_threshold = threshold;
83 void set_history_size_and_duration(uint32_t new_size, uint32_t new_duration) {
84 history.set_size_and_duration(new_size, new_duration);
86 void set_history_slow_op_size_and_threshold(uint32_t new_size, uint32_t new_threshold) {
87 history.set_slow_op_size_and_threshold(new_size, new_threshold);
89 void set_tracking(bool enable) {
90 RWLock::WLocker l(lock);
91 tracking_enabled = enable;
93 bool dump_ops_in_flight(Formatter *f, bool print_only_blocked = false, set<string> filters = {""});
94 bool dump_historic_ops(Formatter *f, bool by_duration = false, set<string> filters = {""});
95 bool dump_historic_slow_ops(Formatter *f, set<string> filters = {""});
96 bool register_inflight_op(TrackedOp *i);
97 void unregister_inflight_op(TrackedOp *i);
99 void get_age_ms_histogram(pow2_hist_t *h);
102 * Look for Ops which are too old, and insert warning
103 * strings for each Op that is too old.
105 * @param warning_strings A vector<string> reference which is filled
106 * with a warning string for each old Op.
107 * @return True if there are any Ops to warn on, false otherwise.
109 bool check_ops_in_flight(std::vector<string> &warning_strings,
113 history.on_shutdown();
117 template <typename T, typename U>
118 typename T::Ref create_request(U params)
120 typename T::Ref retval(new T(params, this));
121 retval->tracking_start();
127 class TrackedOp : public boost::intrusive::list_base_hook<> {
129 friend class OpHistory;
130 friend class OpTracker;
132 boost::intrusive::list_member_hook<> tracker_item;
135 typedef boost::intrusive::list<
137 boost::intrusive::member_hook<
139 boost::intrusive::list_member_hook<>,
140 &TrackedOp::tracker_item> > tracked_op_list_t;
142 // for use when clearing lists. e.g.,
143 // ls.clear_and_dispose(TrackedOp::Putter());
145 void operator()(TrackedOp *op) {
151 OpTracker *tracker; ///< the tracker we are associated with
152 std::atomic_int nref = {0}; ///< ref count
154 utime_t initiated_at;
159 const char *cstr = nullptr;
161 Event(utime_t t, const string& s) : stamp(t), str(s) {}
162 Event(utime_t t, const char *s) : stamp(t), cstr(s) {}
164 int compare(const char *s) const {
166 return strcmp(cstr, s);
168 return str.compare(s);
171 const char *c_str() const {
178 void dump(Formatter *f) const {
179 f->dump_stream("time") << stamp;
180 f->dump_string("event", c_str());
184 vector<Event> events; ///< list of events and their times
185 mutable Mutex lock = {"TrackedOp::lock"}; ///< to protect the events list
186 const char *current = 0; ///< the current state the event is in
187 uint64_t seq = 0; ///< a unique value set by the OpTracker
189 uint32_t warn_interval_multiplier = 1; //< limits output of a given op warning
196 atomic<int> state = {STATE_UNTRACKED};
198 mutable string desc_str; ///< protected by lock
199 mutable const char *desc = nullptr; ///< readable without lock
200 mutable atomic<bool> want_new_desc = {false};
202 TrackedOp(OpTracker *_tracker, const utime_t& initiated) :
204 initiated_at(initiated)
206 events.reserve(OPTRACKER_PREALLOC_EVENTS);
209 /// output any type-specific data you want to get when dump() is called
210 virtual void _dump(Formatter *f) const {}
211 /// if you want something else to happen when events are marked, implement
212 virtual void _event_marked() {}
213 /// return a unique descriptor of the Op; eg the message it's attached to
214 virtual void _dump_op_descriptor_unlocked(ostream& stream) const = 0;
215 /// called when the last non-OpTracker reference is dropped
216 virtual void _unregistered() {}
218 virtual bool filter_out(const set<string>& filters) { return true; }
221 ZTracer::Trace osd_trace;
222 ZTracer::Trace pg_trace;
223 ZTracer::Trace store_trace;
224 ZTracer::Trace journal_trace;
226 virtual ~TrackedOp() {}
233 switch (state.load()) {
234 case STATE_UNTRACKED:
241 tracker->unregister_inflight_op(this);
254 const char *get_desc() const {
255 if (!desc || want_new_desc.load()) {
256 Mutex::Locker l(lock);
262 void _gen_desc() const {
264 _dump_op_descriptor_unlocked(ss);
266 desc = desc_str.c_str();
267 want_new_desc = false;
271 want_new_desc = true;
274 const utime_t& get_initiated() const {
278 double get_duration() const {
279 Mutex::Locker l(lock);
280 if (!events.empty() && events.rbegin()->compare("done") == 0)
281 return events.rbegin()->stamp - get_initiated();
283 return ceph_clock_now() - get_initiated();
286 void mark_event_string(const string &event,
287 utime_t stamp=ceph_clock_now());
288 void mark_event(const char *event,
289 utime_t stamp=ceph_clock_now());
291 virtual const char *state_string() const {
292 Mutex::Locker l(lock);
293 return events.rbegin()->c_str();
296 void dump(utime_t now, Formatter *f) const;
298 void tracking_start() {
299 if (tracker->register_inflight_op(this)) {
300 events.push_back(Event(initiated_at, "initiated"));
305 // ref counting via intrusive_ptr, with special behavior on final
306 // put for historical op tracking
307 friend void intrusive_ptr_add_ref(TrackedOp *o) {
310 friend void intrusive_ptr_release(TrackedOp *o) {