Fix some bugs when testing opensds ansible
[stor4nfv.git] / src / ceph / src / common / TrackedOp.cc
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
3 /*
4  * Ceph - scalable distributed file system
5  *
6  * This is free software; you can redistribute it and/or
7  * modify it under the terms of the GNU Lesser General Public
8  * License version 2.1, as published by the Free Software
9  * Foundation.  See file COPYING.
10  * Copyright 2013 Inktank
11  */
12
13 #include "TrackedOp.h"
14
15 #define dout_context cct
16 #define dout_subsys ceph_subsys_optracker
17 #undef dout_prefix
18 #define dout_prefix _prefix(_dout)
19
20 static ostream& _prefix(std::ostream* _dout)
21 {
22   return *_dout << "-- op tracker -- ";
23 }
24
25 void OpHistory::on_shutdown()
26 {
27   Mutex::Locker history_lock(ops_history_lock);
28   arrived.clear();
29   duration.clear();
30   slow_op.clear();
31   shutdown = true;
32 }
33
34 void OpHistory::insert(utime_t now, TrackedOpRef op)
35 {
36   Mutex::Locker history_lock(ops_history_lock);
37   if (shutdown)
38     return;
39   duration.insert(make_pair(op->get_duration(), op));
40   arrived.insert(make_pair(op->get_initiated(), op));
41   if (op->get_duration() >= history_slow_op_threshold)
42     slow_op.insert(make_pair(op->get_initiated(), op));
43   cleanup(now);
44 }
45
46 void OpHistory::cleanup(utime_t now)
47 {
48   while (arrived.size() &&
49          (now - arrived.begin()->first >
50           (double)(history_duration))) {
51     duration.erase(make_pair(
52         arrived.begin()->second->get_duration(),
53         arrived.begin()->second));
54     arrived.erase(arrived.begin());
55   }
56
57   while (duration.size() > history_size) {
58     arrived.erase(make_pair(
59         duration.begin()->second->get_initiated(),
60         duration.begin()->second));
61     duration.erase(duration.begin());
62   }
63
64   while (slow_op.size() > history_slow_op_size) {
65     slow_op.erase(make_pair(
66         slow_op.begin()->second->get_initiated(),
67         slow_op.begin()->second));
68   }
69 }
70
71 void OpHistory::dump_ops(utime_t now, Formatter *f, set<string> filters)
72 {
73   Mutex::Locker history_lock(ops_history_lock);
74   cleanup(now);
75   f->open_object_section("op_history");
76   f->dump_int("size", history_size);
77   f->dump_int("duration", history_duration);
78   {
79     f->open_array_section("ops");
80     for (set<pair<utime_t, TrackedOpRef> >::const_iterator i =
81            arrived.begin();
82          i != arrived.end();
83          ++i) {
84       if (!i->second->filter_out(filters))
85         continue;
86       f->open_object_section("op");
87       i->second->dump(now, f);
88       f->close_section();
89     }
90     f->close_section();
91   }
92   f->close_section();
93 }
94
95 void OpHistory::dump_ops_by_duration(utime_t now, Formatter *f, set<string> filters)
96 {
97   Mutex::Locker history_lock(ops_history_lock);
98   cleanup(now);
99   f->open_object_section("op_history");
100   f->dump_int("size", history_size);
101   f->dump_int("duration", history_duration);
102   {
103     f->open_array_section("ops");
104     if (arrived.size()) {
105       vector<pair<double, TrackedOpRef> > durationvec;
106       durationvec.reserve(arrived.size());
107
108       for (set<pair<utime_t, TrackedOpRef> >::const_iterator i =
109              arrived.begin();
110            i != arrived.end();
111            ++i) {
112         if (!i->second->filter_out(filters))
113           continue;
114         durationvec.push_back(pair<double, TrackedOpRef>(i->second->get_duration(), i->second));
115       }
116
117       sort(durationvec.begin(), durationvec.end());
118
119       for (auto i = durationvec.rbegin(); i != durationvec.rend(); ++i) {
120         f->open_object_section("op");
121         i->second->dump(now, f);
122         f->close_section();
123       }
124     }
125     f->close_section();
126   }
127   f->close_section();
128 }
129
130 struct ShardedTrackingData {
131   Mutex ops_in_flight_lock_sharded;
132   TrackedOp::tracked_op_list_t ops_in_flight_sharded;
133   explicit ShardedTrackingData(string lock_name):
134       ops_in_flight_lock_sharded(lock_name.c_str()) {}
135 };
136
137 OpTracker::OpTracker(CephContext *cct_, bool tracking, uint32_t num_shards):
138   seq(0),
139   num_optracker_shards(num_shards),
140   complaint_time(0), log_threshold(0),
141   tracking_enabled(tracking),
142   lock("OpTracker::lock"), cct(cct_) {
143     for (uint32_t i = 0; i < num_optracker_shards; i++) {
144       char lock_name[32] = {0};
145       snprintf(lock_name, sizeof(lock_name), "%s:%d", "OpTracker::ShardedLock", i);
146       ShardedTrackingData* one_shard = new ShardedTrackingData(lock_name);
147       sharded_in_flight_list.push_back(one_shard);
148     }
149 }
150
151 OpTracker::~OpTracker() {
152   while (!sharded_in_flight_list.empty()) {
153     assert((sharded_in_flight_list.back())->ops_in_flight_sharded.empty());
154     delete sharded_in_flight_list.back();
155     sharded_in_flight_list.pop_back();
156   }
157 }
158
159 bool OpTracker::dump_historic_ops(Formatter *f, bool by_duration, set<string> filters)
160 {
161   RWLock::RLocker l(lock);
162   if (!tracking_enabled)
163     return false;
164
165   utime_t now = ceph_clock_now();
166   if (by_duration) {
167     history.dump_ops_by_duration(now, f, filters);
168   } else {
169     history.dump_ops(now, f, filters);
170   }
171   return true;
172 }
173
174 void OpHistory::dump_slow_ops(utime_t now, Formatter *f, set<string> filters)
175 {
176   Mutex::Locker history_lock(ops_history_lock);
177   cleanup(now);
178   f->open_object_section("OpHistory slow ops");
179   f->dump_int("num to keep", history_slow_op_size);
180   f->dump_int("threshold to keep", history_slow_op_threshold);
181   {
182     f->open_array_section("Ops");
183     for (set<pair<utime_t, TrackedOpRef> >::const_iterator i =
184            slow_op.begin();
185          i != slow_op.end();
186          ++i) {
187       if (!i->second->filter_out(filters))
188         continue;
189       f->open_object_section("Op");
190       i->second->dump(now, f);
191       f->close_section();
192     }
193     f->close_section();
194   }
195   f->close_section();
196 }
197
198 bool OpTracker::dump_historic_slow_ops(Formatter *f, set<string> filters)
199 {
200   RWLock::RLocker l(lock);
201   if (!tracking_enabled)
202     return false;
203
204   utime_t now = ceph_clock_now();
205   history.dump_slow_ops(now, f, filters);
206   return true;
207 }
208
209 bool OpTracker::dump_ops_in_flight(Formatter *f, bool print_only_blocked, set<string> filters)
210 {
211   RWLock::RLocker l(lock);
212   if (!tracking_enabled)
213     return false;
214
215   f->open_object_section("ops_in_flight"); // overall dump
216   uint64_t total_ops_in_flight = 0;
217   f->open_array_section("ops"); // list of TrackedOps
218   utime_t now = ceph_clock_now();
219   for (uint32_t i = 0; i < num_optracker_shards; i++) {
220     ShardedTrackingData* sdata = sharded_in_flight_list[i];
221     assert(NULL != sdata); 
222     Mutex::Locker locker(sdata->ops_in_flight_lock_sharded);
223     for (auto& op : sdata->ops_in_flight_sharded) {
224       if (print_only_blocked && (now - op.get_initiated() <= complaint_time))
225         break;
226       if (!op.filter_out(filters))
227         continue;
228       f->open_object_section("op");
229       op.dump(now, f);
230       f->close_section(); // this TrackedOp
231       total_ops_in_flight++;
232     }
233   }
234   f->close_section(); // list of TrackedOps
235   if (print_only_blocked) {
236     f->dump_float("complaint_time", complaint_time);
237     f->dump_int("num_blocked_ops", total_ops_in_flight);
238   } else
239     f->dump_int("num_ops", total_ops_in_flight);
240   f->close_section(); // overall dump
241   return true;
242 }
243
244 bool OpTracker::register_inflight_op(TrackedOp *i)
245 {
246   RWLock::RLocker l(lock);
247   if (!tracking_enabled)
248     return false;
249
250   uint64_t current_seq = ++seq;
251   uint32_t shard_index = current_seq % num_optracker_shards;
252   ShardedTrackingData* sdata = sharded_in_flight_list[shard_index];
253   assert(NULL != sdata);
254   {
255     Mutex::Locker locker(sdata->ops_in_flight_lock_sharded);
256     sdata->ops_in_flight_sharded.push_back(*i);
257     i->seq = current_seq;
258   }
259   return true;
260 }
261
262 void OpTracker::unregister_inflight_op(TrackedOp *i)
263 {
264   // caller checks;
265   assert(i->state);
266
267   uint32_t shard_index = i->seq % num_optracker_shards;
268   ShardedTrackingData* sdata = sharded_in_flight_list[shard_index];
269   assert(NULL != sdata);
270   {
271     Mutex::Locker locker(sdata->ops_in_flight_lock_sharded);
272     auto p = sdata->ops_in_flight_sharded.iterator_to(*i);
273     sdata->ops_in_flight_sharded.erase(p);
274   }
275   i->_unregistered();
276
277   RWLock::RLocker l(lock);
278   if (!tracking_enabled)
279     delete i;
280   else {
281     i->state = TrackedOp::STATE_HISTORY;
282     utime_t now = ceph_clock_now();
283     history.insert(now, TrackedOpRef(i));
284   }
285 }
286
287 bool OpTracker::check_ops_in_flight(std::vector<string> &warning_vector, int *slow)
288 {
289   RWLock::RLocker l(lock);
290   if (!tracking_enabled)
291     return false;
292
293   utime_t now = ceph_clock_now();
294   utime_t too_old = now;
295   too_old -= complaint_time;
296   utime_t oldest_op = now;
297   uint64_t total_ops_in_flight = 0;
298
299   for (uint32_t i = 0; i < num_optracker_shards; i++) {
300     ShardedTrackingData* sdata = sharded_in_flight_list[i];
301     assert(NULL != sdata);
302     Mutex::Locker locker(sdata->ops_in_flight_lock_sharded);
303     if (!sdata->ops_in_flight_sharded.empty()) {
304       utime_t oldest_op_tmp =
305         sdata->ops_in_flight_sharded.front().get_initiated();
306       if (oldest_op_tmp < oldest_op) {
307         oldest_op = oldest_op_tmp;
308       }
309     }
310     total_ops_in_flight += sdata->ops_in_flight_sharded.size();
311   }
312
313   if (0 == total_ops_in_flight)
314     return false;
315
316   utime_t oldest_secs = now - oldest_op;
317
318   dout(10) << "ops_in_flight.size: " << total_ops_in_flight
319            << "; oldest is " << oldest_secs
320            << " seconds old" << dendl;
321
322   if (oldest_secs < complaint_time)
323     return false;
324
325   warning_vector.reserve(log_threshold + 1);
326   //store summary message
327   warning_vector.push_back("");
328
329   int _slow = 0;    // total slow
330   if (!slow)
331     slow = &_slow; 
332   else
333     *slow = _slow;  // start from 0 anyway
334   int warned = 0;   // total logged
335   for (uint32_t iter = 0; iter < num_optracker_shards; iter++) {
336     ShardedTrackingData* sdata = sharded_in_flight_list[iter];
337     assert(NULL != sdata);
338     Mutex::Locker locker(sdata->ops_in_flight_lock_sharded);
339     if (sdata->ops_in_flight_sharded.empty())
340       continue;
341     auto i = sdata->ops_in_flight_sharded.begin();
342     while (i != sdata->ops_in_flight_sharded.end() &&
343            i->get_initiated() < too_old) {
344       (*slow)++;
345
346       // exponential backoff of warning intervals
347       if (warned < log_threshold &&
348           (i->get_initiated() + (complaint_time *
349                                  i->warn_interval_multiplier)) < now) {
350         // will warn, increase counter
351         warned++;
352
353         utime_t age = now - i->get_initiated();
354         stringstream ss;
355         ss << "slow request " << age << " seconds old, received at "
356            << i->get_initiated() << ": " << i->get_desc()
357            << " currently "
358            << (i->current ? i->current : i->state_string());
359         warning_vector.push_back(ss.str());
360
361         // only those that have been shown will backoff
362         i->warn_interval_multiplier *= 2;
363       }
364       ++i;
365     }
366   }
367
368   // only summarize if we warn about any.  if everything has backed
369   // off, we will stay silent.
370   if (warned > 0) {
371     stringstream ss;
372     ss << *slow << " slow requests, " << warned << " included below; oldest blocked for > "
373        << oldest_secs << " secs";
374     warning_vector[0] = ss.str();
375   }
376
377   return warned > 0;
378 }
379
380 void OpTracker::get_age_ms_histogram(pow2_hist_t *h)
381 {
382   h->clear();
383   utime_t now = ceph_clock_now();
384
385   for (uint32_t iter = 0; iter < num_optracker_shards; iter++) {
386     ShardedTrackingData* sdata = sharded_in_flight_list[iter];
387     assert(NULL != sdata);
388     Mutex::Locker locker(sdata->ops_in_flight_lock_sharded);
389
390     for (auto& i : sdata->ops_in_flight_sharded) {
391       utime_t age = now - i.get_initiated();
392       uint32_t ms = (long)(age * 1000.0);
393       h->add(ms);
394     }
395   }
396 }
397
398
399 #undef dout_context
400 #define dout_context tracker->cct
401
402 void TrackedOp::mark_event_string(const string &event, utime_t stamp)
403 {
404   if (!state)
405     return;
406
407   {
408     Mutex::Locker l(lock);
409     events.push_back(Event(stamp, event));
410     current = events.back().c_str();
411   }
412   dout(6) << " seq: " << seq
413           << ", time: " << stamp
414           << ", event: " << event
415           << ", op: " << get_desc()
416           << dendl;
417   _event_marked();
418 }
419
420 void TrackedOp::mark_event(const char *event, utime_t stamp)
421 {
422   if (!state)
423     return;
424
425   {
426     Mutex::Locker l(lock);
427     events.push_back(Event(stamp, event));
428     current = event;
429   }
430   dout(6) << " seq: " << seq
431           << ", time: " << stamp
432           << ", event: " << event
433           << ", op: " << get_desc()
434           << dendl;
435   _event_marked();
436 }
437
438 void TrackedOp::dump(utime_t now, Formatter *f) const
439 {
440   // Ignore if still in the constructor
441   if (!state)
442     return;
443   f->dump_string("description", get_desc());
444   f->dump_stream("initiated_at") << get_initiated();
445   f->dump_float("age", now - get_initiated());
446   f->dump_float("duration", get_duration());
447   {
448     f->open_object_section("type_data");
449     _dump(f);
450     f->close_section();
451   }
452 }