X-Git-Url: https://gerrit.opnfv.org/gerrit/gitweb?a=blobdiff_plain;f=src%2Fceph%2Fsrc%2Fmsg%2Fasync%2FEventKqueue.cc;fp=src%2Fceph%2Fsrc%2Fmsg%2Fasync%2FEventKqueue.cc;h=fd78718848227ee11c8fc5144640671632fe7a3a;hb=812ff6ca9fcd3e629e49d4328905f33eee8ca3f5;hp=0000000000000000000000000000000000000000;hpb=15280273faafb77777eab341909a3f495cf248d9;p=stor4nfv.git diff --git a/src/ceph/src/msg/async/EventKqueue.cc b/src/ceph/src/msg/async/EventKqueue.cc new file mode 100644 index 0000000..fd78718 --- /dev/null +++ b/src/ceph/src/msg/async/EventKqueue.cc @@ -0,0 +1,267 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab +/* + * Ceph - scalable distributed file system + * + * Copyright (C) 2014 UnitedStack + * + * Author: Haomai Wang + * + * This is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License version 2.1, as published by the Free Software + * Foundation. See file COPYING. + * + */ + +#include "common/errno.h" +#include "EventKqueue.h" + +#define dout_subsys ceph_subsys_ms + +#undef dout_prefix +#define dout_prefix *_dout << "KqueueDriver." + +#define KEVENT_NOWAIT 0 + +int KqueueDriver::test_kqfd() { + struct kevent ke[1]; + if (kevent(kqfd, ke, 0, NULL, 0, KEVENT_NOWAIT) == -1) { + ldout(cct,0) << __func__ << " invalid kqfd = " << kqfd + << cpp_strerror(errno) << dendl; + return -errno; + } + return kqfd; +} + +int KqueueDriver::restore_events() { + struct kevent ke[2]; + int i; + + ldout(cct,30) << __func__ << " on kqfd = " << kqfd << dendl; + for(i=0;i= sav_max) + resize_events(sav_max+5000); + sav_events[fd].mask = cur_mask | add_mask; + return 0; +} + +int KqueueDriver::del_event(int fd, int cur_mask, int del_mask) +{ + struct kevent ke[2]; + int num = 0; + int mask = cur_mask & del_mask; + + ldout(cct,30) << __func__ << " delete event kqfd = " << kqfd + << " fd = " << fd << " cur_mask = " << cur_mask + << " del_mask = " << del_mask << dendl; + + int r = test_thread_change(__func__); + if ( r < 0 ) + return r; + + if (mask & EVENT_READABLE) + EV_SET(&ke[num++], fd, EVFILT_READ, EV_DELETE, 0, 0, NULL); + if (mask & EVENT_WRITABLE) + EV_SET(&ke[num++], fd, EVFILT_WRITE, EV_DELETE, 0, 0, NULL); + + if (num) { + int r = 0; + if ((r = kevent(kqfd, ke, num, NULL, 0, KEVENT_NOWAIT)) < 0) { + lderr(cct) << __func__ << " kevent: delete fd=" << fd << " mask=" << mask + << " failed." << cpp_strerror(errno) << dendl; + return -errno; + } + } + // keep the administration + sav_events[fd].mask = cur_mask & ~del_mask; + return 0; +} + +int KqueueDriver::resize_events(int newsize) +{ + ldout(cct,30) << __func__ << " kqfd = " << kqfd << "newsize = " << newsize + << dendl; + if(newsize > sav_max) { + sav_events = (struct SaveEvent*)realloc( sav_events, + sizeof(struct SaveEvent)*newsize); + if (!sav_events) { + lderr(cct) << __func__ << " unable to realloc memory: " + << cpp_strerror(errno) << dendl; + return -ENOMEM; + } + memset(&sav_events[size], 0, sizeof(struct SaveEvent)*(newsize-sav_max)); + sav_max = newsize; + } + return 0; +} + +int KqueueDriver::event_wait(vector &fired_events, struct timeval *tvp) +{ + int retval, numevents = 0; + struct timespec timeout; + + ldout(cct,10) << __func__ << " kqfd = " << kqfd << dendl; + + int r = test_thread_change(__func__); + if ( r < 0 ) + return r; + + if (tvp != NULL) { + timeout.tv_sec = tvp->tv_sec; + timeout.tv_nsec = tvp->tv_usec * 1000; + ldout(cct,20) << __func__ << " " + << timeout.tv_sec << " sec " + << timeout.tv_nsec << " nsec" + << dendl; + retval = kevent(kqfd, NULL, 0, res_events, size, &timeout); + } else { + ldout(cct,30) << __func__ << " event_wait: " << " NULL" << dendl; + retval = kevent(kqfd, NULL, 0, res_events, size, KEVENT_NOWAIT); + } + + ldout(cct,25) << __func__ << " kevent retval: " << retval << dendl; + if (retval < 0) { + lderr(cct) << __func__ << " kqueue error: " + << cpp_strerror(errno) << dendl; + return -errno; + } else if (retval == 0) { + ldout(cct,5) << __func__ << " Hit timeout(" + << timeout.tv_sec << " sec " + << timeout.tv_nsec << " nsec" + << ")." << dendl; + } else { + int j; + + numevents = retval; + fired_events.resize(numevents); + for (j = 0; j < numevents; j++) { + int mask = 0; + struct kevent *e = res_events + j; + + if (e->filter == EVFILT_READ) mask |= EVENT_READABLE; + if (e->filter == EVFILT_WRITE) mask |= EVENT_WRITABLE; + if (e->flags & EV_ERROR) mask |= EVENT_READABLE|EVENT_WRITABLE; + fired_events[j].fd = (int)e->ident; + fired_events[j].mask = mask; + + } + } + return numevents; +}