initial code repo
[stor4nfv.git] / src / ceph / src / msg / async / AsyncConnection.cc
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- 
2 // vim: ts=8 sw=2 smarttab
3 /*
4  * Ceph - scalable distributed file system
5  *
6  * Copyright (C) 2014 UnitedStack <haomai@unitedstack.com>
7  *
8  * Author: Haomai Wang <haomaiwang@gmail.com>
9  *
10  * This is free software; you can redistribute it and/or
11  * modify it under the terms of the GNU Lesser General Public
12  * License version 2.1, as published by the Free Software
13  * Foundation.  See file COPYING.
14  *
15  */
16
17 #include <unistd.h>
18
19 #include "include/Context.h"
20 #include "common/errno.h"
21 #include "AsyncMessenger.h"
22 #include "AsyncConnection.h"
23
24 #include "messages/MOSDOp.h"
25 #include "messages/MOSDOpReply.h"
26 #include "common/EventTrace.h"
27
28 // Constant to limit starting sequence number to 2^31.  Nothing special about it, just a big number.  PLR
29 #define SEQ_MASK  0x7fffffff 
30
31 #define dout_subsys ceph_subsys_ms
32 #undef dout_prefix
33 #define dout_prefix _conn_prefix(_dout)
34 ostream& AsyncConnection::_conn_prefix(std::ostream *_dout) {
35   return *_dout << "-- " << async_msgr->get_myinst().addr << " >> " << peer_addr << " conn(" << this
36                 << " :" << port
37                 << " s=" << get_state_name(state)
38                 << " pgs=" << peer_global_seq
39                 << " cs=" << connect_seq
40                 << " l=" << policy.lossy
41                 << ").";
42 }
43
44 // Notes:
45 // 1. Don't dispatch any event when closed! It may cause AsyncConnection alive even if AsyncMessenger dead
46
47 const int AsyncConnection::TCP_PREFETCH_MIN_SIZE = 512;
48 const int ASYNC_COALESCE_THRESHOLD = 256;
49
50 class C_time_wakeup : public EventCallback {
51   AsyncConnectionRef conn;
52
53  public:
54   explicit C_time_wakeup(AsyncConnectionRef c): conn(c) {}
55   void do_request(int fd_or_id) override {
56     conn->wakeup_from(fd_or_id);
57   }
58 };
59
60 class C_handle_read : public EventCallback {
61   AsyncConnectionRef conn;
62
63  public:
64   explicit C_handle_read(AsyncConnectionRef c): conn(c) {}
65   void do_request(int fd_or_id) override {
66     conn->process();
67   }
68 };
69
70 class C_handle_write : public EventCallback {
71   AsyncConnectionRef conn;
72
73  public:
74   explicit C_handle_write(AsyncConnectionRef c): conn(c) {}
75   void do_request(int fd) override {
76     conn->handle_write();
77   }
78 };
79
80 class C_clean_handler : public EventCallback {
81   AsyncConnectionRef conn;
82  public:
83   explicit C_clean_handler(AsyncConnectionRef c): conn(c) {}
84   void do_request(int id) override {
85     conn->cleanup();
86     delete this;
87   }
88 };
89
90 class C_tick_wakeup : public EventCallback {
91   AsyncConnectionRef conn;
92
93  public:
94   explicit C_tick_wakeup(AsyncConnectionRef c): conn(c) {}
95   void do_request(int fd_or_id) override {
96     conn->tick(fd_or_id);
97   }
98 };
99
100 static void alloc_aligned_buffer(bufferlist& data, unsigned len, unsigned off)
101 {
102   // create a buffer to read into that matches the data alignment
103   unsigned left = len;
104   if (off & ~CEPH_PAGE_MASK) {
105     // head
106     unsigned head = 0;
107     head = MIN(CEPH_PAGE_SIZE - (off & ~CEPH_PAGE_MASK), left);
108     data.push_back(buffer::create(head));
109     left -= head;
110   }
111   unsigned middle = left & CEPH_PAGE_MASK;
112   if (middle > 0) {
113     data.push_back(buffer::create_page_aligned(middle));
114     left -= middle;
115   }
116   if (left) {
117     data.push_back(buffer::create(left));
118   }
119 }
120
121 AsyncConnection::AsyncConnection(CephContext *cct, AsyncMessenger *m, DispatchQueue *q,
122                                  Worker *w)
123   : Connection(cct, m), delay_state(NULL), async_msgr(m), conn_id(q->get_id()),
124     logger(w->get_perf_counter()), global_seq(0), connect_seq(0), peer_global_seq(0),
125     state(STATE_NONE), state_after_send(STATE_NONE), port(-1),
126     dispatch_queue(q), can_write(WriteStatus::NOWRITE),
127     keepalive(false), recv_buf(NULL),
128     recv_max_prefetch(MAX(msgr->cct->_conf->ms_tcp_prefetch_max_size, TCP_PREFETCH_MIN_SIZE)),
129     recv_start(0), recv_end(0),
130     last_active(ceph::coarse_mono_clock::now()),
131     inactive_timeout_us(cct->_conf->ms_tcp_read_timeout*1000*1000),
132     got_bad_auth(false), authorizer(NULL), replacing(false),
133     is_reset_from_peer(false), once_ready(false), state_buffer(NULL), state_offset(0),
134     worker(w), center(&w->center)
135 {
136   read_handler = new C_handle_read(this);
137   write_handler = new C_handle_write(this);
138   wakeup_handler = new C_time_wakeup(this);
139   tick_handler = new C_tick_wakeup(this);
140   memset(msgvec, 0, sizeof(msgvec));
141   // double recv_max_prefetch see "read_until"
142   recv_buf = new char[2*recv_max_prefetch];
143   state_buffer = new char[4096];
144   logger->inc(l_msgr_created_connections);
145 }
146
147 AsyncConnection::~AsyncConnection()
148 {
149   assert(out_q.empty());
150   assert(sent.empty());
151   delete authorizer;
152   if (recv_buf)
153     delete[] recv_buf;
154   if (state_buffer)
155     delete[] state_buffer;
156   assert(!delay_state);
157 }
158
159 void AsyncConnection::maybe_start_delay_thread()
160 {
161   if (!delay_state) {
162     auto pos = async_msgr->cct->_conf->get_val<std::string>("ms_inject_delay_type").find(ceph_entity_type_name(peer_type));
163     if (pos != string::npos) {
164       ldout(msgr->cct, 1) << __func__ << " setting up a delay queue" << dendl;
165       delay_state = new DelayedDelivery(async_msgr, center, dispatch_queue, conn_id);
166     }
167   }
168 }
169
170 /* return -1 means `fd` occurs error or closed, it should be closed
171  * return 0 means EAGAIN or EINTR */
172 ssize_t AsyncConnection::read_bulk(char *buf, unsigned len)
173 {
174   ssize_t nread;
175  again:
176   nread = cs.read(buf, len);
177   if (nread < 0) {
178     if (nread == -EAGAIN) {
179       nread = 0;
180     } else if (nread == -EINTR) {
181       goto again;
182     } else {
183       ldout(async_msgr->cct, 1) << __func__ << " reading from fd=" << cs.fd()
184                           << " : "<< strerror(nread) << dendl;
185       return -1;
186     }
187   } else if (nread == 0) {
188     ldout(async_msgr->cct, 1) << __func__ << " peer close file descriptor "
189                               << cs.fd() << dendl;
190     return -1;
191   }
192   return nread;
193 }
194
195 // return the remaining bytes, it may larger than the length of ptr
196 // else return < 0 means error
197 ssize_t AsyncConnection::_try_send(bool more)
198 {
199   if (async_msgr->cct->_conf->ms_inject_socket_failures && cs) {
200     if (rand() % async_msgr->cct->_conf->ms_inject_socket_failures == 0) {
201       ldout(async_msgr->cct, 0) << __func__ << " injecting socket failure" << dendl;
202       cs.shutdown();
203     }
204   }
205
206   assert(center->in_thread());
207   ssize_t r = cs.send(outcoming_bl, more);
208   if (r < 0) {
209     ldout(async_msgr->cct, 1) << __func__ << " send error: " << cpp_strerror(r) << dendl;
210     return r;
211   }
212
213   ldout(async_msgr->cct, 10) << __func__ << " sent bytes " << r
214                              << " remaining bytes " << outcoming_bl.length() << dendl;
215
216   if (!open_write && is_queued()) {
217     center->create_file_event(cs.fd(), EVENT_WRITABLE, write_handler);
218     open_write = true;
219   }
220
221   if (open_write && !is_queued()) {
222     center->delete_file_event(cs.fd(), EVENT_WRITABLE);
223     open_write = false;
224     if (state_after_send != STATE_NONE)
225       center->dispatch_event_external(read_handler);
226   }
227
228   return outcoming_bl.length();
229 }
230
231 // Because this func will be called multi times to populate
232 // the needed buffer, so the passed in bufferptr must be the same.
233 // Normally, only "read_message" will pass existing bufferptr in
234 //
235 // And it will uses readahead method to reduce small read overhead,
236 // "recv_buf" is used to store read buffer
237 //
238 // return the remaining bytes, 0 means this buffer is finished
239 // else return < 0 means error
240 ssize_t AsyncConnection::read_until(unsigned len, char *p)
241 {
242   ldout(async_msgr->cct, 25) << __func__ << " len is " << len << " state_offset is "
243                              << state_offset << dendl;
244
245   if (async_msgr->cct->_conf->ms_inject_socket_failures && cs) {
246     if (rand() % async_msgr->cct->_conf->ms_inject_socket_failures == 0) {
247       ldout(async_msgr->cct, 0) << __func__ << " injecting socket failure" << dendl;
248       cs.shutdown();
249     }
250   }
251
252   ssize_t r = 0;
253   uint64_t left = len - state_offset;
254   if (recv_end > recv_start) {
255     uint64_t to_read = MIN(recv_end - recv_start, left);
256     memcpy(p, recv_buf+recv_start, to_read);
257     recv_start += to_read;
258     left -= to_read;
259     ldout(async_msgr->cct, 25) << __func__ << " got " << to_read << " in buffer "
260                                << " left is " << left << " buffer still has "
261                                << recv_end - recv_start << dendl;
262     if (left == 0) {
263       return 0;
264     }
265     state_offset += to_read;
266   }
267
268   recv_end = recv_start = 0;
269   /* nothing left in the prefetch buffer */
270   if (len > recv_max_prefetch) {
271     /* this was a large read, we don't prefetch for these */
272     do {
273       r = read_bulk(p+state_offset, left);
274       ldout(async_msgr->cct, 25) << __func__ << " read_bulk left is " << left << " got " << r << dendl;
275       if (r < 0) {
276         ldout(async_msgr->cct, 1) << __func__ << " read failed" << dendl;
277         return -1;
278       } else if (r == static_cast<int>(left)) {
279         state_offset = 0;
280         return 0;
281       }
282       state_offset += r;
283       left -= r;
284     } while (r > 0);
285   } else {
286     do {
287       r = read_bulk(recv_buf+recv_end, recv_max_prefetch);
288       ldout(async_msgr->cct, 25) << __func__ << " read_bulk recv_end is " << recv_end
289                                  << " left is " << left << " got " << r << dendl;
290       if (r < 0) {
291         ldout(async_msgr->cct, 1) << __func__ << " read failed" << dendl;
292         return -1;
293       }
294       recv_end += r;
295       if (r >= static_cast<int>(left)) {
296         recv_start = len - state_offset;
297         memcpy(p+state_offset, recv_buf, recv_start);
298         state_offset = 0;
299         return 0;
300       }
301       left -= r;
302     } while (r > 0);
303     memcpy(p+state_offset, recv_buf, recv_end-recv_start);
304     state_offset += (recv_end - recv_start);
305     recv_end = recv_start = 0;
306   }
307   ldout(async_msgr->cct, 25) << __func__ << " need len " << len << " remaining "
308                              << len - state_offset << " bytes" << dendl;
309   return len - state_offset;
310 }
311
312 void AsyncConnection::inject_delay() {
313   if (async_msgr->cct->_conf->ms_inject_internal_delays) {
314     ldout(async_msgr->cct, 10) << __func__ << " sleep for " << 
315       async_msgr->cct->_conf->ms_inject_internal_delays << dendl;
316     utime_t t;
317     t.set_from_double(async_msgr->cct->_conf->ms_inject_internal_delays);
318     t.sleep();
319   }
320 }
321
322 void AsyncConnection::process()
323 {
324   ssize_t r = 0;
325   int prev_state = state;
326 #if defined(WITH_LTTNG) && defined(WITH_EVENTTRACE)
327   utime_t ltt_recv_stamp = ceph_clock_now();
328 #endif
329   bool need_dispatch_writer = false;
330   std::lock_guard<std::mutex> l(lock);
331   last_active = ceph::coarse_mono_clock::now();
332   auto recv_start_time = ceph::mono_clock::now();
333   do {
334     ldout(async_msgr->cct, 20) << __func__ << " prev state is " << get_state_name(prev_state) << dendl;
335     prev_state = state;
336     switch (state) {
337       case STATE_OPEN:
338         {
339           char tag = -1;
340           r = read_until(sizeof(tag), &tag);
341           if (r < 0) {
342             ldout(async_msgr->cct, 1) << __func__ << " read tag failed" << dendl;
343             goto fail;
344           } else if (r > 0) {
345             break;
346           }
347
348           if (tag == CEPH_MSGR_TAG_KEEPALIVE) {
349             ldout(async_msgr->cct, 20) << __func__ << " got KEEPALIVE" << dendl;
350             set_last_keepalive(ceph_clock_now());
351           } else if (tag == CEPH_MSGR_TAG_KEEPALIVE2) {
352             state = STATE_OPEN_KEEPALIVE2;
353           } else if (tag == CEPH_MSGR_TAG_KEEPALIVE2_ACK) {
354             state = STATE_OPEN_KEEPALIVE2_ACK;
355           } else if (tag == CEPH_MSGR_TAG_ACK) {
356             state = STATE_OPEN_TAG_ACK;
357           } else if (tag == CEPH_MSGR_TAG_MSG) {
358             state = STATE_OPEN_MESSAGE_HEADER;
359           } else if (tag == CEPH_MSGR_TAG_CLOSE) {
360             state = STATE_OPEN_TAG_CLOSE;
361           } else {
362             ldout(async_msgr->cct, 0) << __func__ << " bad tag " << (int)tag << dendl;
363             goto fail;
364           }
365
366           break;
367         }
368
369       case STATE_OPEN_KEEPALIVE2:
370         {
371           ceph_timespec *t;
372           r = read_until(sizeof(*t), state_buffer);
373           if (r < 0) {
374             ldout(async_msgr->cct, 1) << __func__ << " read keeplive timespec failed" << dendl;
375             goto fail;
376           } else if (r > 0) {
377             break;
378           }
379
380           ldout(async_msgr->cct, 30) << __func__ << " got KEEPALIVE2 tag ..." << dendl;
381           t = (ceph_timespec*)state_buffer;
382           utime_t kp_t = utime_t(*t);
383           write_lock.lock();
384           _append_keepalive_or_ack(true, &kp_t);
385           write_lock.unlock();
386           ldout(async_msgr->cct, 20) << __func__ << " got KEEPALIVE2 " << kp_t << dendl;
387           set_last_keepalive(ceph_clock_now());
388           need_dispatch_writer = true;
389           state = STATE_OPEN;
390           break;
391         }
392
393       case STATE_OPEN_KEEPALIVE2_ACK:
394         {
395           ceph_timespec *t;
396           r = read_until(sizeof(*t), state_buffer);
397           if (r < 0) {
398             ldout(async_msgr->cct, 1) << __func__ << " read keeplive timespec failed" << dendl;
399             goto fail;
400           } else if (r > 0) {
401             break;
402           }
403
404           t = (ceph_timespec*)state_buffer;
405           set_last_keepalive_ack(utime_t(*t));
406           ldout(async_msgr->cct, 20) << __func__ << " got KEEPALIVE_ACK" << dendl;
407           state = STATE_OPEN;
408           break;
409         }
410
411       case STATE_OPEN_TAG_ACK:
412         {
413           ceph_le64 *seq;
414           r = read_until(sizeof(*seq), state_buffer);
415           if (r < 0) {
416             ldout(async_msgr->cct, 1) << __func__ << " read ack seq failed" << dendl;
417             goto fail;
418           } else if (r > 0) {
419             break;
420           }
421
422           seq = (ceph_le64*)state_buffer;
423           ldout(async_msgr->cct, 20) << __func__ << " got ACK" << dendl;
424           handle_ack(*seq);
425           state = STATE_OPEN;
426           break;
427         }
428
429       case STATE_OPEN_MESSAGE_HEADER:
430         {
431 #if defined(WITH_LTTNG) && defined(WITH_EVENTTRACE)
432           ltt_recv_stamp = ceph_clock_now();
433 #endif
434           recv_stamp = ceph_clock_now();
435           ldout(async_msgr->cct, 20) << __func__ << " begin MSG" << dendl;
436           ceph_msg_header header;
437           ceph_msg_header_old oldheader;
438           __u32 header_crc = 0;
439           unsigned len;
440           if (has_feature(CEPH_FEATURE_NOSRCADDR))
441             len = sizeof(header);
442           else
443             len = sizeof(oldheader);
444
445           r = read_until(len, state_buffer);
446           if (r < 0) {
447             ldout(async_msgr->cct, 1) << __func__ << " read message header failed" << dendl;
448             goto fail;
449           } else if (r > 0) {
450             break;
451           }
452
453           ldout(async_msgr->cct, 20) << __func__ << " got MSG header" << dendl;
454
455           if (has_feature(CEPH_FEATURE_NOSRCADDR)) {
456             header = *((ceph_msg_header*)state_buffer);
457             if (msgr->crcflags & MSG_CRC_HEADER)
458               header_crc = ceph_crc32c(0, (unsigned char *)&header,
459                                        sizeof(header) - sizeof(header.crc));
460           } else {
461             oldheader = *((ceph_msg_header_old*)state_buffer);
462             // this is fugly
463             memcpy(&header, &oldheader, sizeof(header));
464             header.src = oldheader.src.name;
465             header.reserved = oldheader.reserved;
466             if (msgr->crcflags & MSG_CRC_HEADER) {
467               header.crc = oldheader.crc;
468               header_crc = ceph_crc32c(0, (unsigned char *)&oldheader, sizeof(oldheader) - sizeof(oldheader.crc));
469             }
470           }
471
472           ldout(async_msgr->cct, 20) << __func__ << " got envelope type=" << header.type
473                               << " src " << entity_name_t(header.src)
474                               << " front=" << header.front_len
475                               << " data=" << header.data_len
476                               << " off " << header.data_off << dendl;
477
478           // verify header crc
479           if (msgr->crcflags & MSG_CRC_HEADER && header_crc != header.crc) {
480             ldout(async_msgr->cct,0) << __func__ << " got bad header crc "
481                                      << header_crc << " != " << header.crc << dendl;
482             goto fail;
483           }
484
485           // Reset state
486           data_buf.clear();
487           front.clear();
488           middle.clear();
489           data.clear();
490           current_header = header;
491           state = STATE_OPEN_MESSAGE_THROTTLE_MESSAGE;
492           break;
493         }
494
495       case STATE_OPEN_MESSAGE_THROTTLE_MESSAGE:
496         {
497           if (policy.throttler_messages) {
498             ldout(async_msgr->cct, 10) << __func__ << " wants " << 1 << " message from policy throttler "
499                                        << policy.throttler_messages->get_current() << "/"
500                                        << policy.throttler_messages->get_max() << dendl;
501             if (!policy.throttler_messages->get_or_fail()) {
502               ldout(async_msgr->cct, 10) << __func__ << " wants 1 message from policy throttle "
503                                          << policy.throttler_messages->get_current() << "/"
504                                          << policy.throttler_messages->get_max() << " failed, just wait." << dendl;
505               // following thread pool deal with th full message queue isn't a
506               // short time, so we can wait a ms.
507               if (register_time_events.empty())
508                 register_time_events.insert(center->create_time_event(1000, wakeup_handler));
509               break;
510             }
511           }
512
513           state = STATE_OPEN_MESSAGE_THROTTLE_BYTES;
514           break;
515         }
516
517       case STATE_OPEN_MESSAGE_THROTTLE_BYTES:
518         {
519           cur_msg_size = current_header.front_len + current_header.middle_len + current_header.data_len;
520           if (cur_msg_size) {
521             if (policy.throttler_bytes) {
522               ldout(async_msgr->cct, 10) << __func__ << " wants " << cur_msg_size << " bytes from policy throttler "
523                                          << policy.throttler_bytes->get_current() << "/"
524                                          << policy.throttler_bytes->get_max() << dendl;
525               if (!policy.throttler_bytes->get_or_fail(cur_msg_size)) {
526                 ldout(async_msgr->cct, 10) << __func__ << " wants " << cur_msg_size << " bytes from policy throttler "
527                                            << policy.throttler_bytes->get_current() << "/"
528                                            << policy.throttler_bytes->get_max() << " failed, just wait." << dendl;
529                 // following thread pool deal with th full message queue isn't a
530                 // short time, so we can wait a ms.
531                 if (register_time_events.empty())
532                   register_time_events.insert(center->create_time_event(1000, wakeup_handler));
533                 break;
534               }
535             }
536           }
537
538           state = STATE_OPEN_MESSAGE_THROTTLE_DISPATCH_QUEUE;
539           break;
540         }
541
542       case STATE_OPEN_MESSAGE_THROTTLE_DISPATCH_QUEUE:
543         {
544           if (cur_msg_size) {
545             if (!dispatch_queue->dispatch_throttler.get_or_fail(cur_msg_size)) {
546               ldout(async_msgr->cct, 10) << __func__ << " wants " << cur_msg_size << " bytes from dispatch throttle "
547                                          << dispatch_queue->dispatch_throttler.get_current() << "/"
548                                          << dispatch_queue->dispatch_throttler.get_max() << " failed, just wait." << dendl;
549               // following thread pool deal with th full message queue isn't a
550               // short time, so we can wait a ms.
551               if (register_time_events.empty())
552                 register_time_events.insert(center->create_time_event(1000, wakeup_handler));
553               break;
554             }
555           }
556
557           throttle_stamp = ceph_clock_now();
558           state = STATE_OPEN_MESSAGE_READ_FRONT;
559           break;
560         }
561
562       case STATE_OPEN_MESSAGE_READ_FRONT:
563         {
564           // read front
565           unsigned front_len = current_header.front_len;
566           if (front_len) {
567             if (!front.length())
568               front.push_back(buffer::create(front_len));
569
570             r = read_until(front_len, front.c_str());
571             if (r < 0) {
572               ldout(async_msgr->cct, 1) << __func__ << " read message front failed" << dendl;
573               goto fail;
574             } else if (r > 0) {
575               break;
576             }
577
578             ldout(async_msgr->cct, 20) << __func__ << " got front " << front.length() << dendl;
579           }
580           state = STATE_OPEN_MESSAGE_READ_MIDDLE;
581         }
582
583       case STATE_OPEN_MESSAGE_READ_MIDDLE:
584         {
585           // read middle
586           unsigned middle_len = current_header.middle_len;
587           if (middle_len) {
588             if (!middle.length())
589               middle.push_back(buffer::create(middle_len));
590
591             r = read_until(middle_len, middle.c_str());
592             if (r < 0) {
593               ldout(async_msgr->cct, 1) << __func__ << " read message middle failed" << dendl;
594               goto fail;
595             } else if (r > 0) {
596               break;
597             }
598             ldout(async_msgr->cct, 20) << __func__ << " got middle " << middle.length() << dendl;
599           }
600
601           state = STATE_OPEN_MESSAGE_READ_DATA_PREPARE;
602         }
603
604       case STATE_OPEN_MESSAGE_READ_DATA_PREPARE:
605         {
606           // read data
607           unsigned data_len = le32_to_cpu(current_header.data_len);
608           unsigned data_off = le32_to_cpu(current_header.data_off);
609           if (data_len) {
610             // get a buffer
611             map<ceph_tid_t,pair<bufferlist,int> >::iterator p = rx_buffers.find(current_header.tid);
612             if (p != rx_buffers.end()) {
613               ldout(async_msgr->cct,10) << __func__ << " seleting rx buffer v " << p->second.second
614                                   << " at offset " << data_off
615                                   << " len " << p->second.first.length() << dendl;
616               data_buf = p->second.first;
617               // make sure it's big enough
618               if (data_buf.length() < data_len)
619                 data_buf.push_back(buffer::create(data_len - data_buf.length()));
620               data_blp = data_buf.begin();
621             } else {
622               ldout(async_msgr->cct,20) << __func__ << " allocating new rx buffer at offset " << data_off << dendl;
623               alloc_aligned_buffer(data_buf, data_len, data_off);
624               data_blp = data_buf.begin();
625             }
626           }
627
628           msg_left = data_len;
629           state = STATE_OPEN_MESSAGE_READ_DATA;
630         }
631
632       case STATE_OPEN_MESSAGE_READ_DATA:
633         {
634           while (msg_left > 0) {
635             bufferptr bp = data_blp.get_current_ptr();
636             unsigned read = MIN(bp.length(), msg_left);
637             r = read_until(read, bp.c_str());
638             if (r < 0) {
639               ldout(async_msgr->cct, 1) << __func__ << " read data error " << dendl;
640               goto fail;
641             } else if (r > 0) {
642               break;
643             }
644
645             data_blp.advance(read);
646             data.append(bp, 0, read);
647             msg_left -= read;
648           }
649
650           if (msg_left > 0)
651             break;
652
653           state = STATE_OPEN_MESSAGE_READ_FOOTER_AND_DISPATCH;
654         }
655
656       case STATE_OPEN_MESSAGE_READ_FOOTER_AND_DISPATCH:
657         {
658           ceph_msg_footer footer;
659           ceph_msg_footer_old old_footer;
660           unsigned len;
661           // footer
662           if (has_feature(CEPH_FEATURE_MSG_AUTH))
663             len = sizeof(footer);
664           else
665             len = sizeof(old_footer);
666
667           r = read_until(len, state_buffer);
668           if (r < 0) {
669             ldout(async_msgr->cct, 1) << __func__ << " read footer data error " << dendl;
670             goto fail;
671           } else if (r > 0) {
672             break;
673           }
674
675           if (has_feature(CEPH_FEATURE_MSG_AUTH)) {
676             footer = *((ceph_msg_footer*)state_buffer);
677           } else {
678             old_footer = *((ceph_msg_footer_old*)state_buffer);
679             footer.front_crc = old_footer.front_crc;
680             footer.middle_crc = old_footer.middle_crc;
681             footer.data_crc = old_footer.data_crc;
682             footer.sig = 0;
683             footer.flags = old_footer.flags;
684           }
685           int aborted = (footer.flags & CEPH_MSG_FOOTER_COMPLETE) == 0;
686           ldout(async_msgr->cct, 10) << __func__ << " aborted = " << aborted << dendl;
687           if (aborted) {
688             ldout(async_msgr->cct, 0) << __func__ << " got " << front.length() << " + " << middle.length() << " + " << data.length()
689                                 << " byte message.. ABORTED" << dendl;
690             goto fail;
691           }
692
693           ldout(async_msgr->cct, 20) << __func__ << " got " << front.length() << " + " << middle.length()
694                               << " + " << data.length() << " byte message" << dendl;
695           Message *message = decode_message(async_msgr->cct, async_msgr->crcflags, current_header, footer,
696                                             front, middle, data, this);
697           if (!message) {
698             ldout(async_msgr->cct, 1) << __func__ << " decode message failed " << dendl;
699             goto fail;
700           }
701
702           //
703           //  Check the signature if one should be present.  A zero return indicates success. PLR
704           //
705
706           if (session_security.get() == NULL) {
707             ldout(async_msgr->cct, 10) << __func__ << " no session security set" << dendl;
708           } else {
709             if (session_security->check_message_signature(message)) {
710               ldout(async_msgr->cct, 0) << __func__ << " Signature check failed" << dendl;
711               message->put();
712               goto fail;
713             }
714           }
715           message->set_byte_throttler(policy.throttler_bytes);
716           message->set_message_throttler(policy.throttler_messages);
717
718           // store reservation size in message, so we don't get confused
719           // by messages entering the dispatch queue through other paths.
720           message->set_dispatch_throttle_size(cur_msg_size);
721
722           message->set_recv_stamp(recv_stamp);
723           message->set_throttle_stamp(throttle_stamp);
724           message->set_recv_complete_stamp(ceph_clock_now());
725
726           // check received seq#.  if it is old, drop the message.  
727           // note that incoming messages may skip ahead.  this is convenient for the client
728           // side queueing because messages can't be renumbered, but the (kernel) client will
729           // occasionally pull a message out of the sent queue to send elsewhere.  in that case
730           // it doesn't matter if we "got" it or not.
731           uint64_t cur_seq = in_seq;
732           if (message->get_seq() <= cur_seq) {
733             ldout(async_msgr->cct,0) << __func__ << " got old message "
734                     << message->get_seq() << " <= " << cur_seq << " " << message << " " << *message
735                     << ", discarding" << dendl;
736             message->put();
737             if (has_feature(CEPH_FEATURE_RECONNECT_SEQ) && async_msgr->cct->_conf->ms_die_on_old_message)
738               assert(0 == "old msgs despite reconnect_seq feature");
739             break;
740           }
741           if (message->get_seq() > cur_seq + 1) {
742             ldout(async_msgr->cct, 0) << __func__ << " missed message?  skipped from seq "
743                                       << cur_seq << " to " << message->get_seq() << dendl;
744             if (async_msgr->cct->_conf->ms_die_on_skipped_message)
745               assert(0 == "skipped incoming seq");
746           }
747
748           message->set_connection(this);
749
750 #if defined(WITH_LTTNG) && defined(WITH_EVENTTRACE)
751           if (message->get_type() == CEPH_MSG_OSD_OP || message->get_type() == CEPH_MSG_OSD_OPREPLY) {
752             utime_t ltt_processed_stamp = ceph_clock_now();
753             double usecs_elapsed = (ltt_processed_stamp.to_nsec()-ltt_recv_stamp.to_nsec())/1000;
754             ostringstream buf;
755             if (message->get_type() == CEPH_MSG_OSD_OP)
756               OID_ELAPSED_WITH_MSG(message, usecs_elapsed, "TIME_TO_DECODE_OSD_OP", false);
757             else
758               OID_ELAPSED_WITH_MSG(message, usecs_elapsed, "TIME_TO_DECODE_OSD_OPREPLY", false);
759           }
760 #endif
761
762           // note last received message.
763           in_seq = message->get_seq();
764           ldout(async_msgr->cct, 5) << " rx " << message->get_source() << " seq "
765                                     << message->get_seq() << " " << message
766                                     << " " << *message << dendl;
767
768           if (!policy.lossy) {
769             ack_left++;
770             need_dispatch_writer = true;
771           }
772           state = STATE_OPEN;
773
774           logger->inc(l_msgr_recv_messages);
775           logger->inc(l_msgr_recv_bytes, cur_msg_size + sizeof(ceph_msg_header) + sizeof(ceph_msg_footer));
776
777           async_msgr->ms_fast_preprocess(message);
778           auto fast_dispatch_time = ceph::mono_clock::now();
779           logger->tinc(l_msgr_running_recv_time, fast_dispatch_time - recv_start_time);
780           if (delay_state) {
781             utime_t release = message->get_recv_stamp();
782             double delay_period = 0;
783             if (rand() % 10000 < async_msgr->cct->_conf->ms_inject_delay_probability * 10000.0) {
784               delay_period = async_msgr->cct->_conf->ms_inject_delay_max * (double)(rand() % 10000) / 10000.0;
785               release += delay_period;
786               ldout(async_msgr->cct, 1) << "queue_received will delay until " << release << " on "
787                                         << message << " " << *message << dendl;
788             }
789             delay_state->queue(delay_period, release, message);
790           } else if (async_msgr->ms_can_fast_dispatch(message)) {
791             lock.unlock();
792             dispatch_queue->fast_dispatch(message);
793             recv_start_time = ceph::mono_clock::now();
794             logger->tinc(l_msgr_running_fast_dispatch_time,
795                          recv_start_time - fast_dispatch_time);
796             lock.lock();
797           } else {
798             dispatch_queue->enqueue(message, message->get_priority(), conn_id);
799           }
800
801           break;
802         }
803
804       case STATE_OPEN_TAG_CLOSE:
805         {
806           ldout(async_msgr->cct, 20) << __func__ << " got CLOSE" << dendl;
807           _stop();
808           return ;
809         }
810
811       case STATE_STANDBY:
812         {
813           ldout(async_msgr->cct, 20) << __func__ << " enter STANDY" << dendl;
814
815           break;
816         }
817
818       case STATE_NONE:
819         {
820           ldout(async_msgr->cct, 20) << __func__ << " enter none state" << dendl;
821           break;
822         }
823
824       case STATE_CLOSED:
825         {
826           ldout(async_msgr->cct, 20) << __func__ << " socket closed" << dendl;
827           break;
828         }
829
830       case STATE_WAIT:
831         {
832           ldout(async_msgr->cct, 1) << __func__ << " enter wait state, failing" << dendl;
833           goto fail;
834         }
835
836       default:
837         {
838           if (_process_connection() < 0)
839             goto fail;
840           break;
841         }
842     }
843   } while (prev_state != state);
844
845   if (need_dispatch_writer && is_connected())
846     center->dispatch_event_external(write_handler);
847
848   logger->tinc(l_msgr_running_recv_time, ceph::mono_clock::now() - recv_start_time);
849   return;
850
851  fail:
852   fault();
853 }
854
855 ssize_t AsyncConnection::_process_connection()
856 {
857   ssize_t r = 0;
858
859   switch(state) {
860     case STATE_WAIT_SEND:
861       {
862         std::lock_guard<std::mutex> l(write_lock);
863         if (!outcoming_bl.length()) {
864           assert(state_after_send);
865           state = state_after_send;
866           state_after_send = STATE_NONE;
867         }
868         break;
869       }
870
871     case STATE_CONNECTING:
872       {
873         assert(!policy.server);
874
875         // reset connect state variables
876         got_bad_auth = false;
877         delete authorizer;
878         authorizer = NULL;
879         authorizer_buf.clear();
880         memset(&connect_msg, 0, sizeof(connect_msg));
881         memset(&connect_reply, 0, sizeof(connect_reply));
882
883         global_seq = async_msgr->get_global_seq();
884         // close old socket.  this is safe because we stopped the reader thread above.
885         if (cs) {
886           center->delete_file_event(cs.fd(), EVENT_READABLE|EVENT_WRITABLE);
887           cs.close();
888         }
889
890         SocketOptions opts;
891         opts.priority = async_msgr->get_socket_priority();
892         opts.connect_bind_addr = msgr->get_myaddr();
893         r = worker->connect(get_peer_addr(), opts, &cs);
894         if (r < 0)
895           goto fail;
896
897         center->create_file_event(cs.fd(), EVENT_READABLE, read_handler);
898         state = STATE_CONNECTING_RE;
899         break;
900       }
901
902     case STATE_CONNECTING_RE:
903       {
904         r = cs.is_connected();
905         if (r < 0) {
906           ldout(async_msgr->cct, 1) << __func__ << " reconnect failed " << dendl;
907           if (r == -ECONNREFUSED) {
908             ldout(async_msgr->cct, 2) << __func__ << " connection refused!" << dendl;
909             dispatch_queue->queue_refused(this);
910           }
911           goto fail;
912         } else if (r == 0) {
913           ldout(async_msgr->cct, 10) << __func__ << " nonblock connect inprogress" << dendl;
914           if (async_msgr->get_stack()->nonblock_connect_need_writable_event())
915             center->create_file_event(cs.fd(), EVENT_WRITABLE, read_handler);
916           break;
917         }
918
919         center->delete_file_event(cs.fd(), EVENT_WRITABLE);
920         ldout(async_msgr->cct, 10) << __func__ << " connect successfully, ready to send banner" << dendl;
921
922         bufferlist bl;
923         bl.append(CEPH_BANNER, strlen(CEPH_BANNER));
924         r = try_send(bl);
925         if (r == 0) {
926           state = STATE_CONNECTING_WAIT_BANNER_AND_IDENTIFY;
927           ldout(async_msgr->cct, 10) << __func__ << " connect write banner done: "
928                                      << get_peer_addr() << dendl;
929         } else if (r > 0) {
930           state = STATE_WAIT_SEND;
931           state_after_send = STATE_CONNECTING_WAIT_BANNER_AND_IDENTIFY;
932           ldout(async_msgr->cct, 10) << __func__ << " connect wait for write banner: "
933                                << get_peer_addr() << dendl;
934         } else {
935           goto fail;
936         }
937
938         break;
939       }
940
941     case STATE_CONNECTING_WAIT_BANNER_AND_IDENTIFY:
942       {
943         entity_addr_t paddr, peer_addr_for_me;
944         bufferlist myaddrbl;
945         unsigned banner_len = strlen(CEPH_BANNER);
946         unsigned need_len = banner_len + sizeof(ceph_entity_addr)*2;
947         r = read_until(need_len, state_buffer);
948         if (r < 0) {
949           ldout(async_msgr->cct, 1) << __func__ << " read banner and identify addresses failed" << dendl;
950           goto fail;
951         } else if (r > 0) {
952           break;
953         }
954
955         if (memcmp(state_buffer, CEPH_BANNER, banner_len)) {
956           ldout(async_msgr->cct, 0) << __func__ << " connect protocol error (bad banner) on peer "
957                                     << get_peer_addr() << dendl;
958           goto fail;
959         }
960
961         bufferlist bl;
962         bl.append(state_buffer+banner_len, sizeof(ceph_entity_addr)*2);
963         bufferlist::iterator p = bl.begin();
964         try {
965           ::decode(paddr, p);
966           ::decode(peer_addr_for_me, p);
967         } catch (const buffer::error& e) {
968           lderr(async_msgr->cct) << __func__ <<  " decode peer addr failed " << dendl;
969           goto fail;
970         }
971         ldout(async_msgr->cct, 20) << __func__ <<  " connect read peer addr "
972                              << paddr << " on socket " << cs.fd() << dendl;
973         if (peer_addr != paddr) {
974           if (paddr.is_blank_ip() && peer_addr.get_port() == paddr.get_port() &&
975               peer_addr.get_nonce() == paddr.get_nonce()) {
976             ldout(async_msgr->cct, 0) << __func__ <<  " connect claims to be " << paddr
977                                 << " not " << peer_addr
978                                 << " - presumably this is the same node!" << dendl;
979           } else {
980             ldout(async_msgr->cct, 10) << __func__ << " connect claims to be "
981                                        << paddr << " not " << peer_addr << dendl;
982             goto fail;
983           }
984         }
985
986         ldout(async_msgr->cct, 20) << __func__ << " connect peer addr for me is " << peer_addr_for_me << dendl;
987         lock.unlock();
988         async_msgr->learned_addr(peer_addr_for_me);
989         if (async_msgr->cct->_conf->ms_inject_internal_delays) {
990           if (rand() % async_msgr->cct->_conf->ms_inject_socket_failures == 0) {
991             ldout(msgr->cct, 10) << __func__ << " sleep for "
992                                  << async_msgr->cct->_conf->ms_inject_internal_delays << dendl;
993             utime_t t;
994             t.set_from_double(async_msgr->cct->_conf->ms_inject_internal_delays);
995             t.sleep();
996           }
997         }
998
999         lock.lock();
1000         if (state != STATE_CONNECTING_WAIT_BANNER_AND_IDENTIFY) {
1001           ldout(async_msgr->cct, 1) << __func__ << " state changed while learned_addr, mark_down or "
1002                                     << " replacing must be happened just now" << dendl;
1003           return 0;
1004         }
1005
1006         ::encode(async_msgr->get_myaddr(), myaddrbl, 0); // legacy
1007         r = try_send(myaddrbl);
1008         if (r == 0) {
1009           state = STATE_CONNECTING_SEND_CONNECT_MSG;
1010           ldout(async_msgr->cct, 10) << __func__ << " connect sent my addr "
1011               << async_msgr->get_myaddr() << dendl;
1012         } else if (r > 0) {
1013           state = STATE_WAIT_SEND;
1014           state_after_send = STATE_CONNECTING_SEND_CONNECT_MSG;
1015           ldout(async_msgr->cct, 10) << __func__ << " connect send my addr done: "
1016               << async_msgr->get_myaddr() << dendl;
1017         } else {
1018           ldout(async_msgr->cct, 2) << __func__ << " connect couldn't write my addr, "
1019               << cpp_strerror(r) << dendl;
1020           goto fail;
1021         }
1022
1023         break;
1024       }
1025
1026     case STATE_CONNECTING_SEND_CONNECT_MSG:
1027       {
1028         if (!got_bad_auth) {
1029           delete authorizer;
1030           authorizer = async_msgr->get_authorizer(peer_type, false);
1031         }
1032         bufferlist bl;
1033
1034         connect_msg.features = policy.features_supported;
1035         connect_msg.host_type = async_msgr->get_myinst().name.type();
1036         connect_msg.global_seq = global_seq;
1037         connect_msg.connect_seq = connect_seq;
1038         connect_msg.protocol_version = async_msgr->get_proto_version(peer_type, true);
1039         connect_msg.authorizer_protocol = authorizer ? authorizer->protocol : 0;
1040         connect_msg.authorizer_len = authorizer ? authorizer->bl.length() : 0;
1041         if (authorizer)
1042           ldout(async_msgr->cct, 10) << __func__ <<  " connect_msg.authorizer_len="
1043                                      << connect_msg.authorizer_len << " protocol="
1044                                      << connect_msg.authorizer_protocol << dendl;
1045         connect_msg.flags = 0;
1046         if (policy.lossy)
1047           connect_msg.flags |= CEPH_MSG_CONNECT_LOSSY;  // this is fyi, actually, server decides!
1048         bl.append((char*)&connect_msg, sizeof(connect_msg));
1049         if (authorizer) {
1050           bl.append(authorizer->bl.c_str(), authorizer->bl.length());
1051         }
1052         ldout(async_msgr->cct, 10) << __func__ << " connect sending gseq=" << global_seq << " cseq="
1053             << connect_seq << " proto=" << connect_msg.protocol_version << dendl;
1054
1055         r = try_send(bl);
1056         if (r == 0) {
1057           state = STATE_CONNECTING_WAIT_CONNECT_REPLY;
1058           ldout(async_msgr->cct,20) << __func__ << " connect wrote (self +) cseq, waiting for reply" << dendl;
1059         } else if (r > 0) {
1060           state = STATE_WAIT_SEND;
1061           state_after_send = STATE_CONNECTING_WAIT_CONNECT_REPLY;
1062           ldout(async_msgr->cct, 10) << __func__ << " continue send reply " << dendl;
1063         } else {
1064           ldout(async_msgr->cct, 2) << __func__ << " connect couldn't send reply "
1065               << cpp_strerror(r) << dendl;
1066           goto fail;
1067         }
1068
1069         break;
1070       }
1071
1072     case STATE_CONNECTING_WAIT_CONNECT_REPLY:
1073       {
1074         r = read_until(sizeof(connect_reply), state_buffer);
1075         if (r < 0) {
1076           ldout(async_msgr->cct, 1) << __func__ << " read connect reply failed" << dendl;
1077           goto fail;
1078         } else if (r > 0) {
1079           break;
1080         }
1081
1082         connect_reply = *((ceph_msg_connect_reply*)state_buffer);
1083
1084         ldout(async_msgr->cct, 20) << __func__ << " connect got reply tag " << (int)connect_reply.tag
1085                              << " connect_seq " << connect_reply.connect_seq << " global_seq "
1086                              << connect_reply.global_seq << " proto " << connect_reply.protocol_version
1087                              << " flags " << (int)connect_reply.flags << " features "
1088                              << connect_reply.features << dendl;
1089         state = STATE_CONNECTING_WAIT_CONNECT_REPLY_AUTH;
1090
1091         break;
1092       }
1093
1094     case STATE_CONNECTING_WAIT_CONNECT_REPLY_AUTH:
1095       {
1096         bufferlist authorizer_reply;
1097         if (connect_reply.authorizer_len) {
1098           ldout(async_msgr->cct, 10) << __func__ << " reply.authorizer_len=" << connect_reply.authorizer_len << dendl;
1099           assert(connect_reply.authorizer_len < 4096);
1100           r = read_until(connect_reply.authorizer_len, state_buffer);
1101           if (r < 0) {
1102             ldout(async_msgr->cct, 1) << __func__ << " read connect reply authorizer failed" << dendl;
1103             goto fail;
1104           } else if (r > 0) {
1105             break;
1106           }
1107
1108           authorizer_reply.append(state_buffer, connect_reply.authorizer_len);
1109           bufferlist::iterator iter = authorizer_reply.begin();
1110           if (authorizer && !authorizer->verify_reply(iter)) {
1111             ldout(async_msgr->cct, 0) << __func__ << " failed verifying authorize reply" << dendl;
1112             goto fail;
1113           }
1114         }
1115         r = handle_connect_reply(connect_msg, connect_reply);
1116         if (r < 0)
1117           goto fail;
1118
1119         // state must be changed!
1120         assert(state != STATE_CONNECTING_WAIT_CONNECT_REPLY_AUTH);
1121         break;
1122       }
1123
1124     case STATE_CONNECTING_WAIT_ACK_SEQ:
1125       {
1126         uint64_t newly_acked_seq = 0;
1127
1128         r = read_until(sizeof(newly_acked_seq), state_buffer);
1129         if (r < 0) {
1130           ldout(async_msgr->cct, 1) << __func__ << " read connect ack seq failed" << dendl;
1131           goto fail;
1132         } else if (r > 0) {
1133           break;
1134         }
1135
1136         newly_acked_seq = *((uint64_t*)state_buffer);
1137         ldout(async_msgr->cct, 2) << __func__ << " got newly_acked_seq " << newly_acked_seq
1138                             << " vs out_seq " << out_seq << dendl;
1139         discard_requeued_up_to(newly_acked_seq);
1140         //while (newly_acked_seq > out_seq.read()) {
1141         //  Message *m = _get_next_outgoing(NULL);
1142         //  assert(m);
1143         //  ldout(async_msgr->cct, 2) << __func__ << " discarding previously sent " << m->get_seq()
1144         //                      << " " << *m << dendl;
1145         //  assert(m->get_seq() <= newly_acked_seq);
1146         //  m->put();
1147         //  out_seq.inc();
1148         //}
1149
1150         bufferlist bl;
1151         uint64_t s = in_seq;
1152         bl.append((char*)&s, sizeof(s));
1153         r = try_send(bl);
1154         if (r == 0) {
1155           state = STATE_CONNECTING_READY;
1156           ldout(async_msgr->cct, 10) << __func__ << " send in_seq done " << dendl;
1157         } else if (r > 0) {
1158           state_after_send = STATE_CONNECTING_READY;
1159           state = STATE_WAIT_SEND;
1160           ldout(async_msgr->cct, 10) << __func__ << " continue send in_seq " << dendl;
1161         } else {
1162           goto fail;
1163         }
1164         break;
1165       }
1166
1167     case STATE_CONNECTING_READY:
1168       {
1169         // hooray!
1170         peer_global_seq = connect_reply.global_seq;
1171         policy.lossy = connect_reply.flags & CEPH_MSG_CONNECT_LOSSY;
1172         state = STATE_OPEN;
1173         once_ready = true;
1174         connect_seq += 1;
1175         assert(connect_seq == connect_reply.connect_seq);
1176         backoff = utime_t();
1177         set_features((uint64_t)connect_reply.features & (uint64_t)connect_msg.features);
1178         ldout(async_msgr->cct, 10) << __func__ << " connect success " << connect_seq
1179                                    << ", lossy = " << policy.lossy << ", features "
1180                                    << get_features() << dendl;
1181
1182         // If we have an authorizer, get a new AuthSessionHandler to deal with ongoing security of the
1183         // connection.  PLR
1184         if (authorizer != NULL) {
1185           session_security.reset(
1186               get_auth_session_handler(async_msgr->cct,
1187                                        authorizer->protocol,
1188                                        authorizer->session_key,
1189                                        get_features()));
1190         } else {
1191           // We have no authorizer, so we shouldn't be applying security to messages in this AsyncConnection.  PLR
1192           session_security.reset();
1193         }
1194
1195         if (delay_state)
1196           assert(delay_state->ready());
1197         dispatch_queue->queue_connect(this);
1198         async_msgr->ms_deliver_handle_fast_connect(this);
1199
1200         // make sure no pending tick timer
1201         if (last_tick_id)
1202           center->delete_time_event(last_tick_id);
1203         last_tick_id = center->create_time_event(inactive_timeout_us, tick_handler);
1204
1205         // message may in queue between last _try_send and connection ready
1206         // write event may already notify and we need to force scheduler again
1207         write_lock.lock();
1208         can_write = WriteStatus::CANWRITE;
1209         if (is_queued())
1210           center->dispatch_event_external(write_handler);
1211         write_lock.unlock();
1212         maybe_start_delay_thread();
1213         break;
1214       }
1215
1216     case STATE_ACCEPTING:
1217       {
1218         bufferlist bl;
1219         center->create_file_event(cs.fd(), EVENT_READABLE, read_handler);
1220
1221         bl.append(CEPH_BANNER, strlen(CEPH_BANNER));
1222
1223         ::encode(async_msgr->get_myaddr(), bl, 0); // legacy
1224         port = async_msgr->get_myaddr().get_port();
1225         ::encode(socket_addr, bl, 0); // legacy
1226         ldout(async_msgr->cct, 1) << __func__ << " sd=" << cs.fd() << " " << socket_addr << dendl;
1227
1228         r = try_send(bl);
1229         if (r == 0) {
1230           state = STATE_ACCEPTING_WAIT_BANNER_ADDR;
1231           ldout(async_msgr->cct, 10) << __func__ << " write banner and addr done: "
1232             << get_peer_addr() << dendl;
1233         } else if (r > 0) {
1234           state = STATE_WAIT_SEND;
1235           state_after_send = STATE_ACCEPTING_WAIT_BANNER_ADDR;
1236           ldout(async_msgr->cct, 10) << __func__ << " wait for write banner and addr: "
1237                               << get_peer_addr() << dendl;
1238         } else {
1239           goto fail;
1240         }
1241
1242         break;
1243       }
1244     case STATE_ACCEPTING_WAIT_BANNER_ADDR:
1245       {
1246         bufferlist addr_bl;
1247         entity_addr_t peer_addr;
1248
1249         r = read_until(strlen(CEPH_BANNER) + sizeof(ceph_entity_addr), state_buffer);
1250         if (r < 0) {
1251           ldout(async_msgr->cct, 1) << __func__ << " read peer banner and addr failed" << dendl;
1252           goto fail;
1253         } else if (r > 0) {
1254           break;
1255         }
1256
1257         if (memcmp(state_buffer, CEPH_BANNER, strlen(CEPH_BANNER))) {
1258           ldout(async_msgr->cct, 1) << __func__ << " accept peer sent bad banner '" << state_buffer
1259                                     << "' (should be '" << CEPH_BANNER << "')" << dendl;
1260           goto fail;
1261         }
1262
1263         addr_bl.append(state_buffer+strlen(CEPH_BANNER), sizeof(ceph_entity_addr));
1264         {
1265           bufferlist::iterator ti = addr_bl.begin();
1266           ::decode(peer_addr, ti);
1267         }
1268
1269         ldout(async_msgr->cct, 10) << __func__ << " accept peer addr is " << peer_addr << dendl;
1270         if (peer_addr.is_blank_ip()) {
1271           // peer apparently doesn't know what ip they have; figure it out for them.
1272           int port = peer_addr.get_port();
1273           peer_addr.u = socket_addr.u;
1274           peer_addr.set_port(port);
1275           ldout(async_msgr->cct, 0) << __func__ << " accept peer addr is really " << peer_addr
1276                              << " (socket is " << socket_addr << ")" << dendl;
1277         }
1278         set_peer_addr(peer_addr);  // so that connection_state gets set up
1279         state = STATE_ACCEPTING_WAIT_CONNECT_MSG;
1280         break;
1281       }
1282
1283     case STATE_ACCEPTING_WAIT_CONNECT_MSG:
1284       {
1285         r = read_until(sizeof(connect_msg), state_buffer);
1286         if (r < 0) {
1287           ldout(async_msgr->cct, 1) << __func__ << " read connect msg failed" << dendl;
1288           goto fail;
1289         } else if (r > 0) {
1290           break;
1291         }
1292
1293         connect_msg = *((ceph_msg_connect*)state_buffer);
1294         state = STATE_ACCEPTING_WAIT_CONNECT_MSG_AUTH;
1295         break;
1296       }
1297
1298     case STATE_ACCEPTING_WAIT_CONNECT_MSG_AUTH:
1299       {
1300         bufferlist authorizer_reply;
1301
1302         if (connect_msg.authorizer_len) {
1303           if (!authorizer_buf.length())
1304             authorizer_buf.push_back(buffer::create(connect_msg.authorizer_len));
1305
1306           r = read_until(connect_msg.authorizer_len, authorizer_buf.c_str());
1307           if (r < 0) {
1308             ldout(async_msgr->cct, 1) << __func__ << " read connect authorizer failed" << dendl;
1309             goto fail;
1310           } else if (r > 0) {
1311             break;
1312           }
1313         }
1314
1315         ldout(async_msgr->cct, 20) << __func__ << " accept got peer connect_seq "
1316                              << connect_msg.connect_seq << " global_seq "
1317                              << connect_msg.global_seq << dendl;
1318         set_peer_type(connect_msg.host_type);
1319         policy = async_msgr->get_policy(connect_msg.host_type);
1320         ldout(async_msgr->cct, 10) << __func__ << " accept of host_type " << connect_msg.host_type
1321                                    << ", policy.lossy=" << policy.lossy << " policy.server="
1322                                    << policy.server << " policy.standby=" << policy.standby
1323                                    << " policy.resetcheck=" << policy.resetcheck << dendl;
1324
1325         r = handle_connect_msg(connect_msg, authorizer_buf, authorizer_reply);
1326         if (r < 0)
1327           goto fail;
1328
1329         // state is changed by "handle_connect_msg"
1330         assert(state != STATE_ACCEPTING_WAIT_CONNECT_MSG_AUTH);
1331         break;
1332       }
1333
1334     case STATE_ACCEPTING_WAIT_SEQ:
1335       {
1336         uint64_t newly_acked_seq;
1337         r = read_until(sizeof(newly_acked_seq), state_buffer);
1338         if (r < 0) {
1339           ldout(async_msgr->cct, 1) << __func__ << " read ack seq failed" << dendl;
1340           goto fail_registered;
1341         } else if (r > 0) {
1342           break;
1343         }
1344
1345         newly_acked_seq = *((uint64_t*)state_buffer);
1346         ldout(async_msgr->cct, 2) << __func__ << " accept get newly_acked_seq " << newly_acked_seq << dendl;
1347         discard_requeued_up_to(newly_acked_seq);
1348         state = STATE_ACCEPTING_READY;
1349         break;
1350       }
1351
1352     case STATE_ACCEPTING_READY:
1353       {
1354         ldout(async_msgr->cct, 20) << __func__ << " accept done" << dendl;
1355         state = STATE_OPEN;
1356         memset(&connect_msg, 0, sizeof(connect_msg));
1357
1358         if (delay_state)
1359           assert(delay_state->ready());
1360         // make sure no pending tick timer
1361         if (last_tick_id)
1362           center->delete_time_event(last_tick_id);
1363         last_tick_id = center->create_time_event(inactive_timeout_us, tick_handler);
1364
1365         write_lock.lock();
1366         can_write = WriteStatus::CANWRITE;
1367         if (is_queued())
1368           center->dispatch_event_external(write_handler);
1369         write_lock.unlock();
1370         maybe_start_delay_thread();
1371         break;
1372       }
1373
1374     default:
1375       {
1376         lderr(async_msgr->cct) << __func__ << " bad state: " << state << dendl;
1377         ceph_abort();
1378       }
1379   }
1380
1381   return 0;
1382
1383 fail_registered:
1384   ldout(async_msgr->cct, 10) << "accept fault after register" << dendl;
1385   inject_delay();
1386
1387 fail:
1388   return -1;
1389 }
1390
1391 int AsyncConnection::handle_connect_reply(ceph_msg_connect &connect, ceph_msg_connect_reply &reply)
1392 {
1393   uint64_t feat_missing;
1394   if (reply.tag == CEPH_MSGR_TAG_FEATURES) {
1395     ldout(async_msgr->cct, 0) << __func__ << " connect protocol feature mismatch, my "
1396                         << std::hex << connect.features << " < peer "
1397                         << reply.features << " missing "
1398                         << (reply.features & ~policy.features_supported)
1399                         << std::dec << dendl;
1400     goto fail;
1401   }
1402
1403   if (reply.tag == CEPH_MSGR_TAG_BADPROTOVER) {
1404     ldout(async_msgr->cct, 0) << __func__ << " connect protocol version mismatch, my "
1405                         << connect.protocol_version << " != " << reply.protocol_version
1406                         << dendl;
1407     goto fail;
1408   }
1409
1410   if (reply.tag == CEPH_MSGR_TAG_BADAUTHORIZER) {
1411     ldout(async_msgr->cct,0) << __func__ << " connect got BADAUTHORIZER" << dendl;
1412     if (got_bad_auth)
1413       goto fail;
1414     got_bad_auth = true;
1415     delete authorizer;
1416     authorizer = async_msgr->get_authorizer(peer_type, true);  // try harder
1417     state = STATE_CONNECTING_SEND_CONNECT_MSG;
1418   }
1419   if (reply.tag == CEPH_MSGR_TAG_RESETSESSION) {
1420     ldout(async_msgr->cct, 0) << __func__ << " connect got RESETSESSION" << dendl;
1421     was_session_reset();
1422     // see was_session_reset
1423     outcoming_bl.clear();
1424     state = STATE_CONNECTING_SEND_CONNECT_MSG;
1425   }
1426   if (reply.tag == CEPH_MSGR_TAG_RETRY_GLOBAL) {
1427     global_seq = async_msgr->get_global_seq(reply.global_seq);
1428     ldout(async_msgr->cct, 5) << __func__ << " connect got RETRY_GLOBAL "
1429                               << reply.global_seq << " chose new "
1430                               << global_seq << dendl;
1431     state = STATE_CONNECTING_SEND_CONNECT_MSG;
1432   }
1433   if (reply.tag == CEPH_MSGR_TAG_RETRY_SESSION) {
1434     assert(reply.connect_seq > connect_seq);
1435     ldout(async_msgr->cct, 5) << __func__ << " connect got RETRY_SESSION "
1436                               << connect_seq << " -> "
1437                               << reply.connect_seq << dendl;
1438     connect_seq = reply.connect_seq;
1439     state = STATE_CONNECTING_SEND_CONNECT_MSG;
1440   }
1441   if (reply.tag == CEPH_MSGR_TAG_WAIT) {
1442     ldout(async_msgr->cct, 1) << __func__ << " connect got WAIT (connection race)" << dendl;
1443     state = STATE_WAIT;
1444   }
1445
1446   feat_missing = policy.features_required & ~(uint64_t)connect_reply.features;
1447   if (feat_missing) {
1448     ldout(async_msgr->cct, 1) << __func__ << " missing required features " << std::hex
1449                               << feat_missing << std::dec << dendl;
1450     goto fail;
1451   }
1452
1453   if (reply.tag == CEPH_MSGR_TAG_SEQ) {
1454     ldout(async_msgr->cct, 10) << __func__ << " got CEPH_MSGR_TAG_SEQ, reading acked_seq and writing in_seq" << dendl;
1455     state = STATE_CONNECTING_WAIT_ACK_SEQ;
1456   }
1457   if (reply.tag == CEPH_MSGR_TAG_READY) {
1458     ldout(async_msgr->cct, 10) << __func__ << " got CEPH_MSGR_TAG_READY " << dendl;
1459     state = STATE_CONNECTING_READY;
1460   }
1461
1462   return 0;
1463
1464  fail:
1465   return -1;
1466 }
1467
1468 ssize_t AsyncConnection::handle_connect_msg(ceph_msg_connect &connect, bufferlist &authorizer_bl,
1469                                             bufferlist &authorizer_reply)
1470 {
1471   ssize_t r = 0;
1472   ceph_msg_connect_reply reply;
1473   bufferlist reply_bl;
1474
1475   memset(&reply, 0, sizeof(reply));
1476   reply.protocol_version = async_msgr->get_proto_version(peer_type, false);
1477
1478   // mismatch?
1479   ldout(async_msgr->cct, 10) << __func__ << " accept my proto " << reply.protocol_version
1480                       << ", their proto " << connect.protocol_version << dendl;
1481   if (connect.protocol_version != reply.protocol_version) {
1482     return _reply_accept(CEPH_MSGR_TAG_BADPROTOVER, connect, reply, authorizer_reply);
1483   }
1484   // require signatures for cephx?
1485   if (connect.authorizer_protocol == CEPH_AUTH_CEPHX) {
1486     if (peer_type == CEPH_ENTITY_TYPE_OSD ||
1487         peer_type == CEPH_ENTITY_TYPE_MDS) {
1488       if (async_msgr->cct->_conf->cephx_require_signatures ||
1489           async_msgr->cct->_conf->cephx_cluster_require_signatures) {
1490         ldout(async_msgr->cct, 10) << __func__ << " using cephx, requiring MSG_AUTH feature bit for cluster" << dendl;
1491         policy.features_required |= CEPH_FEATURE_MSG_AUTH;
1492       }
1493     } else {
1494       if (async_msgr->cct->_conf->cephx_require_signatures ||
1495           async_msgr->cct->_conf->cephx_service_require_signatures) {
1496         ldout(async_msgr->cct, 10) << __func__ << " using cephx, requiring MSG_AUTH feature bit for service" << dendl;
1497         policy.features_required |= CEPH_FEATURE_MSG_AUTH;
1498       }
1499     }
1500   }
1501   uint64_t feat_missing = policy.features_required & ~(uint64_t)connect.features;
1502   if (feat_missing) {
1503     ldout(async_msgr->cct, 1) << __func__ << " peer missing required features "
1504                         << std::hex << feat_missing << std::dec << dendl;
1505     return _reply_accept(CEPH_MSGR_TAG_FEATURES, connect, reply, authorizer_reply);
1506   }
1507
1508   lock.unlock();
1509
1510   bool authorizer_valid;
1511   if (!async_msgr->verify_authorizer(this, peer_type, connect.authorizer_protocol, authorizer_bl,
1512                                authorizer_reply, authorizer_valid, session_key) || !authorizer_valid) {
1513     lock.lock();
1514     ldout(async_msgr->cct,0) << __func__ << ": got bad authorizer" << dendl;
1515     session_security.reset();
1516     return _reply_accept(CEPH_MSGR_TAG_BADAUTHORIZER, connect, reply, authorizer_reply);
1517   }
1518
1519   // We've verified the authorizer for this AsyncConnection, so set up the session security structure.  PLR
1520   ldout(async_msgr->cct, 10) << __func__ << " accept setting up session_security." << dendl;
1521
1522   // existing?
1523   AsyncConnectionRef existing = async_msgr->lookup_conn(peer_addr);
1524
1525   inject_delay();
1526
1527   lock.lock();
1528   if (state != STATE_ACCEPTING_WAIT_CONNECT_MSG_AUTH) {
1529     ldout(async_msgr->cct, 1) << __func__ << " state changed while accept, it must be mark_down" << dendl;
1530     assert(state == STATE_CLOSED);
1531     goto fail;
1532   }
1533
1534   if (existing == this)
1535     existing = NULL;
1536   if (existing) {
1537     // There is no possible that existing connection will acquire this
1538     // connection's lock
1539     existing->lock.lock();  // skip lockdep check (we are locking a second AsyncConnection here)
1540
1541     if (existing->state == STATE_CLOSED) {
1542       ldout(async_msgr->cct, 1) << __func__ << " existing already closed." << dendl;
1543       existing->lock.unlock();
1544       existing = NULL;
1545       goto open;
1546     }
1547
1548     if (existing->replacing) {
1549       ldout(async_msgr->cct, 1) << __func__ << " existing racing replace happened while replacing."
1550                                 << " existing_state=" << get_state_name(existing->state) << dendl;
1551       reply.global_seq = existing->peer_global_seq;
1552       r = _reply_accept(CEPH_MSGR_TAG_RETRY_GLOBAL, connect, reply, authorizer_reply);
1553       existing->lock.unlock();
1554       if (r < 0)
1555         goto fail;
1556       return 0;
1557     }
1558
1559     if (connect.global_seq < existing->peer_global_seq) {
1560       ldout(async_msgr->cct, 10) << __func__ << " accept existing " << existing
1561                            << ".gseq " << existing->peer_global_seq << " > "
1562                            << connect.global_seq << ", RETRY_GLOBAL" << dendl;
1563       reply.global_seq = existing->peer_global_seq;  // so we can send it below..
1564       existing->lock.unlock();
1565       return _reply_accept(CEPH_MSGR_TAG_RETRY_GLOBAL, connect, reply, authorizer_reply);
1566     } else {
1567       ldout(async_msgr->cct, 10) << __func__ << " accept existing " << existing
1568                            << ".gseq " << existing->peer_global_seq
1569                            << " <= " << connect.global_seq << ", looks ok" << dendl;
1570     }
1571
1572     if (existing->policy.lossy) {
1573       ldout(async_msgr->cct, 0) << __func__ << " accept replacing existing (lossy) channel (new one lossy="
1574                           << policy.lossy << ")" << dendl;
1575       existing->was_session_reset();
1576       goto replace;
1577     }
1578
1579     ldout(async_msgr->cct, 0) << __func__ << " accept connect_seq " << connect.connect_seq
1580                               << " vs existing csq=" << existing->connect_seq << " existing_state="
1581                               << get_state_name(existing->state) << dendl;
1582
1583     if (connect.connect_seq == 0 && existing->connect_seq > 0) {
1584       ldout(async_msgr->cct,0) << __func__ << " accept peer reset, then tried to connect to us, replacing" << dendl;
1585       // this is a hard reset from peer
1586       is_reset_from_peer = true;
1587       if (policy.resetcheck)
1588         existing->was_session_reset(); // this resets out_queue, msg_ and connect_seq #'s
1589       goto replace;
1590     }
1591
1592     if (connect.connect_seq < existing->connect_seq) {
1593       // old attempt, or we sent READY but they didn't get it.
1594       ldout(async_msgr->cct, 10) << __func__ << " accept existing " << existing << ".cseq "
1595                            << existing->connect_seq << " > " << connect.connect_seq
1596                            << ", RETRY_SESSION" << dendl;
1597       reply.connect_seq = existing->connect_seq + 1;
1598       existing->lock.unlock();
1599       return _reply_accept(CEPH_MSGR_TAG_RETRY_SESSION, connect, reply, authorizer_reply);
1600     }
1601
1602     if (connect.connect_seq == existing->connect_seq) {
1603       // if the existing connection successfully opened, and/or
1604       // subsequently went to standby, then the peer should bump
1605       // their connect_seq and retry: this is not a connection race
1606       // we need to resolve here.
1607       if (existing->state == STATE_OPEN ||
1608           existing->state == STATE_STANDBY) {
1609         ldout(async_msgr->cct, 10) << __func__ << " accept connection race, existing " << existing
1610                              << ".cseq " << existing->connect_seq << " == "
1611                              << connect.connect_seq << ", OPEN|STANDBY, RETRY_SESSION" << dendl;
1612         reply.connect_seq = existing->connect_seq + 1;
1613         existing->lock.unlock();
1614         return _reply_accept(CEPH_MSGR_TAG_RETRY_SESSION, connect, reply, authorizer_reply);
1615       }
1616
1617       // connection race?
1618       if (peer_addr < async_msgr->get_myaddr() || existing->policy.server) {
1619         // incoming wins
1620         ldout(async_msgr->cct, 10) << __func__ << " accept connection race, existing " << existing
1621                              << ".cseq " << existing->connect_seq << " == " << connect.connect_seq
1622                              << ", or we are server, replacing my attempt" << dendl;
1623         goto replace;
1624       } else {
1625         // our existing outgoing wins
1626         ldout(async_msgr->cct,10) << __func__ << " accept connection race, existing "
1627                             << existing << ".cseq " << existing->connect_seq
1628                             << " == " << connect.connect_seq << ", sending WAIT" << dendl;
1629         assert(peer_addr > async_msgr->get_myaddr());
1630         existing->lock.unlock();
1631         return _reply_accept(CEPH_MSGR_TAG_WAIT, connect, reply, authorizer_reply);
1632       }
1633     }
1634
1635     assert(connect.connect_seq > existing->connect_seq);
1636     assert(connect.global_seq >= existing->peer_global_seq);
1637     if (policy.resetcheck &&   // RESETSESSION only used by servers; peers do not reset each other
1638         existing->connect_seq == 0) {
1639       ldout(async_msgr->cct, 0) << __func__ << " accept we reset (peer sent cseq "
1640                           << connect.connect_seq << ", " << existing << ".cseq = "
1641                           << existing->connect_seq << "), sending RESETSESSION" << dendl;
1642       existing->lock.unlock();
1643       return _reply_accept(CEPH_MSGR_TAG_RESETSESSION, connect, reply, authorizer_reply);
1644     }
1645
1646     // reconnect
1647     ldout(async_msgr->cct, 10) << __func__ << " accept peer sent cseq " << connect.connect_seq
1648                          << " > " << existing->connect_seq << dendl;
1649     goto replace;
1650   } // existing
1651   else if (!replacing && connect.connect_seq > 0) {
1652     // we reset, and they are opening a new session
1653     ldout(async_msgr->cct, 0) << __func__ << " accept we reset (peer sent cseq "
1654                         << connect.connect_seq << "), sending RESETSESSION" << dendl;
1655     return _reply_accept(CEPH_MSGR_TAG_RESETSESSION, connect, reply, authorizer_reply);
1656   } else {
1657     // new session
1658     ldout(async_msgr->cct, 10) << __func__ << " accept new session" << dendl;
1659     existing = NULL;
1660     goto open;
1661   }
1662   ceph_abort();
1663
1664  replace:
1665   ldout(async_msgr->cct, 10) << __func__ << " accept replacing " << existing << dendl;
1666
1667   inject_delay();
1668   if (existing->policy.lossy) {
1669     // disconnect from the Connection
1670     ldout(async_msgr->cct, 1) << __func__ << " replacing on lossy channel, failing existing" << dendl;
1671     existing->_stop();
1672     existing->dispatch_queue->queue_reset(existing.get());
1673   } else {
1674     assert(can_write == WriteStatus::NOWRITE);
1675     existing->write_lock.lock();
1676
1677     // reset the in_seq if this is a hard reset from peer,
1678     // otherwise we respect our original connection's value
1679     if (is_reset_from_peer) {
1680       existing->is_reset_from_peer = true;
1681     }
1682
1683     center->delete_file_event(cs.fd(), EVENT_READABLE|EVENT_WRITABLE);
1684
1685     if (existing->delay_state) {
1686       existing->delay_state->flush();
1687       assert(!delay_state);
1688     }
1689     existing->reset_recv_state();
1690
1691     auto temp_cs = std::move(cs);
1692     EventCenter *new_center = center;
1693     Worker *new_worker = worker;
1694     // avoid _stop shutdown replacing socket
1695     // queue a reset on the new connection, which we're dumping for the old
1696     _stop();
1697
1698     dispatch_queue->queue_reset(this);
1699     ldout(async_msgr->cct, 1) << __func__ << " stop myself to swap existing" << dendl;
1700     existing->can_write = WriteStatus::REPLACING;
1701     existing->replacing = true;
1702     existing->state_offset = 0;
1703     // avoid previous thread modify event
1704     existing->state = STATE_NONE;
1705     // Discard existing prefetch buffer in `recv_buf`
1706     existing->recv_start = existing->recv_end = 0;
1707     // there shouldn't exist any buffer
1708     assert(recv_start == recv_end);
1709
1710     auto deactivate_existing = std::bind(
1711         [existing, new_worker, new_center, connect, reply, authorizer_reply](ConnectedSocket &cs) mutable {
1712       // we need to delete time event in original thread
1713       {
1714         std::lock_guard<std::mutex> l(existing->lock);
1715         existing->write_lock.lock();
1716         existing->requeue_sent();
1717         existing->outcoming_bl.clear();
1718         existing->open_write = false;
1719         existing->write_lock.unlock();
1720         if (existing->state == STATE_NONE) {
1721           existing->shutdown_socket();
1722           existing->cs = std::move(cs);
1723           existing->worker->references--;
1724           new_worker->references++;
1725           existing->logger = new_worker->get_perf_counter();
1726           existing->worker = new_worker;
1727           existing->center = new_center;
1728           if (existing->delay_state)
1729             existing->delay_state->set_center(new_center);
1730         } else if (existing->state == STATE_CLOSED) {
1731           auto back_to_close = std::bind(
1732             [](ConnectedSocket &cs) mutable { cs.close(); }, std::move(cs));
1733           new_center->submit_to(
1734               new_center->get_id(), std::move(back_to_close), true);
1735           return ;
1736         } else {
1737           ceph_abort();
1738         }
1739       }
1740
1741       // Before changing existing->center, it may already exists some events in existing->center's queue.
1742       // Then if we mark down `existing`, it will execute in another thread and clean up connection.
1743       // Previous event will result in segment fault
1744       auto transfer_existing = [existing, connect, reply, authorizer_reply]() mutable {
1745         std::lock_guard<std::mutex> l(existing->lock);
1746         if (existing->state == STATE_CLOSED)
1747           return ;
1748         assert(existing->state == STATE_NONE);
1749   
1750         existing->state = STATE_ACCEPTING_WAIT_CONNECT_MSG;
1751         existing->center->create_file_event(existing->cs.fd(), EVENT_READABLE, existing->read_handler);
1752         reply.global_seq = existing->peer_global_seq;
1753         if (existing->_reply_accept(CEPH_MSGR_TAG_RETRY_GLOBAL, connect, reply, authorizer_reply) < 0) {
1754           // handle error
1755           existing->fault();
1756         }
1757       };
1758       if (existing->center->in_thread())
1759         transfer_existing();
1760       else
1761         existing->center->submit_to(
1762             existing->center->get_id(), std::move(transfer_existing), true);
1763     }, std::move(temp_cs));
1764
1765     existing->center->submit_to(
1766         existing->center->get_id(), std::move(deactivate_existing), true);
1767     existing->write_lock.unlock();
1768     existing->lock.unlock();
1769     return 0;
1770   }
1771   existing->lock.unlock();
1772
1773  open:
1774   connect_seq = connect.connect_seq + 1;
1775   peer_global_seq = connect.global_seq;
1776   ldout(async_msgr->cct, 10) << __func__ << " accept success, connect_seq = "
1777                              << connect_seq << " in_seq=" << in_seq << ", sending READY" << dendl;
1778
1779   int next_state;
1780
1781   // if it is a hard reset from peer, we don't need a round-trip to negotiate in/out sequence
1782   if ((connect.features & CEPH_FEATURE_RECONNECT_SEQ) && !is_reset_from_peer) {
1783     reply.tag = CEPH_MSGR_TAG_SEQ;
1784     next_state = STATE_ACCEPTING_WAIT_SEQ;
1785   } else {
1786     reply.tag = CEPH_MSGR_TAG_READY;
1787     next_state = STATE_ACCEPTING_READY;
1788     discard_requeued_up_to(0);
1789     is_reset_from_peer = false;
1790     in_seq = 0;
1791   }
1792
1793   // send READY reply
1794   reply.features = policy.features_supported;
1795   reply.global_seq = async_msgr->get_global_seq();
1796   reply.connect_seq = connect_seq;
1797   reply.flags = 0;
1798   reply.authorizer_len = authorizer_reply.length();
1799   if (policy.lossy)
1800     reply.flags = reply.flags | CEPH_MSG_CONNECT_LOSSY;
1801
1802   set_features((uint64_t)reply.features & (uint64_t)connect.features);
1803   ldout(async_msgr->cct, 10) << __func__ << " accept features " << get_features() << dendl;
1804
1805   session_security.reset(
1806       get_auth_session_handler(async_msgr->cct, connect.authorizer_protocol,
1807                                session_key, get_features()));
1808
1809   reply_bl.append((char*)&reply, sizeof(reply));
1810
1811   if (reply.authorizer_len)
1812     reply_bl.append(authorizer_reply.c_str(), authorizer_reply.length());
1813
1814   if (reply.tag == CEPH_MSGR_TAG_SEQ) {
1815     uint64_t s = in_seq;
1816     reply_bl.append((char*)&s, sizeof(s));
1817   }
1818
1819   lock.unlock();
1820   // Because "replacing" will prevent other connections preempt this addr,
1821   // it's safe that here we don't acquire Connection's lock
1822   r = async_msgr->accept_conn(this);
1823
1824   inject_delay();
1825   
1826   lock.lock();
1827   replacing = false;
1828   if (r < 0) {
1829     ldout(async_msgr->cct, 1) << __func__ << " existing race replacing process for addr=" << peer_addr
1830                               << " just fail later one(this)" << dendl;
1831     goto fail_registered;
1832   }
1833   if (state != STATE_ACCEPTING_WAIT_CONNECT_MSG_AUTH) {
1834     ldout(async_msgr->cct, 1) << __func__ << " state changed while accept_conn, it must be mark_down" << dendl;
1835     assert(state == STATE_CLOSED);
1836     goto fail_registered;
1837   }
1838
1839   r = try_send(reply_bl);
1840   if (r < 0)
1841     goto fail_registered;
1842
1843   // notify
1844   dispatch_queue->queue_accept(this);
1845   async_msgr->ms_deliver_handle_fast_accept(this);
1846   once_ready = true;
1847
1848   if (r == 0) {
1849     state = next_state;
1850     ldout(async_msgr->cct, 2) << __func__ << " accept write reply msg done" << dendl;
1851   } else {
1852     state = STATE_WAIT_SEND;
1853     state_after_send = next_state;
1854   }
1855
1856   return 0;
1857
1858  fail_registered:
1859   ldout(async_msgr->cct, 10) << __func__ << " accept fault after register" << dendl;
1860   inject_delay();
1861
1862  fail:
1863   ldout(async_msgr->cct, 10) << __func__ << " failed to accept." << dendl;
1864   return -1;
1865 }
1866
1867 void AsyncConnection::_connect()
1868 {
1869   ldout(async_msgr->cct, 10) << __func__ << " csq=" << connect_seq << dendl;
1870
1871   state = STATE_CONNECTING;
1872   // rescheduler connection in order to avoid lock dep
1873   // may called by external thread(send_message)
1874   center->dispatch_event_external(read_handler);
1875 }
1876
1877 void AsyncConnection::accept(ConnectedSocket socket, entity_addr_t &addr)
1878 {
1879   ldout(async_msgr->cct, 10) << __func__ << " sd=" << socket.fd() << dendl;
1880   assert(socket.fd() >= 0);
1881
1882   std::lock_guard<std::mutex> l(lock);
1883   cs = std::move(socket);
1884   socket_addr = addr;
1885   state = STATE_ACCEPTING;
1886   // rescheduler connection in order to avoid lock dep
1887   center->dispatch_event_external(read_handler);
1888 }
1889
1890 int AsyncConnection::send_message(Message *m)
1891 {
1892   FUNCTRACE();
1893   lgeneric_subdout(async_msgr->cct, ms,
1894                    1) << "-- " << async_msgr->get_myaddr() << " --> "
1895                       << get_peer_addr() << " -- "
1896                       << *m << " -- " << m << " con "
1897                       << m->get_connection().get()
1898                       << dendl;
1899
1900   // optimistic think it's ok to encode(actually may broken now)
1901   if (!m->get_priority())
1902     m->set_priority(async_msgr->get_default_send_priority());
1903
1904   m->get_header().src = async_msgr->get_myname();
1905   m->set_connection(this);
1906
1907   if (m->get_type() == CEPH_MSG_OSD_OP)
1908     OID_EVENT_TRACE_WITH_MSG(m, "SEND_MSG_OSD_OP_BEGIN", true);
1909   else if (m->get_type() == CEPH_MSG_OSD_OPREPLY)
1910     OID_EVENT_TRACE_WITH_MSG(m, "SEND_MSG_OSD_OPREPLY_BEGIN", true);
1911
1912   if (async_msgr->get_myaddr() == get_peer_addr()) { //loopback connection
1913     ldout(async_msgr->cct, 20) << __func__ << " " << *m << " local" << dendl;
1914     std::lock_guard<std::mutex> l(write_lock);
1915     if (can_write != WriteStatus::CLOSED) {
1916       dispatch_queue->local_delivery(m, m->get_priority());
1917     } else {
1918       ldout(async_msgr->cct, 10) << __func__ << " loopback connection closed."
1919                                  << " Drop message " << m << dendl;
1920       m->put();
1921     }
1922     return 0;
1923   }
1924
1925   last_active = ceph::coarse_mono_clock::now();
1926   // we don't want to consider local message here, it's too lightweight which
1927   // may disturb users
1928   logger->inc(l_msgr_send_messages);
1929
1930   bufferlist bl;
1931   uint64_t f = get_features();
1932
1933   // TODO: Currently not all messages supports reencode like MOSDMap, so here
1934   // only let fast dispatch support messages prepare message
1935   bool can_fast_prepare = async_msgr->ms_can_fast_dispatch(m);
1936   if (can_fast_prepare)
1937     prepare_send_message(f, m, bl);
1938
1939   std::lock_guard<std::mutex> l(write_lock);
1940   // "features" changes will change the payload encoding
1941   if (can_fast_prepare && (can_write == WriteStatus::NOWRITE || get_features() != f)) {
1942     // ensure the correctness of message encoding
1943     bl.clear();
1944     m->get_payload().clear();
1945     ldout(async_msgr->cct, 5) << __func__ << " clear encoded buffer previous "
1946                               << f << " != " << get_features() << dendl;
1947   }
1948   if (can_write == WriteStatus::CLOSED) {
1949     ldout(async_msgr->cct, 10) << __func__ << " connection closed."
1950                                << " Drop message " << m << dendl;
1951     m->put();
1952   } else {
1953     m->trace.event("async enqueueing message");
1954     out_q[m->get_priority()].emplace_back(std::move(bl), m);
1955     ldout(async_msgr->cct, 15) << __func__ << " inline write is denied, reschedule m=" << m << dendl;
1956     if (can_write != WriteStatus::REPLACING)
1957       center->dispatch_event_external(write_handler);
1958   }
1959   return 0;
1960 }
1961
1962 void AsyncConnection::requeue_sent()
1963 {
1964   if (sent.empty())
1965     return;
1966
1967   list<pair<bufferlist, Message*> >& rq = out_q[CEPH_MSG_PRIO_HIGHEST];
1968   while (!sent.empty()) {
1969     Message* m = sent.back();
1970     sent.pop_back();
1971     ldout(async_msgr->cct, 10) << __func__ << " " << *m << " for resend "
1972                                << " (" << m->get_seq() << ")" << dendl;
1973     rq.push_front(make_pair(bufferlist(), m));
1974     out_seq--;
1975   }
1976 }
1977
1978 void AsyncConnection::discard_requeued_up_to(uint64_t seq)
1979 {
1980   ldout(async_msgr->cct, 10) << __func__ << " " << seq << dendl;
1981   std::lock_guard<std::mutex> l(write_lock);
1982   if (out_q.count(CEPH_MSG_PRIO_HIGHEST) == 0)
1983     return;
1984   list<pair<bufferlist, Message*> >& rq = out_q[CEPH_MSG_PRIO_HIGHEST];
1985   while (!rq.empty()) {
1986     pair<bufferlist, Message*> p = rq.front();
1987     if (p.second->get_seq() == 0 || p.second->get_seq() > seq)
1988       break;
1989     ldout(async_msgr->cct, 10) << __func__ << " " << *(p.second) << " for resend seq " << p.second->get_seq()
1990                          << " <= " << seq << ", discarding" << dendl;
1991     p.second->put();
1992     rq.pop_front();
1993     out_seq++;
1994   }
1995   if (rq.empty())
1996     out_q.erase(CEPH_MSG_PRIO_HIGHEST);
1997 }
1998
1999 /*
2000  * Tears down the AsyncConnection's message queues, and removes them from the DispatchQueue
2001  * Must hold write_lock prior to calling.
2002  */
2003 void AsyncConnection::discard_out_queue()
2004 {
2005   ldout(async_msgr->cct, 10) << __func__ << " started" << dendl;
2006
2007   for (list<Message*>::iterator p = sent.begin(); p != sent.end(); ++p) {
2008     ldout(async_msgr->cct, 20) << __func__ << " discard " << *p << dendl;
2009     (*p)->put();
2010   }
2011   sent.clear();
2012   for (map<int, list<pair<bufferlist, Message*> > >::iterator p = out_q.begin(); p != out_q.end(); ++p)
2013     for (list<pair<bufferlist, Message*> >::iterator r = p->second.begin(); r != p->second.end(); ++r) {
2014       ldout(async_msgr->cct, 20) << __func__ << " discard " << r->second << dendl;
2015       r->second->put();
2016     }
2017   out_q.clear();
2018 }
2019
2020 int AsyncConnection::randomize_out_seq()
2021 {
2022   if (get_features() & CEPH_FEATURE_MSG_AUTH) {
2023     // Set out_seq to a random value, so CRC won't be predictable.   Don't bother checking seq_error
2024     // here.  We'll check it on the call.  PLR
2025     uint64_t rand_seq;
2026     int seq_error = get_random_bytes((char *)&rand_seq, sizeof(rand_seq));
2027     rand_seq &= SEQ_MASK;
2028     lsubdout(async_msgr->cct, ms, 10) << __func__ << " randomize_out_seq " << rand_seq << dendl;
2029     out_seq = rand_seq;
2030     return seq_error;
2031   } else {
2032     // previously, seq #'s always started at 0.
2033     out_seq = 0;
2034     return 0;
2035   }
2036 }
2037
2038 void AsyncConnection::fault()
2039 {
2040   if (state == STATE_CLOSED || state == STATE_NONE) {
2041     ldout(async_msgr->cct, 10) << __func__ << " connection is already closed" << dendl;
2042     return ;
2043   }
2044
2045   if (policy.lossy && !(state >= STATE_CONNECTING && state < STATE_CONNECTING_READY)) {
2046     ldout(async_msgr->cct, 1) << __func__ << " on lossy channel, failing" << dendl;
2047     _stop();
2048     dispatch_queue->queue_reset(this);
2049     return ;
2050   }
2051
2052   write_lock.lock();
2053   can_write = WriteStatus::NOWRITE;
2054   shutdown_socket();
2055   open_write = false;
2056
2057   // queue delayed items immediately
2058   if (delay_state)
2059     delay_state->flush();
2060   // requeue sent items
2061   requeue_sent();
2062   recv_start = recv_end = 0;
2063   state_offset = 0;
2064   replacing = false;
2065   is_reset_from_peer = false;
2066   outcoming_bl.clear();
2067   if (!once_ready && !is_queued() &&
2068       state >=STATE_ACCEPTING && state <= STATE_ACCEPTING_WAIT_CONNECT_MSG_AUTH) {
2069     ldout(async_msgr->cct, 10) << __func__ << " with nothing to send and in the half "
2070                               << " accept state just closed" << dendl;
2071     write_lock.unlock();
2072     _stop();
2073     dispatch_queue->queue_reset(this);
2074     return ;
2075   }
2076   reset_recv_state();
2077   if (policy.standby && !is_queued() && state != STATE_WAIT) {
2078     ldout(async_msgr->cct, 10) << __func__ << " with nothing to send, going to standby" << dendl;
2079     state = STATE_STANDBY;
2080     write_lock.unlock();
2081     return;
2082   }
2083
2084   write_lock.unlock();
2085   if (!(state >= STATE_CONNECTING && state < STATE_CONNECTING_READY) &&
2086       state != STATE_WAIT) { // STATE_WAIT is coming from STATE_CONNECTING_*
2087     // policy maybe empty when state is in accept
2088     if (policy.server) {
2089       ldout(async_msgr->cct, 0) << __func__ << " server, going to standby" << dendl;
2090       state = STATE_STANDBY;
2091     } else {
2092       ldout(async_msgr->cct, 0) << __func__ << " initiating reconnect" << dendl;
2093       connect_seq++;
2094       state = STATE_CONNECTING;
2095     }
2096     backoff = utime_t();
2097     center->dispatch_event_external(read_handler);
2098   } else {
2099     if (state == STATE_WAIT) {
2100       backoff.set_from_double(async_msgr->cct->_conf->ms_max_backoff);
2101     } else if (backoff == utime_t()) {
2102       backoff.set_from_double(async_msgr->cct->_conf->ms_initial_backoff);
2103     } else {
2104       backoff += backoff;
2105       if (backoff > async_msgr->cct->_conf->ms_max_backoff)
2106         backoff.set_from_double(async_msgr->cct->_conf->ms_max_backoff);
2107     }
2108
2109     state = STATE_CONNECTING;
2110     ldout(async_msgr->cct, 10) << __func__ << " waiting " << backoff << dendl;
2111     // woke up again;
2112     register_time_events.insert(center->create_time_event(
2113             backoff.to_nsec()/1000, wakeup_handler));
2114   }
2115 }
2116
2117 void AsyncConnection::was_session_reset()
2118 {
2119   ldout(async_msgr->cct,10) << __func__ << " started" << dendl;
2120   std::lock_guard<std::mutex> l(write_lock);
2121   if (delay_state)
2122     delay_state->discard();
2123   dispatch_queue->discard_queue(conn_id);
2124   discard_out_queue();
2125   // note: we need to clear outcoming_bl here, but was_session_reset may be
2126   // called by other thread, so let caller clear this itself!
2127   // outcoming_bl.clear();
2128
2129   dispatch_queue->queue_remote_reset(this);
2130
2131   if (randomize_out_seq()) {
2132     ldout(async_msgr->cct, 15) << __func__ << " could not get random bytes to set seq number for session reset; set seq number to " << out_seq << dendl;
2133   }
2134
2135   in_seq = 0;
2136   connect_seq = 0;
2137   // it's safe to directly set 0, double locked
2138   ack_left = 0;
2139   once_ready = false;
2140   can_write = WriteStatus::NOWRITE;
2141 }
2142
2143 void AsyncConnection::_stop()
2144 {
2145   if (state == STATE_CLOSED)
2146     return ;
2147
2148   if (delay_state)
2149     delay_state->flush();
2150
2151   ldout(async_msgr->cct, 2) << __func__ << dendl;
2152   std::lock_guard<std::mutex> l(write_lock);
2153
2154   reset_recv_state();
2155   dispatch_queue->discard_queue(conn_id);
2156   discard_out_queue();
2157   async_msgr->unregister_conn(this);
2158   worker->release_worker();
2159
2160   state = STATE_CLOSED;
2161   open_write = false;
2162   can_write = WriteStatus::CLOSED;
2163   state_offset = 0;
2164   // Make sure in-queue events will been processed
2165   center->dispatch_event_external(EventCallbackRef(new C_clean_handler(this)));
2166 }
2167
2168 void AsyncConnection::prepare_send_message(uint64_t features, Message *m, bufferlist &bl)
2169 {
2170   ldout(async_msgr->cct, 20) << __func__ << " m" << " " << *m << dendl;
2171
2172   // associate message with Connection (for benefit of encode_payload)
2173   if (m->empty_payload())
2174     ldout(async_msgr->cct, 20) << __func__ << " encoding features "
2175                                << features << " " << m << " " << *m << dendl;
2176   else
2177     ldout(async_msgr->cct, 20) << __func__ << " half-reencoding features "
2178                                << features << " " << m << " " << *m << dendl;
2179
2180   // encode and copy out of *m
2181   m->encode(features, msgr->crcflags);
2182
2183   bl.append(m->get_payload());
2184   bl.append(m->get_middle());
2185   bl.append(m->get_data());
2186 }
2187
2188 ssize_t AsyncConnection::write_message(Message *m, bufferlist& bl, bool more)
2189 {
2190   FUNCTRACE();
2191   assert(center->in_thread());
2192   m->set_seq(++out_seq);
2193
2194   if (msgr->crcflags & MSG_CRC_HEADER)
2195     m->calc_header_crc();
2196
2197   ceph_msg_header& header = m->get_header();
2198   ceph_msg_footer& footer = m->get_footer();
2199
2200   // TODO: let sign_message could be reentry?
2201   // Now that we have all the crcs calculated, handle the
2202   // digital signature for the message, if the AsyncConnection has session
2203   // security set up.  Some session security options do not
2204   // actually calculate and check the signature, but they should
2205   // handle the calls to sign_message and check_signature.  PLR
2206   if (session_security.get() == NULL) {
2207     ldout(async_msgr->cct, 20) << __func__ << " no session security" << dendl;
2208   } else {
2209     if (session_security->sign_message(m)) {
2210       ldout(async_msgr->cct, 20) << __func__ << " failed to sign m="
2211                                  << m << "): sig = " << footer.sig << dendl;
2212     } else {
2213       ldout(async_msgr->cct, 20) << __func__ << " signed m=" << m
2214                                  << "): sig = " << footer.sig << dendl;
2215     }
2216   }
2217   
2218   unsigned original_bl_len = outcoming_bl.length();
2219
2220   outcoming_bl.append(CEPH_MSGR_TAG_MSG);
2221
2222   if (has_feature(CEPH_FEATURE_NOSRCADDR)) {
2223     outcoming_bl.append((char*)&header, sizeof(header));
2224   } else {
2225     ceph_msg_header_old oldheader;
2226     memcpy(&oldheader, &header, sizeof(header));
2227     oldheader.src.name = header.src;
2228     oldheader.src.addr = get_peer_addr();
2229     oldheader.orig_src = oldheader.src;
2230     oldheader.reserved = header.reserved;
2231     oldheader.crc = ceph_crc32c(0, (unsigned char*)&oldheader,
2232                                 sizeof(oldheader) - sizeof(oldheader.crc));
2233     outcoming_bl.append((char*)&oldheader, sizeof(oldheader));
2234   }
2235
2236   ldout(async_msgr->cct, 20) << __func__ << " sending message type=" << header.type
2237                              << " src " << entity_name_t(header.src)
2238                              << " front=" << header.front_len
2239                              << " data=" << header.data_len
2240                              << " off " << header.data_off << dendl;
2241
2242   if ((bl.length() <= ASYNC_COALESCE_THRESHOLD) && (bl.buffers().size() > 1)) {
2243     for (const auto &pb : bl.buffers()) {
2244       outcoming_bl.append((char*)pb.c_str(), pb.length());
2245     }
2246   } else {
2247     outcoming_bl.claim_append(bl);  
2248   }
2249
2250   // send footer; if receiver doesn't support signatures, use the old footer format
2251   ceph_msg_footer_old old_footer;
2252   if (has_feature(CEPH_FEATURE_MSG_AUTH)) {
2253     outcoming_bl.append((char*)&footer, sizeof(footer));
2254   } else {
2255     if (msgr->crcflags & MSG_CRC_HEADER) {
2256       old_footer.front_crc = footer.front_crc;
2257       old_footer.middle_crc = footer.middle_crc;
2258       old_footer.data_crc = footer.data_crc;
2259     } else {
2260        old_footer.front_crc = old_footer.middle_crc = 0;
2261     }
2262     old_footer.data_crc = msgr->crcflags & MSG_CRC_DATA ? footer.data_crc : 0;
2263     old_footer.flags = footer.flags;
2264     outcoming_bl.append((char*)&old_footer, sizeof(old_footer));
2265   }
2266
2267   m->trace.event("async writing message");
2268   ldout(async_msgr->cct, 20) << __func__ << " sending " << m->get_seq()
2269                              << " " << m << dendl;
2270   ssize_t total_send_size = outcoming_bl.length();
2271   ssize_t rc = _try_send(more);
2272   if (rc < 0) {
2273     ldout(async_msgr->cct, 1) << __func__ << " error sending " << m << ", "
2274                               << cpp_strerror(rc) << dendl;
2275   } else if (rc == 0) {
2276     logger->inc(l_msgr_send_bytes, total_send_size - original_bl_len);
2277     ldout(async_msgr->cct, 10) << __func__ << " sending " << m << " done." << dendl;
2278   } else {
2279     logger->inc(l_msgr_send_bytes, total_send_size - outcoming_bl.length());
2280     ldout(async_msgr->cct, 10) << __func__ << " sending " << m << " continuely." << dendl;
2281   }
2282   if (m->get_type() == CEPH_MSG_OSD_OP)
2283     OID_EVENT_TRACE_WITH_MSG(m, "SEND_MSG_OSD_OP_END", false);
2284   else if (m->get_type() == CEPH_MSG_OSD_OPREPLY)
2285     OID_EVENT_TRACE_WITH_MSG(m, "SEND_MSG_OSD_OPREPLY_END", false);
2286   m->put();
2287
2288   return rc;
2289 }
2290
2291 void AsyncConnection::reset_recv_state()
2292 {
2293   // clean up state internal variables and states
2294   if (state >= STATE_CONNECTING_SEND_CONNECT_MSG &&
2295       state <= STATE_CONNECTING_READY) {
2296     delete authorizer;
2297     authorizer = NULL;
2298     got_bad_auth = false;
2299   }
2300
2301   if (state > STATE_OPEN_MESSAGE_THROTTLE_MESSAGE &&
2302       state <= STATE_OPEN_MESSAGE_READ_FOOTER_AND_DISPATCH
2303       && policy.throttler_messages) {
2304     ldout(async_msgr->cct, 10) << __func__ << " releasing " << 1
2305                                << " message to policy throttler "
2306                                << policy.throttler_messages->get_current() << "/"
2307                                << policy.throttler_messages->get_max() << dendl;
2308     policy.throttler_messages->put();
2309   }
2310   if (state > STATE_OPEN_MESSAGE_THROTTLE_BYTES &&
2311       state <= STATE_OPEN_MESSAGE_READ_FOOTER_AND_DISPATCH) {
2312     if (policy.throttler_bytes) {
2313       ldout(async_msgr->cct, 10) << __func__ << " releasing " << cur_msg_size
2314                                  << " bytes to policy throttler "
2315                                  << policy.throttler_bytes->get_current() << "/"
2316                                  << policy.throttler_bytes->get_max() << dendl;
2317       policy.throttler_bytes->put(cur_msg_size);
2318     }
2319   }
2320   if (state > STATE_OPEN_MESSAGE_THROTTLE_DISPATCH_QUEUE &&
2321       state <= STATE_OPEN_MESSAGE_READ_FOOTER_AND_DISPATCH) {
2322     ldout(async_msgr->cct, 10) << __func__ << " releasing " << cur_msg_size
2323                                << " bytes to dispatch_queue throttler "
2324                                << dispatch_queue->dispatch_throttler.get_current() << "/"
2325                                << dispatch_queue->dispatch_throttler.get_max() << dendl;
2326     dispatch_queue->dispatch_throttle_release(cur_msg_size);
2327   }
2328 }
2329
2330 void AsyncConnection::handle_ack(uint64_t seq)
2331 {
2332   ldout(async_msgr->cct, 15) << __func__ << " got ack seq " << seq << dendl;
2333   // trim sent list
2334   std::lock_guard<std::mutex> l(write_lock);
2335   while (!sent.empty() && sent.front()->get_seq() <= seq) {
2336     Message* m = sent.front();
2337     sent.pop_front();
2338     ldout(async_msgr->cct, 10) << __func__ << " got ack seq "
2339                                << seq << " >= " << m->get_seq() << " on "
2340                                << m << " " << *m << dendl;
2341     m->put();
2342   }
2343 }
2344
2345 void AsyncConnection::DelayedDelivery::do_request(int id)
2346 {
2347   Message *m = nullptr;
2348   {
2349     std::lock_guard<std::mutex> l(delay_lock);
2350     register_time_events.erase(id);
2351     if (stop_dispatch)
2352       return ;
2353     if (delay_queue.empty())
2354       return ;
2355     utime_t release = delay_queue.front().first;
2356     m = delay_queue.front().second;
2357     string delay_msg_type = msgr->cct->_conf->ms_inject_delay_msg_type;
2358     utime_t now = ceph_clock_now();
2359     if ((release > now &&
2360         (delay_msg_type.empty() || m->get_type_name() == delay_msg_type))) {
2361       utime_t t = release - now;
2362       t.sleep();
2363     }
2364     delay_queue.pop_front();
2365   }
2366   if (msgr->ms_can_fast_dispatch(m)) {
2367     dispatch_queue->fast_dispatch(m);
2368   } else {
2369     dispatch_queue->enqueue(m, m->get_priority(), conn_id);
2370   }
2371 }
2372
2373 void AsyncConnection::DelayedDelivery::flush() {
2374   stop_dispatch = true;
2375   center->submit_to(
2376       center->get_id(), [this] () mutable {
2377     std::lock_guard<std::mutex> l(delay_lock);
2378     while (!delay_queue.empty()) {
2379       Message *m = delay_queue.front().second;
2380       if (msgr->ms_can_fast_dispatch(m)) {
2381         dispatch_queue->fast_dispatch(m);
2382       } else {
2383         dispatch_queue->enqueue(m, m->get_priority(), conn_id);
2384       }
2385       delay_queue.pop_front();
2386     }
2387     for (auto i : register_time_events)
2388       center->delete_time_event(i);
2389     register_time_events.clear();
2390     stop_dispatch = false;
2391   }, true);
2392 }
2393
2394 void AsyncConnection::send_keepalive()
2395 {
2396   ldout(async_msgr->cct, 10) << __func__ << dendl;
2397   std::lock_guard<std::mutex> l(write_lock);
2398   if (can_write != WriteStatus::CLOSED) {
2399     keepalive = true;
2400     center->dispatch_event_external(write_handler);
2401   }
2402 }
2403
2404 void AsyncConnection::mark_down()
2405 {
2406   ldout(async_msgr->cct, 1) << __func__ << dendl;
2407   std::lock_guard<std::mutex> l(lock);
2408   _stop();
2409 }
2410
2411 void AsyncConnection::_append_keepalive_or_ack(bool ack, utime_t *tp)
2412 {
2413   ldout(async_msgr->cct, 10) << __func__ << dendl;
2414   if (ack) {
2415     assert(tp);
2416     struct ceph_timespec ts;
2417     tp->encode_timeval(&ts);
2418     outcoming_bl.append(CEPH_MSGR_TAG_KEEPALIVE2_ACK);
2419     outcoming_bl.append((char*)&ts, sizeof(ts));
2420   } else if (has_feature(CEPH_FEATURE_MSGR_KEEPALIVE2)) {
2421     struct ceph_timespec ts;
2422     utime_t t = ceph_clock_now();
2423     t.encode_timeval(&ts);
2424     outcoming_bl.append(CEPH_MSGR_TAG_KEEPALIVE2);
2425     outcoming_bl.append((char*)&ts, sizeof(ts));
2426   } else {
2427     outcoming_bl.append(CEPH_MSGR_TAG_KEEPALIVE);
2428   }
2429 }
2430
2431 void AsyncConnection::handle_write()
2432 {
2433   ldout(async_msgr->cct, 10) << __func__ << dendl;
2434   ssize_t r = 0;
2435
2436   write_lock.lock();
2437   if (can_write == WriteStatus::CANWRITE) {
2438     if (keepalive) {
2439       _append_keepalive_or_ack();
2440       keepalive = false;
2441     }
2442
2443     auto start = ceph::mono_clock::now();
2444     bool more;
2445     do {
2446       bufferlist data;
2447       Message *m = _get_next_outgoing(&data);
2448       if (!m)
2449         break;
2450
2451       if (!policy.lossy) {
2452         // put on sent list
2453         sent.push_back(m);
2454         m->get();
2455       }
2456       more = _has_next_outgoing();
2457       write_lock.unlock();
2458
2459       // send_message or requeue messages may not encode message
2460       if (!data.length())
2461         prepare_send_message(get_features(), m, data);
2462
2463       r = write_message(m, data, more);
2464       if (r < 0) {
2465         ldout(async_msgr->cct, 1) << __func__ << " send msg failed" << dendl;
2466         goto fail;
2467       }
2468       write_lock.lock();
2469       if (r > 0)
2470         break;
2471     } while (can_write == WriteStatus::CANWRITE);
2472     write_lock.unlock();
2473
2474     uint64_t left = ack_left;
2475     if (left) {
2476       ceph_le64 s;
2477       s = in_seq;
2478       outcoming_bl.append(CEPH_MSGR_TAG_ACK);
2479       outcoming_bl.append((char*)&s, sizeof(s));
2480       ldout(async_msgr->cct, 10) << __func__ << " try send msg ack, acked " << left << " messages" << dendl;
2481       ack_left -= left;
2482       left = ack_left;
2483       r = _try_send(left);
2484     } else if (is_queued()) {
2485       r = _try_send();
2486     }
2487
2488     logger->tinc(l_msgr_running_send_time, ceph::mono_clock::now() - start);
2489     if (r < 0) {
2490       ldout(async_msgr->cct, 1) << __func__ << " send msg failed" << dendl;
2491       goto fail;
2492     }
2493   } else {
2494     write_lock.unlock();
2495     lock.lock();
2496     write_lock.lock();
2497     if (state == STATE_STANDBY && !policy.server && is_queued()) {
2498       ldout(async_msgr->cct, 10) << __func__ << " policy.server is false" << dendl;
2499       _connect();
2500     } else if (cs && state != STATE_NONE && state != STATE_CONNECTING && state != STATE_CONNECTING_RE && state != STATE_CLOSED) {
2501       r = _try_send();
2502       if (r < 0) {
2503         ldout(async_msgr->cct, 1) << __func__ << " send outcoming bl failed" << dendl;
2504         write_lock.unlock();
2505         fault();
2506         lock.unlock();
2507         return ;
2508       }
2509     }
2510     write_lock.unlock();
2511     lock.unlock();
2512   }
2513
2514   return ;
2515
2516  fail:
2517   lock.lock();
2518   fault();
2519   lock.unlock();
2520 }
2521
2522 void AsyncConnection::wakeup_from(uint64_t id)
2523 {
2524   lock.lock();
2525   register_time_events.erase(id);
2526   lock.unlock();
2527   process();
2528 }
2529
2530 void AsyncConnection::tick(uint64_t id)
2531 {
2532   auto now = ceph::coarse_mono_clock::now();
2533   ldout(async_msgr->cct, 20) << __func__ << " last_id=" << last_tick_id
2534                              << " last_active" << last_active << dendl;
2535   std::lock_guard<std::mutex> l(lock);
2536   last_tick_id = 0;
2537   auto idle_period = std::chrono::duration_cast<std::chrono::microseconds>(now - last_active).count();
2538   if (inactive_timeout_us < (uint64_t)idle_period) {
2539     ldout(async_msgr->cct, 1) << __func__ << " idle(" << idle_period << ") more than "
2540                               << inactive_timeout_us
2541                               << " us, mark self fault." << dendl;
2542     fault();
2543   } else if (is_connected()) {
2544     last_tick_id = center->create_time_event(inactive_timeout_us, tick_handler);
2545   }
2546 }