Fix some bugs when testing opensds ansible
[stor4nfv.git] / src / ceph / src / common / Timer.cc
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
3 /*
4  * Ceph - scalable distributed file system
5  *
6  * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
7  *
8  * This is free software; you can redistribute it and/or
9  * modify it under the terms of the GNU Lesser General Public
10  * License version 2.1, as published by the Free Software
11  * Foundation.  See file COPYING.
12  *
13  */
14
15 #include "Cond.h"
16 #include "Timer.h"
17
18
19 #define dout_subsys ceph_subsys_timer
20 #undef dout_prefix
21 #define dout_prefix *_dout << "timer(" << this << ")."
22
23
24
25 class SafeTimerThread : public Thread {
26   SafeTimer *parent;
27 public:
28   explicit SafeTimerThread(SafeTimer *s) : parent(s) {}
29   void *entry() override {
30     parent->timer_thread();
31     return NULL;
32   }
33 };
34
35
36
37 typedef std::multimap < utime_t, Context *> scheduled_map_t;
38 typedef std::map < Context*, scheduled_map_t::iterator > event_lookup_map_t;
39
40 SafeTimer::SafeTimer(CephContext *cct_, Mutex &l, bool safe_callbacks)
41   : cct(cct_), lock(l),
42     safe_callbacks(safe_callbacks),
43     thread(NULL),
44     stopping(false)
45 {
46 }
47
48 SafeTimer::~SafeTimer()
49 {
50   assert(thread == NULL);
51 }
52
53 void SafeTimer::init()
54 {
55   ldout(cct,10) << "init" << dendl;
56   thread = new SafeTimerThread(this);
57   thread->create("safe_timer");
58 }
59
60 void SafeTimer::shutdown()
61 {
62   ldout(cct,10) << "shutdown" << dendl;
63   if (thread) {
64     assert(lock.is_locked());
65     cancel_all_events();
66     stopping = true;
67     cond.Signal();
68     lock.Unlock();
69     thread->join();
70     lock.Lock();
71     delete thread;
72     thread = NULL;
73   }
74 }
75
76 void SafeTimer::timer_thread()
77 {
78   lock.Lock();
79   ldout(cct,10) << "timer_thread starting" << dendl;
80   while (!stopping) {
81     utime_t now = ceph_clock_now();
82
83     while (!schedule.empty()) {
84       scheduled_map_t::iterator p = schedule.begin();
85
86       // is the future now?
87       if (p->first > now)
88         break;
89
90       Context *callback = p->second;
91       events.erase(callback);
92       schedule.erase(p);
93       ldout(cct,10) << "timer_thread executing " << callback << dendl;
94       
95       if (!safe_callbacks)
96         lock.Unlock();
97       callback->complete(0);
98       if (!safe_callbacks)
99         lock.Lock();
100     }
101
102     // recheck stopping if we dropped the lock
103     if (!safe_callbacks && stopping)
104       break;
105
106     ldout(cct,20) << "timer_thread going to sleep" << dendl;
107     if (schedule.empty())
108       cond.Wait(lock);
109     else
110       cond.WaitUntil(lock, schedule.begin()->first);
111     ldout(cct,20) << "timer_thread awake" << dendl;
112   }
113   ldout(cct,10) << "timer_thread exiting" << dendl;
114   lock.Unlock();
115 }
116
117 Context* SafeTimer::add_event_after(double seconds, Context *callback)
118 {
119   assert(lock.is_locked());
120
121   utime_t when = ceph_clock_now();
122   when += seconds;
123   return add_event_at(when, callback);
124 }
125
126 Context* SafeTimer::add_event_at(utime_t when, Context *callback)
127 {
128   assert(lock.is_locked());
129   ldout(cct,10) << __func__ << " " << when << " -> " << callback << dendl;
130   if (stopping) {
131     ldout(cct,5) << __func__ << " already shutdown, event not added" << dendl;
132     delete callback;
133     return nullptr;
134   }
135   scheduled_map_t::value_type s_val(when, callback);
136   scheduled_map_t::iterator i = schedule.insert(s_val);
137
138   event_lookup_map_t::value_type e_val(callback, i);
139   pair < event_lookup_map_t::iterator, bool > rval(events.insert(e_val));
140
141   /* If you hit this, you tried to insert the same Context* twice. */
142   assert(rval.second);
143
144   /* If the event we have just inserted comes before everything else, we need to
145    * adjust our timeout. */
146   if (i == schedule.begin())
147     cond.Signal();
148   return callback;
149 }
150
151 bool SafeTimer::cancel_event(Context *callback)
152 {
153   assert(lock.is_locked());
154   
155   auto p = events.find(callback);
156   if (p == events.end()) {
157     ldout(cct,10) << "cancel_event " << callback << " not found" << dendl;
158     return false;
159   }
160
161   ldout(cct,10) << "cancel_event " << p->second->first << " -> " << callback << dendl;
162   delete p->first;
163
164   schedule.erase(p->second);
165   events.erase(p);
166   return true;
167 }
168
169 void SafeTimer::cancel_all_events()
170 {
171   ldout(cct,10) << "cancel_all_events" << dendl;
172   assert(lock.is_locked());
173   
174   while (!events.empty()) {
175     auto p = events.begin();
176     ldout(cct,10) << " cancelled " << p->second->first << " -> " << p->first << dendl;
177     delete p->first;
178     schedule.erase(p->second);
179     events.erase(p);
180   }
181 }
182
183 void SafeTimer::dump(const char *caller) const
184 {
185   if (!caller)
186     caller = "";
187   ldout(cct,10) << "dump " << caller << dendl;
188
189   for (scheduled_map_t::const_iterator s = schedule.begin();
190        s != schedule.end();
191        ++s)
192     ldout(cct,10) << " " << s->first << "->" << s->second << dendl;
193 }