1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
4 * Ceph - scalable distributed file system
6 * Copyright (C) 2014 UnitedStack <haomai@unitedstack.com>
8 * Author: Haomai Wang <haomaiwang@gmail.com>
10 * This is free software; you can redistribute it and/or
11 * modify it under the terms of the GNU Lesser General Public
12 * License version 2.1, as published by the Free Software
13 * Foundation. See file COPYING.
17 #ifndef CEPH_MSG_EVENT_H
18 #define CEPH_MSG_EVENT_H
21 #include <AvailabilityMacros.h>
24 // We use epoll, kqueue, evport, select in descending order by performance.
25 #if defined(__linux__)
29 #if (defined(__APPLE__) && defined(MAC_OS_X_VERSION_10_6)) || defined(__FreeBSD__) || defined(__OpenBSD__) || defined (__NetBSD__)
34 #include <sys/feature_tests.h>
35 #ifdef _DTRACE_VERSION
42 #include <condition_variable>
44 #include "common/ceph_time.h"
45 #include "common/dout.h"
46 #include "net_handler.h"
49 #define EVENT_READABLE 1
50 #define EVENT_WRITABLE 2
57 virtual void do_request(int fd_or_id) = 0;
58 virtual ~EventCallback() {} // we want a virtual destructor!!!
61 typedef EventCallback* EventCallbackRef;
63 struct FiredFileEvent {
69 * EventDriver is a wrap of event mechanisms depends on different OS.
70 * For example, Linux will use epoll(2), BSD will use kqueue(2) and select will
71 * be used for worst condition.
75 virtual ~EventDriver() {} // we want a virtual destructor!!!
76 virtual int init(EventCenter *center, int nevent) = 0;
77 virtual int add_event(int fd, int cur_mask, int mask) = 0;
78 virtual int del_event(int fd, int cur_mask, int del_mask) = 0;
79 virtual int event_wait(vector<FiredFileEvent> &fired_events, struct timeval *tp) = 0;
80 virtual int resize_events(int newsize) = 0;
81 virtual bool need_wakeup() { return true; }
85 * EventCenter maintain a set of file descriptor and handle registered events.
90 static const int MAX_EVENTCENTER = 24;
93 using clock_type = ceph::coarse_mono_clock;
95 struct AssociatedCenters {
96 EventCenter *centers[MAX_EVENTCENTER];
97 AssociatedCenters(CephContext *c) {
98 memset(centers, 0, MAX_EVENTCENTER * sizeof(EventCenter*));
104 EventCallbackRef read_cb;
105 EventCallbackRef write_cb;
106 FileEvent(): mask(0), read_cb(NULL), write_cb(NULL) {}
111 EventCallbackRef time_cb;
113 TimeEvent(): id(0), time_cb(NULL) {}
118 * A Poller object is invoked once each time through the dispatcher's
119 * inner polling loop.
123 explicit Poller(EventCenter* center, const string& pollerName);
127 * This method is defined by a subclass and invoked once by the
128 * center during each pass through its inner polling loop.
131 * 1 means that this poller did useful work during this call.
132 * 0 means that the poller found no work to do.
134 virtual int poll() = 0;
137 /// The EventCenter object that owns this Poller. NULL means the
138 /// EventCenter has been deleted.
141 /// Human-readable string name given to the poller to make it
142 /// easy to identify for debugging. For most pollers just passing
143 /// in the subclass name probably makes sense.
146 /// Index of this Poller in EventCenter::pollers. Allows deletion
147 /// without having to scan all the entries in pollers. -1 means
148 /// this poller isn't currently in EventCenter::pollers (happens
149 /// after EventCenter::reset).
157 // Used only to external event
159 std::mutex external_lock;
160 std::atomic_ulong external_num_events;
161 deque<EventCallbackRef> external_events;
162 vector<FileEvent> file_events;
164 std::multimap<clock_type::time_point, TimeEvent> time_events;
165 // Keeps track of all of the pollers currently defined. We don't
166 // use an intrusive list here because it isn't reentrant: we need
167 // to add/remove elements while the center is traversing the list.
168 std::vector<Poller*> pollers;
169 std::map<uint64_t, std::multimap<clock_type::time_point, TimeEvent>::iterator> event_map;
170 uint64_t time_event_next_id;
171 int notify_receive_fd;
174 EventCallbackRef notify_handler;
176 AssociatedCenters *global_centers = nullptr;
178 int process_time_events();
179 FileEvent *_get_file_event(int fd) {
181 return &file_events[fd];
185 explicit EventCenter(CephContext *c):
187 external_num_events(0),
188 driver(NULL), time_event_next_id(1),
189 notify_receive_fd(-1), notify_send_fd(-1), net(c),
190 notify_handler(NULL), idx(0) { }
192 ostream& _event_prefix(std::ostream *_dout);
194 int init(int nevent, unsigned idx, const std::string &t);
196 pthread_t get_owner() const { return owner; }
197 unsigned get_id() const { return idx; }
199 EventDriver *get_driver() { return driver; }
201 // Used by internal thread
202 int create_file_event(int fd, int mask, EventCallbackRef ctxt);
203 uint64_t create_time_event(uint64_t milliseconds, EventCallbackRef ctxt);
204 void delete_file_event(int fd, int mask);
205 void delete_time_event(uint64_t id);
206 int process_events(int timeout_microseconds, ceph::timespan *working_dur = nullptr);
209 // Used by external thread
210 void dispatch_event_external(EventCallbackRef e);
211 inline bool in_thread() const {
212 return pthread_equal(pthread_self(), owner);
216 template <typename func>
217 class C_submit_event : public EventCallback {
219 std::condition_variable cond;
224 C_submit_event(func &&_f, bool nw)
225 : f(std::move(_f)), nonwait(nw) {}
226 void do_request(int id) override {
238 std::unique_lock<std::mutex> l(lock);
245 template <typename func>
246 void submit_to(int i, func &&f, bool nowait = false) {
247 assert(i < MAX_EVENTCENTER && global_centers);
248 EventCenter *c = global_centers->centers[i];
250 if (!nowait && c->in_thread()) {
255 C_submit_event<func> *event = new C_submit_event<func>(std::move(f), true);
256 c->dispatch_event_external(event);
258 C_submit_event<func> event(std::move(f), false);
259 c->dispatch_event_external(&event);