1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
4 * Ceph - scalable distributed file system
6 * Copyright (C) 2014 UnitedStack <haomai@unitedstack.com>
8 * Author: Haomai Wang <haomaiwang@gmail.com>
10 * This is free software; you can redistribute it and/or
11 * modify it under the terms of the GNU Lesser General Public
12 * License version 2.1, as published by the Free Software
13 * Foundation. See file COPYING.
17 #include "common/errno.h"
21 #include "dpdk/EventDPDK.h"
25 #include "EventEpoll.h"
28 #include "EventKqueue.h"
30 #include "EventSelect.h"
34 #define dout_subsys ceph_subsys_ms
37 #define dout_prefix *_dout << "EventCallback "
38 class C_handle_notify : public EventCallback {
43 C_handle_notify(EventCenter *c, CephContext *cc): center(c), cct(cc) {}
44 void do_request(int fd_or_id) override {
48 r = read(fd_or_id, c, sizeof(c));
51 ldout(cct, 1) << __func__ << " read notify pipe failed: " << cpp_strerror(errno) << dendl;
58 #define dout_prefix _event_prefix(_dout)
64 * EventCenter object through which the poller will be invoked (defaults
65 * to the global #RAMCloud::center object).
67 * Human readable name that can be printed out in debugging messages
68 * about the poller. The name of the superclass is probably sufficient
71 EventCenter::Poller::Poller(EventCenter* center, const string& name)
72 : owner(center), poller_name(name), slot(owner->pollers.size())
74 owner->pollers.push_back(this);
80 EventCenter::Poller::~Poller()
82 // Erase this Poller from the vector by overwriting it with the
83 // poller that used to be the last one in the vector.
85 // Note: this approach is reentrant (it is safe to delete a
86 // poller from a poller callback, which means that the poll
87 // method is in the middle of scanning the list of all pollers;
88 // the worst that will happen is that the poller that got moved
89 // may not be invoked in the current scan).
90 owner->pollers[slot] = owner->pollers.back();
91 owner->pollers[slot]->slot = slot;
92 owner->pollers.pop_back();
96 ostream& EventCenter::_event_prefix(std::ostream *_dout)
98 return *_dout << "Event(" << this << " nevent=" << nevent
99 << " time_id=" << time_event_next_id << ").";
102 int EventCenter::init(int n, unsigned i, const std::string &t)
104 // can't init multi times
112 driver = new DPDKDriver(cct);
116 driver = new EpollDriver(cct);
119 driver = new KqueueDriver(cct);
121 driver = new SelectDriver(cct);
127 lderr(cct) << __func__ << " failed to create event driver " << dendl;
131 int r = driver->init(this, n);
133 lderr(cct) << __func__ << " failed to init event driver." << dendl;
137 file_events.resize(n);
140 if (!driver->need_wakeup())
145 lderr(cct) << __func__ << " can't create notify pipe" << dendl;
149 notify_receive_fd = fds[0];
150 notify_send_fd = fds[1];
151 r = net.set_nonblock(notify_receive_fd);
155 r = net.set_nonblock(notify_send_fd);
163 EventCenter::~EventCenter()
166 std::lock_guard<std::mutex> l(external_lock);
167 while (!external_events.empty()) {
168 EventCallbackRef e = external_events.front();
171 external_events.pop_front();
174 assert(time_events.empty());
176 if (notify_receive_fd >= 0)
177 ::close(notify_receive_fd);
178 if (notify_send_fd >= 0)
179 ::close(notify_send_fd);
183 delete notify_handler;
187 void EventCenter::set_owner()
189 owner = pthread_self();
190 ldout(cct, 2) << __func__ << " idx=" << idx << " owner=" << owner << dendl;
191 if (!global_centers) {
192 cct->lookup_or_create_singleton_object<EventCenter::AssociatedCenters>(
193 global_centers, "AsyncMessenger::EventCenter::global_center::"+type);
194 assert(global_centers);
195 global_centers->centers[idx] = this;
196 if (driver->need_wakeup()) {
197 notify_handler = new C_handle_notify(this, cct);
198 int r = create_file_event(notify_receive_fd, EVENT_READABLE, notify_handler);
204 int EventCenter::create_file_event(int fd, int mask, EventCallbackRef ctxt)
209 int new_size = nevent << 2;
210 while (fd > new_size)
212 ldout(cct, 20) << __func__ << " event count exceed " << nevent << ", expand to " << new_size << dendl;
213 r = driver->resize_events(new_size);
215 lderr(cct) << __func__ << " event count is exceed." << dendl;
218 file_events.resize(new_size);
222 EventCenter::FileEvent *event = _get_file_event(fd);
223 ldout(cct, 20) << __func__ << " create event started fd=" << fd << " mask=" << mask
224 << " original mask is " << event->mask << dendl;
225 if (event->mask == mask)
228 r = driver->add_event(fd, event->mask, mask);
230 // Actually we don't allow any failed error code, caller doesn't prepare to
231 // handle error status. So now we need to assert failure here. In practice,
232 // add_event shouldn't report error, otherwise it must be a innermost bug!
238 if (mask & EVENT_READABLE) {
239 event->read_cb = ctxt;
241 if (mask & EVENT_WRITABLE) {
242 event->write_cb = ctxt;
244 ldout(cct, 20) << __func__ << " create event end fd=" << fd << " mask=" << mask
245 << " original mask is " << event->mask << dendl;
249 void EventCenter::delete_file_event(int fd, int mask)
251 assert(in_thread() && fd >= 0);
253 ldout(cct, 1) << __func__ << " delete event fd=" << fd << " is equal or greater than nevent=" << nevent
254 << "mask=" << mask << dendl;
257 EventCenter::FileEvent *event = _get_file_event(fd);
258 ldout(cct, 30) << __func__ << " delete event started fd=" << fd << " mask=" << mask
259 << " original mask is " << event->mask << dendl;
263 int r = driver->del_event(fd, event->mask, mask);
265 // see create_file_event
269 if (mask & EVENT_READABLE && event->read_cb) {
270 event->read_cb = nullptr;
272 if (mask & EVENT_WRITABLE && event->write_cb) {
273 event->write_cb = nullptr;
276 event->mask = event->mask & (~mask);
277 ldout(cct, 30) << __func__ << " delete event end fd=" << fd << " mask=" << mask
278 << " original mask is " << event->mask << dendl;
281 uint64_t EventCenter::create_time_event(uint64_t microseconds, EventCallbackRef ctxt)
284 uint64_t id = time_event_next_id++;
286 ldout(cct, 30) << __func__ << " id=" << id << " trigger after " << microseconds << "us"<< dendl;
287 EventCenter::TimeEvent event;
288 clock_type::time_point expire = clock_type::now() + std::chrono::microseconds(microseconds);
290 event.time_cb = ctxt;
291 std::multimap<clock_type::time_point, TimeEvent>::value_type s_val(expire, event);
292 auto it = time_events.insert(std::move(s_val));
298 void EventCenter::delete_time_event(uint64_t id)
301 ldout(cct, 30) << __func__ << " id=" << id << dendl;
302 if (id >= time_event_next_id || id == 0)
305 auto it = event_map.find(id);
306 if (it == event_map.end()) {
307 ldout(cct, 10) << __func__ << " id=" << id << " not found" << dendl;
311 time_events.erase(it->second);
315 void EventCenter::wakeup()
317 // No need to wake up since we never sleep
318 if (!pollers.empty() || !driver->need_wakeup())
321 ldout(cct, 20) << __func__ << dendl;
323 // wake up "event_wait"
324 int n = write(notify_send_fd, &buf, sizeof(buf));
326 if (errno != EAGAIN) {
327 ldout(cct, 1) << __func__ << " write notify pipe failed: " << cpp_strerror(errno) << dendl;
333 int EventCenter::process_time_events()
336 clock_type::time_point now = clock_type::now();
337 ldout(cct, 30) << __func__ << " cur time is " << now << dendl;
339 while (!time_events.empty()) {
340 auto it = time_events.begin();
341 if (now >= it->first) {
342 TimeEvent &e = it->second;
343 EventCallbackRef cb = e.time_cb;
345 time_events.erase(it);
347 ldout(cct, 30) << __func__ << " process time event: id=" << id << dendl;
358 int EventCenter::process_events(int timeout_microseconds, ceph::timespan *working_dur)
362 bool trigger_time = false;
363 auto now = clock_type::now();
365 auto it = time_events.begin();
366 bool blocking = pollers.empty() && !external_num_events.load();
367 // If exists external events or poller, don't block
369 if (it != time_events.end() && now >= it->first)
374 clock_type::time_point shortest;
375 shortest = now + std::chrono::microseconds(timeout_microseconds);
377 if (it != time_events.end() && shortest >= it->first) {
378 ldout(cct, 30) << __func__ << " shortest is " << shortest << " it->first is " << it->first << dendl;
379 shortest = it->first;
381 if (shortest > now) {
382 timeout_microseconds = std::chrono::duration_cast<std::chrono::microseconds>(
383 shortest - now).count();
386 timeout_microseconds = 0;
389 tv.tv_sec = timeout_microseconds / 1000000;
390 tv.tv_usec = timeout_microseconds % 1000000;
393 ldout(cct, 30) << __func__ << " wait second " << tv.tv_sec << " usec " << tv.tv_usec << dendl;
394 vector<FiredFileEvent> fired_events;
395 numevents = driver->event_wait(fired_events, &tv);
396 auto working_start = ceph::mono_clock::now();
397 for (int j = 0; j < numevents; j++) {
401 event = _get_file_event(fired_events[j].fd);
403 /* note the event->mask & mask & ... code: maybe an already processed
404 * event removed an element that fired and we still didn't
405 * processed, so we check if the event is still valid. */
406 if (event->mask & fired_events[j].mask & EVENT_READABLE) {
409 cb->do_request(fired_events[j].fd);
412 if (event->mask & fired_events[j].mask & EVENT_WRITABLE) {
413 if (!rfired || event->read_cb != event->write_cb) {
414 cb = event->write_cb;
415 cb->do_request(fired_events[j].fd);
419 ldout(cct, 30) << __func__ << " event_wq process is " << fired_events[j].fd << " mask is " << fired_events[j].mask << dendl;
423 numevents += process_time_events();
425 if (external_num_events.load()) {
426 external_lock.lock();
427 deque<EventCallbackRef> cur_process;
428 cur_process.swap(external_events);
429 external_num_events.store(0);
430 external_lock.unlock();
431 while (!cur_process.empty()) {
432 EventCallbackRef e = cur_process.front();
433 ldout(cct, 30) << __func__ << " do " << e << dendl;
435 cur_process.pop_front();
440 if (!numevents && !blocking) {
441 for (uint32_t i = 0; i < pollers.size(); i++)
442 numevents += pollers[i]->poll();
446 *working_dur = ceph::mono_clock::now() - working_start;
450 void EventCenter::dispatch_event_external(EventCallbackRef e)
452 external_lock.lock();
453 external_events.push_back(e);
454 bool wake = !external_num_events.load();
455 uint64_t num = ++external_num_events;
456 external_lock.unlock();
457 if (!in_thread() && wake)
460 ldout(cct, 30) << __func__ << " " << e << " pending " << num << dendl;