Fix some bugs when testing opensds ansible
[stor4nfv.git] / src / ceph / src / msg / simple / Pipe.cc
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- 
2 // vim: ts=8 sw=2 smarttab
3 /*
4  * Ceph - scalable distributed file system
5  *
6  * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
7  *
8  * This is free software; you can redistribute it and/or
9  * modify it under the terms of the GNU Lesser General Public
10  * License version 2.1, as published by the Free Software 
11  * Foundation.  See file COPYING.
12  * 
13  */
14
15 #include <sys/types.h>
16 #include <sys/socket.h>
17 #include <netinet/in.h>
18 #include <netinet/ip.h>
19 #include <netinet/tcp.h>
20 #include <sys/uio.h>
21 #include <limits.h>
22 #include <poll.h>
23
24 #include "msg/Message.h"
25 #include "Pipe.h"
26 #include "SimpleMessenger.h"
27
28 #include "common/debug.h"
29 #include "common/errno.h"
30 #include "common/valgrind.h"
31
32 // Below included to get encode_encrypt(); That probably should be in Crypto.h, instead
33
34 #include "auth/Crypto.h"
35 #include "auth/cephx/CephxProtocol.h"
36 #include "auth/AuthSessionHandler.h"
37
38 #include "include/sock_compat.h"
39
40 // Constant to limit starting sequence number to 2^31.  Nothing special about it, just a big number.  PLR
41 #define SEQ_MASK  0x7fffffff 
42 #define dout_subsys ceph_subsys_ms
43
44 #undef dout_prefix
45 #define dout_prefix *_dout << *this
46 ostream& Pipe::_pipe_prefix(std::ostream &out) const {
47   return out << "-- " << msgr->get_myinst().addr << " >> " << peer_addr << " pipe(" << this
48              << " sd=" << sd << " :" << port
49              << " s=" << state
50              << " pgs=" << peer_global_seq
51              << " cs=" << connect_seq
52              << " l=" << policy.lossy
53              << " c=" << connection_state
54              << ").";
55 }
56
57 ostream& operator<<(ostream &out, const Pipe &pipe) {
58   return pipe._pipe_prefix(out);
59 }
60
61 /**
62  * The DelayedDelivery is for injecting delays into Message delivery off
63  * the socket. It is only enabled if delays are requested, and if they
64  * are then it pulls Messages off the DelayQueue and puts them into the
65  * in_q (SimpleMessenger::dispatch_queue).
66  * Please note that this probably has issues with Pipe shutdown and
67  * replacement semantics. I've tried, but no guarantees.
68  */
69 class Pipe::DelayedDelivery: public Thread {
70   Pipe *pipe;
71   std::deque< pair<utime_t,Message*> > delay_queue;
72   Mutex delay_lock;
73   Cond delay_cond;
74   int flush_count;
75   bool active_flush;
76   bool stop_delayed_delivery;
77   bool delay_dispatching; // we are in fast dispatch now
78   bool stop_fast_dispatching_flag; // we need to stop fast dispatching
79
80 public:
81   explicit DelayedDelivery(Pipe *p)
82     : pipe(p),
83       delay_lock("Pipe::DelayedDelivery::delay_lock"), flush_count(0),
84       active_flush(false),
85       stop_delayed_delivery(false),
86       delay_dispatching(false),
87       stop_fast_dispatching_flag(false) { }
88   ~DelayedDelivery() override {
89     discard();
90   }
91   void *entry() override;
92   void queue(utime_t release, Message *m) {
93     Mutex::Locker l(delay_lock);
94     delay_queue.push_back(make_pair(release, m));
95     delay_cond.Signal();
96   }
97   void discard();
98   void flush();
99   bool is_flushing() {
100     Mutex::Locker l(delay_lock);
101     return flush_count > 0 || active_flush;
102   }
103   void wait_for_flush() {
104     Mutex::Locker l(delay_lock);
105     while (flush_count > 0 || active_flush)
106       delay_cond.Wait(delay_lock);
107   }
108   void stop() {
109     delay_lock.Lock();
110     stop_delayed_delivery = true;
111     delay_cond.Signal();
112     delay_lock.Unlock();
113   }
114   void steal_for_pipe(Pipe *new_owner) {
115     Mutex::Locker l(delay_lock);
116     pipe = new_owner;
117   }
118   /**
119    * We need to stop fast dispatching before we need to stop putting
120    * normal messages into the DispatchQueue.
121    */
122   void stop_fast_dispatching();
123 };
124
125 /**************************************
126  * Pipe
127  */
128
129 Pipe::Pipe(SimpleMessenger *r, int st, PipeConnection *con)
130   : RefCountedObject(r->cct),
131     reader_thread(this),
132     writer_thread(this),
133     delay_thread(NULL),
134     msgr(r),
135     conn_id(r->dispatch_queue.get_id()),
136     recv_ofs(0),
137     recv_len(0),
138     sd(-1), port(0),
139     peer_type(-1),
140     pipe_lock("SimpleMessenger::Pipe::pipe_lock"),
141     state(st),
142     connection_state(NULL),
143     reader_running(false), reader_needs_join(false),
144     reader_dispatching(false), notify_on_dispatch_done(false),
145     writer_running(false),
146     in_q(&(r->dispatch_queue)),
147     send_keepalive(false),
148     send_keepalive_ack(false),
149     connect_seq(0), peer_global_seq(0),
150     out_seq(0), in_seq(0), in_seq_acked(0) {
151   ANNOTATE_BENIGN_RACE_SIZED(&sd, sizeof(sd), "Pipe socket");
152   ANNOTATE_BENIGN_RACE_SIZED(&state, sizeof(state), "Pipe state");
153   ANNOTATE_BENIGN_RACE_SIZED(&recv_len, sizeof(recv_len), "Pipe recv_len");
154   ANNOTATE_BENIGN_RACE_SIZED(&recv_ofs, sizeof(recv_ofs), "Pipe recv_ofs");
155   if (con) {
156     connection_state = con;
157     connection_state->reset_pipe(this);
158   } else {
159     connection_state = new PipeConnection(msgr->cct, msgr);
160     connection_state->pipe = get();
161   }
162
163   if (randomize_out_seq()) {
164     lsubdout(msgr->cct,ms,15) << "Pipe(): Could not get random bytes to set seq number for session reset; set seq number to " << out_seq << dendl;
165   }
166     
167
168   msgr->timeout = msgr->cct->_conf->ms_tcp_read_timeout * 1000; //convert to ms
169   if (msgr->timeout == 0)
170     msgr->timeout = -1;
171
172   recv_max_prefetch = msgr->cct->_conf->ms_tcp_prefetch_max_size;
173   recv_buf = new char[recv_max_prefetch];
174 }
175
176 Pipe::~Pipe()
177 {
178   assert(out_q.empty());
179   assert(sent.empty());
180   delete delay_thread;
181   delete[] recv_buf;
182 }
183
184 void Pipe::handle_ack(uint64_t seq)
185 {
186   lsubdout(msgr->cct, ms, 15) << "reader got ack seq " << seq << dendl;
187   // trim sent list
188   while (!sent.empty() &&
189          sent.front()->get_seq() <= seq) {
190     Message *m = sent.front();
191     sent.pop_front();
192     lsubdout(msgr->cct, ms, 10) << "reader got ack seq "
193                                 << seq << " >= " << m->get_seq() << " on " << m << " " << *m << dendl;
194     m->put();
195   }
196 }
197
198 void Pipe::start_reader()
199 {
200   assert(pipe_lock.is_locked());
201   assert(!reader_running);
202   if (reader_needs_join) {
203     reader_thread.join();
204     reader_needs_join = false;
205   }
206   reader_running = true;
207   reader_thread.create("ms_pipe_read", msgr->cct->_conf->ms_rwthread_stack_bytes);
208 }
209
210 void Pipe::maybe_start_delay_thread()
211 {
212   if (!delay_thread) {
213     auto pos = msgr->cct->_conf->get_val<std::string>("ms_inject_delay_type").find(ceph_entity_type_name(connection_state->peer_type));
214     if (pos != string::npos) {
215       lsubdout(msgr->cct, ms, 1) << "setting up a delay queue on Pipe " << this << dendl;
216       delay_thread = new DelayedDelivery(this);
217       delay_thread->create("ms_pipe_delay");
218     }
219   }
220 }
221
222 void Pipe::start_writer()
223 {
224   assert(pipe_lock.is_locked());
225   assert(!writer_running);
226   writer_running = true;
227   writer_thread.create("ms_pipe_write", msgr->cct->_conf->ms_rwthread_stack_bytes);
228 }
229
230 void Pipe::join_reader()
231 {
232   if (!reader_running)
233     return;
234   cond.Signal();
235   pipe_lock.Unlock();
236   reader_thread.join();
237   pipe_lock.Lock();
238   reader_needs_join = false;
239 }
240
241 void Pipe::DelayedDelivery::discard()
242 {
243   lgeneric_subdout(pipe->msgr->cct, ms, 20) << *pipe << "DelayedDelivery::discard" << dendl;
244   Mutex::Locker l(delay_lock);
245   while (!delay_queue.empty()) {
246     Message *m = delay_queue.front().second;
247     pipe->in_q->dispatch_throttle_release(m->get_dispatch_throttle_size());
248     m->put();
249     delay_queue.pop_front();
250   }
251 }
252
253 void Pipe::DelayedDelivery::flush()
254 {
255   lgeneric_subdout(pipe->msgr->cct, ms, 20) << *pipe << "DelayedDelivery::flush" << dendl;
256   Mutex::Locker l(delay_lock);
257   flush_count = delay_queue.size();
258   delay_cond.Signal();
259 }
260
261 void *Pipe::DelayedDelivery::entry()
262 {
263   Mutex::Locker locker(delay_lock);
264   lgeneric_subdout(pipe->msgr->cct, ms, 20) << *pipe << "DelayedDelivery::entry start" << dendl;
265
266   while (!stop_delayed_delivery) {
267     if (delay_queue.empty()) {
268       lgeneric_subdout(pipe->msgr->cct, ms, 30) << *pipe << "DelayedDelivery::entry sleeping on delay_cond because delay queue is empty" << dendl;
269       delay_cond.Wait(delay_lock);
270       continue;
271     }
272     utime_t release = delay_queue.front().first;
273     Message *m = delay_queue.front().second;
274     string delay_msg_type = pipe->msgr->cct->_conf->ms_inject_delay_msg_type;
275     if (!flush_count &&
276         (release > ceph_clock_now() &&
277          (delay_msg_type.empty() || m->get_type_name() == delay_msg_type))) {
278       lgeneric_subdout(pipe->msgr->cct, ms, 10) << *pipe << "DelayedDelivery::entry sleeping on delay_cond until " << release << dendl;
279       delay_cond.WaitUntil(delay_lock, release);
280       continue;
281     }
282     lgeneric_subdout(pipe->msgr->cct, ms, 10) << *pipe << "DelayedDelivery::entry dequeuing message " << m << " for delivery, past " << release << dendl;
283     delay_queue.pop_front();
284     if (flush_count > 0) {
285       --flush_count;
286       active_flush = true;
287     }
288     if (pipe->in_q->can_fast_dispatch(m)) {
289       if (!stop_fast_dispatching_flag) {
290         delay_dispatching = true;
291         delay_lock.Unlock();
292         pipe->in_q->fast_dispatch(m);
293         delay_lock.Lock();
294         delay_dispatching = false;
295         if (stop_fast_dispatching_flag) {
296           // we need to let the stopping thread proceed
297           delay_cond.Signal();
298           delay_lock.Unlock();
299           delay_lock.Lock();
300         }
301       }
302     } else {
303       pipe->in_q->enqueue(m, m->get_priority(), pipe->conn_id);
304     }
305     active_flush = false;
306   }
307   lgeneric_subdout(pipe->msgr->cct, ms, 20) << *pipe << "DelayedDelivery::entry stop" << dendl;
308   return NULL;
309 }
310
311 void Pipe::DelayedDelivery::stop_fast_dispatching() {
312   Mutex::Locker l(delay_lock);
313   stop_fast_dispatching_flag = true;
314   while (delay_dispatching)
315     delay_cond.Wait(delay_lock);
316 }
317
318
319 int Pipe::accept()
320 {
321   ldout(msgr->cct,10) << "accept" << dendl;
322   assert(pipe_lock.is_locked());
323   assert(state == STATE_ACCEPTING);
324
325   pipe_lock.Unlock();
326
327   // vars
328   bufferlist addrs;
329   entity_addr_t socket_addr;
330   socklen_t len;
331   int r;
332   char banner[strlen(CEPH_BANNER)+1];
333   bufferlist addrbl;
334   ceph_msg_connect connect;
335   ceph_msg_connect_reply reply;
336   Pipe *existing = 0;
337   bufferptr bp;
338   bufferlist authorizer, authorizer_reply;
339   bool authorizer_valid;
340   uint64_t feat_missing;
341   bool replaced = false;
342   // this variable denotes if the connection attempt from peer is a hard 
343   // reset or not, it is true if there is an existing connection and the
344   // connection sequence from peer is equal to zero
345   bool is_reset_from_peer = false;
346   CryptoKey session_key;
347   int removed; // single-use down below
348
349   // this should roughly mirror pseudocode at
350   //  http://ceph.com/wiki/Messaging_protocol
351   int reply_tag = 0;
352   uint64_t existing_seq = -1;
353
354   // used for reading in the remote acked seq on connect
355   uint64_t newly_acked_seq = 0;
356
357   recv_reset();
358
359   set_socket_options();
360
361   // announce myself.
362   r = tcp_write(CEPH_BANNER, strlen(CEPH_BANNER));
363   if (r < 0) {
364     ldout(msgr->cct,10) << "accept couldn't write banner" << dendl;
365     goto fail_unlocked;
366   }
367
368   // and my addr
369   ::encode(msgr->my_inst.addr, addrs, 0);  // legacy
370
371   port = msgr->my_inst.addr.get_port();
372
373   // and peer's socket addr (they might not know their ip)
374   sockaddr_storage ss;
375   len = sizeof(ss);
376   r = ::getpeername(sd, (sockaddr*)&ss, &len);
377   if (r < 0) {
378     ldout(msgr->cct,0) << "accept failed to getpeername " << cpp_strerror(errno) << dendl;
379     goto fail_unlocked;
380   }
381   socket_addr.set_sockaddr((sockaddr*)&ss);
382   ::encode(socket_addr, addrs, 0);  // legacy
383
384   r = tcp_write(addrs.c_str(), addrs.length());
385   if (r < 0) {
386     ldout(msgr->cct,10) << "accept couldn't write my+peer addr" << dendl;
387     goto fail_unlocked;
388   }
389
390   ldout(msgr->cct,1) << "accept sd=" << sd << " " << socket_addr << dendl;
391   
392   // identify peer
393   if (tcp_read(banner, strlen(CEPH_BANNER)) < 0) {
394     ldout(msgr->cct,10) << "accept couldn't read banner" << dendl;
395     goto fail_unlocked;
396   }
397   if (memcmp(banner, CEPH_BANNER, strlen(CEPH_BANNER))) {
398     banner[strlen(CEPH_BANNER)] = 0;
399     ldout(msgr->cct,1) << "accept peer sent bad banner '" << banner << "' (should be '" << CEPH_BANNER << "')" << dendl;
400     goto fail_unlocked;
401   }
402   {
403     bufferptr tp(sizeof(ceph_entity_addr));
404     addrbl.push_back(std::move(tp));
405   }
406   if (tcp_read(addrbl.c_str(), addrbl.length()) < 0) {
407     ldout(msgr->cct,10) << "accept couldn't read peer_addr" << dendl;
408     goto fail_unlocked;
409   }
410   {
411     bufferlist::iterator ti = addrbl.begin();
412     ::decode(peer_addr, ti);
413   }
414
415   ldout(msgr->cct,10) << "accept peer addr is " << peer_addr << dendl;
416   if (peer_addr.is_blank_ip()) {
417     // peer apparently doesn't know what ip they have; figure it out for them.
418     int port = peer_addr.get_port();
419     peer_addr.u = socket_addr.u;
420     peer_addr.set_port(port);
421     ldout(msgr->cct,0) << "accept peer addr is really " << peer_addr
422             << " (socket is " << socket_addr << ")" << dendl;
423   }
424   set_peer_addr(peer_addr);  // so that connection_state gets set up
425   
426   while (1) {
427     if (tcp_read((char*)&connect, sizeof(connect)) < 0) {
428       ldout(msgr->cct,10) << "accept couldn't read connect" << dendl;
429       goto fail_unlocked;
430     }
431
432     authorizer.clear();
433     if (connect.authorizer_len) {
434       bp = buffer::create(connect.authorizer_len);
435       if (tcp_read(bp.c_str(), connect.authorizer_len) < 0) {
436         ldout(msgr->cct,10) << "accept couldn't read connect authorizer" << dendl;
437         goto fail_unlocked;
438       }
439       authorizer.push_back(std::move(bp));
440       authorizer_reply.clear();
441     }
442
443     ldout(msgr->cct,20) << "accept got peer connect_seq " << connect.connect_seq
444              << " global_seq " << connect.global_seq
445              << dendl;
446     
447     msgr->lock.Lock();   // FIXME
448     pipe_lock.Lock();
449     if (msgr->dispatch_queue.stop)
450       goto shutting_down;
451     if (state != STATE_ACCEPTING) {
452       goto shutting_down;
453     }
454
455     // note peer's type, flags
456     set_peer_type(connect.host_type);
457     policy = msgr->get_policy(connect.host_type);
458     ldout(msgr->cct,10) << "accept of host_type " << connect.host_type
459                         << ", policy.lossy=" << policy.lossy
460                         << " policy.server=" << policy.server
461                         << " policy.standby=" << policy.standby
462                         << " policy.resetcheck=" << policy.resetcheck
463                         << dendl;
464
465     memset(&reply, 0, sizeof(reply));
466     reply.protocol_version = msgr->get_proto_version(peer_type, false);
467     msgr->lock.Unlock();
468
469     // mismatch?
470     ldout(msgr->cct,10) << "accept my proto " << reply.protocol_version
471              << ", their proto " << connect.protocol_version << dendl;
472     if (connect.protocol_version != reply.protocol_version) {
473       reply.tag = CEPH_MSGR_TAG_BADPROTOVER;
474       goto reply;
475     }
476
477     // require signatures for cephx?
478     if (connect.authorizer_protocol == CEPH_AUTH_CEPHX) {
479       if (peer_type == CEPH_ENTITY_TYPE_OSD ||
480           peer_type == CEPH_ENTITY_TYPE_MDS) {
481         if (msgr->cct->_conf->cephx_require_signatures ||
482             msgr->cct->_conf->cephx_cluster_require_signatures) {
483           ldout(msgr->cct,10) << "using cephx, requiring MSG_AUTH feature bit for cluster" << dendl;
484           policy.features_required |= CEPH_FEATURE_MSG_AUTH;
485         }
486       } else {
487         if (msgr->cct->_conf->cephx_require_signatures ||
488             msgr->cct->_conf->cephx_service_require_signatures) {
489           ldout(msgr->cct,10) << "using cephx, requiring MSG_AUTH feature bit for service" << dendl;
490           policy.features_required |= CEPH_FEATURE_MSG_AUTH;
491         }
492       }
493     }
494
495     feat_missing = policy.features_required & ~(uint64_t)connect.features;
496     if (feat_missing) {
497       ldout(msgr->cct,1) << "peer missing required features " << std::hex << feat_missing << std::dec << dendl;
498       reply.tag = CEPH_MSGR_TAG_FEATURES;
499       goto reply;
500     }
501     
502     // Check the authorizer.  If not good, bail out.
503
504     pipe_lock.Unlock();
505
506     if (!msgr->verify_authorizer(connection_state.get(), peer_type, connect.authorizer_protocol, authorizer,
507                                  authorizer_reply, authorizer_valid, session_key) ||
508         !authorizer_valid) {
509       ldout(msgr->cct,0) << "accept: got bad authorizer" << dendl;
510       pipe_lock.Lock();
511       if (state != STATE_ACCEPTING)
512         goto shutting_down_msgr_unlocked;
513       reply.tag = CEPH_MSGR_TAG_BADAUTHORIZER;
514       session_security.reset();
515       goto reply;
516     } 
517
518     // We've verified the authorizer for this pipe, so set up the session security structure.  PLR
519
520     ldout(msgr->cct,10) << "accept:  setting up session_security." << dendl;
521
522   retry_existing_lookup:
523     msgr->lock.Lock();
524     pipe_lock.Lock();
525     if (msgr->dispatch_queue.stop)
526       goto shutting_down;
527     if (state != STATE_ACCEPTING)
528       goto shutting_down;
529     
530     // existing?
531     existing = msgr->_lookup_pipe(peer_addr);
532     if (existing) {
533       existing->pipe_lock.Lock(true);  // skip lockdep check (we are locking a second Pipe here)
534       if (existing->reader_dispatching) {
535         /** we need to wait, or we can deadlock if downstream
536          *  fast_dispatchers are (naughtily!) waiting on resources
537          *  held by somebody trying to make use of the SimpleMessenger lock.
538          *  So drop locks, wait, and retry. It just looks like a slow network
539          *  to everybody else.
540          *
541          *  We take a ref to existing here since it might get reaped before we
542          *  wake up (see bug #15870).  We can be confident that it lived until
543          *  locked it since we held the msgr lock from _lookup_pipe through to
544          *  locking existing->lock and checking reader_dispatching.
545          */
546         existing->get();
547         pipe_lock.Unlock();
548         msgr->lock.Unlock();
549         existing->notify_on_dispatch_done = true;
550         while (existing->reader_dispatching)
551           existing->cond.Wait(existing->pipe_lock);
552         existing->pipe_lock.Unlock();
553         existing->put();
554         existing = nullptr;
555         goto retry_existing_lookup;
556       }
557
558       if (connect.global_seq < existing->peer_global_seq) {
559         ldout(msgr->cct,10) << "accept existing " << existing << ".gseq " << existing->peer_global_seq
560                  << " > " << connect.global_seq << ", RETRY_GLOBAL" << dendl;
561         reply.tag = CEPH_MSGR_TAG_RETRY_GLOBAL;
562         reply.global_seq = existing->peer_global_seq;  // so we can send it below..
563         existing->pipe_lock.Unlock();
564         msgr->lock.Unlock();
565         goto reply;
566       } else {
567         ldout(msgr->cct,10) << "accept existing " << existing << ".gseq " << existing->peer_global_seq
568                  << " <= " << connect.global_seq << ", looks ok" << dendl;
569       }
570       
571       if (existing->policy.lossy) {
572         ldout(msgr->cct,0) << "accept replacing existing (lossy) channel (new one lossy="
573                 << policy.lossy << ")" << dendl;
574         existing->was_session_reset();
575         goto replace;
576       }
577
578       ldout(msgr->cct,0) << "accept connect_seq " << connect.connect_seq
579                          << " vs existing " << existing->connect_seq
580                          << " state " << existing->get_state_name() << dendl;
581
582       if (connect.connect_seq == 0 && existing->connect_seq > 0) {
583         ldout(msgr->cct,0) << "accept peer reset, then tried to connect to us, replacing" << dendl;
584         // this is a hard reset from peer
585         is_reset_from_peer = true;
586         if (policy.resetcheck)
587           existing->was_session_reset(); // this resets out_queue, msg_ and connect_seq #'s
588         goto replace;
589       }
590
591       if (connect.connect_seq < existing->connect_seq) {
592         // old attempt, or we sent READY but they didn't get it.
593         ldout(msgr->cct,10) << "accept existing " << existing << ".cseq " << existing->connect_seq
594                             << " > " << connect.connect_seq << ", RETRY_SESSION" << dendl;
595         goto retry_session;
596       }
597
598       if (connect.connect_seq == existing->connect_seq) {
599         // if the existing connection successfully opened, and/or
600         // subsequently went to standby, then the peer should bump
601         // their connect_seq and retry: this is not a connection race
602         // we need to resolve here.
603         if (existing->state == STATE_OPEN ||
604             existing->state == STATE_STANDBY) {
605           ldout(msgr->cct,10) << "accept connection race, existing " << existing
606                               << ".cseq " << existing->connect_seq
607                               << " == " << connect.connect_seq
608                               << ", OPEN|STANDBY, RETRY_SESSION" << dendl;
609           goto retry_session;
610         }
611
612         // connection race?
613         if (peer_addr < msgr->my_inst.addr ||
614             existing->policy.server) {
615           // incoming wins
616           ldout(msgr->cct,10) << "accept connection race, existing " << existing << ".cseq " << existing->connect_seq
617                    << " == " << connect.connect_seq << ", or we are server, replacing my attempt" << dendl;
618           if (!(existing->state == STATE_CONNECTING ||
619                 existing->state == STATE_WAIT))
620             lderr(msgr->cct) << "accept race bad state, would replace, existing="
621                              << existing->get_state_name()
622                              << " " << existing << ".cseq=" << existing->connect_seq
623                              << " == " << connect.connect_seq
624                              << dendl;
625           assert(existing->state == STATE_CONNECTING ||
626                  existing->state == STATE_WAIT);
627           goto replace;
628         } else {
629           // our existing outgoing wins
630           ldout(msgr->cct,10) << "accept connection race, existing " << existing << ".cseq " << existing->connect_seq
631                    << " == " << connect.connect_seq << ", sending WAIT" << dendl;
632           assert(peer_addr > msgr->my_inst.addr);
633           if (!(existing->state == STATE_CONNECTING))
634             lderr(msgr->cct) << "accept race bad state, would send wait, existing="
635                              << existing->get_state_name()
636                              << " " << existing << ".cseq=" << existing->connect_seq
637                              << " == " << connect.connect_seq
638                              << dendl;
639           assert(existing->state == STATE_CONNECTING);
640           // make sure our outgoing connection will follow through
641           existing->_send_keepalive();
642           reply.tag = CEPH_MSGR_TAG_WAIT;
643           existing->pipe_lock.Unlock();
644           msgr->lock.Unlock();
645           goto reply;
646         }
647       }
648
649       assert(connect.connect_seq > existing->connect_seq);
650       assert(connect.global_seq >= existing->peer_global_seq);
651       if (policy.resetcheck &&   // RESETSESSION only used by servers; peers do not reset each other
652           existing->connect_seq == 0) {
653         ldout(msgr->cct,0) << "accept we reset (peer sent cseq " << connect.connect_seq 
654                  << ", " << existing << ".cseq = " << existing->connect_seq
655                  << "), sending RESETSESSION" << dendl;
656         reply.tag = CEPH_MSGR_TAG_RESETSESSION;
657         msgr->lock.Unlock();
658         existing->pipe_lock.Unlock();
659         goto reply;
660       }
661
662       // reconnect
663       ldout(msgr->cct,10) << "accept peer sent cseq " << connect.connect_seq
664                << " > " << existing->connect_seq << dendl;
665       goto replace;
666     } // existing
667     else if (connect.connect_seq > 0) {
668       // we reset, and they are opening a new session
669       ldout(msgr->cct,0) << "accept we reset (peer sent cseq " << connect.connect_seq << "), sending RESETSESSION" << dendl;
670       msgr->lock.Unlock();
671       reply.tag = CEPH_MSGR_TAG_RESETSESSION;
672       goto reply;
673     } else {
674       // new session
675       ldout(msgr->cct,10) << "accept new session" << dendl;
676       existing = NULL;
677       goto open;
678     }
679     ceph_abort();
680
681   retry_session:
682     assert(existing->pipe_lock.is_locked());
683     assert(pipe_lock.is_locked());
684     reply.tag = CEPH_MSGR_TAG_RETRY_SESSION;
685     reply.connect_seq = existing->connect_seq + 1;
686     existing->pipe_lock.Unlock();
687     msgr->lock.Unlock();
688     goto reply;    
689
690   reply:
691     assert(pipe_lock.is_locked());
692     reply.features = ((uint64_t)connect.features & policy.features_supported) | policy.features_required;
693     reply.authorizer_len = authorizer_reply.length();
694     pipe_lock.Unlock();
695     r = tcp_write((char*)&reply, sizeof(reply));
696     if (r < 0)
697       goto fail_unlocked;
698     if (reply.authorizer_len) {
699       r = tcp_write(authorizer_reply.c_str(), authorizer_reply.length());
700       if (r < 0)
701         goto fail_unlocked;
702     }
703   }
704   
705  replace:
706   assert(existing->pipe_lock.is_locked());
707   assert(pipe_lock.is_locked());
708   // if it is a hard reset from peer, we don't need a round-trip to negotiate in/out sequence
709   if ((connect.features & CEPH_FEATURE_RECONNECT_SEQ) && !is_reset_from_peer) {
710     reply_tag = CEPH_MSGR_TAG_SEQ;
711     existing_seq = existing->in_seq;
712   }
713   ldout(msgr->cct,10) << "accept replacing " << existing << dendl;
714   existing->stop();
715   existing->unregister_pipe();
716   replaced = true;
717
718   if (existing->policy.lossy) {
719     // disconnect from the Connection
720     assert(existing->connection_state);
721     if (existing->connection_state->clear_pipe(existing))
722       msgr->dispatch_queue.queue_reset(existing->connection_state.get());
723   } else {
724     // queue a reset on the new connection, which we're dumping for the old
725     msgr->dispatch_queue.queue_reset(connection_state.get());
726
727     // drop my Connection, and take a ref to the existing one. do not
728     // clear existing->connection_state, since read_message and
729     // write_message both dereference it without pipe_lock.
730     connection_state = existing->connection_state;
731
732     // make existing Connection reference us
733     connection_state->reset_pipe(this);
734
735     if (existing->delay_thread) {
736       existing->delay_thread->steal_for_pipe(this);
737       delay_thread = existing->delay_thread;
738       existing->delay_thread = NULL;
739       delay_thread->flush();
740     }
741
742     // steal incoming queue
743     uint64_t replaced_conn_id = conn_id;
744     conn_id = existing->conn_id;
745     existing->conn_id = replaced_conn_id;
746
747     // reset the in_seq if this is a hard reset from peer,
748     // otherwise we respect our original connection's value
749     in_seq = is_reset_from_peer ? 0 : existing->in_seq;
750     in_seq_acked = in_seq;
751
752     // steal outgoing queue and out_seq
753     existing->requeue_sent();
754     out_seq = existing->out_seq;
755     ldout(msgr->cct,10) << "accept re-queuing on out_seq " << out_seq << " in_seq " << in_seq << dendl;
756     for (map<int, list<Message*> >::iterator p = existing->out_q.begin();
757          p != existing->out_q.end();
758          ++p)
759       out_q[p->first].splice(out_q[p->first].begin(), p->second);
760   }
761   existing->stop_and_wait();
762   existing->pipe_lock.Unlock();
763
764  open:
765   // open
766   assert(pipe_lock.is_locked());
767   connect_seq = connect.connect_seq + 1;
768   peer_global_seq = connect.global_seq;
769   assert(state == STATE_ACCEPTING);
770   state = STATE_OPEN;
771   ldout(msgr->cct,10) << "accept success, connect_seq = " << connect_seq << ", sending READY" << dendl;
772
773   // send READY reply
774   reply.tag = (reply_tag ? reply_tag : CEPH_MSGR_TAG_READY);
775   reply.features = policy.features_supported;
776   reply.global_seq = msgr->get_global_seq();
777   reply.connect_seq = connect_seq;
778   reply.flags = 0;
779   reply.authorizer_len = authorizer_reply.length();
780   if (policy.lossy)
781     reply.flags = reply.flags | CEPH_MSG_CONNECT_LOSSY;
782
783   connection_state->set_features((uint64_t)reply.features & (uint64_t)connect.features);
784   ldout(msgr->cct,10) << "accept features " << connection_state->get_features() << dendl;
785
786   session_security.reset(
787       get_auth_session_handler(msgr->cct,
788                                connect.authorizer_protocol,
789                                session_key,
790                                connection_state->get_features()));
791
792   // notify
793   msgr->dispatch_queue.queue_accept(connection_state.get());
794   msgr->ms_deliver_handle_fast_accept(connection_state.get());
795
796   // ok!
797   if (msgr->dispatch_queue.stop)
798     goto shutting_down;
799   removed = msgr->accepting_pipes.erase(this);
800   assert(removed == 1);
801   register_pipe();
802   msgr->lock.Unlock();
803   pipe_lock.Unlock();
804
805   r = tcp_write((char*)&reply, sizeof(reply));
806   if (r < 0) {
807     goto fail_registered;
808   }
809
810   if (reply.authorizer_len) {
811     r = tcp_write(authorizer_reply.c_str(), authorizer_reply.length());
812     if (r < 0) {
813       goto fail_registered;
814     }
815   }
816
817   if (reply_tag == CEPH_MSGR_TAG_SEQ) {
818     if (tcp_write((char*)&existing_seq, sizeof(existing_seq)) < 0) {
819       ldout(msgr->cct,2) << "accept write error on in_seq" << dendl;
820       goto fail_registered;
821     }
822     if (tcp_read((char*)&newly_acked_seq, sizeof(newly_acked_seq)) < 0) {
823       ldout(msgr->cct,2) << "accept read error on newly_acked_seq" << dendl;
824       goto fail_registered;
825     }
826   }
827
828   pipe_lock.Lock();
829   discard_requeued_up_to(newly_acked_seq);
830   if (state != STATE_CLOSED) {
831     ldout(msgr->cct,10) << "accept starting writer, state " << get_state_name() << dendl;
832     start_writer();
833   }
834   ldout(msgr->cct,20) << "accept done" << dendl;
835
836   maybe_start_delay_thread();
837
838   return 0;   // success.
839
840  fail_registered:
841   ldout(msgr->cct, 10) << "accept fault after register" << dendl;
842
843   if (msgr->cct->_conf->ms_inject_internal_delays) {
844     ldout(msgr->cct, 10) << " sleep for " << msgr->cct->_conf->ms_inject_internal_delays << dendl;
845     utime_t t;
846     t.set_from_double(msgr->cct->_conf->ms_inject_internal_delays);
847     t.sleep();
848   }
849
850  fail_unlocked:
851   pipe_lock.Lock();
852   if (state != STATE_CLOSED) {
853     bool queued = is_queued();
854     ldout(msgr->cct, 10) << "  queued = " << (int)queued << dendl;
855     if (queued) {
856       state = policy.server ? STATE_STANDBY : STATE_CONNECTING;
857     } else if (replaced) {
858       state = STATE_STANDBY;
859     } else {
860       state = STATE_CLOSED;
861       state_closed = true;
862     }
863     fault();
864     if (queued || replaced)
865       start_writer();
866   }
867   return -1;
868
869  shutting_down:
870   msgr->lock.Unlock();
871  shutting_down_msgr_unlocked:
872   assert(pipe_lock.is_locked());
873
874   if (msgr->cct->_conf->ms_inject_internal_delays) {
875     ldout(msgr->cct, 10) << " sleep for " << msgr->cct->_conf->ms_inject_internal_delays << dendl;
876     utime_t t;
877     t.set_from_double(msgr->cct->_conf->ms_inject_internal_delays);
878     t.sleep();
879   }
880
881   state = STATE_CLOSED;
882   state_closed = true;
883   fault();
884   return -1;
885 }
886
887 void Pipe::set_socket_options()
888 {
889   // disable Nagle algorithm?
890   if (msgr->cct->_conf->ms_tcp_nodelay) {
891     int flag = 1;
892     int r = ::setsockopt(sd, IPPROTO_TCP, TCP_NODELAY, (char*)&flag, sizeof(flag));
893     if (r < 0) {
894       r = -errno;
895       ldout(msgr->cct,0) << "couldn't set TCP_NODELAY: "
896                          << cpp_strerror(r) << dendl;
897     }
898   }
899   if (msgr->cct->_conf->ms_tcp_rcvbuf) {
900     int size = msgr->cct->_conf->ms_tcp_rcvbuf;
901     int r = ::setsockopt(sd, SOL_SOCKET, SO_RCVBUF, (void*)&size, sizeof(size));
902     if (r < 0)  {
903       r = -errno;
904       ldout(msgr->cct,0) << "couldn't set SO_RCVBUF to " << size
905                          << ": " << cpp_strerror(r) << dendl;
906     }
907   }
908
909   // block ESIGPIPE
910 #ifdef CEPH_USE_SO_NOSIGPIPE
911   int val = 1;
912   int r = ::setsockopt(sd, SOL_SOCKET, SO_NOSIGPIPE, (void*)&val, sizeof(val));
913   if (r) {
914     r = -errno;
915     ldout(msgr->cct,0) << "couldn't set SO_NOSIGPIPE: "
916                        << cpp_strerror(r) << dendl;
917   }
918 #endif
919
920 #ifdef SO_PRIORITY
921   int prio = msgr->get_socket_priority();
922   if (prio >= 0) {
923     int r = -1;
924 #ifdef IPTOS_CLASS_CS6
925     int iptos = IPTOS_CLASS_CS6;
926     int addr_family = 0;
927     if (!peer_addr.is_blank_ip()) {
928       addr_family = peer_addr.get_family();
929     } else {
930       addr_family = msgr->get_myaddr().get_family();
931     }
932     switch (addr_family) {
933     case AF_INET:
934       r = ::setsockopt(sd, IPPROTO_IP, IP_TOS, &iptos, sizeof(iptos));
935       break;
936     case AF_INET6:
937       r = ::setsockopt(sd, IPPROTO_IPV6, IPV6_TCLASS, &iptos, sizeof(iptos));
938       break;
939     default:
940       lderr(msgr->cct) << "couldn't set ToS of unknown family ("
941                        << addr_family << ")"
942                        << " to " << iptos << dendl;
943       return;
944     }
945     if (r < 0) {
946       r = -errno;
947       ldout(msgr->cct,0) << "couldn't set TOS to " << iptos
948                          << ": " << cpp_strerror(r) << dendl;
949     }
950 #endif
951     // setsockopt(IPTOS_CLASS_CS6) sets the priority of the socket as 0.
952     // See http://goo.gl/QWhvsD and http://goo.gl/laTbjT
953     // We need to call setsockopt(SO_PRIORITY) after it.
954     r = ::setsockopt(sd, SOL_SOCKET, SO_PRIORITY, &prio, sizeof(prio));
955     if (r < 0) {
956       r = -errno;
957       ldout(msgr->cct,0) << "couldn't set SO_PRIORITY to " << prio
958                          << ": " << cpp_strerror(r) << dendl;
959     }
960   }
961 #endif
962 }
963
964 int Pipe::connect()
965 {
966   bool got_bad_auth = false;
967
968   ldout(msgr->cct,10) << "connect " << connect_seq << dendl;
969   assert(pipe_lock.is_locked());
970
971   __u32 cseq = connect_seq;
972   __u32 gseq = msgr->get_global_seq();
973
974   // stop reader thread
975   join_reader();
976
977   pipe_lock.Unlock();
978   
979   char tag = -1;
980   int rc = -1;
981   struct msghdr msg;
982   struct iovec msgvec[2];
983   int msglen;
984   char banner[strlen(CEPH_BANNER) + 1];  // extra byte makes coverity happy
985   entity_addr_t paddr;
986   entity_addr_t peer_addr_for_me, socket_addr;
987   AuthAuthorizer *authorizer = NULL;
988   bufferlist addrbl, myaddrbl;
989   const md_config_t *conf = msgr->cct->_conf;
990
991   // close old socket.  this is safe because we stopped the reader thread above.
992   if (sd >= 0)
993     ::close(sd);
994
995   // create socket?
996   sd = ::socket(peer_addr.get_family(), SOCK_STREAM, 0);
997   if (sd < 0) {
998     rc = -errno;
999     lderr(msgr->cct) << "connect couldn't create socket " << cpp_strerror(rc) << dendl;
1000     goto fail;
1001   }
1002
1003   recv_reset();
1004
1005   set_socket_options();
1006
1007   {
1008     entity_addr_t addr2bind = msgr->get_myaddr();
1009     if (msgr->cct->_conf->ms_bind_before_connect && (!addr2bind.is_blank_ip())) {
1010       addr2bind.set_port(0);
1011       int r = ::bind(sd , addr2bind.get_sockaddr(), addr2bind.get_sockaddr_len());
1012       if (r < 0) {
1013         ldout(msgr->cct,2) << "client bind error " << ", " << cpp_strerror(errno) << dendl;
1014         goto fail;
1015       }
1016     }
1017   }
1018
1019   // connect!
1020   ldout(msgr->cct,10) << "connecting to " << peer_addr << dendl;
1021   rc = ::connect(sd, peer_addr.get_sockaddr(), peer_addr.get_sockaddr_len());
1022   if (rc < 0) {
1023     int stored_errno = errno;
1024     ldout(msgr->cct,2) << "connect error " << peer_addr
1025              << ", " << cpp_strerror(stored_errno) << dendl;
1026     if (stored_errno == ECONNREFUSED) {
1027       ldout(msgr->cct, 2) << "connection refused!" << dendl;
1028       msgr->dispatch_queue.queue_refused(connection_state.get());
1029     }
1030     goto fail;
1031   }
1032
1033   // verify banner
1034   // FIXME: this should be non-blocking, or in some other way verify the banner as we get it.
1035   rc = tcp_read((char*)&banner, strlen(CEPH_BANNER));
1036   if (rc < 0) {
1037     ldout(msgr->cct,2) << "connect couldn't read banner, " << cpp_strerror(rc) << dendl;
1038     goto fail;
1039   }
1040   if (memcmp(banner, CEPH_BANNER, strlen(CEPH_BANNER))) {
1041     ldout(msgr->cct,0) << "connect protocol error (bad banner) on peer " << peer_addr << dendl;
1042     goto fail;
1043   }
1044
1045   memset(&msg, 0, sizeof(msg));
1046   msgvec[0].iov_base = banner;
1047   msgvec[0].iov_len = strlen(CEPH_BANNER);
1048   msg.msg_iov = msgvec;
1049   msg.msg_iovlen = 1;
1050   msglen = msgvec[0].iov_len;
1051   rc = do_sendmsg(&msg, msglen);
1052   if (rc < 0) {
1053     ldout(msgr->cct,2) << "connect couldn't write my banner, " << cpp_strerror(rc) << dendl;
1054     goto fail;
1055   }
1056
1057   // identify peer
1058   {
1059 #if defined(__linux__) || defined(DARWIN) || defined(__FreeBSD__)
1060     bufferptr p(sizeof(ceph_entity_addr) * 2);
1061 #else
1062     int wirelen = sizeof(__u32) * 2 + sizeof(ceph_sockaddr_storage);
1063     bufferptr p(wirelen * 2);
1064 #endif
1065     addrbl.push_back(std::move(p));
1066   }
1067   rc = tcp_read(addrbl.c_str(), addrbl.length());
1068   if (rc < 0) {
1069     ldout(msgr->cct,2) << "connect couldn't read peer addrs, " << cpp_strerror(rc) << dendl;
1070     goto fail;
1071   }
1072   try {
1073     bufferlist::iterator p = addrbl.begin();
1074     ::decode(paddr, p);
1075     ::decode(peer_addr_for_me, p);
1076   }
1077   catch (buffer::error& e) {
1078     ldout(msgr->cct,2) << "connect couldn't decode peer addrs: " << e.what()
1079                        << dendl;
1080     goto fail;
1081   }
1082   port = peer_addr_for_me.get_port();
1083
1084   ldout(msgr->cct,20) << "connect read peer addr " << paddr << " on socket " << sd << dendl;
1085   if (peer_addr != paddr) {
1086     if (paddr.is_blank_ip() &&
1087         peer_addr.get_port() == paddr.get_port() &&
1088         peer_addr.get_nonce() == paddr.get_nonce()) {
1089       ldout(msgr->cct,0) << "connect claims to be " 
1090               << paddr << " not " << peer_addr << " - presumably this is the same node!" << dendl;
1091     } else {
1092       ldout(msgr->cct,10) << "connect claims to be "
1093                           << paddr << " not " << peer_addr << dendl;
1094       goto fail;
1095     }
1096   }
1097
1098   ldout(msgr->cct,20) << "connect peer addr for me is " << peer_addr_for_me << dendl;
1099
1100   msgr->learned_addr(peer_addr_for_me);
1101
1102   ::encode(msgr->my_inst.addr, myaddrbl, 0);  // legacy
1103
1104   memset(&msg, 0, sizeof(msg));
1105   msgvec[0].iov_base = myaddrbl.c_str();
1106   msgvec[0].iov_len = myaddrbl.length();
1107   msg.msg_iov = msgvec;
1108   msg.msg_iovlen = 1;
1109   msglen = msgvec[0].iov_len;
1110   rc = do_sendmsg(&msg, msglen);
1111   if (rc < 0) {
1112     ldout(msgr->cct,2) << "connect couldn't write my addr, " << cpp_strerror(rc) << dendl;
1113     goto fail;
1114   }
1115   ldout(msgr->cct,10) << "connect sent my addr " << msgr->my_inst.addr << dendl;
1116
1117
1118   while (1) {
1119     delete authorizer;
1120     authorizer = msgr->get_authorizer(peer_type, false);
1121     bufferlist authorizer_reply;
1122
1123     ceph_msg_connect connect;
1124     connect.features = policy.features_supported;
1125     connect.host_type = msgr->get_myinst().name.type();
1126     connect.global_seq = gseq;
1127     connect.connect_seq = cseq;
1128     connect.protocol_version = msgr->get_proto_version(peer_type, true);
1129     connect.authorizer_protocol = authorizer ? authorizer->protocol : 0;
1130     connect.authorizer_len = authorizer ? authorizer->bl.length() : 0;
1131     if (authorizer) 
1132       ldout(msgr->cct,10) << "connect.authorizer_len=" << connect.authorizer_len
1133                << " protocol=" << connect.authorizer_protocol << dendl;
1134     connect.flags = 0;
1135     if (policy.lossy)
1136       connect.flags |= CEPH_MSG_CONNECT_LOSSY;  // this is fyi, actually, server decides!
1137     memset(&msg, 0, sizeof(msg));
1138     msgvec[0].iov_base = (char*)&connect;
1139     msgvec[0].iov_len = sizeof(connect);
1140     msg.msg_iov = msgvec;
1141     msg.msg_iovlen = 1;
1142     msglen = msgvec[0].iov_len;
1143     if (authorizer) {
1144       msgvec[1].iov_base = authorizer->bl.c_str();
1145       msgvec[1].iov_len = authorizer->bl.length();
1146       msg.msg_iovlen++;
1147       msglen += msgvec[1].iov_len;
1148     }
1149
1150     ldout(msgr->cct,10) << "connect sending gseq=" << gseq << " cseq=" << cseq
1151              << " proto=" << connect.protocol_version << dendl;
1152     rc = do_sendmsg(&msg, msglen);
1153     if (rc < 0) {
1154       ldout(msgr->cct,2) << "connect couldn't write gseq, cseq, " << cpp_strerror(rc) << dendl;
1155       goto fail;
1156     }
1157
1158     ldout(msgr->cct,20) << "connect wrote (self +) cseq, waiting for reply" << dendl;
1159     ceph_msg_connect_reply reply;
1160     rc = tcp_read((char*)&reply, sizeof(reply));
1161     if (rc < 0) {
1162       ldout(msgr->cct,2) << "connect read reply " << cpp_strerror(rc) << dendl;
1163       goto fail;
1164     }
1165
1166     ldout(msgr->cct,20) << "connect got reply tag " << (int)reply.tag
1167                         << " connect_seq " << reply.connect_seq
1168                         << " global_seq " << reply.global_seq
1169                         << " proto " << reply.protocol_version
1170                         << " flags " << (int)reply.flags
1171                         << " features " << reply.features
1172                         << dendl;
1173
1174     authorizer_reply.clear();
1175
1176     if (reply.authorizer_len) {
1177       ldout(msgr->cct,10) << "reply.authorizer_len=" << reply.authorizer_len << dendl;
1178       bufferptr bp = buffer::create(reply.authorizer_len);
1179       rc = tcp_read(bp.c_str(), reply.authorizer_len);
1180       if (rc < 0) {
1181         ldout(msgr->cct,10) << "connect couldn't read connect authorizer_reply" << cpp_strerror(rc) << dendl;
1182         goto fail;
1183       }
1184       authorizer_reply.push_back(bp);
1185     }
1186
1187     if (authorizer) {
1188       bufferlist::iterator iter = authorizer_reply.begin();
1189       if (!authorizer->verify_reply(iter)) {
1190         ldout(msgr->cct,0) << "failed verifying authorize reply" << dendl;
1191         goto fail;
1192       }
1193     }
1194
1195     if (conf->ms_inject_internal_delays) {
1196       ldout(msgr->cct, 10) << " sleep for " << msgr->cct->_conf->ms_inject_internal_delays << dendl;
1197       utime_t t;
1198       t.set_from_double(msgr->cct->_conf->ms_inject_internal_delays);
1199       t.sleep();
1200     }
1201
1202     pipe_lock.Lock();
1203     if (state != STATE_CONNECTING) {
1204       ldout(msgr->cct,0) << "connect got RESETSESSION but no longer connecting" << dendl;
1205       goto stop_locked;
1206     }
1207
1208     if (reply.tag == CEPH_MSGR_TAG_FEATURES) {
1209       ldout(msgr->cct,0) << "connect protocol feature mismatch, my " << std::hex
1210               << connect.features << " < peer " << reply.features
1211               << " missing " << (reply.features & ~policy.features_supported)
1212               << std::dec << dendl;
1213       goto fail_locked;
1214     }
1215
1216     if (reply.tag == CEPH_MSGR_TAG_BADPROTOVER) {
1217       ldout(msgr->cct,0) << "connect protocol version mismatch, my " << connect.protocol_version
1218               << " != " << reply.protocol_version << dendl;
1219       goto fail_locked;
1220     }
1221
1222     if (reply.tag == CEPH_MSGR_TAG_BADAUTHORIZER) {
1223       ldout(msgr->cct,0) << "connect got BADAUTHORIZER" << dendl;
1224       if (got_bad_auth)
1225         goto stop_locked;
1226       got_bad_auth = true;
1227       pipe_lock.Unlock();
1228       delete authorizer;
1229       authorizer = msgr->get_authorizer(peer_type, true);  // try harder
1230       continue;
1231     }
1232     if (reply.tag == CEPH_MSGR_TAG_RESETSESSION) {
1233       ldout(msgr->cct,0) << "connect got RESETSESSION" << dendl;
1234       was_session_reset();
1235       cseq = 0;
1236       pipe_lock.Unlock();
1237       continue;
1238     }
1239     if (reply.tag == CEPH_MSGR_TAG_RETRY_GLOBAL) {
1240       gseq = msgr->get_global_seq(reply.global_seq);
1241       ldout(msgr->cct,10) << "connect got RETRY_GLOBAL " << reply.global_seq
1242                << " chose new " << gseq << dendl;
1243       pipe_lock.Unlock();
1244       continue;
1245     }
1246     if (reply.tag == CEPH_MSGR_TAG_RETRY_SESSION) {
1247       assert(reply.connect_seq > connect_seq);
1248       ldout(msgr->cct,10) << "connect got RETRY_SESSION " << connect_seq
1249                << " -> " << reply.connect_seq << dendl;
1250       cseq = connect_seq = reply.connect_seq;
1251       pipe_lock.Unlock();
1252       continue;
1253     }
1254
1255     if (reply.tag == CEPH_MSGR_TAG_WAIT) {
1256       ldout(msgr->cct,3) << "connect got WAIT (connection race)" << dendl;
1257       state = STATE_WAIT;
1258       goto stop_locked;
1259     }
1260
1261     if (reply.tag == CEPH_MSGR_TAG_READY ||
1262         reply.tag == CEPH_MSGR_TAG_SEQ) {
1263       uint64_t feat_missing = policy.features_required & ~(uint64_t)reply.features;
1264       if (feat_missing) {
1265         ldout(msgr->cct,1) << "missing required features " << std::hex << feat_missing << std::dec << dendl;
1266         goto fail_locked;
1267       }
1268
1269       if (reply.tag == CEPH_MSGR_TAG_SEQ) {
1270         ldout(msgr->cct,10) << "got CEPH_MSGR_TAG_SEQ, reading acked_seq and writing in_seq" << dendl;
1271         uint64_t newly_acked_seq = 0;
1272         rc = tcp_read((char*)&newly_acked_seq, sizeof(newly_acked_seq));
1273         if (rc < 0) {
1274           ldout(msgr->cct,2) << "connect read error on newly_acked_seq" << cpp_strerror(rc) << dendl;
1275           goto fail_locked;
1276         }
1277         ldout(msgr->cct,2) << " got newly_acked_seq " << newly_acked_seq
1278                            << " vs out_seq " << out_seq << dendl;
1279         while (newly_acked_seq > out_seq) {
1280           Message *m = _get_next_outgoing();
1281           assert(m);
1282           ldout(msgr->cct,2) << " discarding previously sent " << m->get_seq()
1283                              << " " << *m << dendl;
1284           assert(m->get_seq() <= newly_acked_seq);
1285           m->put();
1286           ++out_seq;
1287         }
1288         if (tcp_write((char*)&in_seq, sizeof(in_seq)) < 0) {
1289           ldout(msgr->cct,2) << "connect write error on in_seq" << dendl;
1290           goto fail_locked;
1291         }
1292       }
1293
1294       // hooray!
1295       peer_global_seq = reply.global_seq;
1296       policy.lossy = reply.flags & CEPH_MSG_CONNECT_LOSSY;
1297       state = STATE_OPEN;
1298       connect_seq = cseq + 1;
1299       assert(connect_seq == reply.connect_seq);
1300       backoff = utime_t();
1301       connection_state->set_features((uint64_t)reply.features & (uint64_t)connect.features);
1302       ldout(msgr->cct,10) << "connect success " << connect_seq << ", lossy = " << policy.lossy
1303                << ", features " << connection_state->get_features() << dendl;
1304       
1305
1306       // If we have an authorizer, get a new AuthSessionHandler to deal with ongoing security of the
1307       // connection.  PLR
1308
1309       if (authorizer != NULL) {
1310         session_security.reset(
1311             get_auth_session_handler(msgr->cct,
1312                                      authorizer->protocol,
1313                                      authorizer->session_key,
1314                                      connection_state->get_features()));
1315       }  else {
1316         // We have no authorizer, so we shouldn't be applying security to messages in this pipe.  PLR
1317         session_security.reset();
1318       }
1319
1320       msgr->dispatch_queue.queue_connect(connection_state.get());
1321       msgr->ms_deliver_handle_fast_connect(connection_state.get());
1322       
1323       if (!reader_running) {
1324         ldout(msgr->cct,20) << "connect starting reader" << dendl;
1325         start_reader();
1326       }
1327       maybe_start_delay_thread();
1328       delete authorizer;
1329       return 0;
1330     }
1331     
1332     // protocol error
1333     ldout(msgr->cct,0) << "connect got bad tag " << (int)tag << dendl;
1334     goto fail_locked;
1335   }
1336
1337  fail:
1338   if (conf->ms_inject_internal_delays) {
1339     ldout(msgr->cct, 10) << " sleep for " << msgr->cct->_conf->ms_inject_internal_delays << dendl;
1340     utime_t t;
1341     t.set_from_double(msgr->cct->_conf->ms_inject_internal_delays);
1342     t.sleep();
1343   }
1344
1345   pipe_lock.Lock();
1346  fail_locked:
1347   if (state == STATE_CONNECTING)
1348     fault();
1349   else
1350     ldout(msgr->cct,3) << "connect fault, but state = " << get_state_name()
1351                        << " != connecting, stopping" << dendl;
1352
1353  stop_locked:
1354   delete authorizer;
1355   return rc;
1356 }
1357
1358 void Pipe::register_pipe()
1359 {
1360   ldout(msgr->cct,10) << "register_pipe" << dendl;
1361   assert(msgr->lock.is_locked());
1362   Pipe *existing = msgr->_lookup_pipe(peer_addr);
1363   assert(existing == NULL);
1364   msgr->rank_pipe[peer_addr] = this;
1365 }
1366
1367 void Pipe::unregister_pipe()
1368 {
1369   assert(msgr->lock.is_locked());
1370   ceph::unordered_map<entity_addr_t,Pipe*>::iterator p = msgr->rank_pipe.find(peer_addr);
1371   if (p != msgr->rank_pipe.end() && p->second == this) {
1372     ldout(msgr->cct,10) << "unregister_pipe" << dendl;
1373     msgr->rank_pipe.erase(p);
1374   } else {
1375     ldout(msgr->cct,10) << "unregister_pipe - not registered" << dendl;
1376     msgr->accepting_pipes.erase(this);  // somewhat overkill, but safe.
1377   }
1378 }
1379
1380 void Pipe::join()
1381 {
1382   ldout(msgr->cct, 20) << "join" << dendl;
1383   if (writer_thread.is_started())
1384     writer_thread.join();
1385   if (reader_thread.is_started())
1386     reader_thread.join();
1387   if (delay_thread) {
1388     ldout(msgr->cct, 20) << "joining delay_thread" << dendl;
1389     delay_thread->stop();
1390     delay_thread->join();
1391   }
1392 }
1393
1394 void Pipe::requeue_sent()
1395 {
1396   if (sent.empty())
1397     return;
1398
1399   list<Message*>& rq = out_q[CEPH_MSG_PRIO_HIGHEST];
1400   while (!sent.empty()) {
1401     Message *m = sent.back();
1402     sent.pop_back();
1403     ldout(msgr->cct,10) << "requeue_sent " << *m << " for resend seq " << out_seq
1404                         << " (" << m->get_seq() << ")" << dendl;
1405     rq.push_front(m);
1406     out_seq--;
1407   }
1408 }
1409
1410 void Pipe::discard_requeued_up_to(uint64_t seq)
1411 {
1412   ldout(msgr->cct, 10) << "discard_requeued_up_to " << seq << dendl;
1413   if (out_q.count(CEPH_MSG_PRIO_HIGHEST) == 0)
1414     return;
1415   list<Message*>& rq = out_q[CEPH_MSG_PRIO_HIGHEST];
1416   while (!rq.empty()) {
1417     Message *m = rq.front();
1418     if (m->get_seq() == 0 || m->get_seq() > seq)
1419       break;
1420     ldout(msgr->cct,10) << "discard_requeued_up_to " << *m << " for resend seq " << out_seq
1421                         << " <= " << seq << ", discarding" << dendl;
1422     m->put();
1423     rq.pop_front();
1424     out_seq++;
1425   }
1426   if (rq.empty())
1427     out_q.erase(CEPH_MSG_PRIO_HIGHEST);
1428 }
1429
1430 /*
1431  * Tears down the Pipe's message queues, and removes them from the DispatchQueue
1432  * Must hold pipe_lock prior to calling.
1433  */
1434 void Pipe::discard_out_queue()
1435 {
1436   ldout(msgr->cct,10) << "discard_queue" << dendl;
1437
1438   for (list<Message*>::iterator p = sent.begin(); p != sent.end(); ++p) {
1439     ldout(msgr->cct,20) << "  discard " << *p << dendl;
1440     (*p)->put();
1441   }
1442   sent.clear();
1443   for (map<int,list<Message*> >::iterator p = out_q.begin(); p != out_q.end(); ++p)
1444     for (list<Message*>::iterator r = p->second.begin(); r != p->second.end(); ++r) {
1445       ldout(msgr->cct,20) << "  discard " << *r << dendl;
1446       (*r)->put();
1447     }
1448   out_q.clear();
1449 }
1450
1451 void Pipe::fault(bool onread)
1452 {
1453   const md_config_t *conf = msgr->cct->_conf;
1454   assert(pipe_lock.is_locked());
1455   cond.Signal();
1456
1457   if (onread && state == STATE_CONNECTING) {
1458     ldout(msgr->cct,10) << "fault already connecting, reader shutting down" << dendl;
1459     return;
1460   }
1461   
1462   ldout(msgr->cct,2) << "fault " << cpp_strerror(errno) << dendl;
1463
1464   if (state == STATE_CLOSED ||
1465       state == STATE_CLOSING) {
1466     ldout(msgr->cct,10) << "fault already closed|closing" << dendl;
1467     if (connection_state->clear_pipe(this))
1468       msgr->dispatch_queue.queue_reset(connection_state.get());
1469     return;
1470   }
1471
1472   shutdown_socket();
1473
1474   // lossy channel?
1475   if (policy.lossy && state != STATE_CONNECTING) {
1476     ldout(msgr->cct,10) << "fault on lossy channel, failing" << dendl;
1477
1478     // disconnect from Connection, and mark it failed.  future messages
1479     // will be dropped.
1480     assert(connection_state);
1481     stop();
1482     bool cleared = connection_state->clear_pipe(this);
1483
1484     // crib locks, blech.  note that Pipe is now STATE_CLOSED and the
1485     // rank_pipe entry is ignored by others.
1486     pipe_lock.Unlock();
1487
1488     if (conf->ms_inject_internal_delays) {
1489       ldout(msgr->cct, 10) << " sleep for " << msgr->cct->_conf->ms_inject_internal_delays << dendl;
1490       utime_t t;
1491       t.set_from_double(msgr->cct->_conf->ms_inject_internal_delays);
1492       t.sleep();
1493     }
1494
1495     msgr->lock.Lock();
1496     pipe_lock.Lock();
1497     unregister_pipe();
1498     msgr->lock.Unlock();
1499
1500     if (delay_thread)
1501       delay_thread->discard();
1502     in_q->discard_queue(conn_id);
1503     discard_out_queue();
1504     if (cleared)
1505       msgr->dispatch_queue.queue_reset(connection_state.get());
1506     return;
1507   }
1508
1509   // queue delayed items immediately
1510   if (delay_thread)
1511     delay_thread->flush();
1512
1513   // requeue sent items
1514   requeue_sent();
1515
1516   if (policy.standby && !is_queued()) {
1517     ldout(msgr->cct,0) << "fault with nothing to send, going to standby" << dendl;
1518     state = STATE_STANDBY;
1519     return;
1520   }
1521
1522   if (state != STATE_CONNECTING) {
1523     if (policy.server) {
1524       ldout(msgr->cct,0) << "fault, server, going to standby" << dendl;
1525       state = STATE_STANDBY;
1526     } else {
1527       ldout(msgr->cct,0) << "fault, initiating reconnect" << dendl;
1528       connect_seq++;
1529       state = STATE_CONNECTING;
1530     }
1531     backoff = utime_t();
1532   } else if (backoff == utime_t()) {
1533     ldout(msgr->cct,0) << "fault" << dendl;
1534     backoff.set_from_double(conf->ms_initial_backoff);
1535   } else {
1536     ldout(msgr->cct,10) << "fault waiting " << backoff << dendl;
1537     cond.WaitInterval(pipe_lock, backoff);
1538     backoff += backoff;
1539     if (backoff > conf->ms_max_backoff)
1540       backoff.set_from_double(conf->ms_max_backoff);
1541     ldout(msgr->cct,10) << "fault done waiting or woke up" << dendl;
1542   }
1543 }
1544
1545 int Pipe::randomize_out_seq()
1546 {
1547   if (connection_state->get_features() & CEPH_FEATURE_MSG_AUTH) {
1548     // Set out_seq to a random value, so CRC won't be predictable.   Don't bother checking seq_error
1549     // here.  We'll check it on the call.  PLR
1550     int seq_error = get_random_bytes((char *)&out_seq, sizeof(out_seq));
1551     out_seq &= SEQ_MASK;
1552     lsubdout(msgr->cct, ms, 10) << "randomize_out_seq " << out_seq << dendl;
1553     return seq_error;
1554   } else {
1555     // previously, seq #'s always started at 0.
1556     out_seq = 0;
1557     return 0;
1558   }
1559 }
1560
1561 void Pipe::was_session_reset()
1562 {
1563   assert(pipe_lock.is_locked());
1564
1565   ldout(msgr->cct,10) << "was_session_reset" << dendl;
1566   in_q->discard_queue(conn_id);
1567   if (delay_thread)
1568     delay_thread->discard();
1569   discard_out_queue();
1570
1571   msgr->dispatch_queue.queue_remote_reset(connection_state.get());
1572
1573   if (randomize_out_seq()) {
1574     lsubdout(msgr->cct,ms,15) << "was_session_reset(): Could not get random bytes to set seq number for session reset; set seq number to " << out_seq << dendl;
1575   }
1576
1577   in_seq = 0;
1578   connect_seq = 0;
1579 }
1580
1581 void Pipe::stop()
1582 {
1583   ldout(msgr->cct,10) << "stop" << dendl;
1584   assert(pipe_lock.is_locked());
1585   state = STATE_CLOSED;
1586   state_closed = true;
1587   cond.Signal();
1588   shutdown_socket();
1589 }
1590
1591 void Pipe::stop_and_wait()
1592 {
1593   assert(pipe_lock.is_locked_by_me());
1594   if (state != STATE_CLOSED)
1595     stop();
1596
1597   if (msgr->cct->_conf->ms_inject_internal_delays) {
1598     ldout(msgr->cct, 10) << __func__ << " sleep for "
1599                          << msgr->cct->_conf->ms_inject_internal_delays
1600                          << dendl;
1601     utime_t t;
1602     t.set_from_double(msgr->cct->_conf->ms_inject_internal_delays);
1603     t.sleep();
1604   }
1605   
1606   if (delay_thread) {
1607     pipe_lock.Unlock();
1608     delay_thread->stop_fast_dispatching();
1609     pipe_lock.Lock();
1610   }
1611   while (reader_running &&
1612          reader_dispatching)
1613     cond.Wait(pipe_lock);
1614 }
1615
1616 /* read msgs from socket.
1617  * also, server.
1618  */
1619 void Pipe::reader()
1620 {
1621   pipe_lock.Lock();
1622
1623   if (state == STATE_ACCEPTING) {
1624     accept();
1625     assert(pipe_lock.is_locked());
1626   }
1627
1628   // loop.
1629   while (state != STATE_CLOSED &&
1630          state != STATE_CONNECTING) {
1631     assert(pipe_lock.is_locked());
1632
1633     // sleep if (re)connecting
1634     if (state == STATE_STANDBY) {
1635       ldout(msgr->cct,20) << "reader sleeping during reconnect|standby" << dendl;
1636       cond.Wait(pipe_lock);
1637       continue;
1638     }
1639
1640     // get a reference to the AuthSessionHandler while we have the pipe_lock
1641     ceph::shared_ptr<AuthSessionHandler> auth_handler = session_security;
1642
1643     pipe_lock.Unlock();
1644
1645     char tag = -1;
1646     ldout(msgr->cct,20) << "reader reading tag..." << dendl;
1647     if (tcp_read((char*)&tag, 1) < 0) {
1648       pipe_lock.Lock();
1649       ldout(msgr->cct,2) << "reader couldn't read tag, " << cpp_strerror(errno) << dendl;
1650       fault(true);
1651       continue;
1652     }
1653
1654     if (tag == CEPH_MSGR_TAG_KEEPALIVE) {
1655       ldout(msgr->cct,2) << "reader got KEEPALIVE" << dendl;
1656       pipe_lock.Lock();
1657       connection_state->set_last_keepalive(ceph_clock_now());
1658       continue;
1659     }
1660     if (tag == CEPH_MSGR_TAG_KEEPALIVE2) {
1661       ldout(msgr->cct,30) << "reader got KEEPALIVE2 tag ..." << dendl;
1662       ceph_timespec t;
1663       int rc = tcp_read((char*)&t, sizeof(t));
1664       pipe_lock.Lock();
1665       if (rc < 0) {
1666         ldout(msgr->cct,2) << "reader couldn't read KEEPALIVE2 stamp "
1667                            << cpp_strerror(errno) << dendl;
1668         fault(true);
1669       } else {
1670         send_keepalive_ack = true;
1671         keepalive_ack_stamp = utime_t(t);
1672         ldout(msgr->cct,2) << "reader got KEEPALIVE2 " << keepalive_ack_stamp
1673                            << dendl;
1674         connection_state->set_last_keepalive(ceph_clock_now());
1675         cond.Signal();
1676       }
1677       continue;
1678     }
1679     if (tag == CEPH_MSGR_TAG_KEEPALIVE2_ACK) {
1680       ldout(msgr->cct,2) << "reader got KEEPALIVE_ACK" << dendl;
1681       struct ceph_timespec t;
1682       int rc = tcp_read((char*)&t, sizeof(t));
1683       pipe_lock.Lock();
1684       if (rc < 0) {
1685         ldout(msgr->cct,2) << "reader couldn't read KEEPALIVE2 stamp " << cpp_strerror(errno) << dendl;
1686         fault(true);
1687       } else {
1688         connection_state->set_last_keepalive_ack(utime_t(t));
1689       }
1690       continue;
1691     }
1692
1693     // open ...
1694     if (tag == CEPH_MSGR_TAG_ACK) {
1695       ldout(msgr->cct,20) << "reader got ACK" << dendl;
1696       ceph_le64 seq;
1697       int rc = tcp_read((char*)&seq, sizeof(seq));
1698       pipe_lock.Lock();
1699       if (rc < 0) {
1700         ldout(msgr->cct,2) << "reader couldn't read ack seq, " << cpp_strerror(errno) << dendl;
1701         fault(true);
1702       } else if (state != STATE_CLOSED) {
1703         handle_ack(seq);
1704       }
1705       continue;
1706     }
1707
1708     else if (tag == CEPH_MSGR_TAG_MSG) {
1709       ldout(msgr->cct,20) << "reader got MSG" << dendl;
1710       Message *m = 0;
1711       int r = read_message(&m, auth_handler.get());
1712
1713       pipe_lock.Lock();
1714       
1715       if (!m) {
1716         if (r < 0)
1717           fault(true);
1718         continue;
1719       }
1720
1721       m->trace.event("pipe read message");
1722
1723       if (state == STATE_CLOSED ||
1724           state == STATE_CONNECTING) {
1725         in_q->dispatch_throttle_release(m->get_dispatch_throttle_size());
1726         m->put();
1727         continue;
1728       }
1729
1730       // check received seq#.  if it is old, drop the message.  
1731       // note that incoming messages may skip ahead.  this is convenient for the client
1732       // side queueing because messages can't be renumbered, but the (kernel) client will
1733       // occasionally pull a message out of the sent queue to send elsewhere.  in that case
1734       // it doesn't matter if we "got" it or not.
1735       if (m->get_seq() <= in_seq) {
1736         ldout(msgr->cct,0) << "reader got old message "
1737                 << m->get_seq() << " <= " << in_seq << " " << m << " " << *m
1738                 << ", discarding" << dendl;
1739         in_q->dispatch_throttle_release(m->get_dispatch_throttle_size());
1740         m->put();
1741         if (connection_state->has_feature(CEPH_FEATURE_RECONNECT_SEQ) &&
1742             msgr->cct->_conf->ms_die_on_old_message)
1743           assert(0 == "old msgs despite reconnect_seq feature");
1744         continue;
1745       }
1746       if (m->get_seq() > in_seq + 1) {
1747         ldout(msgr->cct,0) << "reader missed message?  skipped from seq "
1748                            << in_seq << " to " << m->get_seq() << dendl;
1749         if (msgr->cct->_conf->ms_die_on_skipped_message)
1750           assert(0 == "skipped incoming seq");
1751       }
1752
1753       m->set_connection(connection_state.get());
1754
1755       // note last received message.
1756       in_seq = m->get_seq();
1757
1758       cond.Signal();  // wake up writer, to ack this
1759       
1760       ldout(msgr->cct,10) << "reader got message "
1761                << m->get_seq() << " " << m << " " << *m
1762                << dendl;
1763       in_q->fast_preprocess(m);
1764
1765       if (delay_thread) {
1766         utime_t release;
1767         if (rand() % 10000 < msgr->cct->_conf->ms_inject_delay_probability * 10000.0) {
1768           release = m->get_recv_stamp();
1769           release += msgr->cct->_conf->ms_inject_delay_max * (double)(rand() % 10000) / 10000.0;
1770           lsubdout(msgr->cct, ms, 1) << "queue_received will delay until " << release << " on " << m << " " << *m << dendl;
1771         }
1772         delay_thread->queue(release, m);
1773       } else {
1774         if (in_q->can_fast_dispatch(m)) {
1775           reader_dispatching = true;
1776           pipe_lock.Unlock();
1777           in_q->fast_dispatch(m);
1778           pipe_lock.Lock();
1779           reader_dispatching = false;
1780           if (state == STATE_CLOSED ||
1781               notify_on_dispatch_done) { // there might be somebody waiting
1782             notify_on_dispatch_done = false;
1783             cond.Signal();
1784           }
1785         } else {
1786           in_q->enqueue(m, m->get_priority(), conn_id);
1787         }
1788       }
1789     }
1790     
1791     else if (tag == CEPH_MSGR_TAG_CLOSE) {
1792       ldout(msgr->cct,20) << "reader got CLOSE" << dendl;
1793       pipe_lock.Lock();
1794       if (state == STATE_CLOSING) {
1795         state = STATE_CLOSED;
1796         state_closed = true;
1797       } else {
1798         state = STATE_CLOSING;
1799       }
1800       cond.Signal();
1801       break;
1802     }
1803     else {
1804       ldout(msgr->cct,0) << "reader bad tag " << (int)tag << dendl;
1805       pipe_lock.Lock();
1806       fault(true);
1807     }
1808   }
1809
1810  
1811   // reap?
1812   reader_running = false;
1813   reader_needs_join = true;
1814   unlock_maybe_reap();
1815   ldout(msgr->cct,10) << "reader done" << dendl;
1816 }
1817
1818 /* write msgs to socket.
1819  * also, client.
1820  */
1821 void Pipe::writer()
1822 {
1823   pipe_lock.Lock();
1824   while (state != STATE_CLOSED) {// && state != STATE_WAIT) {
1825     ldout(msgr->cct,10) << "writer: state = " << get_state_name()
1826                         << " policy.server=" << policy.server << dendl;
1827
1828     // standby?
1829     if (is_queued() && state == STATE_STANDBY && !policy.server)
1830       state = STATE_CONNECTING;
1831
1832     // connect?
1833     if (state == STATE_CONNECTING) {
1834       assert(!policy.server);
1835       connect();
1836       continue;
1837     }
1838     
1839     if (state == STATE_CLOSING) {
1840       // write close tag
1841       ldout(msgr->cct,20) << "writer writing CLOSE tag" << dendl;
1842       char tag = CEPH_MSGR_TAG_CLOSE;
1843       state = STATE_CLOSED;
1844       state_closed = true;
1845       pipe_lock.Unlock();
1846       if (sd >= 0) {
1847         // we can ignore return value, actually; we don't care if this succeeds.
1848         int r = ::write(sd, &tag, 1);
1849         (void)r;
1850       }
1851       pipe_lock.Lock();
1852       continue;
1853     }
1854
1855     if (state != STATE_CONNECTING && state != STATE_WAIT && state != STATE_STANDBY &&
1856         (is_queued() || in_seq > in_seq_acked)) {
1857
1858       // keepalive?
1859       if (send_keepalive) {
1860         int rc;
1861         if (connection_state->has_feature(CEPH_FEATURE_MSGR_KEEPALIVE2)) {
1862           pipe_lock.Unlock();
1863           rc = write_keepalive2(CEPH_MSGR_TAG_KEEPALIVE2,
1864                                 ceph_clock_now());
1865         } else {
1866           pipe_lock.Unlock();
1867           rc = write_keepalive();
1868         }
1869         pipe_lock.Lock();
1870         if (rc < 0) {
1871           ldout(msgr->cct,2) << "writer couldn't write keepalive[2], "
1872                              << cpp_strerror(errno) << dendl;
1873           fault();
1874           continue;
1875         }
1876         send_keepalive = false;
1877       }
1878       if (send_keepalive_ack) {
1879         utime_t t = keepalive_ack_stamp;
1880         pipe_lock.Unlock();
1881         int rc = write_keepalive2(CEPH_MSGR_TAG_KEEPALIVE2_ACK, t);
1882         pipe_lock.Lock();
1883         if (rc < 0) {
1884           ldout(msgr->cct,2) << "writer couldn't write keepalive_ack, " << cpp_strerror(errno) << dendl;
1885           fault();
1886           continue;
1887         }
1888         send_keepalive_ack = false;
1889       }
1890
1891       // send ack?
1892       if (in_seq > in_seq_acked) {
1893         uint64_t send_seq = in_seq;
1894         pipe_lock.Unlock();
1895         int rc = write_ack(send_seq);
1896         pipe_lock.Lock();
1897         if (rc < 0) {
1898           ldout(msgr->cct,2) << "writer couldn't write ack, " << cpp_strerror(errno) << dendl;
1899           fault();
1900           continue;
1901         }
1902         in_seq_acked = send_seq;
1903       }
1904
1905       // grab outgoing message
1906       Message *m = _get_next_outgoing();
1907       if (m) {
1908         m->set_seq(++out_seq);
1909         if (!policy.lossy) {
1910           // put on sent list
1911           sent.push_back(m); 
1912           m->get();
1913         }
1914
1915         // associate message with Connection (for benefit of encode_payload)
1916         m->set_connection(connection_state.get());
1917
1918         uint64_t features = connection_state->get_features();
1919
1920         if (m->empty_payload())
1921           ldout(msgr->cct,20) << "writer encoding " << m->get_seq() << " features " << features
1922                               << " " << m << " " << *m << dendl;
1923         else
1924           ldout(msgr->cct,20) << "writer half-reencoding " << m->get_seq() << " features " << features
1925                               << " " << m << " " << *m << dendl;
1926
1927         // encode and copy out of *m
1928         m->encode(features, msgr->crcflags);
1929
1930         // prepare everything
1931         const ceph_msg_header& header = m->get_header();
1932         const ceph_msg_footer& footer = m->get_footer();
1933
1934         // Now that we have all the crcs calculated, handle the
1935         // digital signature for the message, if the pipe has session
1936         // security set up.  Some session security options do not
1937         // actually calculate and check the signature, but they should
1938         // handle the calls to sign_message and check_signature.  PLR
1939         if (session_security.get() == NULL) {
1940           ldout(msgr->cct, 20) << "writer no session security" << dendl;
1941         } else {
1942           if (session_security->sign_message(m)) {
1943             ldout(msgr->cct, 20) << "writer failed to sign seq # " << header.seq
1944                                  << "): sig = " << footer.sig << dendl;
1945           } else {
1946             ldout(msgr->cct, 20) << "writer signed seq # " << header.seq
1947                                  << "): sig = " << footer.sig << dendl;
1948           }
1949         }
1950
1951         bufferlist blist = m->get_payload();
1952         blist.append(m->get_middle());
1953         blist.append(m->get_data());
1954
1955         pipe_lock.Unlock();
1956
1957         m->trace.event("pipe writing message");
1958
1959         ldout(msgr->cct,20) << "writer sending " << m->get_seq() << " " << m << dendl;
1960         int rc = write_message(header, footer, blist);
1961
1962         pipe_lock.Lock();
1963         if (rc < 0) {
1964           ldout(msgr->cct,1) << "writer error sending " << m << ", "
1965                   << cpp_strerror(errno) << dendl;
1966           fault();
1967         }
1968         m->put();
1969       }
1970       continue;
1971     }
1972     
1973     // wait
1974     ldout(msgr->cct,20) << "writer sleeping" << dendl;
1975     cond.Wait(pipe_lock);
1976   }
1977   
1978   ldout(msgr->cct,20) << "writer finishing" << dendl;
1979
1980   // reap?
1981   writer_running = false;
1982   unlock_maybe_reap();
1983   ldout(msgr->cct,10) << "writer done" << dendl;
1984 }
1985
1986 void Pipe::unlock_maybe_reap()
1987 {
1988   if (!reader_running && !writer_running) {
1989     shutdown_socket();
1990     pipe_lock.Unlock();
1991     if (delay_thread && delay_thread->is_flushing()) {
1992       delay_thread->wait_for_flush();
1993     }
1994     msgr->queue_reap(this);
1995   } else {
1996     pipe_lock.Unlock();
1997   }
1998 }
1999
2000 static void alloc_aligned_buffer(bufferlist& data, unsigned len, unsigned off)
2001 {
2002   // create a buffer to read into that matches the data alignment
2003   unsigned left = len;
2004   if (off & ~CEPH_PAGE_MASK) {
2005     // head
2006     unsigned head = 0;
2007     head = MIN(CEPH_PAGE_SIZE - (off & ~CEPH_PAGE_MASK), left);
2008     data.push_back(buffer::create(head));
2009     left -= head;
2010   }
2011   unsigned middle = left & CEPH_PAGE_MASK;
2012   if (middle > 0) {
2013     data.push_back(buffer::create_page_aligned(middle));
2014     left -= middle;
2015   }
2016   if (left) {
2017     data.push_back(buffer::create(left));
2018   }
2019 }
2020
2021 int Pipe::read_message(Message **pm, AuthSessionHandler* auth_handler)
2022 {
2023   int ret = -1;
2024   // envelope
2025   //ldout(msgr->cct,10) << "receiver.read_message from sd " << sd  << dendl;
2026   
2027   ceph_msg_header header; 
2028   ceph_msg_footer footer;
2029   __u32 header_crc = 0;
2030
2031   if (connection_state->has_feature(CEPH_FEATURE_NOSRCADDR)) {
2032     if (tcp_read((char*)&header, sizeof(header)) < 0)
2033       return -1;
2034     if (msgr->crcflags & MSG_CRC_HEADER) {
2035       header_crc = ceph_crc32c(0, (unsigned char *)&header, sizeof(header) - sizeof(header.crc));
2036     }
2037   } else {
2038     ceph_msg_header_old oldheader;
2039     if (tcp_read((char*)&oldheader, sizeof(oldheader)) < 0)
2040       return -1;
2041     // this is fugly
2042     memcpy(&header, &oldheader, sizeof(header));
2043     header.src = oldheader.src.name;
2044     header.reserved = oldheader.reserved;
2045     if (msgr->crcflags & MSG_CRC_HEADER) {
2046       header.crc = oldheader.crc;
2047       header_crc = ceph_crc32c(0, (unsigned char *)&oldheader, sizeof(oldheader) - sizeof(oldheader.crc));
2048     }
2049   }
2050
2051   ldout(msgr->cct,20) << "reader got envelope type=" << header.type
2052            << " src " << entity_name_t(header.src)
2053            << " front=" << header.front_len
2054            << " data=" << header.data_len
2055            << " off " << header.data_off
2056            << dendl;
2057
2058   // verify header crc
2059   if ((msgr->crcflags & MSG_CRC_HEADER) && header_crc != header.crc) {
2060     ldout(msgr->cct,0) << "reader got bad header crc " << header_crc << " != " << header.crc << dendl;
2061     return -1;
2062   }
2063
2064   bufferlist front, middle, data;
2065   int front_len, middle_len;
2066   unsigned data_len, data_off;
2067   int aborted;
2068   Message *message;
2069   utime_t recv_stamp = ceph_clock_now();
2070
2071   if (policy.throttler_messages) {
2072     ldout(msgr->cct,10) << "reader wants " << 1 << " message from policy throttler "
2073                         << policy.throttler_messages->get_current() << "/"
2074                         << policy.throttler_messages->get_max() << dendl;
2075     policy.throttler_messages->get();
2076   }
2077
2078   uint64_t message_size = header.front_len + header.middle_len + header.data_len;
2079   if (message_size) {
2080     if (policy.throttler_bytes) {
2081       ldout(msgr->cct,10) << "reader wants " << message_size << " bytes from policy throttler "
2082                << policy.throttler_bytes->get_current() << "/"
2083                << policy.throttler_bytes->get_max() << dendl;
2084       policy.throttler_bytes->get(message_size);
2085     }
2086
2087     // throttle total bytes waiting for dispatch.  do this _after_ the
2088     // policy throttle, as this one does not deadlock (unless dispatch
2089     // blocks indefinitely, which it shouldn't).  in contrast, the
2090     // policy throttle carries for the lifetime of the message.
2091     ldout(msgr->cct,10) << "reader wants " << message_size << " from dispatch throttler "
2092              << in_q->dispatch_throttler.get_current() << "/"
2093              << in_q->dispatch_throttler.get_max() << dendl;
2094     in_q->dispatch_throttler.get(message_size);
2095   }
2096
2097   utime_t throttle_stamp = ceph_clock_now();
2098
2099   // read front
2100   front_len = header.front_len;
2101   if (front_len) {
2102     bufferptr bp = buffer::create(front_len);
2103     if (tcp_read(bp.c_str(), front_len) < 0)
2104       goto out_dethrottle;
2105     front.push_back(std::move(bp));
2106     ldout(msgr->cct,20) << "reader got front " << front.length() << dendl;
2107   }
2108
2109   // read middle
2110   middle_len = header.middle_len;
2111   if (middle_len) {
2112     bufferptr bp = buffer::create(middle_len);
2113     if (tcp_read(bp.c_str(), middle_len) < 0)
2114       goto out_dethrottle;
2115     middle.push_back(std::move(bp));
2116     ldout(msgr->cct,20) << "reader got middle " << middle.length() << dendl;
2117   }
2118
2119
2120   // read data
2121   data_len = le32_to_cpu(header.data_len);
2122   data_off = le32_to_cpu(header.data_off);
2123   if (data_len) {
2124     unsigned offset = 0;
2125     unsigned left = data_len;
2126
2127     bufferlist newbuf, rxbuf;
2128     bufferlist::iterator blp;
2129     int rxbuf_version = 0;
2130         
2131     while (left > 0) {
2132       // wait for data
2133       if (tcp_read_wait() < 0)
2134         goto out_dethrottle;
2135
2136       // get a buffer
2137       connection_state->lock.Lock();
2138       map<ceph_tid_t,pair<bufferlist,int> >::iterator p = connection_state->rx_buffers.find(header.tid);
2139       if (p != connection_state->rx_buffers.end()) {
2140         if (rxbuf.length() == 0 || p->second.second != rxbuf_version) {
2141           ldout(msgr->cct,10) << "reader seleting rx buffer v " << p->second.second
2142                    << " at offset " << offset
2143                    << " len " << p->second.first.length() << dendl;
2144           rxbuf = p->second.first;
2145           rxbuf_version = p->second.second;
2146           // make sure it's big enough
2147           if (rxbuf.length() < data_len)
2148             rxbuf.push_back(buffer::create(data_len - rxbuf.length()));
2149           blp = p->second.first.begin();
2150           blp.advance(offset);
2151         }
2152       } else {
2153         if (!newbuf.length()) {
2154           ldout(msgr->cct,20) << "reader allocating new rx buffer at offset " << offset << dendl;
2155           alloc_aligned_buffer(newbuf, data_len, data_off);
2156           blp = newbuf.begin();
2157           blp.advance(offset);
2158         }
2159       }
2160       bufferptr bp = blp.get_current_ptr();
2161       int read = MIN(bp.length(), left);
2162       ldout(msgr->cct,20) << "reader reading nonblocking into " << (void*)bp.c_str() << " len " << bp.length() << dendl;
2163       ssize_t got = tcp_read_nonblocking(bp.c_str(), read);
2164       ldout(msgr->cct,30) << "reader read " << got << " of " << read << dendl;
2165       connection_state->lock.Unlock();
2166       if (got < 0)
2167         goto out_dethrottle;
2168       if (got > 0) {
2169         blp.advance(got);
2170         data.append(bp, 0, got);
2171         offset += got;
2172         left -= got;
2173       } // else we got a signal or something; just loop.
2174     }
2175   }
2176
2177   // footer
2178   if (connection_state->has_feature(CEPH_FEATURE_MSG_AUTH)) {
2179     if (tcp_read((char*)&footer, sizeof(footer)) < 0)
2180       goto out_dethrottle;
2181   } else {
2182     ceph_msg_footer_old old_footer;
2183     if (tcp_read((char*)&old_footer, sizeof(old_footer)) < 0)
2184       goto out_dethrottle;
2185     footer.front_crc = old_footer.front_crc;
2186     footer.middle_crc = old_footer.middle_crc;
2187     footer.data_crc = old_footer.data_crc;
2188     footer.sig = 0;
2189     footer.flags = old_footer.flags;
2190   }
2191   
2192   aborted = (footer.flags & CEPH_MSG_FOOTER_COMPLETE) == 0;
2193   ldout(msgr->cct,10) << "aborted = " << aborted << dendl;
2194   if (aborted) {
2195     ldout(msgr->cct,0) << "reader got " << front.length() << " + " << middle.length() << " + " << data.length()
2196             << " byte message.. ABORTED" << dendl;
2197     ret = 0;
2198     goto out_dethrottle;
2199   }
2200
2201   ldout(msgr->cct,20) << "reader got " << front.length() << " + " << middle.length() << " + " << data.length()
2202            << " byte message" << dendl;
2203   message = decode_message(msgr->cct, msgr->crcflags, header, footer,
2204                            front, middle, data, connection_state.get());
2205   if (!message) {
2206     ret = -EINVAL;
2207     goto out_dethrottle;
2208   }
2209
2210   //
2211   //  Check the signature if one should be present.  A zero return indicates success. PLR
2212   //
2213
2214   if (auth_handler == NULL) {
2215     ldout(msgr->cct, 10) << "No session security set" << dendl;
2216   } else {
2217     if (auth_handler->check_message_signature(message)) {
2218       ldout(msgr->cct, 0) << "Signature check failed" << dendl;
2219       message->put();
2220       ret = -EINVAL;
2221       goto out_dethrottle;
2222     } 
2223   }
2224
2225   message->set_byte_throttler(policy.throttler_bytes);
2226   message->set_message_throttler(policy.throttler_messages);
2227
2228   // store reservation size in message, so we don't get confused
2229   // by messages entering the dispatch queue through other paths.
2230   message->set_dispatch_throttle_size(message_size);
2231
2232   message->set_recv_stamp(recv_stamp);
2233   message->set_throttle_stamp(throttle_stamp);
2234   message->set_recv_complete_stamp(ceph_clock_now());
2235
2236   *pm = message;
2237   return 0;
2238
2239  out_dethrottle:
2240   // release bytes reserved from the throttlers on failure
2241   if (policy.throttler_messages) {
2242     ldout(msgr->cct,10) << "reader releasing " << 1 << " message to policy throttler "
2243                         << policy.throttler_messages->get_current() << "/"
2244                         << policy.throttler_messages->get_max() << dendl;
2245     policy.throttler_messages->put();
2246   }
2247   if (message_size) {
2248     if (policy.throttler_bytes) {
2249       ldout(msgr->cct,10) << "reader releasing " << message_size << " bytes to policy throttler "
2250                           << policy.throttler_bytes->get_current() << "/"
2251                           << policy.throttler_bytes->get_max() << dendl;
2252       policy.throttler_bytes->put(message_size);
2253     }
2254
2255     in_q->dispatch_throttle_release(message_size);
2256   }
2257   return ret;
2258 }
2259
2260 int Pipe::do_sendmsg(struct msghdr *msg, unsigned len, bool more)
2261 {
2262   MSGR_SIGPIPE_STOPPER;
2263   while (len > 0) {
2264     int r;
2265     r = ::sendmsg(sd, msg, MSG_NOSIGNAL | (more ? MSG_MORE : 0));
2266     if (r == 0) 
2267       ldout(msgr->cct,10) << "do_sendmsg hmm do_sendmsg got r==0!" << dendl;
2268     if (r < 0) {
2269       r = -errno; 
2270       ldout(msgr->cct,1) << "do_sendmsg error " << cpp_strerror(r) << dendl;
2271       return r;
2272     }
2273     if (state == STATE_CLOSED) {
2274       ldout(msgr->cct,10) << "do_sendmsg oh look, state == CLOSED, giving up" << dendl;
2275       return -EINTR; // close enough
2276     }
2277
2278     len -= r;
2279     if (len == 0) break;
2280     
2281     // hrmph.  trim r bytes off the front of our message.
2282     ldout(msgr->cct,20) << "do_sendmsg short write did " << r << ", still have " << len << dendl;
2283     while (r > 0) {
2284       if (msg->msg_iov[0].iov_len <= (size_t)r) {
2285         // lose this whole item
2286         //ldout(msgr->cct,30) << "skipping " << msg->msg_iov[0].iov_len << ", " << (msg->msg_iovlen-1) << " v, " << r << " left" << dendl;
2287         r -= msg->msg_iov[0].iov_len;
2288         msg->msg_iov++;
2289         msg->msg_iovlen--;
2290       } else {
2291         // partial!
2292         //ldout(msgr->cct,30) << "adjusting " << msg->msg_iov[0].iov_len << ", " << msg->msg_iovlen << " v, " << r << " left" << dendl;
2293         msg->msg_iov[0].iov_base = (char *)msg->msg_iov[0].iov_base + r;
2294         msg->msg_iov[0].iov_len -= r;
2295         break;
2296       }
2297     }
2298   }
2299   return 0;
2300 }
2301
2302
2303 int Pipe::write_ack(uint64_t seq)
2304 {
2305   ldout(msgr->cct,10) << "write_ack " << seq << dendl;
2306
2307   char c = CEPH_MSGR_TAG_ACK;
2308   ceph_le64 s;
2309   s = seq;
2310
2311   struct msghdr msg;
2312   memset(&msg, 0, sizeof(msg));
2313   struct iovec msgvec[2];
2314   msgvec[0].iov_base = &c;
2315   msgvec[0].iov_len = 1;
2316   msgvec[1].iov_base = &s;
2317   msgvec[1].iov_len = sizeof(s);
2318   msg.msg_iov = msgvec;
2319   msg.msg_iovlen = 2;
2320   
2321   if (do_sendmsg(&msg, 1 + sizeof(s), true) < 0)
2322     return -1;  
2323   return 0;
2324 }
2325
2326 int Pipe::write_keepalive()
2327 {
2328   ldout(msgr->cct,10) << "write_keepalive" << dendl;
2329
2330   char c = CEPH_MSGR_TAG_KEEPALIVE;
2331
2332   struct msghdr msg;
2333   memset(&msg, 0, sizeof(msg));
2334   struct iovec msgvec[2];
2335   msgvec[0].iov_base = &c;
2336   msgvec[0].iov_len = 1;
2337   msg.msg_iov = msgvec;
2338   msg.msg_iovlen = 1;
2339   
2340   if (do_sendmsg(&msg, 1) < 0)
2341     return -1;  
2342   return 0;
2343 }
2344
2345 int Pipe::write_keepalive2(char tag, const utime_t& t)
2346 {
2347   ldout(msgr->cct,10) << "write_keepalive2 " << (int)tag << " " << t << dendl;
2348   struct ceph_timespec ts;
2349   t.encode_timeval(&ts);
2350   struct msghdr msg;
2351   memset(&msg, 0, sizeof(msg));
2352   struct iovec msgvec[2];
2353   msgvec[0].iov_base = &tag;
2354   msgvec[0].iov_len = 1;
2355   msgvec[1].iov_base = &ts;
2356   msgvec[1].iov_len = sizeof(ts);
2357   msg.msg_iov = msgvec;
2358   msg.msg_iovlen = 2;
2359
2360   if (do_sendmsg(&msg, 1 + sizeof(ts)) < 0)
2361     return -1;
2362   return 0;
2363 }
2364
2365
2366 int Pipe::write_message(const ceph_msg_header& header, const ceph_msg_footer& footer, bufferlist& blist)
2367 {
2368   int ret;
2369
2370   // set up msghdr and iovecs
2371   struct msghdr msg;
2372   memset(&msg, 0, sizeof(msg));
2373   msg.msg_iov = msgvec;
2374   int msglen = 0;
2375   
2376   // send tag
2377   char tag = CEPH_MSGR_TAG_MSG;
2378   msgvec[msg.msg_iovlen].iov_base = &tag;
2379   msgvec[msg.msg_iovlen].iov_len = 1;
2380   msglen++;
2381   msg.msg_iovlen++;
2382
2383   // send envelope
2384   ceph_msg_header_old oldheader;
2385   if (connection_state->has_feature(CEPH_FEATURE_NOSRCADDR)) {
2386     msgvec[msg.msg_iovlen].iov_base = (char*)&header;
2387     msgvec[msg.msg_iovlen].iov_len = sizeof(header);
2388     msglen += sizeof(header);
2389     msg.msg_iovlen++;
2390   } else {
2391     memcpy(&oldheader, &header, sizeof(header));
2392     oldheader.src.name = header.src;
2393     oldheader.src.addr = connection_state->get_peer_addr();
2394     oldheader.orig_src = oldheader.src;
2395     oldheader.reserved = header.reserved;
2396     if (msgr->crcflags & MSG_CRC_HEADER) {
2397         oldheader.crc = ceph_crc32c(0, (unsigned char*)&oldheader,
2398                                     sizeof(oldheader) - sizeof(oldheader.crc));
2399     } else {
2400         oldheader.crc = 0;
2401     }
2402     msgvec[msg.msg_iovlen].iov_base = (char*)&oldheader;
2403     msgvec[msg.msg_iovlen].iov_len = sizeof(oldheader);
2404     msglen += sizeof(oldheader);
2405     msg.msg_iovlen++;
2406   }
2407
2408   // payload (front+data)
2409   list<bufferptr>::const_iterator pb = blist.buffers().begin();
2410   unsigned b_off = 0;  // carry-over buffer offset, if any
2411   unsigned bl_pos = 0; // blist pos
2412   unsigned left = blist.length();
2413
2414   while (left > 0) {
2415     unsigned donow = MIN(left, pb->length()-b_off);
2416     if (donow == 0) {
2417       ldout(msgr->cct,0) << "donow = " << donow << " left " << left << " pb->length " << pb->length()
2418                          << " b_off " << b_off << dendl;
2419     }
2420     assert(donow > 0);
2421     ldout(msgr->cct,30) << " bl_pos " << bl_pos << " b_off " << b_off
2422              << " leftinchunk " << left
2423              << " buffer len " << pb->length()
2424              << " writing " << donow 
2425              << dendl;
2426     
2427     if (msg.msg_iovlen >= SM_IOV_MAX-2) {
2428       if (do_sendmsg(&msg, msglen, true))
2429         goto fail;
2430       
2431       // and restart the iov
2432       msg.msg_iov = msgvec;
2433       msg.msg_iovlen = 0;
2434       msglen = 0;
2435     }
2436     
2437     msgvec[msg.msg_iovlen].iov_base = (void*)(pb->c_str()+b_off);
2438     msgvec[msg.msg_iovlen].iov_len = donow;
2439     msglen += donow;
2440     msg.msg_iovlen++;
2441     
2442     assert(left >= donow);
2443     left -= donow;
2444     b_off += donow;
2445     bl_pos += donow;
2446     if (left == 0)
2447       break;
2448     while (b_off == pb->length()) {
2449       ++pb;
2450       b_off = 0;
2451     }
2452   }
2453   assert(left == 0);
2454
2455   // send footer; if receiver doesn't support signatures, use the old footer format
2456
2457   ceph_msg_footer_old old_footer;
2458   if (connection_state->has_feature(CEPH_FEATURE_MSG_AUTH)) {
2459     msgvec[msg.msg_iovlen].iov_base = (void*)&footer;
2460     msgvec[msg.msg_iovlen].iov_len = sizeof(footer);
2461     msglen += sizeof(footer);
2462     msg.msg_iovlen++;
2463   } else {
2464     if (msgr->crcflags & MSG_CRC_HEADER) {
2465       old_footer.front_crc = footer.front_crc;
2466       old_footer.middle_crc = footer.middle_crc;
2467     } else {
2468         old_footer.front_crc = old_footer.middle_crc = 0;
2469     }
2470     old_footer.data_crc = msgr->crcflags & MSG_CRC_DATA ? footer.data_crc : 0;
2471     old_footer.flags = footer.flags;   
2472     msgvec[msg.msg_iovlen].iov_base = (char*)&old_footer;
2473     msgvec[msg.msg_iovlen].iov_len = sizeof(old_footer);
2474     msglen += sizeof(old_footer);
2475     msg.msg_iovlen++;
2476   }
2477
2478   // send
2479   if (do_sendmsg(&msg, msglen))
2480     goto fail;
2481
2482   ret = 0;
2483
2484  out:
2485   return ret;
2486
2487  fail:
2488   ret = -1;
2489   goto out;
2490 }
2491
2492
2493 int Pipe::tcp_read(char *buf, unsigned len)
2494 {
2495   if (sd < 0)
2496     return -EINVAL;
2497
2498   while (len > 0) {
2499
2500     if (msgr->cct->_conf->ms_inject_socket_failures && sd >= 0) {
2501       if (rand() % msgr->cct->_conf->ms_inject_socket_failures == 0) {
2502         ldout(msgr->cct, 0) << "injecting socket failure" << dendl;
2503         ::shutdown(sd, SHUT_RDWR);
2504       }
2505     }
2506
2507     if (tcp_read_wait() < 0)
2508       return -1;
2509
2510     ssize_t got = tcp_read_nonblocking(buf, len);
2511
2512     if (got < 0)
2513       return -1;
2514
2515     len -= got;
2516     buf += got;
2517     //lgeneric_dout(cct, DBL) << "tcp_read got " << got << ", " << len << " left" << dendl;
2518   }
2519   return 0;
2520 }
2521
2522 int Pipe::tcp_read_wait()
2523 {
2524   if (sd < 0)
2525     return -EINVAL;
2526   struct pollfd pfd;
2527   short evmask;
2528   pfd.fd = sd;
2529   pfd.events = POLLIN;
2530 #if defined(__linux__)
2531   pfd.events |= POLLRDHUP;
2532 #endif
2533
2534   if (has_pending_data())
2535     return 0;
2536
2537   int r = poll(&pfd, 1, msgr->timeout);
2538   if (r < 0)
2539     return -errno;
2540   if (r == 0)
2541     return -EAGAIN;
2542
2543   evmask = POLLERR | POLLHUP | POLLNVAL;
2544 #if defined(__linux__)
2545   evmask |= POLLRDHUP;
2546 #endif
2547   if (pfd.revents & evmask)
2548     return -1;
2549
2550   if (!(pfd.revents & POLLIN))
2551     return -1;
2552
2553   return 0;
2554 }
2555
2556 ssize_t Pipe::do_recv(char *buf, size_t len, int flags)
2557 {
2558 again:
2559   ssize_t got = ::recv( sd, buf, len, flags );
2560   if (got < 0) {
2561     if (errno == EINTR) {
2562       goto again;
2563     }
2564     ldout(msgr->cct, 10) << __func__ << " socket " << sd << " returned "
2565                      << got << " " << cpp_strerror(errno) << dendl;
2566     return -1;
2567   }
2568   if (got == 0) {
2569     return -1;
2570   }
2571   return got;
2572 }
2573
2574 ssize_t Pipe::buffered_recv(char *buf, size_t len, int flags)
2575 {
2576   size_t left = len;
2577   ssize_t total_recv = 0;
2578   if (recv_len > recv_ofs) {
2579     int to_read = MIN(recv_len - recv_ofs, left);
2580     memcpy(buf, &recv_buf[recv_ofs], to_read);
2581     recv_ofs += to_read;
2582     left -= to_read;
2583     if (left == 0) {
2584       return to_read;
2585     }
2586     buf += to_read;
2587     total_recv += to_read;
2588   }
2589
2590   /* nothing left in the prefetch buffer */
2591
2592   if (left > recv_max_prefetch) {
2593     /* this was a large read, we don't prefetch for these */
2594     ssize_t ret = do_recv(buf, left, flags );
2595     if (ret < 0) {
2596       if (total_recv > 0)
2597         return total_recv;
2598       return ret;
2599     }
2600     total_recv += ret;
2601     return total_recv;
2602   }
2603
2604
2605   ssize_t got = do_recv(recv_buf, recv_max_prefetch, flags);
2606   if (got < 0) {
2607     if (total_recv > 0)
2608       return total_recv;
2609
2610     return got;
2611   }
2612
2613   recv_len = (size_t)got;
2614   got = MIN(left, (size_t)got);
2615   memcpy(buf, recv_buf, got);
2616   recv_ofs = got;
2617   total_recv += got;
2618   return total_recv;
2619 }
2620
2621 ssize_t Pipe::tcp_read_nonblocking(char *buf, unsigned len)
2622 {
2623   ssize_t got = buffered_recv(buf, len, MSG_DONTWAIT );
2624   if (got < 0) {
2625     ldout(msgr->cct, 10) << __func__ << " socket " << sd << " returned "
2626                          << got << " " << cpp_strerror(errno) << dendl;
2627     return -1;
2628   }
2629   if (got == 0) {
2630     /* poll() said there was data, but we didn't read any - peer
2631      * sent a FIN.  Maybe POLLRDHUP signals this, but this is
2632      * standard socket behavior as documented by Stevens.
2633      */
2634     return -1;
2635   }
2636   return got;
2637 }
2638
2639 int Pipe::tcp_write(const char *buf, unsigned len)
2640 {
2641   if (sd < 0)
2642     return -1;
2643   struct pollfd pfd;
2644   pfd.fd = sd;
2645   pfd.events = POLLOUT | POLLHUP | POLLNVAL | POLLERR;
2646 #if defined(__linux__)
2647   pfd.events |= POLLRDHUP;
2648 #endif
2649
2650   if (msgr->cct->_conf->ms_inject_socket_failures && sd >= 0) {
2651     if (rand() % msgr->cct->_conf->ms_inject_socket_failures == 0) {
2652       ldout(msgr->cct, 0) << "injecting socket failure" << dendl;
2653       ::shutdown(sd, SHUT_RDWR);
2654     }
2655   }
2656
2657   if (poll(&pfd, 1, -1) < 0)
2658     return -1;
2659
2660   if (!(pfd.revents & POLLOUT))
2661     return -1;
2662
2663   //lgeneric_dout(cct, DBL) << "tcp_write writing " << len << dendl;
2664   assert(len > 0);
2665   while (len > 0) {
2666     MSGR_SIGPIPE_STOPPER;
2667     int did = ::send( sd, buf, len, MSG_NOSIGNAL );
2668     if (did < 0) {
2669       //lgeneric_dout(cct, 1) << "tcp_write error did = " << did << " " << cpp_strerror(errno) << dendl;
2670       //lgeneric_derr(cct, 1) << "tcp_write error did = " << did << " " << cpp_strerror(errno) << dendl;
2671       return did;
2672     }
2673     len -= did;
2674     buf += did;
2675     //lgeneric_dout(cct, DBL) << "tcp_write did " << did << ", " << len << " left" << dendl;
2676   }
2677   return 0;
2678 }