Fix some bugs when testing opensds ansible
[stor4nfv.git] / src / ceph / src / common / TrackedOp.h
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
3 /*
4  * Ceph - scalable distributed file system
5  *
6  * Copyright (C) 2012 New Dream Network/Sage Weil <sage@newdream.net>
7  *
8  * This is free software; you can redistribute it and/or
9  * modify it under the terms of the GNU Lesser General Public
10  * License version 2.1, as published by the Free Software
11  * Foundation.  See file COPYING.
12  */
13
14 #ifndef TRACKEDREQUEST_H_
15 #define TRACKEDREQUEST_H_
16
17 #include <atomic>
18 #include "common/histogram.h"
19 #include "msg/Message.h"
20 #include "common/RWLock.h"
21
22 #define OPTRACKER_PREALLOC_EVENTS 20
23
24 class TrackedOp;
25 typedef boost::intrusive_ptr<TrackedOp> TrackedOpRef;
26
27 class OpHistory {
28   set<pair<utime_t, TrackedOpRef> > arrived;
29   set<pair<double, TrackedOpRef> > duration;
30   set<pair<utime_t, TrackedOpRef> > slow_op;
31   Mutex ops_history_lock;
32   void cleanup(utime_t now);
33   bool shutdown;
34   uint32_t history_size;
35   uint32_t history_duration;
36   uint32_t history_slow_op_size;
37   uint32_t history_slow_op_threshold;
38
39 public:
40   OpHistory() : ops_history_lock("OpHistory::Lock"), shutdown(false),
41     history_size(0), history_duration(0),
42     history_slow_op_size(0), history_slow_op_threshold(0) {}
43   ~OpHistory() {
44     assert(arrived.empty());
45     assert(duration.empty());
46     assert(slow_op.empty());
47   }
48   void insert(utime_t now, TrackedOpRef op);
49   void dump_ops(utime_t now, Formatter *f, set<string> filters = {""});
50   void dump_ops_by_duration(utime_t now, Formatter *f, set<string> filters = {""});
51   void dump_slow_ops(utime_t now, Formatter *f, set<string> filters = {""});
52   void on_shutdown();
53   void set_size_and_duration(uint32_t new_size, uint32_t new_duration) {
54     history_size = new_size;
55     history_duration = new_duration;
56   }
57   void set_slow_op_size_and_threshold(uint32_t new_size, uint32_t new_threshold) {
58     history_slow_op_size = new_size;
59     history_slow_op_threshold = new_threshold;
60   }
61 };
62
63 struct ShardedTrackingData;
64 class OpTracker {
65   friend class OpHistory;
66   std::atomic<int64_t> seq = { 0 };
67   vector<ShardedTrackingData*> sharded_in_flight_list;
68   uint32_t num_optracker_shards;
69   OpHistory history;
70   float complaint_time;
71   int log_threshold;
72   bool tracking_enabled;
73   RWLock       lock;
74
75 public:
76   CephContext *cct;
77   OpTracker(CephContext *cct_, bool tracking, uint32_t num_shards);
78       
79   void set_complaint_and_threshold(float time, int threshold) {
80     complaint_time = time;
81     log_threshold = threshold;
82   }
83   void set_history_size_and_duration(uint32_t new_size, uint32_t new_duration) {
84     history.set_size_and_duration(new_size, new_duration);
85   }
86   void set_history_slow_op_size_and_threshold(uint32_t new_size, uint32_t new_threshold) {
87     history.set_slow_op_size_and_threshold(new_size, new_threshold);
88   }
89   void set_tracking(bool enable) {
90     RWLock::WLocker l(lock);
91     tracking_enabled = enable;
92   }
93   bool dump_ops_in_flight(Formatter *f, bool print_only_blocked = false, set<string> filters = {""});
94   bool dump_historic_ops(Formatter *f, bool by_duration = false, set<string> filters = {""});
95   bool dump_historic_slow_ops(Formatter *f, set<string> filters = {""});
96   bool register_inflight_op(TrackedOp *i);
97   void unregister_inflight_op(TrackedOp *i);
98
99   void get_age_ms_histogram(pow2_hist_t *h);
100
101   /**
102    * Look for Ops which are too old, and insert warning
103    * strings for each Op that is too old.
104    *
105    * @param warning_strings A vector<string> reference which is filled
106    * with a warning string for each old Op.
107    * @return True if there are any Ops to warn on, false otherwise.
108    */
109   bool check_ops_in_flight(std::vector<string> &warning_strings,
110                            int *slow = NULL);
111
112   void on_shutdown() {
113     history.on_shutdown();
114   }
115   ~OpTracker();
116
117   template <typename T, typename U>
118   typename T::Ref create_request(U params)
119   {
120     typename T::Ref retval(new T(params, this));
121     retval->tracking_start();
122     return retval;
123   }
124 };
125
126
127 class TrackedOp : public boost::intrusive::list_base_hook<> {
128 private:
129   friend class OpHistory;
130   friend class OpTracker;
131
132   boost::intrusive::list_member_hook<> tracker_item;
133
134 public:
135   typedef boost::intrusive::list<
136   TrackedOp,
137   boost::intrusive::member_hook<
138     TrackedOp,
139     boost::intrusive::list_member_hook<>,
140     &TrackedOp::tracker_item> > tracked_op_list_t;
141
142   // for use when clearing lists.  e.g.,
143   //   ls.clear_and_dispose(TrackedOp::Putter());
144   struct Putter {
145     void operator()(TrackedOp *op) {
146       op->put();
147     }
148   };
149
150 protected:
151   OpTracker *tracker;          ///< the tracker we are associated with
152   std::atomic_int nref = {0};  ///< ref count
153
154   utime_t initiated_at;
155
156   struct Event {
157     utime_t stamp;
158     string str;
159     const char *cstr = nullptr;
160
161     Event(utime_t t, const string& s) : stamp(t), str(s) {}
162     Event(utime_t t, const char *s) : stamp(t), cstr(s) {}
163
164     int compare(const char *s) const {
165       if (cstr)
166         return strcmp(cstr, s);
167       else
168         return str.compare(s);
169     }
170
171     const char *c_str() const {
172       if (cstr)
173         return cstr;
174       else
175         return str.c_str();
176     }
177
178     void dump(Formatter *f) const {
179       f->dump_stream("time") << stamp;
180       f->dump_string("event", c_str());
181     }
182   };
183
184   vector<Event> events;    ///< list of events and their times
185   mutable Mutex lock = {"TrackedOp::lock"}; ///< to protect the events list
186   const char *current = 0; ///< the current state the event is in
187   uint64_t seq = 0;        ///< a unique value set by the OpTracker
188
189   uint32_t warn_interval_multiplier = 1; //< limits output of a given op warning
190
191   enum {
192     STATE_UNTRACKED = 0,
193     STATE_LIVE,
194     STATE_HISTORY
195   };
196   atomic<int> state = {STATE_UNTRACKED};
197
198   mutable string desc_str;   ///< protected by lock
199   mutable const char *desc = nullptr;  ///< readable without lock
200   mutable atomic<bool> want_new_desc = {false};
201
202   TrackedOp(OpTracker *_tracker, const utime_t& initiated) :
203     tracker(_tracker),
204     initiated_at(initiated)
205   {
206     events.reserve(OPTRACKER_PREALLOC_EVENTS);
207   }
208
209   /// output any type-specific data you want to get when dump() is called
210   virtual void _dump(Formatter *f) const {}
211   /// if you want something else to happen when events are marked, implement
212   virtual void _event_marked() {}
213   /// return a unique descriptor of the Op; eg the message it's attached to
214   virtual void _dump_op_descriptor_unlocked(ostream& stream) const = 0;
215   /// called when the last non-OpTracker reference is dropped
216   virtual void _unregistered() {}
217
218   virtual bool filter_out(const set<string>& filters) { return true; }
219
220 public:
221   ZTracer::Trace osd_trace;
222   ZTracer::Trace pg_trace;
223   ZTracer::Trace store_trace;
224   ZTracer::Trace journal_trace;
225
226   virtual ~TrackedOp() {}
227
228   void get() {
229     ++nref;
230   }
231   void put() {
232     if (--nref == 0) {
233       switch (state.load()) {
234       case STATE_UNTRACKED:
235         _unregistered();
236         delete this;
237         break;
238
239       case STATE_LIVE:
240         mark_event("done");
241         tracker->unregister_inflight_op(this);
242         break;
243
244       case STATE_HISTORY:
245         delete this;
246         break;
247
248       default:
249         ceph_abort();
250       }
251     }
252   }
253
254   const char *get_desc() const {
255     if (!desc || want_new_desc.load()) {
256       Mutex::Locker l(lock);
257       _gen_desc();
258     }
259     return desc;
260   }
261 private:
262   void _gen_desc() const {
263     ostringstream ss;
264     _dump_op_descriptor_unlocked(ss);
265     desc_str = ss.str();
266     desc = desc_str.c_str();
267     want_new_desc = false;
268   }
269 public:
270   void reset_desc() {
271     want_new_desc = true;
272   }
273
274   const utime_t& get_initiated() const {
275     return initiated_at;
276   }
277
278   double get_duration() const {
279     Mutex::Locker l(lock);
280     if (!events.empty() && events.rbegin()->compare("done") == 0)
281       return events.rbegin()->stamp - get_initiated();
282     else
283       return ceph_clock_now() - get_initiated();
284   }
285
286   void mark_event_string(const string &event,
287                          utime_t stamp=ceph_clock_now());
288   void mark_event(const char *event,
289                   utime_t stamp=ceph_clock_now());
290
291   virtual const char *state_string() const {
292     Mutex::Locker l(lock);
293     return events.rbegin()->c_str();
294   }
295
296   void dump(utime_t now, Formatter *f) const;
297
298   void tracking_start() {
299     if (tracker->register_inflight_op(this)) {
300       events.push_back(Event(initiated_at, "initiated"));
301       state = STATE_LIVE;
302     }
303   }
304
305   // ref counting via intrusive_ptr, with special behavior on final
306   // put for historical op tracking
307   friend void intrusive_ptr_add_ref(TrackedOp *o) {
308     o->get();
309   }
310   friend void intrusive_ptr_release(TrackedOp *o) {
311     o->put();
312   }
313 };
314
315
316 #endif