Fix some bugs when testing opensds ansible
[stor4nfv.git] / src / ceph / src / msg / async / Event.h
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- 
2 // vim: ts=8 sw=2 smarttab
3 /*
4  * Ceph - scalable distributed file system
5  *
6  * Copyright (C) 2014 UnitedStack <haomai@unitedstack.com>
7  *
8  * Author: Haomai Wang <haomaiwang@gmail.com>
9  *
10  * This is free software; you can redistribute it and/or
11  * modify it under the terms of the GNU Lesser General Public
12  * License version 2.1, as published by the Free Software
13  * Foundation.  See file COPYING.
14  *
15  */
16
17 #ifndef CEPH_MSG_EVENT_H
18 #define CEPH_MSG_EVENT_H
19
20 #ifdef __APPLE__
21 #include <AvailabilityMacros.h>
22 #endif
23
24 // We use epoll, kqueue, evport, select in descending order by performance.
25 #if defined(__linux__)
26 #define HAVE_EPOLL 1
27 #endif
28
29 #if (defined(__APPLE__) && defined(MAC_OS_X_VERSION_10_6)) || defined(__FreeBSD__) || defined(__OpenBSD__) || defined (__NetBSD__)
30 #define HAVE_KQUEUE 1
31 #endif
32
33 #ifdef __sun
34 #include <sys/feature_tests.h>
35 #ifdef _DTRACE_VERSION
36 #define HAVE_EVPORT 1
37 #endif
38 #endif
39
40 #include <atomic>
41 #include <mutex>
42 #include <condition_variable>
43
44 #include "common/ceph_time.h"
45 #include "common/dout.h"
46 #include "net_handler.h"
47
48 #define EVENT_NONE 0
49 #define EVENT_READABLE 1
50 #define EVENT_WRITABLE 2
51
52 class EventCenter;
53
54 class EventCallback {
55
56  public:
57   virtual void do_request(int fd_or_id) = 0;
58   virtual ~EventCallback() {}       // we want a virtual destructor!!!
59 };
60
61 typedef EventCallback* EventCallbackRef;
62
63 struct FiredFileEvent {
64   int fd;
65   int mask;
66 };
67
68 /*
69  * EventDriver is a wrap of event mechanisms depends on different OS.
70  * For example, Linux will use epoll(2), BSD will use kqueue(2) and select will
71  * be used for worst condition.
72  */
73 class EventDriver {
74  public:
75   virtual ~EventDriver() {}       // we want a virtual destructor!!!
76   virtual int init(EventCenter *center, int nevent) = 0;
77   virtual int add_event(int fd, int cur_mask, int mask) = 0;
78   virtual int del_event(int fd, int cur_mask, int del_mask) = 0;
79   virtual int event_wait(vector<FiredFileEvent> &fired_events, struct timeval *tp) = 0;
80   virtual int resize_events(int newsize) = 0;
81   virtual bool need_wakeup() { return true; }
82 };
83
84 /*
85  * EventCenter maintain a set of file descriptor and handle registered events.
86  */
87 class EventCenter {
88  public:
89   // should be enough;
90   static const int MAX_EVENTCENTER = 24;
91
92  private:
93   using clock_type = ceph::coarse_mono_clock;
94
95   struct AssociatedCenters {
96     EventCenter *centers[MAX_EVENTCENTER];
97     AssociatedCenters(CephContext *c) {
98       memset(centers, 0, MAX_EVENTCENTER * sizeof(EventCenter*));
99     }
100   };
101
102   struct FileEvent {
103     int mask;
104     EventCallbackRef read_cb;
105     EventCallbackRef write_cb;
106     FileEvent(): mask(0), read_cb(NULL), write_cb(NULL) {}
107   };
108
109   struct TimeEvent {
110     uint64_t id;
111     EventCallbackRef time_cb;
112
113     TimeEvent(): id(0), time_cb(NULL) {}
114   };
115
116  public:
117   /**
118      * A Poller object is invoked once each time through the dispatcher's
119      * inner polling loop.
120      */
121   class Poller {
122    public:
123     explicit Poller(EventCenter* center, const string& pollerName);
124     virtual ~Poller();
125
126     /**
127      * This method is defined by a subclass and invoked once by the
128      * center during each pass through its inner polling loop.
129      *
130      * \return
131      *      1 means that this poller did useful work during this call.
132      *      0 means that the poller found no work to do.
133      */
134     virtual int poll() = 0;
135
136    private:
137     /// The EventCenter object that owns this Poller.  NULL means the
138     /// EventCenter has been deleted.
139     EventCenter* owner;
140
141     /// Human-readable string name given to the poller to make it
142     /// easy to identify for debugging. For most pollers just passing
143     /// in the subclass name probably makes sense.
144     string poller_name;
145
146     /// Index of this Poller in EventCenter::pollers.  Allows deletion
147     /// without having to scan all the entries in pollers. -1 means
148     /// this poller isn't currently in EventCenter::pollers (happens
149     /// after EventCenter::reset).
150     int slot;
151   };
152
153  private:
154   CephContext *cct;
155   std::string type;
156   int nevent;
157   // Used only to external event
158   pthread_t owner;
159   std::mutex external_lock;
160   std::atomic_ulong external_num_events;
161   deque<EventCallbackRef> external_events;
162   vector<FileEvent> file_events;
163   EventDriver *driver;
164   std::multimap<clock_type::time_point, TimeEvent> time_events;
165   // Keeps track of all of the pollers currently defined.  We don't
166   // use an intrusive list here because it isn't reentrant: we need
167   // to add/remove elements while the center is traversing the list.
168   std::vector<Poller*> pollers;
169   std::map<uint64_t, std::multimap<clock_type::time_point, TimeEvent>::iterator> event_map;
170   uint64_t time_event_next_id;
171   int notify_receive_fd;
172   int notify_send_fd;
173   NetHandler net;
174   EventCallbackRef notify_handler;
175   unsigned idx;
176   AssociatedCenters *global_centers = nullptr;
177
178   int process_time_events();
179   FileEvent *_get_file_event(int fd) {
180     assert(fd < nevent);
181     return &file_events[fd];
182   }
183
184  public:
185   explicit EventCenter(CephContext *c):
186     cct(c), nevent(0),
187     external_num_events(0),
188     driver(NULL), time_event_next_id(1),
189     notify_receive_fd(-1), notify_send_fd(-1), net(c),
190     notify_handler(NULL), idx(0) { }
191   ~EventCenter();
192   ostream& _event_prefix(std::ostream *_dout);
193
194   int init(int nevent, unsigned idx, const std::string &t);
195   void set_owner();
196   pthread_t get_owner() const { return owner; }
197   unsigned get_id() const { return idx; }
198
199   EventDriver *get_driver() { return driver; }
200
201   // Used by internal thread
202   int create_file_event(int fd, int mask, EventCallbackRef ctxt);
203   uint64_t create_time_event(uint64_t milliseconds, EventCallbackRef ctxt);
204   void delete_file_event(int fd, int mask);
205   void delete_time_event(uint64_t id);
206   int process_events(int timeout_microseconds, ceph::timespan *working_dur = nullptr);
207   void wakeup();
208
209   // Used by external thread
210   void dispatch_event_external(EventCallbackRef e);
211   inline bool in_thread() const {
212     return pthread_equal(pthread_self(), owner);
213   }
214
215  private:
216   template <typename func>
217   class C_submit_event : public EventCallback {
218     std::mutex lock;
219     std::condition_variable cond;
220     bool done = false;
221     func f;
222     bool nonwait;
223    public:
224     C_submit_event(func &&_f, bool nw)
225       : f(std::move(_f)), nonwait(nw) {}
226     void do_request(int id) override {
227       f();
228       lock.lock();
229       cond.notify_all();
230       done = true;
231       bool del = nonwait;
232       lock.unlock();
233       if (del)
234         delete this;
235     }
236     void wait() {
237       assert(!nonwait);
238       std::unique_lock<std::mutex> l(lock);
239       while (!done)
240         cond.wait(l);
241     }
242   };
243
244  public:
245   template <typename func>
246   void submit_to(int i, func &&f, bool nowait = false) {
247     assert(i < MAX_EVENTCENTER && global_centers);
248     EventCenter *c = global_centers->centers[i];
249     assert(c);
250     if (!nowait && c->in_thread()) {
251       f();
252       return ;
253     }
254     if (nowait) {
255       C_submit_event<func> *event = new C_submit_event<func>(std::move(f), true);
256       c->dispatch_event_external(event);
257     } else {
258       C_submit_event<func> event(std::move(f), false);
259       c->dispatch_event_external(&event);
260       event.wait();
261     }
262   };
263 };
264
265 #endif