initial code repo
[stor4nfv.git] / src / ceph / src / msg / async / Event.cc
diff --git a/src/ceph/src/msg/async/Event.cc b/src/ceph/src/msg/async/Event.cc
new file mode 100644 (file)
index 0000000..690a245
--- /dev/null
@@ -0,0 +1,461 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- 
+// vim: ts=8 sw=2 smarttab
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2014 UnitedStack <haomai@unitedstack.com>
+ *
+ * Author: Haomai Wang <haomaiwang@gmail.com>
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software
+ * Foundation.  See file COPYING.
+ *
+ */
+
+#include "common/errno.h"
+#include "Event.h"
+
+#ifdef HAVE_DPDK
+#include "dpdk/EventDPDK.h"
+#endif
+
+#ifdef HAVE_EPOLL
+#include "EventEpoll.h"
+#else
+#ifdef HAVE_KQUEUE
+#include "EventKqueue.h"
+#else
+#include "EventSelect.h"
+#endif
+#endif
+
+#define dout_subsys ceph_subsys_ms
+
+#undef dout_prefix
+#define dout_prefix *_dout << "EventCallback "
+class C_handle_notify : public EventCallback {
+  EventCenter *center;
+  CephContext *cct;
+
+ public:
+  C_handle_notify(EventCenter *c, CephContext *cc): center(c), cct(cc) {}
+  void do_request(int fd_or_id) override {
+    char c[256];
+    int r = 0;
+    do {
+      r = read(fd_or_id, c, sizeof(c));
+      if (r < 0) {
+        if (errno != EAGAIN)
+          ldout(cct, 1) << __func__ << " read notify pipe failed: " << cpp_strerror(errno) << dendl;
+      }
+    } while (r > 0);
+  }
+};
+
+#undef dout_prefix
+#define dout_prefix _event_prefix(_dout)
+
+/**
+ * Construct a Poller.
+ *
+ * \param center
+ *      EventCenter object through which the poller will be invoked (defaults
+ *      to the global #RAMCloud::center object).
+ * \param pollerName
+ *      Human readable name that can be printed out in debugging messages
+ *      about the poller. The name of the superclass is probably sufficient
+ *      for most cases.
+ */
+EventCenter::Poller::Poller(EventCenter* center, const string& name)
+    : owner(center), poller_name(name), slot(owner->pollers.size())
+{
+  owner->pollers.push_back(this);
+}
+
+/**
+ * Destroy a Poller.
+ */
+EventCenter::Poller::~Poller()
+{
+  // Erase this Poller from the vector by overwriting it with the
+  // poller that used to be the last one in the vector.
+  //
+  // Note: this approach is reentrant (it is safe to delete a
+  // poller from a poller callback, which means that the poll
+  // method is in the middle of scanning the list of all pollers;
+  // the worst that will happen is that the poller that got moved
+  // may not be invoked in the current scan).
+  owner->pollers[slot] = owner->pollers.back();
+  owner->pollers[slot]->slot = slot;
+  owner->pollers.pop_back();
+  slot = -1;
+}
+
+ostream& EventCenter::_event_prefix(std::ostream *_dout)
+{
+  return *_dout << "Event(" << this << " nevent=" << nevent
+                << " time_id=" << time_event_next_id << ").";
+}
+
+int EventCenter::init(int n, unsigned i, const std::string &t)
+{
+  // can't init multi times
+  assert(nevent == 0);
+
+  type = t;
+  idx = i;
+
+  if (t == "dpdk") {
+#ifdef HAVE_DPDK
+    driver = new DPDKDriver(cct);
+#endif
+  } else {
+#ifdef HAVE_EPOLL
+  driver = new EpollDriver(cct);
+#else
+#ifdef HAVE_KQUEUE
+  driver = new KqueueDriver(cct);
+#else
+  driver = new SelectDriver(cct);
+#endif
+#endif
+  }
+
+  if (!driver) {
+    lderr(cct) << __func__ << " failed to create event driver " << dendl;
+    return -1;
+  }
+
+  int r = driver->init(this, n);
+  if (r < 0) {
+    lderr(cct) << __func__ << " failed to init event driver." << dendl;
+    return r;
+  }
+
+  file_events.resize(n);
+  nevent = n;
+
+  if (!driver->need_wakeup())
+    return 0;
+
+  int fds[2];
+  if (pipe(fds) < 0) {
+    lderr(cct) << __func__ << " can't create notify pipe" << dendl;
+    return -errno;
+  }
+
+  notify_receive_fd = fds[0];
+  notify_send_fd = fds[1];
+  r = net.set_nonblock(notify_receive_fd);
+  if (r < 0) {
+    return r;
+  }
+  r = net.set_nonblock(notify_send_fd);
+  if (r < 0) {
+    return r;
+  }
+
+  return r;
+}
+
+EventCenter::~EventCenter()
+{
+  {
+    std::lock_guard<std::mutex> l(external_lock);
+    while (!external_events.empty()) {
+      EventCallbackRef e = external_events.front();
+      if (e)
+        e->do_request(0);
+      external_events.pop_front();
+    }
+  }
+  assert(time_events.empty());
+
+  if (notify_receive_fd >= 0)
+    ::close(notify_receive_fd);
+  if (notify_send_fd >= 0)
+    ::close(notify_send_fd);
+
+  delete driver;
+  if (notify_handler)
+    delete notify_handler;
+}
+
+
+void EventCenter::set_owner()
+{
+  owner = pthread_self();
+  ldout(cct, 2) << __func__ << " idx=" << idx << " owner=" << owner << dendl;
+  if (!global_centers) {
+    cct->lookup_or_create_singleton_object<EventCenter::AssociatedCenters>(
+        global_centers, "AsyncMessenger::EventCenter::global_center::"+type);
+    assert(global_centers);
+    global_centers->centers[idx] = this;
+    if (driver->need_wakeup()) {
+      notify_handler = new C_handle_notify(this, cct);
+      int r = create_file_event(notify_receive_fd, EVENT_READABLE, notify_handler);
+      assert(r == 0);
+    }
+  }
+}
+
+int EventCenter::create_file_event(int fd, int mask, EventCallbackRef ctxt)
+{
+  assert(in_thread());
+  int r = 0;
+  if (fd >= nevent) {
+    int new_size = nevent << 2;
+    while (fd > new_size)
+      new_size <<= 2;
+    ldout(cct, 20) << __func__ << " event count exceed " << nevent << ", expand to " << new_size << dendl;
+    r = driver->resize_events(new_size);
+    if (r < 0) {
+      lderr(cct) << __func__ << " event count is exceed." << dendl;
+      return -ERANGE;
+    }
+    file_events.resize(new_size);
+    nevent = new_size;
+  }
+
+  EventCenter::FileEvent *event = _get_file_event(fd);
+  ldout(cct, 20) << __func__ << " create event started fd=" << fd << " mask=" << mask
+                 << " original mask is " << event->mask << dendl;
+  if (event->mask == mask)
+    return 0;
+
+  r = driver->add_event(fd, event->mask, mask);
+  if (r < 0) {
+    // Actually we don't allow any failed error code, caller doesn't prepare to
+    // handle error status. So now we need to assert failure here. In practice,
+    // add_event shouldn't report error, otherwise it must be a innermost bug!
+    assert(0 == "BUG!");
+    return r;
+  }
+
+  event->mask |= mask;
+  if (mask & EVENT_READABLE) {
+    event->read_cb = ctxt;
+  }
+  if (mask & EVENT_WRITABLE) {
+    event->write_cb = ctxt;
+  }
+  ldout(cct, 20) << __func__ << " create event end fd=" << fd << " mask=" << mask
+                 << " original mask is " << event->mask << dendl;
+  return 0;
+}
+
+void EventCenter::delete_file_event(int fd, int mask)
+{
+  assert(in_thread() && fd >= 0);
+  if (fd >= nevent) {
+    ldout(cct, 1) << __func__ << " delete event fd=" << fd << " is equal or greater than nevent=" << nevent
+                  << "mask=" << mask << dendl;
+    return ;
+  }
+  EventCenter::FileEvent *event = _get_file_event(fd);
+  ldout(cct, 30) << __func__ << " delete event started fd=" << fd << " mask=" << mask
+                 << " original mask is " << event->mask << dendl;
+  if (!event->mask)
+    return ;
+
+  int r = driver->del_event(fd, event->mask, mask);
+  if (r < 0) {
+    // see create_file_event
+    assert(0 == "BUG!");
+  }
+
+  if (mask & EVENT_READABLE && event->read_cb) {
+    event->read_cb = nullptr;
+  }
+  if (mask & EVENT_WRITABLE && event->write_cb) {
+    event->write_cb = nullptr;
+  }
+
+  event->mask = event->mask & (~mask);
+  ldout(cct, 30) << __func__ << " delete event end fd=" << fd << " mask=" << mask
+                 << " original mask is " << event->mask << dendl;
+}
+
+uint64_t EventCenter::create_time_event(uint64_t microseconds, EventCallbackRef ctxt)
+{
+  assert(in_thread());
+  uint64_t id = time_event_next_id++;
+
+  ldout(cct, 30) << __func__ << " id=" << id << " trigger after " << microseconds << "us"<< dendl;
+  EventCenter::TimeEvent event;
+  clock_type::time_point expire = clock_type::now() + std::chrono::microseconds(microseconds);
+  event.id = id;
+  event.time_cb = ctxt;
+  std::multimap<clock_type::time_point, TimeEvent>::value_type s_val(expire, event);
+  auto it = time_events.insert(std::move(s_val));
+  event_map[id] = it;
+
+  return id;
+}
+
+void EventCenter::delete_time_event(uint64_t id)
+{
+  assert(in_thread());
+  ldout(cct, 30) << __func__ << " id=" << id << dendl;
+  if (id >= time_event_next_id || id == 0)
+    return ;
+
+  auto it = event_map.find(id);
+  if (it == event_map.end()) {
+    ldout(cct, 10) << __func__ << " id=" << id << " not found" << dendl;
+    return ;
+  }
+
+  time_events.erase(it->second);
+  event_map.erase(it);
+}
+
+void EventCenter::wakeup()
+{
+  // No need to wake up since we never sleep
+  if (!pollers.empty() || !driver->need_wakeup())
+    return ;
+
+  ldout(cct, 20) << __func__ << dendl;
+  char buf = 'c';
+  // wake up "event_wait"
+  int n = write(notify_send_fd, &buf, sizeof(buf));
+  if (n < 0) {
+    if (errno != EAGAIN) {
+      ldout(cct, 1) << __func__ << " write notify pipe failed: " << cpp_strerror(errno) << dendl;
+      ceph_abort();
+    }
+  }
+}
+
+int EventCenter::process_time_events()
+{
+  int processed = 0;
+  clock_type::time_point now = clock_type::now();
+  ldout(cct, 30) << __func__ << " cur time is " << now << dendl;
+
+  while (!time_events.empty()) {
+    auto it = time_events.begin();
+    if (now >= it->first) {
+      TimeEvent &e = it->second;
+      EventCallbackRef cb = e.time_cb;
+      uint64_t id = e.id;
+      time_events.erase(it);
+      event_map.erase(id);
+      ldout(cct, 30) << __func__ << " process time event: id=" << id << dendl;
+      processed++;
+      cb->do_request(id);
+    } else {
+      break;
+    }
+  }
+
+  return processed;
+}
+
+int EventCenter::process_events(int timeout_microseconds,  ceph::timespan *working_dur)
+{
+  struct timeval tv;
+  int numevents;
+  bool trigger_time = false;
+  auto now = clock_type::now();
+
+  auto it = time_events.begin();
+  bool blocking = pollers.empty() && !external_num_events.load();
+  // If exists external events or poller, don't block
+  if (!blocking) {
+    if (it != time_events.end() && now >= it->first)
+      trigger_time = true;
+    tv.tv_sec = 0;
+    tv.tv_usec = 0;
+  } else {
+    clock_type::time_point shortest;
+    shortest = now + std::chrono::microseconds(timeout_microseconds); 
+
+    if (it != time_events.end() && shortest >= it->first) {
+      ldout(cct, 30) << __func__ << " shortest is " << shortest << " it->first is " << it->first << dendl;
+      shortest = it->first;
+      trigger_time = true;
+      if (shortest > now) {
+        timeout_microseconds = std::chrono::duration_cast<std::chrono::microseconds>(
+            shortest - now).count();
+      } else {
+        shortest = now;
+        timeout_microseconds = 0;
+      }
+    }
+    tv.tv_sec = timeout_microseconds / 1000000;
+    tv.tv_usec = timeout_microseconds % 1000000;
+  }
+
+  ldout(cct, 30) << __func__ << " wait second " << tv.tv_sec << " usec " << tv.tv_usec << dendl;
+  vector<FiredFileEvent> fired_events;
+  numevents = driver->event_wait(fired_events, &tv);
+  auto working_start = ceph::mono_clock::now();
+  for (int j = 0; j < numevents; j++) {
+    int rfired = 0;
+    FileEvent *event;
+    EventCallbackRef cb;
+    event = _get_file_event(fired_events[j].fd);
+
+    /* note the event->mask & mask & ... code: maybe an already processed
+    * event removed an element that fired and we still didn't
+    * processed, so we check if the event is still valid. */
+    if (event->mask & fired_events[j].mask & EVENT_READABLE) {
+      rfired = 1;
+      cb = event->read_cb;
+      cb->do_request(fired_events[j].fd);
+    }
+
+    if (event->mask & fired_events[j].mask & EVENT_WRITABLE) {
+      if (!rfired || event->read_cb != event->write_cb) {
+        cb = event->write_cb;
+        cb->do_request(fired_events[j].fd);
+      }
+    }
+
+    ldout(cct, 30) << __func__ << " event_wq process is " << fired_events[j].fd << " mask is " << fired_events[j].mask << dendl;
+  }
+
+  if (trigger_time)
+    numevents += process_time_events();
+
+  if (external_num_events.load()) {
+    external_lock.lock();
+    deque<EventCallbackRef> cur_process;
+    cur_process.swap(external_events);
+    external_num_events.store(0);
+    external_lock.unlock();
+    while (!cur_process.empty()) {
+      EventCallbackRef e = cur_process.front();
+      ldout(cct, 30) << __func__ << " do " << e << dendl;
+      e->do_request(0);
+      cur_process.pop_front();
+      numevents++;
+    }
+  }
+
+  if (!numevents && !blocking) {
+    for (uint32_t i = 0; i < pollers.size(); i++)
+      numevents += pollers[i]->poll();
+  }
+
+  if (working_dur)
+    *working_dur = ceph::mono_clock::now() - working_start;
+  return numevents;
+}
+
+void EventCenter::dispatch_event_external(EventCallbackRef e)
+{
+  external_lock.lock();
+  external_events.push_back(e);
+  bool wake = !external_num_events.load();
+  uint64_t num = ++external_num_events;
+  external_lock.unlock();
+  if (!in_thread() && wake)
+    wakeup();
+
+  ldout(cct, 30) << __func__ << " " << e << " pending " << num << dendl;
+}