Fix some bugs when testing opensds ansible
[stor4nfv.git] / src / ceph / src / common / ceph_timer.h
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
3 /*
4  * Ceph - scalable distributed file system
5  *
6  * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
7  *
8  * This is free software; you can redistribute it and/or
9  * modify it under the terms of the GNU Lesser General Public
10  * License version 2.1, as published by the Free Software
11  * Foundation.  See file COPYING.
12  *
13  */
14
15 #ifndef COMMON_CEPH_TIMER_H
16 #define COMMON_CEPH_TIMER_H
17
18 #include <condition_variable>
19 #include <thread>
20 #include <boost/intrusive/set.hpp>
21
22 namespace ceph {
23
24   /// Newly constructed timer should be suspended at point of
25   /// construction.
26
27   struct construct_suspended_t { };
28   constexpr construct_suspended_t construct_suspended { };
29
30   namespace timer_detail {
31     using boost::intrusive::member_hook;
32     using boost::intrusive::set_member_hook;
33     using boost::intrusive::link_mode;
34     using boost::intrusive::normal_link;
35     using boost::intrusive::set;
36     using boost::intrusive::constant_time_size;
37     using boost::intrusive::compare;
38
39     // Compared to the SafeTimer this does fewer allocations (you
40     // don't have to allocate a new Context every time you
41     // want to cue the next tick.)
42     //
43     // It also does not share a lock with the caller. If you call
44     // cancel event, it either cancels the event (and returns true) or
45     // you missed it. If this does not work for you, you can set up a
46     // flag and mutex of your own.
47     //
48     // You get to pick your clock. I like mono_clock, since I usually
49     // want to wait FOR a given duration. real_clock is worthwhile if
50     // you want to wait UNTIL a specific moment of wallclock time.  If
51     // you want you can set up a timer that executes a function after
52     // you use up ten seconds of CPU time.
53
54     template <class TC>
55     class timer {
56       using sh = set_member_hook<link_mode<normal_link> >;
57
58       struct event {
59         typename TC::time_point t;
60         uint64_t id;
61         std::function<void()> f;
62
63         sh schedule_link;
64         sh event_link;
65
66         event() : t(TC::time_point::min()), id(0) {}
67         event(uint64_t _id) : t(TC::time_point::min()), id(_id) {}
68         event(typename TC::time_point _t, uint64_t _id,
69               std::function<void()>&& _f) : t(_t), id(_id), f(_f) {}
70         event(typename TC::time_point _t, uint64_t _id,
71               const std::function<void()>& _f) : t(_t), id(_id), f(_f) {}
72         bool operator <(const event& e) {
73           return t == e.t ? id < e.id : t < e.t;
74         }
75       };
76       struct SchedCompare {
77         bool operator()(const event& e1, const event& e2) const {
78           return e1.t == e2.t ? e1.id < e2.id : e1.t < e2.t;
79         }
80       };
81       struct EventCompare {
82         bool operator()(const event& e1, const event& e2) const {
83           return e1.id < e2.id;
84         }
85       };
86
87       using schedule_type = set<event,
88                                 member_hook<event, sh, &event::schedule_link>,
89                                 constant_time_size<false>,
90                                 compare<SchedCompare> >;
91
92       schedule_type schedule;
93
94       using event_set_type = set<event,
95                                  member_hook<event, sh, &event::event_link>,
96                                  constant_time_size<false>,
97                                  compare<EventCompare> >;
98
99       event_set_type events;
100
101       std::mutex lock;
102       using lock_guard = std::lock_guard<std::mutex>;
103       using unique_lock = std::unique_lock<std::mutex>;
104       std::condition_variable cond;
105
106       event* running{ nullptr };
107       uint64_t next_id{ 0 };
108
109       bool suspended;
110       std::thread thread;
111
112       void timer_thread() {
113         unique_lock l(lock);
114         while (!suspended) {
115           typename TC::time_point now = TC::now();
116
117           while (!schedule.empty()) {
118             auto p = schedule.begin();
119             // Should we wait for the future?
120             if (p->t > now)
121               break;
122
123             event& e = *p;
124             schedule.erase(e);
125             events.erase(e);
126
127             // Since we have only one thread it is impossible to have more
128             // than one running event
129             running = &e;
130
131             l.unlock();
132             e.f();
133             l.lock();
134
135             if (running) {
136               running = nullptr;
137               delete &e;
138             } // Otherwise the event requeued itself
139           }
140
141           if (schedule.empty())
142             cond.wait(l);
143           else
144             cond.wait_until(l, schedule.begin()->t);
145         }
146       }
147
148   public:
149       timer() {
150         lock_guard l(lock);
151         suspended = false;
152         thread = std::thread(&timer::timer_thread, this);
153       }
154
155       // Create a suspended timer, jobs will be executed in order when
156       // it is resumed.
157       timer(construct_suspended_t) {
158         lock_guard l(lock);
159         suspended = true;
160       }
161
162       timer(const timer &) = delete;
163       timer& operator=(const timer &) = delete;
164
165       ~timer() {
166         suspend();
167         cancel_all_events();
168       }
169
170       // Suspend operation of the timer (and let its thread die).
171       void suspend() {
172         unique_lock l(lock);
173         if (suspended)
174           return;
175
176         suspended = true;
177         cond.notify_one();
178         l.unlock();
179         thread.join();
180       }
181
182
183       // Resume operation of the timer. (Must have been previously
184       // suspended.)
185       void resume() {
186         unique_lock l(lock);
187           if (!suspended)
188           return;
189
190         suspended = false;
191         assert(!thread.joinable());
192         thread = std::thread(&timer::timer_thread, this);
193       }
194
195       // Schedule an event in the relative future
196       template<typename Callable, typename... Args>
197       uint64_t add_event(typename TC::duration duration,
198                          Callable&& f, Args&&... args) {
199         typename TC::time_point when = TC::now();
200         when += duration;
201         return add_event(when,
202                          std::forward<Callable>(f),
203                          std::forward<Args>(args)...);
204       }
205
206       // Schedule an event in the absolute future
207       template<typename Callable, typename... Args>
208       uint64_t add_event(typename TC::time_point when,
209                          Callable&& f, Args&&... args) {
210         std::lock_guard<std::mutex> l(lock);
211         event& e = *(new event(
212                        when, ++next_id,
213                        std::forward<std::function<void()> >(
214                          std::bind(std::forward<Callable>(f),
215                                    std::forward<Args>(args)...))));
216         auto i = schedule.insert(e);
217         events.insert(e);
218
219         /* If the event we have just inserted comes before everything
220          * else, we need to adjust our timeout. */
221         if (i.first == schedule.begin())
222           cond.notify_one();
223
224         // Previously each event was a context, identified by a
225         // pointer, and each context to be called only once. Since you
226         // can queue the same function pointer, member function,
227         // lambda, or functor up multiple times, identifying things by
228         // function for the purposes of cancellation is no longer
229         // suitable. Thus:
230         return e.id;
231       }
232
233       // Adjust the timeout of a currently-scheduled event (relative)
234       bool adjust_event(uint64_t id, typename TC::duration duration) {
235         return adjust_event(id, TC::now() + duration);
236       }
237
238       // Adjust the timeout of a currently-scheduled event (absolute)
239       bool adjust_event(uint64_t id, typename TC::time_point when) {
240         std::lock_guard<std::mutex> l(lock);
241
242         event key(id);
243         typename event_set_type::iterator it = events.find(key);
244
245         if (it == events.end())
246           return false;
247
248         event& e = *it;
249
250         schedule.erase(e);
251         e.t = when;
252         schedule.insert(e);
253
254         return true;
255       }
256
257       // Cancel an event. If the event has already come and gone (or you
258       // never submitted it) you will receive false. Otherwise you will
259       // receive true and it is guaranteed the event will not execute.
260       bool cancel_event(const uint64_t id) {
261         std::lock_guard<std::mutex> l(lock);
262         event dummy(id);
263         auto p = events.find(dummy);
264         if (p == events.end()) {
265           return false;
266         }
267
268         event& e = *p;
269         events.erase(e);
270         schedule.erase(e);
271         delete &e;
272
273         return true;
274       }
275
276       // Reschedules a currently running event in the relative
277       // future. Must be called only from an event executed by this
278       // timer. If you have a function that can be called either from
279       // this timer or some other way, it is your responsibility to make
280       // sure it can tell the difference only does not call
281       // reschedule_me in the non-timer case.
282       //
283       // Returns an event id. If you had an event_id from the first
284       // scheduling, replace it with this return value.
285       uint64_t reschedule_me(typename TC::duration duration) {
286         return reschedule_me(TC::now() + duration);
287       }
288
289       // Reschedules a currently running event in the absolute
290       // future. Must be called only from an event executed by this
291       // timer. if you have a function that can be called either from
292       // this timer or some other way, it is your responsibility to make
293       // sure it can tell the difference only does not call
294       // reschedule_me in the non-timer case.
295       //
296       // Returns an event id. If you had an event_id from the first
297       // scheduling, replace it with this return value.
298       uint64_t reschedule_me(typename TC::time_point when) {
299         if (std::this_thread::get_id() != thread.get_id())
300           throw std::make_error_condition(std::errc::operation_not_permitted);
301         std::lock_guard<std::mutex> l(lock);
302         running->t = when;
303         uint64_t id = ++next_id;
304         running->id = id;
305         schedule.insert(*running);
306         events.insert(*running);
307
308         // Hacky, but keeps us from being deleted
309         running = nullptr;
310
311         // Same function, but you get a new ID.
312         return id;
313       }
314
315       // Remove all events from the queue.
316       void cancel_all_events() {
317         std::lock_guard<std::mutex> l(lock);
318         while (!events.empty()) {
319           auto p = events.begin();
320           event& e = *p;
321           schedule.erase(e);
322           events.erase(e);
323           delete &e;
324         }
325       }
326     }; // timer
327   }; // timer_detail
328
329   using timer_detail::timer;
330 }; // ceph
331
332 #endif