Fix some bugs when testing opensds ansible
[stor4nfv.git] / src / ceph / src / msg / xio / XioPortal.h
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
3 /*
4  * Ceph - scalable distributed file system
5  *
6  * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
7  * Portions Copyright (C) 2013 CohortFS, LLC
8  *s
9  * This is free software; you can redistribute it and/or
10  * modify it under the terms of the GNU Lesser General Public
11  * License version 2.1, as published by the Free Software
12  * Foundation.  See file COPYING.
13  *
14  */
15
16 #ifndef XIO_PORTAL_H
17 #define XIO_PORTAL_H
18
19 #include <string>
20
21 extern "C" {
22 #include "libxio.h"
23 }
24 #include "XioInSeq.h"
25 #include <boost/lexical_cast.hpp>
26 #include "msg/SimplePolicyMessenger.h"
27 #include "XioConnection.h"
28 #include "XioMsg.h"
29
30 #include "include/assert.h"
31 #include "common/dout.h"
32
33 #ifndef CACHE_LINE_SIZE
34 #define CACHE_LINE_SIZE 64 /* XXX arch-specific define */
35 #endif
36 #define CACHE_PAD(_n) char __pad ## _n [CACHE_LINE_SIZE]
37
38 class XioPortal : public Thread
39 {
40 private:
41
42   struct SubmitQueue
43   {
44     const static int nlanes = 7;
45
46     struct Lane
47     {
48       uint32_t size;
49       XioSubmit::Queue q;
50       pthread_spinlock_t sp;
51       CACHE_PAD(0);
52     };
53
54     Lane qlane[nlanes];
55
56     int ix; /* atomicity by portal thread */
57
58     SubmitQueue() : ix(0)
59       {
60         int ix;
61         Lane* lane;
62
63         for (ix = 0; ix < nlanes; ++ix) {
64           lane = &qlane[ix];
65           pthread_spin_init(&lane->sp, PTHREAD_PROCESS_PRIVATE);
66           lane->size = 0;
67         }
68       }
69
70     inline Lane* get_lane(XioConnection *xcon)
71       {
72         return &qlane[(((uint64_t) xcon) / 16) % nlanes];
73       }
74
75     void enq(XioConnection *xcon, XioSubmit* xs)
76       {
77         Lane* lane = get_lane(xcon);
78         pthread_spin_lock(&lane->sp);
79         lane->q.push_back(*xs);
80         ++(lane->size);
81         pthread_spin_unlock(&lane->sp);
82       }
83
84     void enq(XioConnection *xcon, XioSubmit::Queue& requeue_q)
85       {
86         int size = requeue_q.size();
87         Lane* lane = get_lane(xcon);
88         pthread_spin_lock(&lane->sp);
89         XioSubmit::Queue::const_iterator i1 = lane->q.end();
90         lane->q.splice(i1, requeue_q);
91         lane->size += size;
92         pthread_spin_unlock(&lane->sp);
93       }
94
95     void deq(XioSubmit::Queue& send_q)
96       {
97         Lane* lane;
98         int cnt;
99         for (cnt = 0; cnt < nlanes; ++cnt, ++ix, ix = ix % nlanes) {
100           lane = &qlane[ix];
101           pthread_spin_lock(&lane->sp);
102           if (lane->size > 0) {
103             XioSubmit::Queue::const_iterator i1 = send_q.end();
104             send_q.splice(i1, lane->q);
105             lane->size = 0;
106             ++ix, ix = ix % nlanes;
107             pthread_spin_unlock(&lane->sp);
108             break;
109           }
110           pthread_spin_unlock(&lane->sp);
111         }
112       }
113
114   }; /* SubmitQueue */
115
116   Messenger *msgr;
117   struct xio_context *ctx;
118   struct xio_server *server;
119   SubmitQueue submit_q;
120   pthread_spinlock_t sp;
121   void *ev_loop;
122   string xio_uri;
123   char *portal_id;
124   bool _shutdown;
125   bool drained;
126   uint32_t magic;
127   uint32_t special_handling;
128
129   friend class XioPortals;
130   friend class XioMessenger;
131
132 public:
133   explicit XioPortal(Messenger *_msgr, int max_conns) :
134     msgr(_msgr), ctx(NULL), server(NULL), submit_q(), xio_uri(""),
135     portal_id(NULL), _shutdown(false), drained(false),
136     magic(0),
137     special_handling(0)
138   {
139     pthread_spin_init(&sp, PTHREAD_PROCESS_PRIVATE);
140
141     struct xio_context_params ctx_params;
142     memset(&ctx_params, 0, sizeof(ctx_params));
143     ctx_params.user_context = this;
144     /*
145      * hint to Accelio the total number of connections that will share
146      * this context's resources: internal primary task pool...
147      */
148     ctx_params.max_conns_per_ctx = max_conns;
149
150     /* a portal is an xio_context and event loop */
151     ctx = xio_context_create(&ctx_params, 0 /* poll timeout */, -1 /* cpu hint */);
152     assert(ctx && "Whoops, failed to create portal/ctx");
153   }
154
155   int bind(struct xio_session_ops *ops, const string &base_uri,
156            uint16_t port, uint16_t *assigned_port);
157
158   inline void release_xio_msg(XioCompletion* xcmp) {
159     struct xio_msg *msg = xcmp->dequeue();
160     struct xio_msg *next_msg = NULL;
161     int code;
162     if (unlikely(!xcmp->xcon->conn)) {
163       // NOTE: msg is not safe to dereference if the connection was torn down
164       xcmp->xcon->msg_release_fail(msg, ENOTCONN);
165     }
166     else while (msg) {
167       next_msg = static_cast<struct xio_msg *>(msg->user_context);
168       code = xio_release_msg(msg);
169       if (unlikely(code)) /* very unlikely, so log it */
170         xcmp->xcon->msg_release_fail(msg, code);
171       msg = next_msg;
172     }
173     xcmp->trace.event("xio_release_msg");
174     xcmp->finalize(); /* unconditional finalize */
175   }
176
177   void enqueue(XioConnection *xcon, XioSubmit *xs)
178     {
179       if (! _shutdown) {
180         submit_q.enq(xcon, xs);
181         xio_context_stop_loop(ctx);
182         return;
183       }
184
185       /* dispose xs */
186       switch(xs->type) {
187       case XioSubmit::OUTGOING_MSG: /* it was an outgoing 1-way */
188       {
189         XioSend* xsend = static_cast<XioSend*>(xs);
190         xs->xcon->msg_send_fail(xsend, -EINVAL);
191       }
192         break;
193       default:
194         /* INCOMING_MSG_RELEASE */
195         release_xio_msg(static_cast<XioCompletion*>(xs));
196       break;
197       };
198     }
199
200   void requeue(XioConnection* xcon, XioSubmit::Queue& send_q) {
201     submit_q.enq(xcon, send_q);
202   }
203
204   void requeue_all_xcon(XioConnection* xcon,
205                         XioSubmit::Queue::iterator& q_iter,
206                         XioSubmit::Queue& send_q) {
207     // XXX gather all already-dequeued outgoing messages for xcon
208     // and push them in FIFO order to front of the input queue,
209     // and mark the connection as flow-controlled
210     XioSubmit::Queue requeue_q;
211
212     while (q_iter != send_q.end()) {
213       XioSubmit *xs = &(*q_iter);
214       // skip retires and anything for other connections
215       if (xs->xcon != xcon) {
216         q_iter++;
217         continue;
218       }
219       q_iter = send_q.erase(q_iter);
220       requeue_q.push_back(*xs);
221     }
222     pthread_spin_lock(&xcon->sp);
223     XioSubmit::Queue::const_iterator i1 = xcon->outgoing.requeue.begin();
224     xcon->outgoing.requeue.splice(i1, requeue_q);
225     xcon->cstate.state_flow_controlled(XioConnection::CState::OP_FLAG_LOCKED);
226     pthread_spin_unlock(&xcon->sp);
227   }
228
229   void *entry()
230     {
231       int size, code = 0;
232       uint32_t xio_qdepth_high;
233       XioSubmit::Queue send_q;
234       XioSubmit::Queue::iterator q_iter;
235       struct xio_msg *msg = NULL;
236       XioConnection *xcon;
237       XioSubmit *xs;
238       XioSend *xsend;
239
240       do {
241         submit_q.deq(send_q);
242
243         /* shutdown() barrier */
244         pthread_spin_lock(&sp);
245
246       restart:
247         size = send_q.size();
248
249         if (_shutdown) {
250           // XXX XioSend queues for flow-controlled connections may require
251           // cleanup
252           drained = true;
253         }
254
255         if (size > 0) {
256           q_iter = send_q.begin();
257           while (q_iter != send_q.end()) {
258             xs = &(*q_iter);
259             xcon = xs->xcon;
260
261             switch (xs->type) {
262             case XioSubmit::OUTGOING_MSG: /* it was an outgoing 1-way */
263               xsend = static_cast<XioSend*>(xs);
264               if (unlikely(!xcon->conn || !xcon->is_connected()))
265                 code = ENOTCONN;
266               else {
267                 /* XXX guard Accelio send queue (should be safe to rely
268                  * on Accelio's check on below, but this assures that
269                  * all chained xio_msg are accounted) */
270                 xio_qdepth_high = xcon->xio_qdepth_high_mark();
271                 if (unlikely((xcon->send_ctr + xsend->get_msg_count()) >
272                              xio_qdepth_high)) {
273                   requeue_all_xcon(xcon, q_iter, send_q);
274                   goto restart;
275                 }
276
277                 xs->trace.event("xio_send_msg");
278                 msg = xsend->get_xio_msg();
279                 code = xio_send_msg(xcon->conn, msg);
280                 /* header trace moved here to capture xio serial# */
281                 if (ldlog_p1(msgr->cct, ceph_subsys_xio, 11)) {
282                   xsend->print_debug(msgr->cct, "xio_send_msg");
283                 }
284                 /* get the right Accelio's errno code */
285                 if (unlikely(code)) {
286                   if ((code == -1) && (xio_errno() == -1)) {
287                     /* In case XIO does not have any credits to send,
288                      * it would still queue up the message(s) for transmission,
289                      * but would return -1 and errno would also be set to -1.
290                      * This needs to be treated as a success.
291                      */
292                     code = 0;
293                   }
294                   else {
295                     code = xio_errno();
296                   }
297                 }
298               } /* !ENOTCONN */
299               if (unlikely(code)) {
300                 switch (code) {
301                 case XIO_E_TX_QUEUE_OVERFLOW:
302                 {
303                   requeue_all_xcon(xcon, q_iter, send_q);
304                   goto restart;
305                 }
306                   break;
307                 default:
308                   q_iter = send_q.erase(q_iter);
309                   xcon->msg_send_fail(xsend, code);
310                   continue;
311                   break;
312                 };
313               } else {
314                 xcon->send.set(msg->timestamp); // need atomic?
315                 xcon->send_ctr += xsend->get_msg_count(); // only inc if cb promised
316               }
317               break;
318             default:
319               /* INCOMING_MSG_RELEASE */
320               q_iter = send_q.erase(q_iter);
321               release_xio_msg(static_cast<XioCompletion*>(xs));
322               continue;
323             } /* switch (xs->type) */
324             q_iter = send_q.erase(q_iter);
325           } /* while */
326         } /* size > 0 */
327
328         pthread_spin_unlock(&sp);
329         xio_context_run_loop(ctx, 300);
330
331       } while ((!_shutdown) || (!drained));
332
333       /* shutting down */
334       if (server) {
335         xio_unbind(server);
336       }
337       xio_context_destroy(ctx);
338       return NULL;
339     }
340
341   void shutdown()
342     {
343         pthread_spin_lock(&sp);
344         _shutdown = true;
345         pthread_spin_unlock(&sp);
346     }
347 };
348
349 class XioPortals
350 {
351 private:
352   vector<XioPortal*> portals;
353   char **p_vec;
354   int n;
355   int last_unused;
356
357 public:
358   XioPortals(Messenger *msgr, int _n, int nconns) : p_vec(NULL), last_unused(0)
359   {
360     n = max(_n, 1);
361
362     portals.resize(n);
363     for (int i = 0; i < n; i++) {
364       if (!portals[i]) {
365         portals[i] = new XioPortal(msgr, nconns);
366         assert(portals[i] != nullptr);
367       }
368     }
369   }
370
371   vector<XioPortal*>& get() { return portals; }
372
373   const char **get_vec()
374   {
375     return (const char **) p_vec;
376   }
377
378   int get_portals_len()
379   {
380     return n;
381   }
382
383   int get_last_unused()
384   {
385     int pix = last_unused;
386     if (++last_unused >= get_portals_len())
387       last_unused = 0;
388     return pix;
389   }
390
391   XioPortal* get_next_portal()
392   {
393     int pix = get_last_unused();
394     return portals[pix];
395   }
396
397   int bind(struct xio_session_ops *ops, const string& base_uri,
398            uint16_t port, uint16_t *port0);
399
400   int accept(struct xio_session *session,
401              struct xio_new_session_req *req,
402              void *cb_user_context)
403   {
404     const char **portals_vec = get_vec();
405     int pix = get_last_unused();
406
407     if (pix == 0) {
408       return xio_accept(session, NULL, 0, NULL, 0);
409     } else {
410       return xio_accept(session,
411                         (const char **)&(portals_vec[pix]),
412                         1, NULL, 0);
413     }
414   }
415
416   void start()
417   {
418     XioPortal *portal;
419     int p_ix, nportals = portals.size();
420
421     p_vec = new char*[nportals];
422     for (p_ix = 0; p_ix < nportals; ++p_ix) {
423       portal = portals[p_ix];
424       p_vec[p_ix] = (char*) /* portal->xio_uri.c_str() */
425                         portal->portal_id;
426     }
427
428     for (p_ix = 0; p_ix < nportals; ++p_ix) {
429       string thread_name = "ms_xio_";
430       thread_name.append(std::to_string(p_ix));
431       portal = portals[p_ix];
432       portal->create(thread_name.c_str());
433     }
434   }
435
436   void shutdown()
437   {
438     int nportals = portals.size();
439     for (int p_ix = 0; p_ix < nportals; ++p_ix) {
440       XioPortal *portal = portals[p_ix];
441       portal->shutdown();
442     }
443   }
444
445   void join()
446   {
447     int nportals = portals.size();
448     for (int p_ix = 0; p_ix < nportals; ++p_ix) {
449       XioPortal *portal = portals[p_ix];
450       portal->join();
451     }
452   }
453
454   ~XioPortals()
455   {
456     int nportals = portals.size();
457     for (int ix = 0; ix < nportals; ++ix)
458       delete(portals[ix]);
459     portals.clear();
460     if (p_vec)
461       delete[] p_vec;
462   }
463 };
464
465 #endif /* XIO_PORTAL_H */