Fix some bugs when testing opensds ansible
[stor4nfv.git] / src / ceph / src / msg / xio / XioMsg.h
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
3 /*
4  * Ceph - scalable distributed file system
5  *
6  * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
7  * Portions Copyright (C) 2013 CohortFS, LLC
8  *
9  * This is free software; you can redistribute it and/or
10  * modify it under the terms of the GNU Lesser General Public
11  * License version 2.1, as published by the Free Software
12  * Foundation.  See file COPYING.
13  *
14  */
15
16 #ifndef XIO_MSG_H
17 #define XIO_MSG_H
18
19 #include <boost/intrusive/list.hpp>
20 #include "msg/SimplePolicyMessenger.h"
21 extern "C" {
22 #include "libxio.h"
23 }
24 #include "XioConnection.h"
25 #include "XioSubmit.h"
26 #include "msg/msg_types.h"
27 #include "XioPool.h"
28
29 namespace bi = boost::intrusive;
30
31 class XioMessenger;
32
33 class XioMsgCnt
34 {
35 public:
36   __le32 msg_cnt;
37   buffer::list bl;
38 public:
39   explicit XioMsgCnt(buffer::ptr p)
40     {
41       bl.append(p);
42       buffer::list::iterator bl_iter = bl.begin();
43       ::decode(msg_cnt, bl_iter);
44     }
45 };
46
47 class XioMsgHdr
48 {
49 public:
50   char tag;
51   __le32 msg_cnt;
52   __le32 peer_type;
53   entity_addr_t addr; /* XXX hack! */
54   ceph_msg_header* hdr;
55   ceph_msg_footer* ftr;
56   uint64_t features;
57   buffer::list bl;
58 public:
59   XioMsgHdr(ceph_msg_header& _hdr, ceph_msg_footer& _ftr, uint64_t _features)
60     : tag(CEPH_MSGR_TAG_MSG), msg_cnt(0), hdr(&_hdr), ftr(&_ftr),
61       features(_features)
62     { }
63
64   XioMsgHdr(ceph_msg_header& _hdr, ceph_msg_footer &_ftr, buffer::ptr p)
65     : hdr(&_hdr), ftr(&_ftr)
66     {
67       bl.append(p);
68       buffer::list::iterator bl_iter = bl.begin();
69       decode(bl_iter);
70     }
71
72   static size_t get_max_encoded_length();
73
74   const buffer::list& get_bl() { encode(bl); return bl; };
75
76   inline void encode_hdr(ceph::buffer::list& bl) const {
77     ::encode(tag, bl);
78     ::encode(msg_cnt, bl);
79     ::encode(peer_type, bl);
80     ::encode(addr, bl, features);
81     ::encode(hdr->seq, bl);
82     ::encode(hdr->tid, bl);
83     ::encode(hdr->type, bl);
84     ::encode(hdr->priority, bl);
85     ::encode(hdr->version, bl);
86     ::encode(hdr->front_len, bl);
87     ::encode(hdr->middle_len, bl);
88     ::encode(hdr->data_len, bl);
89     ::encode(hdr->data_off, bl);
90     ::encode(hdr->src.type, bl);
91     ::encode(hdr->src.num, bl);
92     ::encode(hdr->compat_version, bl);
93     ::encode(hdr->crc, bl);
94   }
95
96   inline void encode_ftr(buffer::list& bl) const {
97     ::encode(ftr->front_crc, bl);
98     ::encode(ftr->middle_crc, bl);
99     ::encode(ftr->data_crc, bl);
100     ::encode(ftr->sig, bl);
101     ::encode(ftr->flags, bl);
102   }
103
104   inline void encode(buffer::list& bl) const {
105     encode_hdr(bl);
106     encode_ftr(bl);
107   }
108
109   inline void decode_hdr(buffer::list::iterator& bl) {
110     ::decode(tag, bl);
111     ::decode(msg_cnt, bl);
112     ::decode(peer_type, bl);
113     ::decode(addr, bl);
114     ::decode(hdr->seq, bl);
115     ::decode(hdr->tid, bl);
116     ::decode(hdr->type, bl);
117     ::decode(hdr->priority, bl);
118     ::decode(hdr->version, bl);
119     ::decode(hdr->front_len, bl);
120     ::decode(hdr->middle_len, bl);
121     ::decode(hdr->data_len, bl);
122     ::decode(hdr->data_off, bl);
123     ::decode(hdr->src.type, bl);
124     ::decode(hdr->src.num, bl);
125     ::decode(hdr->compat_version, bl);
126     ::decode(hdr->crc, bl);
127   }
128
129   inline void decode_ftr(buffer::list::iterator& bl) {
130     ::decode(ftr->front_crc, bl);
131     ::decode(ftr->middle_crc, bl);
132     ::decode(ftr->data_crc, bl);
133     ::decode(ftr->sig, bl);
134     ::decode(ftr->flags, bl);
135   }
136
137   inline void decode(buffer::list::iterator& bl) {
138     decode_hdr(bl);
139     decode_ftr(bl);
140   }
141
142   virtual ~XioMsgHdr()
143     {}
144 };
145
146 WRITE_CLASS_ENCODER(XioMsgHdr);
147
148 extern struct xio_mempool *xio_msgr_noreg_mpool;
149
150 #define XIO_MSGR_IOVLEN 16
151
152 struct xio_msg_ex
153 {
154   struct xio_msg msg;
155   struct xio_iovec_ex iovs[XIO_MSGR_IOVLEN];
156
157   explicit xio_msg_ex(void* user_context) {
158     // go in structure order
159     msg.in.header.iov_len = 0;
160     msg.in.header.iov_base = NULL;  /* XXX Accelio requires this currently */
161
162     msg.in.sgl_type = XIO_SGL_TYPE_IOV_PTR;
163     msg.in.pdata_iov.max_nents = XIO_MSGR_IOVLEN;
164     msg.in.pdata_iov.nents = 0;
165     msg.in.pdata_iov.sglist = iovs;
166
167     // minimal zero "out" side
168     msg.out.header.iov_len = 0;
169     msg.out.header.iov_base = NULL;  /* XXX Accelio requires this currently,
170                                       * against spec */
171     // out (some members adjusted later)
172     msg.out.sgl_type = XIO_SGL_TYPE_IOV_PTR;
173     msg.out.pdata_iov.max_nents = XIO_MSGR_IOVLEN;
174     msg.out.pdata_iov.nents = 0;
175     msg.out.pdata_iov.sglist = iovs;
176
177     // minimal initialize an "out" msg
178     msg.request = NULL;
179     msg.type = XIO_MSG_TYPE_ONE_WAY;
180     // for now, we DO NEED receipts for every msg
181     msg.flags = 0;
182     msg.user_context = user_context;
183     msg.next = NULL;
184     // minimal zero "in" side
185   }
186 };
187
188 class XioSend : public XioSubmit
189 {
190 public:
191   virtual void print_debug(CephContext *cct, const char *tag) const {};
192   const struct xio_msg * get_xio_msg() const {return &req_0.msg;}
193   struct xio_msg * get_xio_msg() {return &req_0.msg;}
194   virtual size_t get_msg_count() const {return 1;}
195
196   XioSend(XioConnection *_xcon, struct xio_reg_mem& _mp, int _ex_cnt=0) :
197     XioSubmit(XioSubmit::OUTGOING_MSG, _xcon),
198     req_0(this), mp_this(_mp), nrefs(_ex_cnt+1)
199   {
200     xpool_inc_msgcnt();
201     xcon->get();
202   }
203
204   XioSend* get() { nrefs++; return this; };
205
206   void put(int n) {
207     int refs = nrefs -= n;
208     if (refs == 0) {
209       struct xio_reg_mem *mp = &this->mp_this;
210       this->~XioSend();
211       xpool_free(sizeof(XioSend), mp);
212     }
213   }
214
215   void put() {
216     put(1);
217   }
218
219   void put_msg_refs() {
220     put(get_msg_count());
221   }
222
223   virtual ~XioSend() {
224     xpool_dec_msgcnt();
225     xcon->put();
226   }
227
228 private:
229   xio_msg_ex req_0;
230   struct xio_reg_mem mp_this;
231   std::atomic<unsigned> nrefs = { 0 };
232 };
233
234 class XioCommand : public XioSend
235 {
236 public:
237   XioCommand(XioConnection *_xcon, struct xio_reg_mem& _mp):XioSend(_xcon, _mp) {
238   }
239
240   buffer::list& get_bl_ref() { return bl; };
241
242 private:
243   buffer::list bl;
244 };
245
246 struct XioMsg : public XioSend
247 {
248 public:
249   Message* m;
250   XioMsgHdr hdr;
251   xio_msg_ex* req_arr;
252
253 public:
254   XioMsg(Message *_m, XioConnection *_xcon, struct xio_reg_mem& _mp,
255          int _ex_cnt, uint64_t _features) :
256     XioSend(_xcon, _mp, _ex_cnt),
257     m(_m), hdr(m->get_header(), m->get_footer(), _features),
258     req_arr(NULL)
259     {
260       const entity_inst_t &inst = xcon->get_messenger()->get_myinst();
261       hdr.peer_type = inst.name.type();
262       hdr.addr = xcon->get_messenger()->get_myaddr();
263       hdr.hdr->src.type = inst.name.type();
264       hdr.hdr->src.num = inst.name.num();
265       hdr.msg_cnt = _ex_cnt+1;
266
267       if (unlikely(_ex_cnt > 0)) {
268         alloc_trailers(_ex_cnt);
269       }
270     }
271
272   void print_debug(CephContext *cct, const char *tag) const override;
273   size_t get_msg_count() const override {
274     return hdr.msg_cnt;
275   }
276
277   void alloc_trailers(int cnt) {
278     req_arr = static_cast<xio_msg_ex*>(malloc(cnt * sizeof(xio_msg_ex)));
279     for (int ix = 0; ix < cnt; ++ix) {
280       xio_msg_ex* xreq = &(req_arr[ix]);
281       new (xreq) xio_msg_ex(this);
282     }
283   }
284
285   Message *get_message() { return m; }
286
287   ~XioMsg()
288     {
289       if (unlikely(!!req_arr)) {
290         for (unsigned int ix = 0; ix < get_msg_count()-1; ++ix) {
291           xio_msg_ex* xreq = &(req_arr[ix]);
292           xreq->~xio_msg_ex();
293         }
294         free(req_arr);
295       }
296
297       /* testing only! server's ready, resubmit request (not reached on
298        * PASSIVE/server side) */
299       if (unlikely(m->get_magic() & MSG_MAGIC_REDUPE)) {
300         if (likely(xcon->is_connected())) {
301           xcon->send_message(m);
302         } else {
303           /* dispose it */
304           m->put();
305         }
306       } else {
307           /* the normal case: done with message */
308           m->put();
309       }
310     }
311 };
312
313 class XioDispatchHook : public Message::CompletionHook
314 {
315 private:
316   XioConnection *xcon;
317   XioInSeq msg_seq;
318   XioPool rsp_pool;
319   std::atomic<unsigned> nrefs { 1 };
320   bool cl_flag;
321   friend class XioConnection;
322   friend class XioMessenger;
323 public:
324   struct xio_reg_mem mp_this;
325
326   XioDispatchHook(XioConnection *_xcon, Message *_m, XioInSeq& _msg_seq,
327                     struct xio_reg_mem& _mp) :
328     CompletionHook(_m),
329     xcon(_xcon->get()),
330     msg_seq(_msg_seq),
331     rsp_pool(xio_msgr_noreg_mpool),
332     cl_flag(false),
333     mp_this(_mp)
334     {
335       ++xcon->n_reqs; // atomicity by portal thread
336       xpool_inc_hookcnt();
337     }
338
339   virtual void finish(int r) {
340     this->put();
341   }
342
343   virtual void complete(int r) {
344     finish(r);
345   }
346
347   int release_msgs();
348
349   XioDispatchHook* get() {
350     nrefs++; return this;
351   }
352
353   void put(int n = 1) {
354     int refs = nrefs -= n;
355     if (refs == 0) {
356       /* in Marcus' new system, refs reaches 0 twice:  once in
357        * Message lifecycle, and again after xio_release_msg.
358        */
359       if (!cl_flag && release_msgs())
360         return;
361       struct xio_reg_mem *mp = &this->mp_this;
362       this->~XioDispatchHook();
363       xpool_free(sizeof(XioDispatchHook), mp);
364     }
365   }
366
367   XioInSeq& get_seq() { return msg_seq; }
368
369   XioPool& get_pool() { return rsp_pool; }
370
371   void on_err_finalize(XioConnection *xcon) {
372     /* can't decode message; even with one-way must free
373      * xio_msg structures, and then xiopool
374      */
375     this->finish(-1);
376   }
377
378   ~XioDispatchHook() {
379     --xcon->n_reqs; // atomicity by portal thread
380     xpool_dec_hookcnt();
381     xcon->put();
382   }
383 };
384
385 /* A sender-side CompletionHook that relies on the on_msg_delivered
386  * to complete a pending mark down. */
387 class XioMarkDownHook : public Message::CompletionHook
388 {
389 private:
390   XioConnection* xcon;
391
392 public:
393   struct xio_reg_mem mp_this;
394
395   XioMarkDownHook(
396     XioConnection* _xcon, Message *_m, struct xio_reg_mem& _mp) :
397     CompletionHook(_m), xcon(_xcon->get()), mp_this(_mp)
398     { }
399
400   virtual void claim(int r) {}
401
402   virtual void finish(int r) {
403     xcon->put();
404     struct xio_reg_mem *mp = &this->mp_this;
405     this->~XioMarkDownHook();
406     xio_mempool_free(mp);
407   }
408
409   virtual void complete(int r) {
410     xcon->_mark_down(XioConnection::CState::OP_FLAG_NONE);
411     finish(r);
412   }
413 };
414
415 struct XioCompletion : public XioSubmit
416 {
417   XioDispatchHook *xhook;
418 public:
419   XioCompletion(XioConnection *_xcon, XioDispatchHook *_xhook)
420     : XioSubmit(XioSubmit::INCOMING_MSG_RELEASE, _xcon /* not xcon! */),
421       xhook(_xhook->get()) {
422       // submit queue ref
423       xcon->get();
424     };
425
426   struct xio_msg* dequeue() {
427     return xhook->get_seq().dequeue();
428   }
429
430   XioDispatchHook* get_xhook() { return xhook; }
431
432   void finalize() {
433     xcon->put();
434     xhook->put();
435   }
436 };
437
438 void print_xio_msg_hdr(CephContext *cct, const char *tag,
439                        const XioMsgHdr &hdr, const struct xio_msg *msg);
440 void print_ceph_msg(CephContext *cct, const char *tag, Message *m);
441
442 #endif /* XIO_MSG_H */