1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
4 * Ceph - scalable distributed file system
6 * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
7 * Portions Copyright (C) 2013 CohortFS, LLC
9 * This is free software; you can redistribute it and/or
10 * modify it under the terms of the GNU Lesser General Public
11 * License version 2.1, as published by the Free Software
12 * Foundation. See file COPYING.
19 #include <boost/intrusive/list.hpp>
20 #include "msg/SimplePolicyMessenger.h"
24 #include "XioConnection.h"
25 #include "XioSubmit.h"
26 #include "msg/msg_types.h"
29 namespace bi = boost::intrusive;
39 explicit XioMsgCnt(buffer::ptr p)
42 buffer::list::iterator bl_iter = bl.begin();
43 ::decode(msg_cnt, bl_iter);
53 entity_addr_t addr; /* XXX hack! */
59 XioMsgHdr(ceph_msg_header& _hdr, ceph_msg_footer& _ftr, uint64_t _features)
60 : tag(CEPH_MSGR_TAG_MSG), msg_cnt(0), hdr(&_hdr), ftr(&_ftr),
64 XioMsgHdr(ceph_msg_header& _hdr, ceph_msg_footer &_ftr, buffer::ptr p)
65 : hdr(&_hdr), ftr(&_ftr)
68 buffer::list::iterator bl_iter = bl.begin();
72 static size_t get_max_encoded_length();
74 const buffer::list& get_bl() { encode(bl); return bl; };
76 inline void encode_hdr(ceph::buffer::list& bl) const {
78 ::encode(msg_cnt, bl);
79 ::encode(peer_type, bl);
80 ::encode(addr, bl, features);
81 ::encode(hdr->seq, bl);
82 ::encode(hdr->tid, bl);
83 ::encode(hdr->type, bl);
84 ::encode(hdr->priority, bl);
85 ::encode(hdr->version, bl);
86 ::encode(hdr->front_len, bl);
87 ::encode(hdr->middle_len, bl);
88 ::encode(hdr->data_len, bl);
89 ::encode(hdr->data_off, bl);
90 ::encode(hdr->src.type, bl);
91 ::encode(hdr->src.num, bl);
92 ::encode(hdr->compat_version, bl);
93 ::encode(hdr->crc, bl);
96 inline void encode_ftr(buffer::list& bl) const {
97 ::encode(ftr->front_crc, bl);
98 ::encode(ftr->middle_crc, bl);
99 ::encode(ftr->data_crc, bl);
100 ::encode(ftr->sig, bl);
101 ::encode(ftr->flags, bl);
104 inline void encode(buffer::list& bl) const {
109 inline void decode_hdr(buffer::list::iterator& bl) {
111 ::decode(msg_cnt, bl);
112 ::decode(peer_type, bl);
114 ::decode(hdr->seq, bl);
115 ::decode(hdr->tid, bl);
116 ::decode(hdr->type, bl);
117 ::decode(hdr->priority, bl);
118 ::decode(hdr->version, bl);
119 ::decode(hdr->front_len, bl);
120 ::decode(hdr->middle_len, bl);
121 ::decode(hdr->data_len, bl);
122 ::decode(hdr->data_off, bl);
123 ::decode(hdr->src.type, bl);
124 ::decode(hdr->src.num, bl);
125 ::decode(hdr->compat_version, bl);
126 ::decode(hdr->crc, bl);
129 inline void decode_ftr(buffer::list::iterator& bl) {
130 ::decode(ftr->front_crc, bl);
131 ::decode(ftr->middle_crc, bl);
132 ::decode(ftr->data_crc, bl);
133 ::decode(ftr->sig, bl);
134 ::decode(ftr->flags, bl);
137 inline void decode(buffer::list::iterator& bl) {
146 WRITE_CLASS_ENCODER(XioMsgHdr);
148 extern struct xio_mempool *xio_msgr_noreg_mpool;
150 #define XIO_MSGR_IOVLEN 16
155 struct xio_iovec_ex iovs[XIO_MSGR_IOVLEN];
157 explicit xio_msg_ex(void* user_context) {
158 // go in structure order
159 msg.in.header.iov_len = 0;
160 msg.in.header.iov_base = NULL; /* XXX Accelio requires this currently */
162 msg.in.sgl_type = XIO_SGL_TYPE_IOV_PTR;
163 msg.in.pdata_iov.max_nents = XIO_MSGR_IOVLEN;
164 msg.in.pdata_iov.nents = 0;
165 msg.in.pdata_iov.sglist = iovs;
167 // minimal zero "out" side
168 msg.out.header.iov_len = 0;
169 msg.out.header.iov_base = NULL; /* XXX Accelio requires this currently,
171 // out (some members adjusted later)
172 msg.out.sgl_type = XIO_SGL_TYPE_IOV_PTR;
173 msg.out.pdata_iov.max_nents = XIO_MSGR_IOVLEN;
174 msg.out.pdata_iov.nents = 0;
175 msg.out.pdata_iov.sglist = iovs;
177 // minimal initialize an "out" msg
179 msg.type = XIO_MSG_TYPE_ONE_WAY;
180 // for now, we DO NEED receipts for every msg
182 msg.user_context = user_context;
184 // minimal zero "in" side
188 class XioSend : public XioSubmit
191 virtual void print_debug(CephContext *cct, const char *tag) const {};
192 const struct xio_msg * get_xio_msg() const {return &req_0.msg;}
193 struct xio_msg * get_xio_msg() {return &req_0.msg;}
194 virtual size_t get_msg_count() const {return 1;}
196 XioSend(XioConnection *_xcon, struct xio_reg_mem& _mp, int _ex_cnt=0) :
197 XioSubmit(XioSubmit::OUTGOING_MSG, _xcon),
198 req_0(this), mp_this(_mp), nrefs(_ex_cnt+1)
204 XioSend* get() { nrefs++; return this; };
207 int refs = nrefs -= n;
209 struct xio_reg_mem *mp = &this->mp_this;
211 xpool_free(sizeof(XioSend), mp);
219 void put_msg_refs() {
220 put(get_msg_count());
230 struct xio_reg_mem mp_this;
231 std::atomic<unsigned> nrefs = { 0 };
234 class XioCommand : public XioSend
237 XioCommand(XioConnection *_xcon, struct xio_reg_mem& _mp):XioSend(_xcon, _mp) {
240 buffer::list& get_bl_ref() { return bl; };
246 struct XioMsg : public XioSend
254 XioMsg(Message *_m, XioConnection *_xcon, struct xio_reg_mem& _mp,
255 int _ex_cnt, uint64_t _features) :
256 XioSend(_xcon, _mp, _ex_cnt),
257 m(_m), hdr(m->get_header(), m->get_footer(), _features),
260 const entity_inst_t &inst = xcon->get_messenger()->get_myinst();
261 hdr.peer_type = inst.name.type();
262 hdr.addr = xcon->get_messenger()->get_myaddr();
263 hdr.hdr->src.type = inst.name.type();
264 hdr.hdr->src.num = inst.name.num();
265 hdr.msg_cnt = _ex_cnt+1;
267 if (unlikely(_ex_cnt > 0)) {
268 alloc_trailers(_ex_cnt);
272 void print_debug(CephContext *cct, const char *tag) const override;
273 size_t get_msg_count() const override {
277 void alloc_trailers(int cnt) {
278 req_arr = static_cast<xio_msg_ex*>(malloc(cnt * sizeof(xio_msg_ex)));
279 for (int ix = 0; ix < cnt; ++ix) {
280 xio_msg_ex* xreq = &(req_arr[ix]);
281 new (xreq) xio_msg_ex(this);
285 Message *get_message() { return m; }
289 if (unlikely(!!req_arr)) {
290 for (unsigned int ix = 0; ix < get_msg_count()-1; ++ix) {
291 xio_msg_ex* xreq = &(req_arr[ix]);
297 /* testing only! server's ready, resubmit request (not reached on
298 * PASSIVE/server side) */
299 if (unlikely(m->get_magic() & MSG_MAGIC_REDUPE)) {
300 if (likely(xcon->is_connected())) {
301 xcon->send_message(m);
307 /* the normal case: done with message */
313 class XioDispatchHook : public Message::CompletionHook
319 std::atomic<unsigned> nrefs { 1 };
321 friend class XioConnection;
322 friend class XioMessenger;
324 struct xio_reg_mem mp_this;
326 XioDispatchHook(XioConnection *_xcon, Message *_m, XioInSeq& _msg_seq,
327 struct xio_reg_mem& _mp) :
331 rsp_pool(xio_msgr_noreg_mpool),
335 ++xcon->n_reqs; // atomicity by portal thread
339 virtual void finish(int r) {
343 virtual void complete(int r) {
349 XioDispatchHook* get() {
350 nrefs++; return this;
353 void put(int n = 1) {
354 int refs = nrefs -= n;
356 /* in Marcus' new system, refs reaches 0 twice: once in
357 * Message lifecycle, and again after xio_release_msg.
359 if (!cl_flag && release_msgs())
361 struct xio_reg_mem *mp = &this->mp_this;
362 this->~XioDispatchHook();
363 xpool_free(sizeof(XioDispatchHook), mp);
367 XioInSeq& get_seq() { return msg_seq; }
369 XioPool& get_pool() { return rsp_pool; }
371 void on_err_finalize(XioConnection *xcon) {
372 /* can't decode message; even with one-way must free
373 * xio_msg structures, and then xiopool
379 --xcon->n_reqs; // atomicity by portal thread
385 /* A sender-side CompletionHook that relies on the on_msg_delivered
386 * to complete a pending mark down. */
387 class XioMarkDownHook : public Message::CompletionHook
393 struct xio_reg_mem mp_this;
396 XioConnection* _xcon, Message *_m, struct xio_reg_mem& _mp) :
397 CompletionHook(_m), xcon(_xcon->get()), mp_this(_mp)
400 virtual void claim(int r) {}
402 virtual void finish(int r) {
404 struct xio_reg_mem *mp = &this->mp_this;
405 this->~XioMarkDownHook();
406 xio_mempool_free(mp);
409 virtual void complete(int r) {
410 xcon->_mark_down(XioConnection::CState::OP_FLAG_NONE);
415 struct XioCompletion : public XioSubmit
417 XioDispatchHook *xhook;
419 XioCompletion(XioConnection *_xcon, XioDispatchHook *_xhook)
420 : XioSubmit(XioSubmit::INCOMING_MSG_RELEASE, _xcon /* not xcon! */),
421 xhook(_xhook->get()) {
426 struct xio_msg* dequeue() {
427 return xhook->get_seq().dequeue();
430 XioDispatchHook* get_xhook() { return xhook; }
438 void print_xio_msg_hdr(CephContext *cct, const char *tag,
439 const XioMsgHdr &hdr, const struct xio_msg *msg);
440 void print_ceph_msg(CephContext *cct, const char *tag, Message *m);
442 #endif /* XIO_MSG_H */