Fix some bugs when testing opensds ansible
[stor4nfv.git] / src / ceph / src / msg / async / Event.cc
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- 
2 // vim: ts=8 sw=2 smarttab
3 /*
4  * Ceph - scalable distributed file system
5  *
6  * Copyright (C) 2014 UnitedStack <haomai@unitedstack.com>
7  *
8  * Author: Haomai Wang <haomaiwang@gmail.com>
9  *
10  * This is free software; you can redistribute it and/or
11  * modify it under the terms of the GNU Lesser General Public
12  * License version 2.1, as published by the Free Software
13  * Foundation.  See file COPYING.
14  *
15  */
16
17 #include "common/errno.h"
18 #include "Event.h"
19
20 #ifdef HAVE_DPDK
21 #include "dpdk/EventDPDK.h"
22 #endif
23
24 #ifdef HAVE_EPOLL
25 #include "EventEpoll.h"
26 #else
27 #ifdef HAVE_KQUEUE
28 #include "EventKqueue.h"
29 #else
30 #include "EventSelect.h"
31 #endif
32 #endif
33
34 #define dout_subsys ceph_subsys_ms
35
36 #undef dout_prefix
37 #define dout_prefix *_dout << "EventCallback "
38 class C_handle_notify : public EventCallback {
39   EventCenter *center;
40   CephContext *cct;
41
42  public:
43   C_handle_notify(EventCenter *c, CephContext *cc): center(c), cct(cc) {}
44   void do_request(int fd_or_id) override {
45     char c[256];
46     int r = 0;
47     do {
48       r = read(fd_or_id, c, sizeof(c));
49       if (r < 0) {
50         if (errno != EAGAIN)
51           ldout(cct, 1) << __func__ << " read notify pipe failed: " << cpp_strerror(errno) << dendl;
52       }
53     } while (r > 0);
54   }
55 };
56
57 #undef dout_prefix
58 #define dout_prefix _event_prefix(_dout)
59
60 /**
61  * Construct a Poller.
62  *
63  * \param center
64  *      EventCenter object through which the poller will be invoked (defaults
65  *      to the global #RAMCloud::center object).
66  * \param pollerName
67  *      Human readable name that can be printed out in debugging messages
68  *      about the poller. The name of the superclass is probably sufficient
69  *      for most cases.
70  */
71 EventCenter::Poller::Poller(EventCenter* center, const string& name)
72     : owner(center), poller_name(name), slot(owner->pollers.size())
73 {
74   owner->pollers.push_back(this);
75 }
76
77 /**
78  * Destroy a Poller.
79  */
80 EventCenter::Poller::~Poller()
81 {
82   // Erase this Poller from the vector by overwriting it with the
83   // poller that used to be the last one in the vector.
84   //
85   // Note: this approach is reentrant (it is safe to delete a
86   // poller from a poller callback, which means that the poll
87   // method is in the middle of scanning the list of all pollers;
88   // the worst that will happen is that the poller that got moved
89   // may not be invoked in the current scan).
90   owner->pollers[slot] = owner->pollers.back();
91   owner->pollers[slot]->slot = slot;
92   owner->pollers.pop_back();
93   slot = -1;
94 }
95
96 ostream& EventCenter::_event_prefix(std::ostream *_dout)
97 {
98   return *_dout << "Event(" << this << " nevent=" << nevent
99                 << " time_id=" << time_event_next_id << ").";
100 }
101
102 int EventCenter::init(int n, unsigned i, const std::string &t)
103 {
104   // can't init multi times
105   assert(nevent == 0);
106
107   type = t;
108   idx = i;
109
110   if (t == "dpdk") {
111 #ifdef HAVE_DPDK
112     driver = new DPDKDriver(cct);
113 #endif
114   } else {
115 #ifdef HAVE_EPOLL
116   driver = new EpollDriver(cct);
117 #else
118 #ifdef HAVE_KQUEUE
119   driver = new KqueueDriver(cct);
120 #else
121   driver = new SelectDriver(cct);
122 #endif
123 #endif
124   }
125
126   if (!driver) {
127     lderr(cct) << __func__ << " failed to create event driver " << dendl;
128     return -1;
129   }
130
131   int r = driver->init(this, n);
132   if (r < 0) {
133     lderr(cct) << __func__ << " failed to init event driver." << dendl;
134     return r;
135   }
136
137   file_events.resize(n);
138   nevent = n;
139
140   if (!driver->need_wakeup())
141     return 0;
142
143   int fds[2];
144   if (pipe(fds) < 0) {
145     lderr(cct) << __func__ << " can't create notify pipe" << dendl;
146     return -errno;
147   }
148
149   notify_receive_fd = fds[0];
150   notify_send_fd = fds[1];
151   r = net.set_nonblock(notify_receive_fd);
152   if (r < 0) {
153     return r;
154   }
155   r = net.set_nonblock(notify_send_fd);
156   if (r < 0) {
157     return r;
158   }
159
160   return r;
161 }
162
163 EventCenter::~EventCenter()
164 {
165   {
166     std::lock_guard<std::mutex> l(external_lock);
167     while (!external_events.empty()) {
168       EventCallbackRef e = external_events.front();
169       if (e)
170         e->do_request(0);
171       external_events.pop_front();
172     }
173   }
174   assert(time_events.empty());
175
176   if (notify_receive_fd >= 0)
177     ::close(notify_receive_fd);
178   if (notify_send_fd >= 0)
179     ::close(notify_send_fd);
180
181   delete driver;
182   if (notify_handler)
183     delete notify_handler;
184 }
185
186
187 void EventCenter::set_owner()
188 {
189   owner = pthread_self();
190   ldout(cct, 2) << __func__ << " idx=" << idx << " owner=" << owner << dendl;
191   if (!global_centers) {
192     cct->lookup_or_create_singleton_object<EventCenter::AssociatedCenters>(
193         global_centers, "AsyncMessenger::EventCenter::global_center::"+type);
194     assert(global_centers);
195     global_centers->centers[idx] = this;
196     if (driver->need_wakeup()) {
197       notify_handler = new C_handle_notify(this, cct);
198       int r = create_file_event(notify_receive_fd, EVENT_READABLE, notify_handler);
199       assert(r == 0);
200     }
201   }
202 }
203
204 int EventCenter::create_file_event(int fd, int mask, EventCallbackRef ctxt)
205 {
206   assert(in_thread());
207   int r = 0;
208   if (fd >= nevent) {
209     int new_size = nevent << 2;
210     while (fd > new_size)
211       new_size <<= 2;
212     ldout(cct, 20) << __func__ << " event count exceed " << nevent << ", expand to " << new_size << dendl;
213     r = driver->resize_events(new_size);
214     if (r < 0) {
215       lderr(cct) << __func__ << " event count is exceed." << dendl;
216       return -ERANGE;
217     }
218     file_events.resize(new_size);
219     nevent = new_size;
220   }
221
222   EventCenter::FileEvent *event = _get_file_event(fd);
223   ldout(cct, 20) << __func__ << " create event started fd=" << fd << " mask=" << mask
224                  << " original mask is " << event->mask << dendl;
225   if (event->mask == mask)
226     return 0;
227
228   r = driver->add_event(fd, event->mask, mask);
229   if (r < 0) {
230     // Actually we don't allow any failed error code, caller doesn't prepare to
231     // handle error status. So now we need to assert failure here. In practice,
232     // add_event shouldn't report error, otherwise it must be a innermost bug!
233     assert(0 == "BUG!");
234     return r;
235   }
236
237   event->mask |= mask;
238   if (mask & EVENT_READABLE) {
239     event->read_cb = ctxt;
240   }
241   if (mask & EVENT_WRITABLE) {
242     event->write_cb = ctxt;
243   }
244   ldout(cct, 20) << __func__ << " create event end fd=" << fd << " mask=" << mask
245                  << " original mask is " << event->mask << dendl;
246   return 0;
247 }
248
249 void EventCenter::delete_file_event(int fd, int mask)
250 {
251   assert(in_thread() && fd >= 0);
252   if (fd >= nevent) {
253     ldout(cct, 1) << __func__ << " delete event fd=" << fd << " is equal or greater than nevent=" << nevent
254                   << "mask=" << mask << dendl;
255     return ;
256   }
257   EventCenter::FileEvent *event = _get_file_event(fd);
258   ldout(cct, 30) << __func__ << " delete event started fd=" << fd << " mask=" << mask
259                  << " original mask is " << event->mask << dendl;
260   if (!event->mask)
261     return ;
262
263   int r = driver->del_event(fd, event->mask, mask);
264   if (r < 0) {
265     // see create_file_event
266     assert(0 == "BUG!");
267   }
268
269   if (mask & EVENT_READABLE && event->read_cb) {
270     event->read_cb = nullptr;
271   }
272   if (mask & EVENT_WRITABLE && event->write_cb) {
273     event->write_cb = nullptr;
274   }
275
276   event->mask = event->mask & (~mask);
277   ldout(cct, 30) << __func__ << " delete event end fd=" << fd << " mask=" << mask
278                  << " original mask is " << event->mask << dendl;
279 }
280
281 uint64_t EventCenter::create_time_event(uint64_t microseconds, EventCallbackRef ctxt)
282 {
283   assert(in_thread());
284   uint64_t id = time_event_next_id++;
285
286   ldout(cct, 30) << __func__ << " id=" << id << " trigger after " << microseconds << "us"<< dendl;
287   EventCenter::TimeEvent event;
288   clock_type::time_point expire = clock_type::now() + std::chrono::microseconds(microseconds);
289   event.id = id;
290   event.time_cb = ctxt;
291   std::multimap<clock_type::time_point, TimeEvent>::value_type s_val(expire, event);
292   auto it = time_events.insert(std::move(s_val));
293   event_map[id] = it;
294
295   return id;
296 }
297
298 void EventCenter::delete_time_event(uint64_t id)
299 {
300   assert(in_thread());
301   ldout(cct, 30) << __func__ << " id=" << id << dendl;
302   if (id >= time_event_next_id || id == 0)
303     return ;
304
305   auto it = event_map.find(id);
306   if (it == event_map.end()) {
307     ldout(cct, 10) << __func__ << " id=" << id << " not found" << dendl;
308     return ;
309   }
310
311   time_events.erase(it->second);
312   event_map.erase(it);
313 }
314
315 void EventCenter::wakeup()
316 {
317   // No need to wake up since we never sleep
318   if (!pollers.empty() || !driver->need_wakeup())
319     return ;
320
321   ldout(cct, 20) << __func__ << dendl;
322   char buf = 'c';
323   // wake up "event_wait"
324   int n = write(notify_send_fd, &buf, sizeof(buf));
325   if (n < 0) {
326     if (errno != EAGAIN) {
327       ldout(cct, 1) << __func__ << " write notify pipe failed: " << cpp_strerror(errno) << dendl;
328       ceph_abort();
329     }
330   }
331 }
332
333 int EventCenter::process_time_events()
334 {
335   int processed = 0;
336   clock_type::time_point now = clock_type::now();
337   ldout(cct, 30) << __func__ << " cur time is " << now << dendl;
338
339   while (!time_events.empty()) {
340     auto it = time_events.begin();
341     if (now >= it->first) {
342       TimeEvent &e = it->second;
343       EventCallbackRef cb = e.time_cb;
344       uint64_t id = e.id;
345       time_events.erase(it);
346       event_map.erase(id);
347       ldout(cct, 30) << __func__ << " process time event: id=" << id << dendl;
348       processed++;
349       cb->do_request(id);
350     } else {
351       break;
352     }
353   }
354
355   return processed;
356 }
357
358 int EventCenter::process_events(int timeout_microseconds,  ceph::timespan *working_dur)
359 {
360   struct timeval tv;
361   int numevents;
362   bool trigger_time = false;
363   auto now = clock_type::now();
364
365   auto it = time_events.begin();
366   bool blocking = pollers.empty() && !external_num_events.load();
367   // If exists external events or poller, don't block
368   if (!blocking) {
369     if (it != time_events.end() && now >= it->first)
370       trigger_time = true;
371     tv.tv_sec = 0;
372     tv.tv_usec = 0;
373   } else {
374     clock_type::time_point shortest;
375     shortest = now + std::chrono::microseconds(timeout_microseconds); 
376
377     if (it != time_events.end() && shortest >= it->first) {
378       ldout(cct, 30) << __func__ << " shortest is " << shortest << " it->first is " << it->first << dendl;
379       shortest = it->first;
380       trigger_time = true;
381       if (shortest > now) {
382         timeout_microseconds = std::chrono::duration_cast<std::chrono::microseconds>(
383             shortest - now).count();
384       } else {
385         shortest = now;
386         timeout_microseconds = 0;
387       }
388     }
389     tv.tv_sec = timeout_microseconds / 1000000;
390     tv.tv_usec = timeout_microseconds % 1000000;
391   }
392
393   ldout(cct, 30) << __func__ << " wait second " << tv.tv_sec << " usec " << tv.tv_usec << dendl;
394   vector<FiredFileEvent> fired_events;
395   numevents = driver->event_wait(fired_events, &tv);
396   auto working_start = ceph::mono_clock::now();
397   for (int j = 0; j < numevents; j++) {
398     int rfired = 0;
399     FileEvent *event;
400     EventCallbackRef cb;
401     event = _get_file_event(fired_events[j].fd);
402
403     /* note the event->mask & mask & ... code: maybe an already processed
404     * event removed an element that fired and we still didn't
405     * processed, so we check if the event is still valid. */
406     if (event->mask & fired_events[j].mask & EVENT_READABLE) {
407       rfired = 1;
408       cb = event->read_cb;
409       cb->do_request(fired_events[j].fd);
410     }
411
412     if (event->mask & fired_events[j].mask & EVENT_WRITABLE) {
413       if (!rfired || event->read_cb != event->write_cb) {
414         cb = event->write_cb;
415         cb->do_request(fired_events[j].fd);
416       }
417     }
418
419     ldout(cct, 30) << __func__ << " event_wq process is " << fired_events[j].fd << " mask is " << fired_events[j].mask << dendl;
420   }
421
422   if (trigger_time)
423     numevents += process_time_events();
424
425   if (external_num_events.load()) {
426     external_lock.lock();
427     deque<EventCallbackRef> cur_process;
428     cur_process.swap(external_events);
429     external_num_events.store(0);
430     external_lock.unlock();
431     while (!cur_process.empty()) {
432       EventCallbackRef e = cur_process.front();
433       ldout(cct, 30) << __func__ << " do " << e << dendl;
434       e->do_request(0);
435       cur_process.pop_front();
436       numevents++;
437     }
438   }
439
440   if (!numevents && !blocking) {
441     for (uint32_t i = 0; i < pollers.size(); i++)
442       numevents += pollers[i]->poll();
443   }
444
445   if (working_dur)
446     *working_dur = ceph::mono_clock::now() - working_start;
447   return numevents;
448 }
449
450 void EventCenter::dispatch_event_external(EventCallbackRef e)
451 {
452   external_lock.lock();
453   external_events.push_back(e);
454   bool wake = !external_num_events.load();
455   uint64_t num = ++external_num_events;
456   external_lock.unlock();
457   if (!in_thread() && wake)
458     wakeup();
459
460   ldout(cct, 30) << __func__ << " " << e << " pending " << num << dendl;
461 }