Fix some bugs when testing opensds ansible
[stor4nfv.git] / src / ceph / src / msg / xio / XioConnection.cc
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
3 /*
4  * Ceph - scalable distributed file system
5  *
6  * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
7  * Portions Copyright (C) 2013 CohortFS, LLC
8  *
9  * This is free software; you can redistribute it and/or
10  * modify it under the terms of the GNU Lesser General Public
11  * License version 2.1, as published by the Free Software
12  * Foundation.  See file COPYING.
13  *
14  */
15
16 #include "XioMsg.h"
17 #include "XioConnection.h"
18 #include "XioMessenger.h"
19 #include "messages/MDataPing.h"
20 #include "msg/msg_types.h"
21 #include "auth/none/AuthNoneProtocol.h" // XXX
22
23 #include "include/assert.h"
24 #include "common/dout.h"
25
26 extern struct xio_mempool *xio_msgr_mpool;
27 extern struct xio_mempool *xio_msgr_noreg_mpool;
28
29 #define dout_subsys ceph_subsys_xio
30
31 void print_xio_msg_hdr(CephContext *cct, const char *tag,
32                        const XioMsgHdr &hdr, const struct xio_msg *msg)
33 {
34   if (msg) {
35     ldout(cct,4) << tag <<
36       " xio msg:" <<
37       " sn: " << msg->sn <<
38       " timestamp: " << msg->timestamp <<
39       dendl;
40   }
41
42   ldout(cct,4) << tag <<
43     " ceph header: " <<
44     " front_len: " << hdr.hdr->front_len <<
45     " seq: " << hdr.hdr->seq <<
46     " tid: " << hdr.hdr->tid <<
47     " type: " << hdr.hdr->type <<
48     " prio: " << hdr.hdr->priority <<
49     " name type: " << (int) hdr.hdr->src.type <<
50     " name num: " << (int) hdr.hdr->src.num <<
51     " version: " << hdr.hdr->version <<
52     " compat_version: " << hdr.hdr->compat_version <<
53     " front_len: " << hdr.hdr->front_len <<
54     " middle_len: " << hdr.hdr->middle_len <<
55     " data_len: " << hdr.hdr->data_len <<
56     " xio header: " <<
57     " msg_cnt: " << hdr.msg_cnt <<
58     dendl;
59
60   ldout(cct,4) << tag <<
61     " ceph footer: " <<
62     " front_crc: " << hdr.ftr->front_crc <<
63     " middle_crc: " << hdr.ftr->middle_crc <<
64     " data_crc: " << hdr.ftr->data_crc <<
65     " sig: " << hdr.ftr->sig <<
66     " flags: " << (uint32_t) hdr.ftr->flags <<
67     dendl;
68 }
69
70 void print_ceph_msg(CephContext *cct, const char *tag, Message *m)
71 {
72   if (m->get_magic() & (MSG_MAGIC_XIO & MSG_MAGIC_TRACE_DTOR)) {
73     ceph_msg_header& header = m->get_header();
74     ldout(cct,4) << tag << " header version " << header.version <<
75       " compat version " << header.compat_version <<
76       dendl;
77   }
78 }
79
80 #undef dout_prefix
81 #define dout_prefix conn_prefix(_dout)
82 ostream& XioConnection::conn_prefix(std::ostream *_dout) {
83   return *_dout << "-- " << get_messenger()->get_myinst().addr << " >> " << peer_addr
84                 << " peer=" << peer.name.type_str()
85                 << " conn=" << conn << " sess=" << session << " ";
86 }
87
88 XioConnection::XioConnection(XioMessenger *m, XioConnection::type _type,
89                              const entity_inst_t& _peer) :
90   Connection(m->cct, m),
91   xio_conn_type(_type),
92   portal(m->get_portal()),
93   connected(false),
94   peer(_peer),
95   session(NULL),
96   conn(NULL),
97   magic(m->get_magic()),
98   scount(0),
99   send_ctr(0),
100   in_seq(),
101   cstate(this)
102 {
103   pthread_spin_init(&sp, PTHREAD_PROCESS_PRIVATE);
104   set_peer_type(peer.name.type());
105   set_peer_addr(peer.addr);
106
107   Messenger::Policy policy;
108   int64_t max_msgs = 0, max_bytes = 0, bytes_opt = 0;
109   int xopt;
110
111   policy = m->get_policy(peer_type);
112
113   if (policy.throttler_messages) {
114     max_msgs = policy.throttler_messages->get_max();
115     ldout(m->cct,4) << "XioMessenger throttle_msgs: " << max_msgs << dendl;
116   }
117
118   xopt = m->cct->_conf->xio_queue_depth;
119   if (max_msgs > xopt)
120     xopt = max_msgs;
121
122   /* set high mark for send, reserved 20% for credits */
123   q_high_mark = xopt * 4 / 5;
124   q_low_mark = q_high_mark/2;
125
126   /* set send & receive msgs queue depth */
127   xio_set_opt(NULL, XIO_OPTLEVEL_ACCELIO, XIO_OPTNAME_SND_QUEUE_DEPTH_MSGS,
128              &xopt, sizeof(xopt));
129   xio_set_opt(NULL, XIO_OPTLEVEL_ACCELIO, XIO_OPTNAME_RCV_QUEUE_DEPTH_MSGS,
130              &xopt, sizeof(xopt));
131
132   if (policy.throttler_bytes) {
133     max_bytes = policy.throttler_bytes->get_max();
134     ldout(m->cct,4) << "XioMessenger throttle_bytes: " << max_bytes << dendl;
135   }
136
137   bytes_opt = (2 << 28); /* default: 512 MB */
138   if (max_bytes > bytes_opt)
139     bytes_opt = max_bytes;
140
141   /* set send & receive total bytes throttle */
142   xio_set_opt(NULL, XIO_OPTLEVEL_ACCELIO, XIO_OPTNAME_SND_QUEUE_DEPTH_BYTES,
143              &bytes_opt, sizeof(bytes_opt));
144   xio_set_opt(NULL, XIO_OPTLEVEL_ACCELIO, XIO_OPTNAME_RCV_QUEUE_DEPTH_BYTES,
145              &bytes_opt, sizeof(bytes_opt));
146
147   ldout(m->cct,4) << "throttle_msgs: " << xopt << " throttle_bytes: " << bytes_opt << dendl;
148
149   /* XXXX fake features, aieee! */
150   set_features(XIO_ALL_FEATURES);
151 }
152
153 int XioConnection::send_message(Message *m)
154 {
155   XioMessenger *ms = static_cast<XioMessenger*>(get_messenger());
156   return ms->_send_message(m, this);
157 }
158
159 void XioConnection::send_keepalive_or_ack(bool ack, const utime_t *tp)
160 {
161   /* If con is not in READY state, we need to queue the request */
162   if (cstate.session_state.read() != XioConnection::UP) {
163     pthread_spin_lock(&sp);
164     if (cstate.session_state.read() != XioConnection::UP) {
165       if (ack) {
166         outgoing.ack = true;
167         outgoing.ack_time = *tp;
168       }
169       else {
170         outgoing.keepalive = true;
171       }
172       pthread_spin_unlock(&sp);
173       return;
174     }
175     pthread_spin_unlock(&sp);
176   }
177
178   send_keepalive_or_ack_internal(ack, tp);
179 }
180
181 void XioConnection::send_keepalive_or_ack_internal(bool ack, const utime_t *tp)
182 {
183   XioCommand *xcmd = pool_alloc_xio_command(this);
184   if (! xcmd) {
185     /* could happen if Accelio has been shutdown */
186     return;
187   }
188
189   struct ceph_timespec ts;
190   if (ack) {
191     assert(tp);
192     tp->encode_timeval(&ts);
193     xcmd->get_bl_ref().append(CEPH_MSGR_TAG_KEEPALIVE2_ACK);
194     xcmd->get_bl_ref().append((char*)&ts, sizeof(ts));
195   } else if (has_feature(CEPH_FEATURE_MSGR_KEEPALIVE2)) {
196     utime_t t = ceph_clock_now();
197     t.encode_timeval(&ts);
198     xcmd->get_bl_ref().append(CEPH_MSGR_TAG_KEEPALIVE2);
199     xcmd->get_bl_ref().append((char*)&ts, sizeof(ts));
200   } else {
201     xcmd->get_bl_ref().append(CEPH_MSGR_TAG_KEEPALIVE);
202   }
203
204   const std::list<buffer::ptr>& header = xcmd->get_bl_ref().buffers();
205   assert(header.size() == 1);  /* accelio header must be without scatter gather */
206   list<bufferptr>::const_iterator pb = header.begin();
207   assert(pb->length() < XioMsgHdr::get_max_encoded_length());
208   struct xio_msg * msg = xcmd->get_xio_msg();
209   msg->out.header.iov_base = (char*) pb->c_str();
210   msg->out.header.iov_len = pb->length();
211
212   ldout(msgr->cct,8) << __func__ << " sending command with tag " << (int)(*(char*)msg->out.header.iov_base)
213        << " len " << msg->out.header.iov_len << dendl;
214
215   portal->enqueue(this, xcmd);
216 }
217
218
219 int XioConnection::passive_setup()
220 {
221   /* XXX passive setup is a placeholder for (potentially active-side
222      initiated) feature and auth* negotiation */
223   static bufferlist authorizer_reply; /* static because fake */
224   static CryptoKey session_key; /* ditto */
225   bool authorizer_valid;
226
227   XioMessenger *msgr = static_cast<XioMessenger*>(get_messenger());
228
229   // fake an auth buffer
230   EntityName name;
231   name.set_type(peer.name.type());
232
233   AuthNoneAuthorizer auth;
234   auth.build_authorizer(name, peer.name.num());
235
236   /* XXX fake authorizer! */
237   msgr->ms_deliver_verify_authorizer(
238     this, peer_type, CEPH_AUTH_NONE,
239     auth.bl,
240     authorizer_reply,
241     authorizer_valid,
242     session_key);
243
244   /* notify hook */
245   msgr->ms_deliver_handle_accept(this);
246   msgr->ms_deliver_handle_fast_accept(this);
247
248   /* try to insert in conns_entity_map */
249   msgr->try_insert(this);
250   return (0);
251 }
252
253 static inline XioDispatchHook* pool_alloc_xio_dispatch_hook(
254   XioConnection *xcon, Message *m, XioInSeq& msg_seq)
255 {
256   struct xio_reg_mem mp_mem;
257   int e = xpool_alloc(xio_msgr_noreg_mpool,
258                       sizeof(XioDispatchHook), &mp_mem);
259   if (!!e)
260     return NULL;
261   XioDispatchHook *xhook = static_cast<XioDispatchHook*>(mp_mem.addr);
262   new (xhook) XioDispatchHook(xcon, m, msg_seq, mp_mem);
263   return xhook;
264 }
265
266 int XioConnection::handle_data_msg(struct xio_session *session,
267                               struct xio_msg *msg,
268                               int more_in_batch,
269                               void *cb_user_context)
270 {
271   struct xio_msg *tmsg = msg;
272
273   /* XXX Accelio guarantees message ordering at
274    * xio_session */
275
276   if (! in_seq.p()) {
277     if (!tmsg->in.header.iov_len) {
278         ldout(msgr->cct,0) << __func__ << " empty header: packet out of sequence?" << dendl;
279         xio_release_msg(msg);
280         return 0;
281     }
282     const size_t sizeof_tag = 1;
283     XioMsgCnt msg_cnt(
284       buffer::create_static(tmsg->in.header.iov_len-sizeof_tag,
285                             ((char*) tmsg->in.header.iov_base)+sizeof_tag));
286     ldout(msgr->cct,10) << __func__ << " receive msg " << "tmsg " << tmsg
287       << " msg_cnt " << msg_cnt.msg_cnt
288       << " iov_base " << tmsg->in.header.iov_base
289       << " iov_len " << (int) tmsg->in.header.iov_len
290       << " nents " << tmsg->in.pdata_iov.nents
291       << " sn " << tmsg->sn << dendl;
292     assert(session == this->session);
293     in_seq.set_count(msg_cnt.msg_cnt);
294   } else {
295     /* XXX major sequence error */
296     assert(! tmsg->in.header.iov_len);
297   }
298
299   in_seq.append(msg);
300   if (in_seq.count() > 0) {
301     return 0;
302   }
303
304   XioMessenger *msgr = static_cast<XioMessenger*>(get_messenger());
305   XioDispatchHook *m_hook =
306     pool_alloc_xio_dispatch_hook(this, NULL /* msg */, in_seq);
307   XioInSeq& msg_seq = m_hook->msg_seq;
308   in_seq.clear();
309
310   ceph_msg_header header;
311   ceph_msg_footer footer;
312   buffer::list payload, middle, data;
313
314   const utime_t recv_stamp = ceph_clock_now();
315
316   ldout(msgr->cct,4) << __func__ << " " << "msg_seq.size()="  << msg_seq.size() <<
317     dendl;
318
319   struct xio_msg* msg_iter = msg_seq.begin();
320   tmsg = msg_iter;
321   XioMsgHdr hdr(header, footer,
322                 buffer::create_static(tmsg->in.header.iov_len,
323                                       (char*) tmsg->in.header.iov_base));
324
325   if (magic & (MSG_MAGIC_TRACE_XCON)) {
326     if (hdr.hdr->type == 43) {
327       print_xio_msg_hdr(msgr->cct, "on_msg", hdr, NULL);
328     }
329   }
330
331   unsigned int ix, blen, iov_len;
332   struct xio_iovec_ex *msg_iov, *iovs;
333   uint32_t take_len, left_len = 0;
334   char *left_base = NULL;
335
336   ix = 0;
337   blen = header.front_len;
338
339   while (blen && (msg_iter != msg_seq.end())) {
340     tmsg = msg_iter;
341     iov_len = vmsg_sglist_nents(&tmsg->in);
342     iovs = vmsg_sglist(&tmsg->in);
343     for (; blen && (ix < iov_len); ++ix) {
344       msg_iov = &iovs[ix];
345
346       /* XXX need to detect any buffer which needs to be
347        * split due to coalescing of a segment (front, middle,
348        * data) boundary */
349
350       take_len = MIN(blen, msg_iov->iov_len);
351       payload.append(
352         buffer::create_msg(
353           take_len, (char*) msg_iov->iov_base, m_hook));
354       blen -= take_len;
355       if (! blen) {
356         left_len = msg_iov->iov_len - take_len;
357         if (left_len) {
358           left_base = ((char*) msg_iov->iov_base) + take_len;
359         }
360       }
361     }
362     /* XXX as above, if a buffer is split, then we needed to track
363      * the new start (carry) and not advance */
364     if (ix == iov_len) {
365       msg_seq.next(&msg_iter);
366       ix = 0;
367     }
368   }
369
370   if (magic & (MSG_MAGIC_TRACE_XCON)) {
371     if (hdr.hdr->type == 43) {
372       ldout(msgr->cct,4) << "front (payload) dump:";
373       payload.hexdump( *_dout );
374       *_dout << dendl;
375     }
376   }
377
378   blen = header.middle_len;
379
380   if (blen && left_len) {
381     middle.append(
382       buffer::create_msg(left_len, left_base, m_hook));
383     left_len = 0;
384   }
385
386   while (blen && (msg_iter != msg_seq.end())) {
387     tmsg = msg_iter;
388     iov_len = vmsg_sglist_nents(&tmsg->in);
389     iovs = vmsg_sglist(&tmsg->in);
390     for (; blen && (ix < iov_len); ++ix) {
391       msg_iov = &iovs[ix];
392       take_len = MIN(blen, msg_iov->iov_len);
393       middle.append(
394         buffer::create_msg(
395           take_len, (char*) msg_iov->iov_base, m_hook));
396       blen -= take_len;
397       if (! blen) {
398         left_len = msg_iov->iov_len - take_len;
399         if (left_len) {
400           left_base = ((char*) msg_iov->iov_base) + take_len;
401         }
402       }
403     }
404     if (ix == iov_len) {
405       msg_seq.next(&msg_iter);
406       ix = 0;
407     }
408   }
409
410   blen = header.data_len;
411
412   if (blen && left_len) {
413     data.append(
414       buffer::create_msg(left_len, left_base, m_hook));
415     left_len = 0;
416   }
417
418   while (blen && (msg_iter != msg_seq.end())) {
419     tmsg = msg_iter;
420     iov_len = vmsg_sglist_nents(&tmsg->in);
421     iovs = vmsg_sglist(&tmsg->in);
422     for (; blen && (ix < iov_len); ++ix) {
423       msg_iov = &iovs[ix];
424       data.append(
425         buffer::create_msg(
426           msg_iov->iov_len, (char*) msg_iov->iov_base, m_hook));
427       blen -= msg_iov->iov_len;
428     }
429     if (ix == iov_len) {
430       msg_seq.next(&msg_iter);
431       ix = 0;
432     }
433   }
434
435   /* update connection timestamp */
436   recv = tmsg->timestamp;
437
438   Message *m = decode_message(msgr->cct, msgr->crcflags, header, footer,
439                               payload, middle, data, this);
440
441   if (m) {
442     /* completion */
443     m->set_connection(this);
444
445     /* reply hook */
446     m_hook->set_message(m);
447     m->set_completion_hook(m_hook);
448
449     /* trace flag */
450     m->set_magic(magic);
451
452     /* update timestamps */
453     m->set_recv_stamp(recv_stamp);
454     m->set_recv_complete_stamp(ceph_clock_now());
455     m->set_seq(header.seq);
456
457     /* MP-SAFE */
458     state.set_in_seq(header.seq);
459
460     /* XXXX validate peer type */
461     if (peer_type != (int) hdr.peer_type) { /* XXX isn't peer_type -1? */
462       peer_type = hdr.peer_type;
463       peer_addr = hdr.addr;
464       peer.addr = peer_addr;
465       peer.name = entity_name_t(hdr.hdr->src);
466       if (xio_conn_type == XioConnection::PASSIVE) {
467         /* XXX kick off feature/authn/authz negotiation
468          * nb:  very possibly the active side should initiate this, but
469          * for now, call a passive hook so OSD and friends can create
470          * sessions without actually negotiating
471          */
472         passive_setup();
473       }
474     }
475
476     if (magic & (MSG_MAGIC_TRACE_XCON)) {
477       ldout(msgr->cct,4) << "decode m is " << m->get_type() << dendl;
478     }
479
480     /* dispatch it */
481     msgr->ds_dispatch(m);
482   } else {
483     /* responds for undecoded messages and frees hook */
484     ldout(msgr->cct,4) << "decode m failed" << dendl;
485     m_hook->on_err_finalize(this);
486   }
487
488   return 0;
489 }
490
491 int XioConnection::on_msg(struct xio_session *session,
492                               struct xio_msg *msg,
493                               int more_in_batch,
494                               void *cb_user_context)
495 {
496   char tag = CEPH_MSGR_TAG_MSG;
497   if (msg->in.header.iov_len)
498     tag = *(char*)msg->in.header.iov_base;
499
500   ldout(msgr->cct,8) << __func__ << " receive msg with iov_len "
501     << (int) msg->in.header.iov_len << " tag " << (int)tag << dendl;
502
503   //header_len_without_tag is only meaningful in case we have tag
504   size_t header_len_without_tag = msg->in.header.iov_len - sizeof(tag);
505
506   switch(tag) {
507   case CEPH_MSGR_TAG_MSG:
508     ldout(msgr->cct, 20) << __func__ << " got data message" << dendl;
509     return handle_data_msg(session, msg, more_in_batch, cb_user_context);
510
511   case CEPH_MSGR_TAG_KEEPALIVE:
512     ldout(msgr->cct, 20) << __func__ << " got KEEPALIVE" << dendl;
513     set_last_keepalive(ceph_clock_now());
514     break;
515
516   case CEPH_MSGR_TAG_KEEPALIVE2:
517     if (header_len_without_tag < sizeof(ceph_timespec)) {
518       lderr(msgr->cct) << __func__ << " too few data for KEEPALIVE2: got " << header_len_without_tag <<
519          " bytes instead of " << sizeof(ceph_timespec) << " bytes" << dendl;
520     }
521     else {
522       ceph_timespec *t = (ceph_timespec *) ((char*)msg->in.header.iov_base + sizeof(tag));
523       utime_t kp_t = utime_t(*t);
524       ldout(msgr->cct, 20) << __func__ << " got KEEPALIVE2 with timestamp" << kp_t << dendl;
525       send_keepalive_or_ack(true, &kp_t);
526       set_last_keepalive(ceph_clock_now());
527     }
528
529     break;
530
531   case CEPH_MSGR_TAG_KEEPALIVE2_ACK:
532     if (header_len_without_tag < sizeof(ceph_timespec)) {
533       lderr(msgr->cct) << __func__ << " too few data for KEEPALIVE2_ACK: got " << header_len_without_tag <<
534          " bytes instead of " << sizeof(ceph_timespec) << " bytes" << dendl;
535     }
536     else {
537       ceph_timespec *t = (ceph_timespec *) ((char*)msg->in.header.iov_base + sizeof(tag));
538       utime_t kp_t(*t);
539       ldout(msgr->cct, 20) << __func__ << " got KEEPALIVE2_ACK with timestamp" << kp_t << dendl;
540       set_last_keepalive_ack(kp_t);
541     }
542     break;
543
544   default:
545     lderr(msgr->cct) << __func__ << " unsupported message tag " << (int) tag << dendl;
546     assert(! "unsupported message tag");
547   }
548
549   xio_release_msg(msg);
550   return 0;
551 }
552
553
554 int XioConnection::on_ow_msg_send_complete(struct xio_session *session,
555                                            struct xio_msg *req,
556                                            void *conn_user_context)
557 {
558   /* requester send complete (one-way) */
559   uint64_t rc = ++scount;
560
561   XioSend* xsend = static_cast<XioSend*>(req->user_context);
562   if (unlikely(magic & MSG_MAGIC_TRACE_CTR)) {
563     if (unlikely((rc % 1000000) == 0)) {
564       std::cout << "xio finished " << rc << " " << time(0) << std::endl;
565     }
566   } /* trace ctr */
567
568   ldout(msgr->cct,11) << "on_msg_delivered xcon: " << xsend->xcon <<
569     " msg: " << req << " sn: " << req->sn << dendl;
570
571   XioMsg *xmsg = dynamic_cast<XioMsg*>(xsend);
572   if (xmsg) {
573     ldout(msgr->cct,11) << "on_msg_delivered xcon: " <<
574       " type: " << xmsg->m->get_type() << " tid: " << xmsg->m->get_tid() <<
575       " seq: " << xmsg->m->get_seq() << dendl;
576   }
577
578   --send_ctr; /* atomic, because portal thread */
579
580   /* unblock flow-controlled connections, avoid oscillation */
581   if (unlikely(cstate.session_state.read() ==
582                XioConnection::FLOW_CONTROLLED)) {
583     if ((send_ctr <= uint32_t(xio_qdepth_low_mark())) &&
584         (1 /* XXX memory <= memory low-water mark */))  {
585       cstate.state_up_ready(XioConnection::CState::OP_FLAG_NONE);
586       ldout(msgr->cct,2) << "on_msg_delivered xcon: " << xsend->xcon
587         << " up_ready from flow_controlled" << dendl;
588     }
589   }
590
591   xsend->put();
592
593   return 0;
594 }  /* on_msg_delivered */
595
596 void XioConnection::msg_send_fail(XioSend *xsend, int code)
597 {
598   ldout(msgr->cct,2) << "xio_send_msg FAILED xcon: " << this <<
599     " msg: " << xsend->get_xio_msg() << " code=" << code <<
600     " (" << xio_strerror(code) << ")" << dendl;
601   /* return refs taken for each xio_msg */
602   xsend->put_msg_refs();
603 } /* msg_send_fail */
604
605 void XioConnection::msg_release_fail(struct xio_msg *msg, int code)
606 {
607   ldout(msgr->cct,2) << "xio_release_msg FAILED xcon: " << this <<
608     " msg: " << msg <<  "code=" << code <<
609     " (" << xio_strerror(code) << ")" << dendl;
610 } /* msg_release_fail */
611
612 int XioConnection::flush_out_queues(uint32_t flags) {
613   XioMessenger* msgr = static_cast<XioMessenger*>(get_messenger());
614   if (! (flags & CState::OP_FLAG_LOCKED))
615     pthread_spin_lock(&sp);
616
617   if (outgoing.keepalive) {
618     outgoing.keepalive = false;
619     send_keepalive_or_ack_internal();
620   }
621
622   if (outgoing.ack) {
623     outgoing.ack = false;
624     send_keepalive_or_ack_internal(true, &outgoing.ack_time);
625   }
626
627   // send deferred 1 (direct backpresssure)
628   if (outgoing.requeue.size() > 0)
629     portal->requeue(this, outgoing.requeue);
630
631   // send deferred 2 (sent while deferred)
632   int ix, q_size = outgoing.mqueue.size();
633   for (ix = 0; ix < q_size; ++ix) {
634     Message::Queue::iterator q_iter = outgoing.mqueue.begin();
635     Message* m = &(*q_iter);
636     outgoing.mqueue.erase(q_iter);
637     msgr->_send_message_impl(m, this);
638   }
639   if (! (flags & CState::OP_FLAG_LOCKED))
640     pthread_spin_unlock(&sp);
641   return 0;
642 }
643
644 int XioConnection::discard_out_queues(uint32_t flags)
645 {
646   Message::Queue disc_q;
647   XioSubmit::Queue deferred_q;
648
649   if (! (flags & CState::OP_FLAG_LOCKED))
650     pthread_spin_lock(&sp);
651
652   /* the two send queues contain different objects:
653    * - anything on the mqueue is a Message
654    * - anything on the requeue is an XioSend
655    */
656   Message::Queue::const_iterator i1 = disc_q.end();
657   disc_q.splice(i1, outgoing.mqueue);
658
659   XioSubmit::Queue::const_iterator i2 = deferred_q.end();
660   deferred_q.splice(i2, outgoing.requeue);
661
662   outgoing.keepalive = outgoing.ack = false;
663
664   if (! (flags & CState::OP_FLAG_LOCKED))
665     pthread_spin_unlock(&sp);
666
667   // mqueue
668   while (!disc_q.empty()) {
669     Message::Queue::iterator q_iter = disc_q.begin();
670     Message* m = &(*q_iter);
671     disc_q.erase(q_iter);
672     m->put();
673   }
674
675   // requeue
676   while (!deferred_q.empty()) {
677     XioSubmit::Queue::iterator q_iter = deferred_q.begin();
678     XioSubmit* xs = &(*q_iter);
679     XioSend* xsend;
680     switch (xs->type) {
681       case XioSubmit::OUTGOING_MSG:
682         xsend = static_cast<XioSend*>(xs);
683         deferred_q.erase(q_iter);
684         // release once for each chained xio_msg
685         xsend->put(xsend->get_msg_count());
686         break;
687       case XioSubmit::INCOMING_MSG_RELEASE:
688         deferred_q.erase(q_iter);
689         portal->release_xio_msg(static_cast<XioCompletion*>(xs));
690         break;
691       default:
692         ldout(msgr->cct,0) << __func__ << ": Unknown Msg type " << xs->type << dendl;
693         break;
694     }
695   }
696
697   return 0;
698 }
699
700 int XioConnection::adjust_clru(uint32_t flags)
701 {
702   if (flags & CState::OP_FLAG_LOCKED)
703     pthread_spin_unlock(&sp);
704
705   XioMessenger* msgr = static_cast<XioMessenger*>(get_messenger());
706   msgr->conns_sp.lock();
707   pthread_spin_lock(&sp);
708
709   if (cstate.flags & CState::FLAG_MAPPED) {
710     XioConnection::ConnList::iterator citer =
711       XioConnection::ConnList::s_iterator_to(*this);
712     msgr->conns_list.erase(citer);
713     msgr->conns_list.push_front(*this); // LRU
714   }
715
716   msgr->conns_sp.unlock();
717
718   if (! (flags & CState::OP_FLAG_LOCKED))
719     pthread_spin_unlock(&sp);
720
721   return 0;
722 }
723
724 int XioConnection::on_msg_error(struct xio_session *session,
725                                 enum xio_status error,
726                                 struct xio_msg  *msg,
727                                 void *conn_user_context)
728 {
729   XioSend *xsend = static_cast<XioSend*>(msg->user_context);
730   if (xsend)
731     xsend->put();
732
733   --send_ctr; /* atomic, because portal thread */
734   return 0;
735 } /* on_msg_error */
736
737 void XioConnection::mark_down()
738 {
739   _mark_down(XioConnection::CState::OP_FLAG_NONE);
740 }
741
742 int XioConnection::_mark_down(uint32_t flags)
743 {
744   if (! (flags & CState::OP_FLAG_LOCKED))
745     pthread_spin_lock(&sp);
746
747   // per interface comment, we only stage a remote reset if the
748   // current policy required it
749   if (cstate.policy.resetcheck)
750     cstate.flags |= CState::FLAG_RESET;
751
752   disconnect();
753
754   /* XXX this will almost certainly be called again from
755    * on_disconnect_event() */
756   discard_out_queues(flags|CState::OP_FLAG_LOCKED);
757
758   if (! (flags & CState::OP_FLAG_LOCKED))
759     pthread_spin_unlock(&sp);
760
761   return 0;
762 }
763
764 void XioConnection::mark_disposable()
765 {
766   _mark_disposable(XioConnection::CState::OP_FLAG_NONE);
767 }
768
769 int XioConnection::_mark_disposable(uint32_t flags)
770 {
771   if (! (flags & CState::OP_FLAG_LOCKED))
772     pthread_spin_lock(&sp);
773
774   cstate.policy.lossy = true;
775
776   if (! (flags & CState::OP_FLAG_LOCKED))
777     pthread_spin_unlock(&sp);
778
779   return 0;
780 }
781
782 int XioConnection::CState::state_up_ready(uint32_t flags)
783 {
784   if (! (flags & CState::OP_FLAG_LOCKED))
785     pthread_spin_lock(&xcon->sp);
786
787   xcon->flush_out_queues(flags|CState::OP_FLAG_LOCKED);
788
789   session_state = session_states::UP;
790   startup_state = session_startup_states::READY;
791
792   if (! (flags & CState::OP_FLAG_LOCKED))
793     pthread_spin_unlock(&xcon->sp);
794
795   return (0);
796 }
797
798 int XioConnection::CState::state_discon()
799 {
800   session_state = session_states::DISCONNECTED;
801   startup_state = session_startup_states::IDLE;
802
803   return 0;
804 }
805
806 int XioConnection::CState::state_flow_controlled(uint32_t flags)
807 {
808   if (! (flags & OP_FLAG_LOCKED))
809     pthread_spin_lock(&xcon->sp);
810
811   session_state = session_states::FLOW_CONTROLLED;
812
813   if (! (flags & OP_FLAG_LOCKED))
814     pthread_spin_unlock(&xcon->sp);
815
816   return (0);
817 }
818
819 int XioConnection::CState::state_fail(Message* m, uint32_t flags)
820 {
821   if (! (flags & OP_FLAG_LOCKED))
822     pthread_spin_lock(&xcon->sp);
823
824   // advance to state FAIL, drop queued, msgs, adjust LRU
825   session_state = session_states::DISCONNECTED);
826   startup_state = session_startup_states::FAIL);
827
828   xcon->discard_out_queues(flags|OP_FLAG_LOCKED);
829   xcon->adjust_clru(flags|OP_FLAG_LOCKED|OP_FLAG_LRU);
830
831   xcon->disconnect();
832
833   if (! (flags & OP_FLAG_LOCKED))
834     pthread_spin_unlock(&xcon->sp);
835
836   // notify ULP
837   XioMessenger* msgr = static_cast<XioMessenger*>(xcon->get_messenger());
838   msgr->ms_deliver_handle_reset(xcon);
839   m->put();
840
841   return 0;
842 }
843
844
845 int XioLoopbackConnection::send_message(Message *m)
846 {
847   XioMessenger *ms = static_cast<XioMessenger*>(get_messenger());
848   m->set_connection(this);
849   m->set_seq(next_seq());
850   m->set_src(ms->get_myinst().name);
851   ms->ds_dispatch(m);
852   return 0;
853 }
854
855 void XioLoopbackConnection::send_keepalive()
856 {
857   utime_t t = ceph_clock_now();
858   set_last_keepalive(t);
859   set_last_keepalive_ack(t);
860 }