1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
4 * Ceph - scalable distributed file system
6 * Copyright (C) 2014 UnitedStack <haomai@unitedstack.com>
8 * Author: Haomai Wang <haomaiwang@gmail.com>
10 * This is free software; you can redistribute it and/or
11 * modify it under the terms of the GNU Lesser General Public
12 * License version 2.1, as published by the Free Software
13 * Foundation. See file COPYING.
17 #include "common/errno.h"
18 #include "EventKqueue.h"
20 #define dout_subsys ceph_subsys_ms
23 #define dout_prefix *_dout << "KqueueDriver."
25 #define KEVENT_NOWAIT 0
27 int KqueueDriver::test_kqfd() {
29 if (kevent(kqfd, ke, 0, NULL, 0, KEVENT_NOWAIT) == -1) {
30 ldout(cct,0) << __func__ << " invalid kqfd = " << kqfd
31 << cpp_strerror(errno) << dendl;
37 int KqueueDriver::restore_events() {
41 ldout(cct,30) << __func__ << " on kqfd = " << kqfd << dendl;
44 if (sav_events[i].mask == 0 )
46 ldout(cct,30) << __func__ << " restore kqfd = " << kqfd
47 << " fd = " << i << " mask " << sav_events[i].mask << dendl;
48 if (sav_events[i].mask & EVENT_READABLE)
49 EV_SET(&ke[num++], i, EVFILT_READ, EV_ADD, 0, 0, NULL);
50 if (sav_events[i].mask & EVENT_WRITABLE)
51 EV_SET(&ke[num++], i, EVFILT_WRITE, EV_ADD, 0, 0, NULL);
53 if (kevent(kqfd, ke, num, NULL, 0, KEVENT_NOWAIT) == -1) {
54 ldout(cct,0) << __func__ << " unable to add event: "
55 << cpp_strerror(errno) << dendl;
63 int KqueueDriver::test_thread_change(const char* funcname) {
64 // check to see if we changed thread, because that invalidates
65 // the kqfd and we need to restore that
68 if (!pthread_equal(mythread, pthread_self())) {
69 ldout(cct,20) << funcname << " We changed thread from " << mythread
70 << " to " << pthread_self() << dendl;
71 mythread = pthread_self();
73 } else if ((kqfd != -1) && (test_kqfd() < 0)) {
74 // should this ever happen?
75 // It would be strange to change kqfd with thread change.
76 // Might nee to change this into an assert() in the future.
77 ldout(cct,0) << funcname << " Warning: Recreating old kqfd. "
78 << "This should not happen!!!" << dendl;
83 ldout(cct,30) << funcname << " kqueue: new kqfd = " << kqfd
84 << " (was: " << oldkqfd << ")"
87 lderr(cct) << funcname << " unable to do kqueue: "
88 << cpp_strerror(errno) << dendl;
91 if (restore_events()< 0) {
92 lderr(cct) << funcname << " unable restore all events "
93 << cpp_strerror(errno) << dendl;
100 int KqueueDriver::init(EventCenter *c, int nevent)
102 // keep track of possible changes of our thread
103 // because change of thread kills the kqfd
104 mythread = pthread_self();
106 // Reserve the space to accept the kevent return events.
107 res_events = (struct kevent*)malloc(sizeof(struct kevent)*nevent);
109 lderr(cct) << __func__ << " unable to malloc memory: "
110 << cpp_strerror(errno) << dendl;
113 memset(res_events, 0, sizeof(struct kevent)*nevent);
116 // Reserve the space to keep all of the events set, so it can be redone
117 // when we change trhread ID.
118 sav_events = (struct SaveEvent*)malloc(sizeof(struct SaveEvent)*nevent);
120 lderr(cct) << __func__ << " unable to malloc memory: "
121 << cpp_strerror(errno) << dendl;
124 memset(sav_events, 0, sizeof(struct SaveEvent)*nevent);
127 // Delay assigning a descriptor until it is really needed.
133 int KqueueDriver::add_event(int fd, int cur_mask, int add_mask)
138 ldout(cct,30) << __func__ << " add event kqfd = " << kqfd << " fd = " << fd
139 << " cur_mask = " << cur_mask << " add_mask = " << add_mask
142 int r = test_thread_change(__func__);
146 if (add_mask & EVENT_READABLE)
147 EV_SET(&ke[num++], fd, EVFILT_READ, EV_ADD|EV_CLEAR, 0, 0, NULL);
148 if (add_mask & EVENT_WRITABLE)
149 EV_SET(&ke[num++], fd, EVFILT_WRITE, EV_ADD|EV_CLEAR, 0, 0, NULL);
152 if (kevent(kqfd, ke, num, NULL, 0, KEVENT_NOWAIT) == -1) {
153 lderr(cct) << __func__ << " unable to add event: "
154 << cpp_strerror(errno) << dendl;
160 resize_events(sav_max+5000);
161 sav_events[fd].mask = cur_mask | add_mask;
165 int KqueueDriver::del_event(int fd, int cur_mask, int del_mask)
169 int mask = cur_mask & del_mask;
171 ldout(cct,30) << __func__ << " delete event kqfd = " << kqfd
172 << " fd = " << fd << " cur_mask = " << cur_mask
173 << " del_mask = " << del_mask << dendl;
175 int r = test_thread_change(__func__);
179 if (mask & EVENT_READABLE)
180 EV_SET(&ke[num++], fd, EVFILT_READ, EV_DELETE, 0, 0, NULL);
181 if (mask & EVENT_WRITABLE)
182 EV_SET(&ke[num++], fd, EVFILT_WRITE, EV_DELETE, 0, 0, NULL);
186 if ((r = kevent(kqfd, ke, num, NULL, 0, KEVENT_NOWAIT)) < 0) {
187 lderr(cct) << __func__ << " kevent: delete fd=" << fd << " mask=" << mask
188 << " failed." << cpp_strerror(errno) << dendl;
192 // keep the administration
193 sav_events[fd].mask = cur_mask & ~del_mask;
197 int KqueueDriver::resize_events(int newsize)
199 ldout(cct,30) << __func__ << " kqfd = " << kqfd << "newsize = " << newsize
201 if(newsize > sav_max) {
202 sav_events = (struct SaveEvent*)realloc( sav_events,
203 sizeof(struct SaveEvent)*newsize);
205 lderr(cct) << __func__ << " unable to realloc memory: "
206 << cpp_strerror(errno) << dendl;
209 memset(&sav_events[size], 0, sizeof(struct SaveEvent)*(newsize-sav_max));
215 int KqueueDriver::event_wait(vector<FiredFileEvent> &fired_events, struct timeval *tvp)
217 int retval, numevents = 0;
218 struct timespec timeout;
220 ldout(cct,10) << __func__ << " kqfd = " << kqfd << dendl;
222 int r = test_thread_change(__func__);
227 timeout.tv_sec = tvp->tv_sec;
228 timeout.tv_nsec = tvp->tv_usec * 1000;
229 ldout(cct,20) << __func__ << " "
230 << timeout.tv_sec << " sec "
231 << timeout.tv_nsec << " nsec"
233 retval = kevent(kqfd, NULL, 0, res_events, size, &timeout);
235 ldout(cct,30) << __func__ << " event_wait: " << " NULL" << dendl;
236 retval = kevent(kqfd, NULL, 0, res_events, size, KEVENT_NOWAIT);
239 ldout(cct,25) << __func__ << " kevent retval: " << retval << dendl;
241 lderr(cct) << __func__ << " kqueue error: "
242 << cpp_strerror(errno) << dendl;
244 } else if (retval == 0) {
245 ldout(cct,5) << __func__ << " Hit timeout("
246 << timeout.tv_sec << " sec "
247 << timeout.tv_nsec << " nsec"
253 fired_events.resize(numevents);
254 for (j = 0; j < numevents; j++) {
256 struct kevent *e = res_events + j;
258 if (e->filter == EVFILT_READ) mask |= EVENT_READABLE;
259 if (e->filter == EVFILT_WRITE) mask |= EVENT_WRITABLE;
260 if (e->flags & EV_ERROR) mask |= EVENT_READABLE|EVENT_WRITABLE;
261 fired_events[j].fd = (int)e->ident;
262 fired_events[j].mask = mask;