Fix some bugs when testing opensds ansible
[stor4nfv.git] / src / ceph / src / msg / async / EventKqueue.cc
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- 
2 // vim: ts=8 sw=2 smarttab
3 /*
4  * Ceph - scalable distributed file system
5  *
6  * Copyright (C) 2014 UnitedStack <haomai@unitedstack.com>
7  *
8  * Author: Haomai Wang <haomaiwang@gmail.com>
9  *
10  * This is free software; you can redistribute it and/or
11  * modify it under the terms of the GNU Lesser General Public
12  * License version 2.1, as published by the Free Software
13  * Foundation.  See file COPYING.
14  *
15  */
16
17 #include "common/errno.h"
18 #include "EventKqueue.h"
19
20 #define dout_subsys ceph_subsys_ms
21
22 #undef dout_prefix
23 #define dout_prefix *_dout << "KqueueDriver."
24
25 #define KEVENT_NOWAIT 0
26
27 int KqueueDriver::test_kqfd() {
28   struct kevent ke[1];
29   if (kevent(kqfd, ke, 0, NULL, 0, KEVENT_NOWAIT) == -1) {
30     ldout(cct,0) << __func__ << " invalid kqfd = " << kqfd 
31                  << cpp_strerror(errno) << dendl;
32     return -errno;
33   }
34   return kqfd;
35 }
36
37 int KqueueDriver::restore_events() {
38   struct kevent ke[2];
39   int i;
40
41   ldout(cct,30) << __func__ << " on kqfd = " << kqfd << dendl;
42   for(i=0;i<size;i++) {
43     int num = 0;
44     if (sav_events[i].mask == 0 )
45       continue;
46     ldout(cct,30) << __func__ << " restore kqfd = " << kqfd 
47                   << " fd = " << i << " mask " << sav_events[i].mask << dendl;
48     if (sav_events[i].mask & EVENT_READABLE)
49       EV_SET(&ke[num++], i, EVFILT_READ, EV_ADD, 0, 0, NULL);
50     if (sav_events[i].mask & EVENT_WRITABLE)
51       EV_SET(&ke[num++], i, EVFILT_WRITE, EV_ADD, 0, 0, NULL);
52     if (num) {
53       if (kevent(kqfd, ke, num, NULL, 0, KEVENT_NOWAIT) == -1) {
54         ldout(cct,0) << __func__ << " unable to add event: "
55                      << cpp_strerror(errno) << dendl;
56         return -errno;
57       }
58     }
59   }
60   return 0;
61 }
62
63 int KqueueDriver::test_thread_change(const char* funcname) {
64   // check to see if we changed thread, because that invalidates
65   // the kqfd and we need to restore that
66   int oldkqfd = kqfd;
67
68   if (!pthread_equal(mythread, pthread_self())) {
69     ldout(cct,20) << funcname << " We changed thread from " << mythread
70                   << " to " << pthread_self() << dendl;
71     mythread = pthread_self();
72     kqfd = -1;
73   } else if ((kqfd != -1) && (test_kqfd() < 0)) {
74     // should this ever happen?
75     // It would be strange to change kqfd with thread change.
76     // Might nee to change this into an assert() in the future.
77     ldout(cct,0) << funcname << " Warning: Recreating old kqfd. "
78                  << "This should not happen!!!"  << dendl;
79     kqfd = -1;
80   }
81   if (kqfd == -1) {
82     kqfd = kqueue();
83     ldout(cct,30) << funcname << " kqueue: new kqfd = " << kqfd
84                   << " (was: " << oldkqfd << ")"
85                   << dendl;
86     if (kqfd < 0) {
87       lderr(cct) << funcname << " unable to do kqueue: "
88                              << cpp_strerror(errno) << dendl;
89       return -errno;
90     }
91     if (restore_events()< 0) {
92       lderr(cct) << funcname << " unable restore all events "
93                              << cpp_strerror(errno) << dendl;
94       return -errno;
95     }
96   }
97   return 0;
98 }
99
100 int KqueueDriver::init(EventCenter *c, int nevent)
101 {
102   // keep track of possible changes of our thread
103   // because change of thread kills the kqfd
104   mythread = pthread_self();
105
106   // Reserve the space to accept the kevent return events.
107   res_events = (struct kevent*)malloc(sizeof(struct kevent)*nevent);
108   if (!res_events) {
109     lderr(cct) << __func__ << " unable to malloc memory: "
110                            << cpp_strerror(errno) << dendl;
111     return -ENOMEM;
112   }
113   memset(res_events, 0, sizeof(struct kevent)*nevent);
114   size = nevent;
115
116   // Reserve the space to keep all of the events set, so it can be redone
117   // when we change trhread ID. 
118   sav_events = (struct SaveEvent*)malloc(sizeof(struct SaveEvent)*nevent);
119   if (!sav_events) {
120     lderr(cct) << __func__ << " unable to malloc memory: "
121                            << cpp_strerror(errno) << dendl;
122     return -ENOMEM;
123   }
124   memset(sav_events, 0, sizeof(struct SaveEvent)*nevent);
125   sav_max = nevent;
126
127   // Delay assigning a descriptor until it is really needed.
128   // kqfd = kqueue();
129   kqfd = -1;
130   return 0;
131 }
132
133 int KqueueDriver::add_event(int fd, int cur_mask, int add_mask)
134 {
135   struct kevent ke[2];
136   int num = 0;
137
138   ldout(cct,30) << __func__ << " add event kqfd = " << kqfd << " fd = " << fd 
139         << " cur_mask = " << cur_mask << " add_mask = " << add_mask 
140         << dendl;
141
142   int r = test_thread_change(__func__);
143   if ( r < 0 )
144     return r;
145
146   if (add_mask & EVENT_READABLE)
147     EV_SET(&ke[num++], fd, EVFILT_READ, EV_ADD|EV_CLEAR, 0, 0, NULL);
148   if (add_mask & EVENT_WRITABLE)
149     EV_SET(&ke[num++], fd, EVFILT_WRITE, EV_ADD|EV_CLEAR, 0, 0, NULL);
150
151   if (num) {
152     if (kevent(kqfd, ke, num, NULL, 0, KEVENT_NOWAIT) == -1) {
153       lderr(cct) << __func__ << " unable to add event: "
154                              << cpp_strerror(errno) << dendl;
155       return -errno;
156     }
157   }
158   // keep what we set
159   if (fd >= sav_max)
160     resize_events(sav_max+5000);
161   sav_events[fd].mask = cur_mask | add_mask;
162   return 0;
163 }
164
165 int KqueueDriver::del_event(int fd, int cur_mask, int del_mask)
166 {
167   struct kevent ke[2];
168   int num = 0;
169   int mask = cur_mask & del_mask;
170
171   ldout(cct,30) << __func__ << " delete event kqfd = " << kqfd 
172         << " fd = " << fd << " cur_mask = " << cur_mask 
173         << " del_mask = " << del_mask << dendl;
174
175   int r = test_thread_change(__func__);
176   if ( r < 0 )
177     return r;
178
179   if (mask & EVENT_READABLE)
180     EV_SET(&ke[num++], fd, EVFILT_READ, EV_DELETE, 0, 0, NULL);
181   if (mask & EVENT_WRITABLE)
182     EV_SET(&ke[num++], fd, EVFILT_WRITE, EV_DELETE, 0, 0, NULL);
183
184   if (num) {
185     int r = 0;
186     if ((r = kevent(kqfd, ke, num, NULL, 0, KEVENT_NOWAIT)) < 0) {
187       lderr(cct) << __func__ << " kevent: delete fd=" << fd << " mask=" << mask
188                  << " failed." << cpp_strerror(errno) << dendl;
189       return -errno;
190     }
191   }
192   // keep the administration
193   sav_events[fd].mask = cur_mask & ~del_mask;
194   return 0;
195 }
196
197 int KqueueDriver::resize_events(int newsize)
198 {
199   ldout(cct,30) << __func__ << " kqfd = " << kqfd << "newsize = " << newsize 
200                 << dendl;
201   if(newsize > sav_max) {
202     sav_events = (struct SaveEvent*)realloc( sav_events, 
203                     sizeof(struct SaveEvent)*newsize);
204     if (!sav_events) {
205       lderr(cct) << __func__ << " unable to realloc memory: "
206                              << cpp_strerror(errno) << dendl;
207       return -ENOMEM;
208     }
209     memset(&sav_events[size], 0, sizeof(struct SaveEvent)*(newsize-sav_max));
210     sav_max = newsize;
211   }
212   return 0;
213 }
214
215 int KqueueDriver::event_wait(vector<FiredFileEvent> &fired_events, struct timeval *tvp)
216 {
217   int retval, numevents = 0;
218   struct timespec timeout;
219
220   ldout(cct,10) << __func__ << " kqfd = " << kqfd << dendl;
221
222   int r = test_thread_change(__func__);
223   if ( r < 0 )
224     return r;
225
226   if (tvp != NULL) {
227       timeout.tv_sec = tvp->tv_sec;
228       timeout.tv_nsec = tvp->tv_usec * 1000;
229       ldout(cct,20) << __func__ << " "
230                 << timeout.tv_sec << " sec "
231                 << timeout.tv_nsec << " nsec"
232                 << dendl;
233       retval = kevent(kqfd, NULL, 0, res_events, size, &timeout);
234   } else {
235       ldout(cct,30) << __func__ << " event_wait: " << " NULL" << dendl;
236       retval = kevent(kqfd, NULL, 0, res_events, size, KEVENT_NOWAIT);
237   }
238
239   ldout(cct,25) << __func__ << " kevent retval: " << retval << dendl;
240   if (retval < 0) {
241     lderr(cct) << __func__ << " kqueue error: "
242                            << cpp_strerror(errno) << dendl;
243     return -errno;
244   } else if (retval == 0) {
245     ldout(cct,5) << __func__ << " Hit timeout("
246                  << timeout.tv_sec << " sec "
247                  << timeout.tv_nsec << " nsec"
248                  << ")." << dendl;
249   } else {
250     int j;
251
252     numevents = retval;
253     fired_events.resize(numevents);
254     for (j = 0; j < numevents; j++) {
255       int mask = 0;
256       struct kevent *e = res_events + j;
257
258       if (e->filter == EVFILT_READ) mask |= EVENT_READABLE;
259       if (e->filter == EVFILT_WRITE) mask |= EVENT_WRITABLE;
260       if (e->flags & EV_ERROR) mask |= EVENT_READABLE|EVENT_WRITABLE;
261       fired_events[j].fd = (int)e->ident;
262       fired_events[j].mask = mask;
263
264     }
265   }
266   return numevents;
267 }