Fix some bugs when testing opensds ansible
[stor4nfv.git] / src / ceph / src / msg / async / dpdk / TCP.h
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 /*
3  * This file is open source software, licensed to you under the terms
4  * of the Apache License, Version 2.0 (the "License").  See the NOTICE file
5  * distributed with this work for additional information regarding copyright
6  * ownership.  You may not use this file except in compliance with the License.
7  *
8  * You may obtain a copy of the License at
9  *
10  *   http://www.apache.org/licenses/LICENSE-2.0
11  *
12  * Unless required by applicable law or agreed to in writing,
13  * software distributed under the License is distributed on an
14  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15  * KIND, either express or implied.  See the License for the
16  * specific language governing permissions and limitations
17  * under the License.
18  */
19 /*
20  * Copyright (C) 2014 Cloudius Systems, Ltd.
21  */
22
23 #ifndef CEPH_DPDK_TCP_H_
24 #define CEPH_DPDK_TCP_H_
25
26 #include <unordered_map>
27 #include <map>
28 #include <queue>
29 #include <functional>
30 #include <deque>
31 #include <chrono>
32 #include <random>
33 #include <stdexcept>
34 #include <system_error>
35
36 #define CRYPTOPP_ENABLE_NAMESPACE_WEAK 1
37 #include <cryptopp/md5.h>
38
39 #include "msg/async/dpdk/EventDPDK.h"
40
41 #include "include/utime.h"
42 #include "common/Throttle.h"
43 #include "common/ceph_time.h"
44 #include "msg/async/Event.h"
45 #include "IPChecksum.h"
46 #include "IP.h"
47 #include "const.h"
48 #include "byteorder.h"
49 #include "shared_ptr.h"
50 #include "PacketUtil.h"
51
52 struct tcp_hdr;
53
54 enum class tcp_state : uint16_t {
55   CLOSED          = (1 << 0),
56   LISTEN          = (1 << 1),
57   SYN_SENT        = (1 << 2),
58   SYN_RECEIVED    = (1 << 3),
59   ESTABLISHED     = (1 << 4),
60   FIN_WAIT_1      = (1 << 5),
61   FIN_WAIT_2      = (1 << 6),
62   CLOSE_WAIT      = (1 << 7),
63   CLOSING         = (1 << 8),
64   LAST_ACK        = (1 << 9),
65   TIME_WAIT       = (1 << 10)
66 };
67
68 inline tcp_state operator|(tcp_state s1, tcp_state s2) {
69   return tcp_state(uint16_t(s1) | uint16_t(s2));
70 }
71
72 inline std::ostream & operator<<(std::ostream & str, const tcp_state& s) {
73   switch (s) {
74     case tcp_state::CLOSED: return str << "CLOSED";
75     case tcp_state::LISTEN: return str << "LISTEN";
76     case tcp_state::SYN_SENT: return str << "SYN_SENT";
77     case tcp_state::SYN_RECEIVED: return str << "SYN_RECEIVED";
78     case tcp_state::ESTABLISHED: return str << "ESTABLISHED";
79     case tcp_state::FIN_WAIT_1: return str << "FIN_WAIT_1";
80     case tcp_state::FIN_WAIT_2: return str << "FIN_WAIT_2";
81     case tcp_state::CLOSE_WAIT: return str << "CLOSE_WAIT";
82     case tcp_state::CLOSING: return str << "CLOSING";
83     case tcp_state::LAST_ACK: return str << "LAST_ACK";
84     case tcp_state::TIME_WAIT: return str << "TIME_WAIT";
85     default: return str << "UNKNOWN";
86   }
87 }
88
89 struct tcp_option {
90   // The kind and len field are fixed and defined in TCP protocol
91   enum class option_kind: uint8_t { mss = 2, win_scale = 3, sack = 4, timestamps = 8,  nop = 1, eol = 0 };
92   enum class option_len:  uint8_t { mss = 4, win_scale = 3, sack = 2, timestamps = 10, nop = 1, eol = 1 };
93   struct mss {
94     option_kind kind = option_kind::mss;
95     option_len len = option_len::mss;
96     uint16_t mss;
97     struct mss hton() {
98       struct mss m = *this;
99       m.mss = ::hton(m.mss);
100       return m;
101     }
102   } __attribute__((packed));
103   struct win_scale {
104     option_kind kind = option_kind::win_scale;
105     option_len len = option_len::win_scale;
106     uint8_t shift;
107   } __attribute__((packed));
108   struct sack {
109     option_kind kind = option_kind::sack;
110     option_len len = option_len::sack;
111   } __attribute__((packed));
112   struct timestamps {
113     option_kind kind = option_kind::timestamps;
114     option_len len = option_len::timestamps;
115     uint32_t t1;
116     uint32_t t2;
117   } __attribute__((packed));
118   struct nop {
119     option_kind kind = option_kind::nop;
120   } __attribute__((packed));
121   struct eol {
122     option_kind kind = option_kind::eol;
123   } __attribute__((packed));
124   static const uint8_t align = 4;
125
126   void parse(uint8_t* beg, uint8_t* end);
127   uint8_t fill(tcp_hdr* th, uint8_t option_size);
128   uint8_t get_size(bool syn_on, bool ack_on);
129
130   // For option negotiattion
131   bool _mss_received = false;
132   bool _win_scale_received = false;
133   bool _timestamps_received = false;
134   bool _sack_received = false;
135
136   // Option data
137   uint16_t _remote_mss = 536;
138   uint16_t _local_mss;
139   uint8_t _remote_win_scale = 0;
140   uint8_t _local_win_scale = 0;
141 };
142 inline uint8_t*& operator+=(uint8_t*& x, tcp_option::option_len len) { x += uint8_t(len); return x; }
143 inline uint8_t& operator+=(uint8_t& x, tcp_option::option_len len) { x += uint8_t(len); return x; }
144
145 struct tcp_sequence {
146   uint32_t raw;
147 };
148
149 tcp_sequence ntoh(tcp_sequence ts) {
150   return tcp_sequence { ::ntoh(ts.raw) };
151 }
152
153 tcp_sequence hton(tcp_sequence ts) {
154   return tcp_sequence { ::hton(ts.raw) };
155 }
156
157 inline std::ostream& operator<<(std::ostream& os, const tcp_sequence& s) {
158   return os << s.raw;
159 }
160
161 inline tcp_sequence make_seq(uint32_t raw) { return tcp_sequence{raw}; }
162 inline tcp_sequence& operator+=(tcp_sequence& s, int32_t n) { s.raw += n; return s; }
163 inline tcp_sequence& operator-=(tcp_sequence& s, int32_t n) { s.raw -= n; return s; }
164 inline tcp_sequence operator+(tcp_sequence s, int32_t n) { return s += n; }
165 inline tcp_sequence operator-(tcp_sequence s, int32_t n) { return s -= n; }
166 inline int32_t operator-(tcp_sequence s, tcp_sequence q) { return s.raw - q.raw; }
167 inline bool operator==(tcp_sequence s, tcp_sequence q)  { return s.raw == q.raw; }
168 inline bool operator!=(tcp_sequence s, tcp_sequence q) { return !(s == q); }
169 inline bool operator<(tcp_sequence s, tcp_sequence q) { return s - q < 0; }
170 inline bool operator>(tcp_sequence s, tcp_sequence q) { return q < s; }
171 inline bool operator<=(tcp_sequence s, tcp_sequence q) { return !(s > q); }
172 inline bool operator>=(tcp_sequence s, tcp_sequence q) { return !(s < q); }
173
174 struct tcp_hdr {
175   uint16_t src_port;
176   uint16_t dst_port;
177   tcp_sequence seq;
178   tcp_sequence ack;
179   uint8_t rsvd1 : 4;
180   uint8_t data_offset : 4;
181   uint8_t f_fin : 1;
182   uint8_t f_syn : 1;
183   uint8_t f_rst : 1;
184   uint8_t f_psh : 1;
185   uint8_t f_ack : 1;
186   uint8_t f_urg : 1;
187   uint8_t rsvd2 : 2;
188   uint16_t window;
189   uint16_t checksum;
190   uint16_t urgent;
191
192   tcp_hdr hton() {
193     tcp_hdr hdr = *this;
194     hdr.src_port = ::hton(src_port);
195     hdr.dst_port = ::hton(dst_port);
196     hdr.seq = ::hton(seq);
197     hdr.ack = ::hton(ack);
198     hdr.window = ::hton(window);
199     hdr.checksum = ::hton(checksum);
200     hdr.urgent = ::hton(urgent);
201     return hdr;
202   }
203
204   tcp_hdr ntoh() {
205     tcp_hdr hdr = *this;
206     hdr.src_port = ::ntoh(src_port);
207     hdr.dst_port = ::ntoh(dst_port);
208     hdr.seq = ::ntoh(seq);
209     hdr.ack = ::ntoh(ack);
210     hdr.window = ::ntoh(window);
211     hdr.checksum = ::ntoh(checksum);
212     hdr.urgent = ::ntoh(urgent);
213     return hdr;
214   }
215 } __attribute__((packed));
216
217 struct tcp_tag {};
218 using tcp_packet_merger = packet_merger<tcp_sequence, tcp_tag>;
219
220 template <typename InetTraits>
221 class tcp {
222  public:
223   using ipaddr = typename InetTraits::address_type;
224   using inet_type = typename InetTraits::inet_type;
225   using connid = l4connid<InetTraits>;
226   using connid_hash = typename connid::connid_hash;
227   class connection;
228   class listener;
229  private:
230   class tcb;
231
232   class C_handle_delayed_ack : public EventCallback {
233     tcb *tc;
234
235    public:
236     C_handle_delayed_ack(tcb *t): tc(t) { }
237     void do_request(int r) {
238       tc->_nr_full_seg_received = 0;
239       tc->output();
240     }
241   };
242
243   class C_handle_retransmit : public EventCallback {
244     tcb *tc;
245
246    public:
247     C_handle_retransmit(tcb *t): tc(t) { }
248     void do_request(int r) {
249       tc->retransmit();
250     }
251   };
252
253   class C_handle_persist : public EventCallback {
254     tcb *tc;
255
256    public:
257     C_handle_persist(tcb *t): tc(t) { }
258     void do_request(int r) {
259       tc->persist();
260     }
261   };
262
263   class C_all_data_acked : public EventCallback {
264     tcb *tc;
265
266    public:
267     C_all_data_acked(tcb *t): tc(t) {}
268     void do_request(int fd_or_id) {
269       tc->close_final_cleanup();
270     }
271   };
272
273   class C_actual_remove_tcb : public EventCallback {
274     lw_shared_ptr<tcb> tc;
275    public:
276     C_actual_remove_tcb(tcb *t): tc(t->shared_from_this()) {}
277     void do_request(int r) {
278       delete this;
279     }
280   };
281
282   class tcb : public enable_lw_shared_from_this<tcb> {
283     using clock_type = ceph::coarse_real_clock;
284     static constexpr tcp_state CLOSED         = tcp_state::CLOSED;
285     static constexpr tcp_state LISTEN         = tcp_state::LISTEN;
286     static constexpr tcp_state SYN_SENT       = tcp_state::SYN_SENT;
287     static constexpr tcp_state SYN_RECEIVED   = tcp_state::SYN_RECEIVED;
288     static constexpr tcp_state ESTABLISHED    = tcp_state::ESTABLISHED;
289     static constexpr tcp_state FIN_WAIT_1     = tcp_state::FIN_WAIT_1;
290     static constexpr tcp_state FIN_WAIT_2     = tcp_state::FIN_WAIT_2;
291     static constexpr tcp_state CLOSE_WAIT     = tcp_state::CLOSE_WAIT;
292     static constexpr tcp_state CLOSING        = tcp_state::CLOSING;
293     static constexpr tcp_state LAST_ACK       = tcp_state::LAST_ACK;
294     static constexpr tcp_state TIME_WAIT      = tcp_state::TIME_WAIT;
295     tcp_state _state = CLOSED;
296     tcp& _tcp;
297     UserspaceEventManager &manager;
298     connection* _conn = nullptr;
299     bool _connect_done = false;
300     ipaddr _local_ip;
301     ipaddr _foreign_ip;
302     uint16_t _local_port;
303     uint16_t _foreign_port;
304     struct unacked_segment {
305       Packet p;
306       uint16_t data_len;
307       unsigned nr_transmits;
308       clock_type::time_point tx_time;
309     };
310     struct send {
311       tcp_sequence unacknowledged;
312       tcp_sequence next;
313       uint32_t window;
314       uint8_t window_scale;
315       uint16_t mss;
316       tcp_sequence urgent;
317       tcp_sequence wl1;
318       tcp_sequence wl2;
319       tcp_sequence initial;
320       std::deque<unacked_segment> data;
321       std::deque<Packet> unsent;
322       uint32_t unsent_len = 0;
323       uint32_t queued_len = 0;
324       bool closed = false;
325       // Wait for all data are acked
326       int _all_data_acked_fd = -1;
327       // Limit number of data queued into send queue
328       Throttle user_queue_space;
329       // Round-trip time variation
330       std::chrono::microseconds rttvar;
331       // Smoothed round-trip time
332       std::chrono::microseconds srtt;
333       bool first_rto_sample = true;
334       clock_type::time_point syn_tx_time;
335       // Congestion window
336       uint32_t cwnd;
337       // Slow start threshold
338       uint32_t ssthresh;
339       // Duplicated ACKs
340       uint16_t dupacks = 0;
341       unsigned syn_retransmit = 0;
342       unsigned fin_retransmit = 0;
343       uint32_t limited_transfer = 0;
344       uint32_t partial_ack = 0;
345       tcp_sequence recover;
346       bool window_probe = false;
347       send(CephContext *c): user_queue_space(c, "DPDK::tcp::tcb::user_queue_space", 81920) {}
348     } _snd;
349     struct receive {
350       tcp_sequence next;
351       uint32_t window;
352       uint8_t window_scale;
353       uint16_t mss;
354       tcp_sequence urgent;
355       tcp_sequence initial;
356       std::deque<Packet> data;
357       tcp_packet_merger out_of_order;
358     } _rcv;
359     EventCenter *center;
360     int fd;
361     // positive means no errno, 0 means eof, nagetive means error
362     int16_t _errno = 1;
363     tcp_option _option;
364     EventCallbackRef delayed_ack_event;
365     Tub<uint64_t> _delayed_ack_fd;
366     // Retransmission timeout
367     std::chrono::microseconds _rto{1000*1000};
368     std::chrono::microseconds _persist_time_out{1000*1000};
369     static constexpr std::chrono::microseconds _rto_min{1000*1000};
370     static constexpr std::chrono::microseconds _rto_max{60000*1000};
371     // Clock granularity
372     static constexpr std::chrono::microseconds _rto_clk_granularity{1000};
373     static constexpr uint16_t _max_nr_retransmit{5};
374     EventCallbackRef retransmit_event;
375     Tub<uint64_t> retransmit_fd;
376     EventCallbackRef persist_event;
377     EventCallbackRef all_data_ack_event;
378     Tub<uint64_t> persist_fd;
379     uint16_t _nr_full_seg_received = 0;
380     struct isn_secret {
381       // 512 bits secretkey for ISN generating
382       uint32_t key[16];
383       isn_secret () {
384         std::random_device rd;
385         std::default_random_engine e(rd());
386         std::uniform_int_distribution<uint32_t> dist{};
387         for (auto& k : key) {
388           k = dist(e);
389         }
390       }
391     };
392     static isn_secret _isn_secret;
393     tcp_sequence get_isn();
394     circular_buffer<typename InetTraits::l4packet> _packetq;
395     bool _poll_active = false;
396    public:
397     // callback
398     void close_final_cleanup();
399     ostream& _prefix(std::ostream *_dout);
400
401    public:
402     tcb(tcp& t, connid id);
403     ~tcb();
404     void input_handle_listen_state(tcp_hdr* th, Packet p);
405     void input_handle_syn_sent_state(tcp_hdr* th, Packet p);
406     void input_handle_other_state(tcp_hdr* th, Packet p);
407     void output_one(bool data_retransmit = false);
408     bool is_all_data_acked();
409     int send(Packet p);
410     void connect();
411     Tub<Packet> read();
412     void close();
413     void remove_from_tcbs() {
414       auto id = connid{_local_ip, _foreign_ip, _local_port, _foreign_port};
415       _tcp._tcbs.erase(id);
416     }
417     Tub<typename InetTraits::l4packet> get_packet();
418     void output() {
419       if (!_poll_active) {
420         _poll_active = true;
421
422         auto tcb = this->shared_from_this();
423         _tcp._inet.wait_l2_dst_address(_foreign_ip, Packet(), [tcb] (const ethernet_address &dst, Packet p, int r) {
424           if (r == 0) {
425             tcb->_tcp.poll_tcb(dst, std::move(tcb));
426           } else if (r == -ETIMEDOUT) {
427             // in other states connection should time out
428             if (tcb->in_state(SYN_SENT)) {
429               tcb->_errno = -ETIMEDOUT;
430               tcb->cleanup();
431             }
432           } else if (r == -EBUSY) {
433             // retry later
434             tcb->_poll_active = false;
435             tcb->start_retransmit_timer();
436           }
437         });
438       }
439     }
440
441     int16_t get_errno() const {
442       return _errno;
443     }
444
445     tcp_state& state() {
446       return _state;
447     }
448
449     uint64_t peek_sent_available() {
450       if (!in_state(ESTABLISHED))
451         return 0;
452       uint64_t left = _snd.user_queue_space.get_max() - _snd.user_queue_space.get_current();
453       return left;
454     }
455
456     int is_connected() const {
457       if (_errno <= 0)
458         return _errno;
459       return _connect_done;
460     }
461
462    private:
463     void respond_with_reset(tcp_hdr* th);
464     bool merge_out_of_order();
465     void insert_out_of_order(tcp_sequence seq, Packet p);
466     void trim_receive_data_after_window();
467     bool should_send_ack(uint16_t seg_len);
468     void clear_delayed_ack();
469     Packet get_transmit_packet();
470     void retransmit_one() {
471       bool data_retransmit = true;
472       output_one(data_retransmit);
473     }
474     void start_retransmit_timer() {
475       if (retransmit_fd)
476         center->delete_time_event(*retransmit_fd);
477       retransmit_fd.construct(center->create_time_event(_rto.count(), retransmit_event));
478     };
479     void stop_retransmit_timer() {
480       if (retransmit_fd) {
481         center->delete_time_event(*retransmit_fd);
482         retransmit_fd.destroy();
483       }
484     };
485     void start_persist_timer() {
486       if (persist_fd)
487         center->delete_time_event(*persist_fd);
488       persist_fd.construct(center->create_time_event(_persist_time_out.count(), persist_event));
489     };
490     void stop_persist_timer() {
491       if (persist_fd) {
492         center->delete_time_event(*persist_fd);
493         persist_fd.destroy();
494       }
495     };
496     void persist();
497     void retransmit();
498     void fast_retransmit();
499     void update_rto(clock_type::time_point tx_time);
500     void update_cwnd(uint32_t acked_bytes);
501     void cleanup();
502     uint32_t can_send() {
503       if (_snd.window_probe) {
504         return 1;
505       }
506       // Can not send more than advertised window allows
507       auto x = std::min(uint32_t(_snd.unacknowledged + _snd.window - _snd.next), _snd.unsent_len);
508       // Can not send more than congestion window allows
509       x = std::min(_snd.cwnd, x);
510       if (_snd.dupacks == 1 || _snd.dupacks == 2) {
511         // RFC5681 Step 3.1
512         // Send cwnd + 2 * smss per RFC3042
513         auto flight = flight_size();
514         auto max = _snd.cwnd + 2 * _snd.mss;
515         x = flight <= max ? std::min(x, max - flight) : 0;
516         _snd.limited_transfer += x;
517       } else if (_snd.dupacks >= 3) {
518         // RFC5681 Step 3.5
519         // Sent 1 full-sized segment at most
520         x = std::min(uint32_t(_snd.mss), x);
521       }
522       return x;
523     }
524     uint32_t flight_size() {
525       uint32_t size = 0;
526       std::for_each(_snd.data.begin(), _snd.data.end(),
527                     [&] (unacked_segment& seg) { size += seg.p.len(); });
528       return size;
529     }
530     uint16_t local_mss() {
531       return _tcp.get_hw_features().mtu - tcp_hdr_len_min - InetTraits::ip_hdr_len_min;
532     }
533     void queue_packet(Packet p) {
534       _packetq.emplace_back(
535           typename InetTraits::l4packet{_foreign_ip, std::move(p)});
536     }
537     void signal_data_received() {
538       manager.notify(fd, EVENT_READABLE);
539     }
540     void signal_all_data_acked() {
541       if (_snd._all_data_acked_fd >= 0 && _snd.unsent_len == 0 && _snd.queued_len == 0)
542         manager.notify(_snd._all_data_acked_fd, EVENT_READABLE);
543     }
544     void do_syn_sent() {
545       _state = SYN_SENT;
546       _snd.syn_tx_time = clock_type::now();
547       // Send <SYN> to remote
548       output();
549     }
550     void do_syn_received() {
551       _state = SYN_RECEIVED;
552       _snd.syn_tx_time = clock_type::now();
553       // Send <SYN,ACK> to remote
554       output();
555     }
556     void do_established() {
557       _state = ESTABLISHED;
558       update_rto(_snd.syn_tx_time);
559       _connect_done = true;
560       manager.notify(fd, EVENT_READABLE|EVENT_WRITABLE);
561     }
562     void do_reset() {
563       _state = CLOSED;
564       // Free packets to be sent which are waiting for user_queue_space
565       _snd.user_queue_space.reset();
566       cleanup();
567       _errno = -ECONNRESET;
568       manager.notify(fd, EVENT_READABLE);
569
570       if (_snd._all_data_acked_fd >= 0)
571         manager.notify(_snd._all_data_acked_fd, EVENT_READABLE);
572     }
573     void do_time_wait() {
574       // FIXME: Implement TIME_WAIT state timer
575       _state = TIME_WAIT;
576       cleanup();
577     }
578     void do_closed() {
579       _state = CLOSED;
580       cleanup();
581     }
582     void do_setup_isn() {
583       _snd.initial = get_isn();
584       _snd.unacknowledged = _snd.initial;
585       _snd.next = _snd.initial + 1;
586       _snd.recover = _snd.initial;
587     }
588     void do_local_fin_acked() {
589       _snd.unacknowledged += 1;
590       _snd.next += 1;
591     }
592     bool syn_needs_on() {
593       return in_state(SYN_SENT | SYN_RECEIVED);
594     }
595     bool fin_needs_on() {
596       return in_state(FIN_WAIT_1 | CLOSING | LAST_ACK) && _snd.closed &&
597              _snd.unsent_len == 0 && _snd.queued_len == 0;
598     }
599     bool ack_needs_on() {
600       return !in_state(CLOSED | LISTEN | SYN_SENT);
601     }
602     bool foreign_will_not_send() {
603       return in_state(CLOSING | TIME_WAIT | CLOSE_WAIT | LAST_ACK | CLOSED);
604     }
605     bool in_state(tcp_state state) {
606       return uint16_t(_state) & uint16_t(state);
607     }
608     void exit_fast_recovery() {
609       _snd.dupacks = 0;
610       _snd.limited_transfer = 0;
611       _snd.partial_ack = 0;
612     }
613     uint32_t data_segment_acked(tcp_sequence seg_ack);
614     bool segment_acceptable(tcp_sequence seg_seq, unsigned seg_len);
615     void init_from_options(tcp_hdr* th, uint8_t* opt_start, uint8_t* opt_end);
616     friend class connection;
617
618     friend class C_handle_delayed_ack;
619     friend class C_handle_retransmit;
620     friend class C_handle_persist;
621     friend class C_all_data_acked;
622   };
623
624   CephContext *cct;
625   // ipv4_l4<ip_protocol_num::tcp>
626   inet_type& _inet;
627   EventCenter *center;
628   UserspaceEventManager &manager;
629   std::unordered_map<connid, lw_shared_ptr<tcb>, connid_hash> _tcbs;
630   std::unordered_map<uint16_t, listener*> _listening;
631   std::random_device _rd;
632   std::default_random_engine _e;
633   std::uniform_int_distribution<uint16_t> _port_dist{41952, 65535};
634   circular_buffer<std::pair<lw_shared_ptr<tcb>, ethernet_address>> _poll_tcbs;
635   // queue for packets that do not belong to any tcb
636   circular_buffer<ipv4_traits::l4packet> _packetq;
637   Throttle _queue_space;
638   // Limit number of data queued into send queue
639  public:
640   class connection {
641     lw_shared_ptr<tcb> _tcb;
642    public:
643     explicit connection(lw_shared_ptr<tcb> tcbp) : _tcb(std::move(tcbp)) { _tcb->_conn = this; }
644     connection(const connection&) = delete;
645     connection(connection&& x) noexcept : _tcb(std::move(x._tcb)) {
646       _tcb->_conn = this;
647     }
648     ~connection();
649     void operator=(const connection&) = delete;
650     connection& operator=(connection&& x) {
651       if (this != &x) {
652         this->~connection();
653         new (this) connection(std::move(x));
654       }
655       return *this;
656     }
657     int fd() const {
658       return _tcb->fd;
659     }
660     int send(Packet p) {
661       return _tcb->send(std::move(p));
662     }
663     Tub<Packet> read() {
664       return _tcb->read();
665     }
666     int16_t get_errno() const {
667       return _tcb->get_errno();
668     }
669     void close_read();
670     void close_write();
671     entity_addr_t remote_addr() const {
672       entity_addr_t addr;
673       auto net_ip = _tcb->_foreign_ip.hton();
674       memcpy((void*)&addr.in4_addr().sin_addr.s_addr,
675              &net_ip, sizeof(addr.in4_addr().sin_addr.s_addr));
676       addr.set_family(AF_INET);
677       return addr;
678     }
679     uint64_t peek_sent_available() {
680       return _tcb->peek_sent_available();
681     }
682     int is_connected() const { return _tcb->is_connected(); }
683   };
684   class listener {
685     tcp& _tcp;
686     uint16_t _port;
687     int _fd = -1;
688     int16_t _errno;
689     queue<connection> _q;
690     size_t _q_max_length;
691
692    private:
693     listener(tcp& t, uint16_t port, size_t queue_length)
694         : _tcp(t), _port(port), _errno(0), _q(), _q_max_length(queue_length) {
695     }
696    public:
697     listener(const listener&) = delete;
698     void operator=(const listener&) = delete;
699     listener(listener&& x)
700         : _tcp(x._tcp), _port(x._port), _fd(std::move(x._fd)), _errno(x._errno),
701           _q(std::move(x._q)) {
702       if (_fd >= 0)
703         _tcp._listening[_port] = this;
704     }
705     ~listener() {
706       abort_accept();
707     }
708     int listen() {
709       if (_tcp._listening.find(_port) != _tcp._listening.end())
710         return -EADDRINUSE;
711       _tcp._listening.emplace(_port, this);
712       _fd = _tcp.manager.get_eventfd();
713       return 0;
714     }
715     Tub<connection> accept() {
716       Tub<connection> c;
717       if (!_q.empty()) {
718         c = std::move(_q.front());
719         _q.pop();
720       }
721       return c;
722     }
723     void abort_accept() {
724       while (!_q.empty())
725         _q.pop();
726       if (_fd >= 0) {
727         _tcp._listening.erase(_port);
728         _tcp.manager.close(_fd);
729         _fd = -1;
730       }
731     }
732     int16_t get_errno() const {
733       return _errno;
734     }
735     bool full() const {
736       return _q.size() == _q_max_length;
737     }
738     int fd() const {
739       return _fd;
740     }
741     friend class tcp;
742   };
743  public:
744   explicit tcp(CephContext *c, inet_type& inet, EventCenter *cen);
745   void received(Packet p, ipaddr from, ipaddr to);
746   bool forward(forward_hash& out_hash_data, Packet& p, size_t off);
747   listener listen(uint16_t port, size_t queue_length = 100);
748   connection connect(const entity_addr_t &addr);
749   const hw_features& get_hw_features() const { return _inet._inet.get_hw_features(); }
750   void poll_tcb(const ethernet_address &dst, lw_shared_ptr<tcb> tcb) {
751     _poll_tcbs.emplace_back(std::move(tcb), dst);
752   }
753   bool push_listen_queue(uint16_t port, tcb *t) {
754     auto listener = _listening.find(port);
755     if (listener == _listening.end() || listener->second->full()) {
756       return false;
757     }
758     listener->second->_q.push(connection(t->shared_from_this()));
759     manager.notify(listener->second->_fd, EVENT_READABLE);
760     return true;
761   }
762
763  private:
764   void send_packet_without_tcb(ipaddr from, ipaddr to, Packet p);
765   void respond_with_reset(tcp_hdr* rth, ipaddr local_ip, ipaddr foreign_ip);
766   friend class listener;
767 };
768
769 template <typename InetTraits>
770 tcp<InetTraits>::tcp(CephContext *c, inet_type& inet, EventCenter *cen)
771     : cct(c), _inet(inet), center(cen),
772       manager(static_cast<DPDKDriver*>(cen->get_driver())->manager),
773       _e(_rd()), _queue_space(cct, "DPDK::tcp::queue_space", 81920) {
774   int tcb_polled = 0u;
775   _inet.register_packet_provider([this, tcb_polled] () mutable {
776     Tub<typename InetTraits::l4packet> l4p;
777     auto c = _poll_tcbs.size();
778     if (!_packetq.empty() && (!(tcb_polled % 128) || c == 0)) {
779       l4p = std::move(_packetq.front());
780       _packetq.pop_front();
781       _queue_space.put(l4p->p.len());
782     } else {
783       while (c--) {
784         tcb_polled++;
785         lw_shared_ptr<tcb> tcb;
786         ethernet_address dst;
787         std::tie(tcb, dst) = std::move(_poll_tcbs.front());
788         _poll_tcbs.pop_front();
789         l4p = std::move(tcb->get_packet());
790         if (l4p) {
791           l4p->e_dst = dst;
792           break;
793         }
794       }
795     }
796     return l4p;
797   });
798 }
799
800 template <typename InetTraits>
801 auto tcp<InetTraits>::listen(uint16_t port, size_t queue_length) -> listener {
802   return listener(*this, port, queue_length);
803 }
804
805 template <typename InetTraits>
806 typename tcp<InetTraits>::connection tcp<InetTraits>::connect(const entity_addr_t &addr) {
807   uint16_t src_port;
808   connid id;
809   auto src_ip = _inet._inet.host_address();
810   auto dst_ip = ipv4_address(addr);
811   auto dst_port = addr.get_port();
812
813   do {
814     src_port = _port_dist(_e);
815     id = connid{src_ip, dst_ip, src_port, (uint16_t)dst_port};
816     if (_tcbs.find(id) == _tcbs.end()) {
817       if (_inet._inet.netif()->hw_queues_count() == 1 ||
818           _inet._inet.netif()->hash2cpu(
819               id.hash(_inet._inet.netif()->rss_key())) == center->get_id())
820         break;
821     }
822   } while (true);
823
824   auto tcbp = make_lw_shared<tcb>(*this, id);
825   _tcbs.insert({id, tcbp});
826   tcbp->connect();
827   return connection(tcbp);
828 }
829
830 template <typename InetTraits>
831 bool tcp<InetTraits>::forward(forward_hash& out_hash_data, Packet& p, size_t off) {
832   auto th = p.get_header<tcp_hdr>(off);
833   if (th) {
834     out_hash_data.push_back(th->src_port);
835     out_hash_data.push_back(th->dst_port);
836   }
837   return true;
838 }
839
840 template <typename InetTraits>
841 void tcp<InetTraits>::received(Packet p, ipaddr from, ipaddr to) {
842   auto th = p.get_header<tcp_hdr>(0);
843   if (!th) {
844     return;
845   }
846   // th->data_offset is correct even before ntoh()
847   if (unsigned(th->data_offset * 4) < sizeof(*th)) {
848     return;
849   }
850
851   if (!get_hw_features().rx_csum_offload) {
852     checksummer csum;
853     InetTraits::tcp_pseudo_header_checksum(csum, from, to, p.len());
854     csum.sum(p);
855     if (csum.get() != 0) {
856       return;
857     }
858   }
859   auto h = th->ntoh();
860   auto id = connid{to, from, h.dst_port, h.src_port};
861   auto tcbi = _tcbs.find(id);
862   lw_shared_ptr<tcb> tcbp;
863   if (tcbi == _tcbs.end()) {
864     auto listener = _listening.find(id.local_port);
865     if (listener == _listening.end() || listener->second->full()) {
866       // 1) In CLOSE state
867       // 1.1 all data in the incoming segment is discarded.  An incoming
868       // segment containing a RST is discarded. An incoming segment not
869       // containing a RST causes a RST to be sent in response.
870       // FIXME:
871       //      if ACK off: <SEQ=0><ACK=SEG.SEQ+SEG.LEN><CTL=RST,ACK>
872       //      if ACK on:  <SEQ=SEG.ACK><CTL=RST>
873       return respond_with_reset(&h, id.local_ip, id.foreign_ip);
874     } else {
875       // 2) In LISTEN state
876       // 2.1 first check for an RST
877       if (h.f_rst) {
878         // An incoming RST should be ignored
879         return;
880       }
881       // 2.2 second check for an ACK
882       if (h.f_ack) {
883         // Any acknowledgment is bad if it arrives on a connection
884         // still in the LISTEN state.
885         // <SEQ=SEG.ACK><CTL=RST>
886         return respond_with_reset(&h, id.local_ip, id.foreign_ip);
887       }
888       // 2.3 third check for a SYN
889       if (h.f_syn) {
890         // check the security
891         // NOTE: Ignored for now
892         tcbp = make_lw_shared<tcb>(*this, id);
893         _tcbs.insert({id, tcbp});
894         return tcbp->input_handle_listen_state(&h, std::move(p));
895       }
896       // 2.4 fourth other text or control
897       // So you are unlikely to get here, but if you do, drop the
898       // segment, and return.
899       return;
900     }
901   } else {
902     tcbp = tcbi->second;
903     if (tcbp->state() == tcp_state::SYN_SENT) {
904       // 3) In SYN_SENT State
905       return tcbp->input_handle_syn_sent_state(&h, std::move(p));
906     } else {
907       // 4) In other state, can be one of the following:
908       // SYN_RECEIVED, ESTABLISHED, FIN_WAIT_1, FIN_WAIT_2
909       // CLOSE_WAIT, CLOSING, LAST_ACK, TIME_WAIT
910       return tcbp->input_handle_other_state(&h, std::move(p));
911     }
912   }
913 }
914
915 // Send packet does not belong to any tcb
916 template <typename InetTraits>
917 void tcp<InetTraits>::send_packet_without_tcb(ipaddr from, ipaddr to, Packet p) {
918   if (_queue_space.get_or_fail(p.len())) { // drop packets that do not fit the queue
919     _inet.wait_l2_dst_address(to, std::move(p), [this, to] (const ethernet_address &e_dst, Packet p, int r) mutable {
920       if (r == 0)
921         _packetq.emplace_back(ipv4_traits::l4packet{to, std::move(p), e_dst, ip_protocol_num::tcp});
922     });
923   }
924 }
925
926 template <typename InetTraits>
927 tcp<InetTraits>::connection::~connection() {
928   if (_tcb) {
929     _tcb->_conn = nullptr;
930     close_read();
931     close_write();
932   }
933 }
934
935 template <typename InetTraits>
936 tcp<InetTraits>::tcb::tcb(tcp& t, connid id)
937     : _tcp(t), manager(t.manager), _local_ip(id.local_ip) , _foreign_ip(id.foreign_ip),
938       _local_port(id.local_port), _foreign_port(id.foreign_port),
939       _snd(_tcp.cct),
940       center(t.center),
941       fd(t.manager.get_eventfd()),
942       delayed_ack_event(new tcp<InetTraits>::C_handle_delayed_ack(this)),
943       retransmit_event(new tcp<InetTraits>::C_handle_retransmit(this)),
944       persist_event(new tcp<InetTraits>::C_handle_persist(this)),
945       all_data_ack_event(new tcp<InetTraits>::C_all_data_acked(this)) {}
946
947 template <typename InetTraits>
948 tcp<InetTraits>::tcb::~tcb()
949 {
950   if (_delayed_ack_fd)
951     center->delete_time_event(*_delayed_ack_fd);
952   if (retransmit_fd)
953     center->delete_time_event(*retransmit_fd);
954   if (persist_fd)
955     center->delete_time_event(*persist_fd);
956   delete delayed_ack_event;
957   delete retransmit_event;
958   delete persist_event;
959   delete all_data_ack_event;
960   manager.close(fd);
961   fd = -1;
962 }
963
964 template <typename InetTraits>
965 void tcp<InetTraits>::tcb::respond_with_reset(tcp_hdr* rth)
966 {
967   _tcp.respond_with_reset(rth, _local_ip, _foreign_ip);
968 }
969
970 template <typename InetTraits>
971 uint32_t tcp<InetTraits>::tcb::data_segment_acked(tcp_sequence seg_ack) {
972   uint32_t total_acked_bytes = 0;
973   // Full ACK of segment
974   while (!_snd.data.empty()
975          && (_snd.unacknowledged + _snd.data.front().p.len() <= seg_ack)) {
976     auto acked_bytes = _snd.data.front().p.len();
977     _snd.unacknowledged += acked_bytes;
978     // Ignore retransmitted segments when setting the RTO
979     if (_snd.data.front().nr_transmits == 0) {
980       update_rto(_snd.data.front().tx_time);
981     }
982     update_cwnd(acked_bytes);
983     total_acked_bytes += acked_bytes;
984     _snd.user_queue_space.put(_snd.data.front().data_len);
985     manager.notify(fd, EVENT_WRITABLE);
986     _snd.data.pop_front();
987   }
988   // Partial ACK of segment
989   if (_snd.unacknowledged < seg_ack) {
990     auto acked_bytes = seg_ack - _snd.unacknowledged;
991     if (!_snd.data.empty()) {
992       auto& unacked_seg = _snd.data.front();
993       unacked_seg.p.trim_front(acked_bytes);
994     }
995     _snd.unacknowledged = seg_ack;
996     update_cwnd(acked_bytes);
997     total_acked_bytes += acked_bytes;
998   }
999   return total_acked_bytes;
1000 }
1001
1002 template <typename InetTraits>
1003 bool tcp<InetTraits>::tcb::segment_acceptable(tcp_sequence seg_seq, unsigned seg_len) {
1004   if (seg_len == 0 && _rcv.window == 0) {
1005     // SEG.SEQ = RCV.NXT
1006     return seg_seq == _rcv.next;
1007   } else if (seg_len == 0 && _rcv.window > 0) {
1008     // RCV.NXT =< SEG.SEQ < RCV.NXT+RCV.WND
1009     return (_rcv.next <= seg_seq) && (seg_seq < _rcv.next + _rcv.window);
1010   } else if (seg_len > 0 && _rcv.window > 0) {
1011     // RCV.NXT =< SEG.SEQ < RCV.NXT+RCV.WND
1012     //    or
1013     // RCV.NXT =< SEG.SEQ+SEG.LEN-1 < RCV.NXT+RCV.WND
1014     bool x = (_rcv.next <= seg_seq) && seg_seq < (_rcv.next + _rcv.window);
1015     bool y = (_rcv.next <= seg_seq + seg_len - 1) && (seg_seq + seg_len - 1 < _rcv.next + _rcv.window);
1016     return x || y;
1017   } else  {
1018     // SEG.LEN > 0 RCV.WND = 0, not acceptable
1019     return false;
1020   }
1021 }
1022
1023 template <typename InetTraits>
1024 void tcp<InetTraits>::tcb::init_from_options(tcp_hdr* th, uint8_t* opt_start, uint8_t* opt_end) {
1025   // Handle tcp options
1026   _option.parse(opt_start, opt_end);
1027
1028   // Remote receive window scale factor
1029   _snd.window_scale = _option._remote_win_scale;
1030   // Local receive window scale factor
1031   _rcv.window_scale = _option._local_win_scale;
1032
1033   // Maximum segment size remote can receive
1034   _snd.mss = _option._remote_mss;
1035   // Maximum segment size local can receive
1036   _rcv.mss = _option._local_mss = local_mss();
1037
1038   // Linux's default window size
1039   _rcv.window = 29200 << _rcv.window_scale;
1040   _snd.window = th->window << _snd.window_scale;
1041
1042   // Segment sequence number used for last window update
1043   _snd.wl1 = th->seq;
1044   // Segment acknowledgment number used for last window update
1045   _snd.wl2 = th->ack;
1046
1047   // Setup initial congestion window
1048   if (2190 < _snd.mss) {
1049     _snd.cwnd = 2 * _snd.mss;
1050   } else if (1095 < _snd.mss && _snd.mss <= 2190) {
1051     _snd.cwnd = 3 * _snd.mss;
1052   } else {
1053     _snd.cwnd = 4 * _snd.mss;
1054   }
1055
1056   // Setup initial slow start threshold
1057   _snd.ssthresh = th->window << _snd.window_scale;
1058 }
1059
1060 template <typename InetTraits>
1061 Packet tcp<InetTraits>::tcb::get_transmit_packet() {
1062   // easy case: empty queue
1063   if (_snd.unsent.empty()) {
1064     return Packet();
1065   }
1066   auto can_send = this->can_send();
1067   // Max number of TCP payloads we can pass to NIC
1068   uint32_t len;
1069   if (_tcp.get_hw_features().tx_tso) {
1070     // FIXME: Info tap device the size of the splitted packet
1071     len = _tcp.get_hw_features().max_packet_len - tcp_hdr_len_min - InetTraits::ip_hdr_len_min;
1072   } else {
1073     len = std::min(uint16_t(_tcp.get_hw_features().mtu - tcp_hdr_len_min - InetTraits::ip_hdr_len_min), _snd.mss);
1074   }
1075   can_send = std::min(can_send, len);
1076   // easy case: one small packet
1077   if (_snd.unsent.front().len() <= can_send) {
1078     auto p = std::move(_snd.unsent.front());
1079     _snd.unsent.pop_front();
1080     _snd.unsent_len -= p.len();
1081     return p;
1082   }
1083   // moderate case: need to split one packet
1084   if (_snd.unsent.front().len() > can_send) {
1085     auto p = _snd.unsent.front().share(0, can_send);
1086     _snd.unsent.front().trim_front(can_send);
1087     _snd.unsent_len -= p.len();
1088     return p;
1089   }
1090   // hard case: merge some packets, possibly split last
1091   auto p = std::move(_snd.unsent.front());
1092   _snd.unsent.pop_front();
1093   can_send -= p.len();
1094   while (!_snd.unsent.empty()
1095          && _snd.unsent.front().len() <= can_send) {
1096     can_send -= _snd.unsent.front().len();
1097     p.append(std::move(_snd.unsent.front()));
1098     _snd.unsent.pop_front();
1099   }
1100   // FIXME: this will result in calling "deleter" of packet which free managed objects
1101   // will used later
1102   // if (!_snd.unsent.empty() && can_send) {
1103   //   auto& q = _snd.unsent.front();
1104   //   p.append(q.share(0, can_send));
1105   //   q.trim_front(can_send);
1106   // }
1107   _snd.unsent_len -= p.len();
1108   return p;
1109 }
1110
1111 template <typename InetTraits>
1112 void tcp<InetTraits>::tcb::output_one(bool data_retransmit) {
1113   if (in_state(CLOSED)) {
1114     return;
1115   }
1116
1117   Packet p = data_retransmit ? _snd.data.front().p.share() : get_transmit_packet();
1118   Packet clone = p.share();  // early clone to prevent share() from calling packet::unuse_internal_data() on header.
1119   uint16_t len = p.len();
1120   bool syn_on = syn_needs_on();
1121   bool ack_on = ack_needs_on();
1122
1123   auto options_size = _option.get_size(syn_on, ack_on);
1124   auto th = p.prepend_header<tcp_hdr>(options_size);
1125
1126   th->src_port = _local_port;
1127   th->dst_port = _foreign_port;
1128
1129   th->f_syn = syn_on;
1130   th->f_ack = ack_on;
1131   if (ack_on) {
1132     clear_delayed_ack();
1133   }
1134   th->f_urg = false;
1135   th->f_psh = false;
1136
1137   tcp_sequence seq;
1138   if (data_retransmit) {
1139     seq = _snd.unacknowledged;
1140   } else {
1141     seq = syn_on ? _snd.initial : _snd.next;
1142     _snd.next += len;
1143   }
1144   th->seq = seq;
1145   th->ack = _rcv.next;
1146   th->data_offset = (sizeof(*th) + options_size) / 4;
1147   th->window = _rcv.window >> _rcv.window_scale;
1148   th->checksum = 0;
1149
1150   // FIXME: does the FIN have to fit in the window?
1151   bool fin_on = fin_needs_on();
1152   th->f_fin = fin_on;
1153
1154   // Add tcp options
1155   _option.fill(th, options_size);
1156   *th = th->hton();
1157
1158   offload_info oi;
1159   checksummer csum;
1160   uint16_t pseudo_hdr_seg_len = 0;
1161
1162   oi.tcp_hdr_len = sizeof(tcp_hdr) + options_size;
1163
1164   if (_tcp.get_hw_features().tx_csum_l4_offload) {
1165     oi.needs_csum = true;
1166
1167     //
1168     // tx checksum offloading: both virtio-net's VIRTIO_NET_F_CSUM dpdk's
1169     // PKT_TX_TCP_CKSUM - requires th->checksum to be initialized to ones'
1170     // complement sum of the pseudo header.
1171     //
1172     // For TSO the csum should be calculated for a pseudo header with
1173     // segment length set to 0. All the rest is the same as for a TCP Tx
1174     // CSUM offload case.
1175     //
1176     if (_tcp.get_hw_features().tx_tso && len > _snd.mss) {
1177       oi.tso_seg_size = _snd.mss;
1178     } else {
1179       pseudo_hdr_seg_len = sizeof(*th) + options_size + len;
1180     }
1181   } else {
1182     pseudo_hdr_seg_len = sizeof(*th) + options_size + len;
1183     oi.needs_csum = false;
1184   }
1185
1186   InetTraits::tcp_pseudo_header_checksum(csum, _local_ip, _foreign_ip,
1187                                          pseudo_hdr_seg_len);
1188
1189   if (_tcp.get_hw_features().tx_csum_l4_offload) {
1190     th->checksum = ~csum.get();
1191   } else {
1192     csum.sum(p);
1193     th->checksum = csum.get();
1194   }
1195
1196   oi.protocol = ip_protocol_num::tcp;
1197
1198   p.set_offload_info(oi);
1199
1200   if (!data_retransmit && (len || syn_on || fin_on)) {
1201     auto now = clock_type::now();
1202     if (len) {
1203       unsigned nr_transmits = 0;
1204       _snd.data.emplace_back(unacked_segment{std::move(clone),
1205                                              len, nr_transmits, now});
1206     }
1207     if (!retransmit_fd) {
1208       start_retransmit_timer();
1209     }
1210   }
1211
1212   queue_packet(std::move(p));
1213 }
1214
1215 template <typename InetTraits>
1216 bool tcp<InetTraits>::tcb::is_all_data_acked() {
1217   if (_snd.data.empty() && _snd.unsent_len == 0 && _snd.queued_len == 0) {
1218     return true;
1219   }
1220   return false;
1221 }
1222
1223 template <typename InetTraits>
1224 Tub<Packet> tcp<InetTraits>::tcb::read() {
1225   Tub<Packet> p;
1226   if (_rcv.data.empty())
1227     return p;
1228
1229   p.construct();
1230   for (auto&& q : _rcv.data) {
1231     p->append(std::move(q));
1232   }
1233   _rcv.data.clear();
1234   return p;
1235 }
1236
1237 template <typename InetTraits>
1238 int tcp<InetTraits>::tcb::send(Packet p) {
1239   // We can not send after the connection is closed
1240   assert(!_snd.closed);
1241
1242   if (in_state(CLOSED))
1243     return -ECONNRESET;
1244
1245   auto len = p.len();
1246   if (!_snd.user_queue_space.get_or_fail(len)) {
1247     // note: caller must ensure enough queue space to send
1248     ceph_abort();
1249   }
1250   // TODO: Handle p.len() > max user_queue_space case
1251   _snd.queued_len += len;
1252   _snd.unsent_len += len;
1253   _snd.queued_len -= len;
1254   _snd.unsent.push_back(std::move(p));
1255   if (can_send() > 0) {
1256     output();
1257   }
1258   return len;
1259 }
1260
1261 template <typename InetTraits>
1262 void tcp<InetTraits>::tcb::close() {
1263   if (in_state(CLOSED) || _snd.closed) {
1264     return ;
1265   }
1266   // TODO: We should make this asynchronous
1267
1268   _errno = -EPIPE;
1269   center->delete_file_event(fd, EVENT_READABLE|EVENT_WRITABLE);
1270   bool acked = is_all_data_acked();
1271   if (!acked) {
1272     _snd._all_data_acked_fd = manager.get_eventfd();
1273     center->create_file_event(_snd._all_data_acked_fd, EVENT_READABLE, all_data_ack_event);
1274   } else {
1275     close_final_cleanup();
1276   }
1277 }
1278
1279 template <typename InetTraits>
1280 bool tcp<InetTraits>::tcb::should_send_ack(uint16_t seg_len) {
1281   // We've received a TSO packet, do ack immediately
1282   if (seg_len > _rcv.mss) {
1283     _nr_full_seg_received = 0;
1284     if (_delayed_ack_fd) {
1285       center->delete_time_event(*_delayed_ack_fd);
1286       _delayed_ack_fd.destroy();
1287     }
1288     return true;
1289   }
1290
1291   // We've received a full sized segment, ack for every second full sized segment
1292   if (seg_len == _rcv.mss) {
1293     if (_nr_full_seg_received++ >= 1) {
1294       _nr_full_seg_received = 0;
1295       if (_delayed_ack_fd) {
1296         center->delete_time_event(*_delayed_ack_fd);
1297         _delayed_ack_fd.destroy();
1298       }
1299       return true;
1300     }
1301   }
1302
1303   // If the timer is armed and its callback hasn't been run.
1304   if (_delayed_ack_fd) {
1305     return false;
1306   }
1307
1308   // If the timer is not armed, schedule a delayed ACK.
1309   // The maximum delayed ack timer allowed by RFC1122 is 500ms, most
1310   // implementations use 200ms.
1311   _delayed_ack_fd.construct(center->create_time_event(200*1000, delayed_ack_event));
1312   return false;
1313 }
1314
1315 template <typename InetTraits>
1316 void tcp<InetTraits>::tcb::clear_delayed_ack() {
1317   if (_delayed_ack_fd) {
1318     center->delete_time_event(*_delayed_ack_fd);
1319     _delayed_ack_fd.destroy();
1320   }
1321 }
1322
1323 template <typename InetTraits>
1324 bool tcp<InetTraits>::tcb::merge_out_of_order() {
1325   bool merged = false;
1326   if (_rcv.out_of_order.map.empty()) {
1327     return merged;
1328   }
1329   for (auto it = _rcv.out_of_order.map.begin(); it != _rcv.out_of_order.map.end();) {
1330     auto& p = it->second;
1331     auto seg_beg = it->first;
1332     auto seg_len = p.len();
1333     auto seg_end = seg_beg + seg_len;
1334     if (seg_beg <= _rcv.next && seg_end > _rcv.next) {
1335       // This segment has been received out of order and its previous
1336       // segment has been received now
1337       auto trim = _rcv.next - seg_beg;
1338       if (trim) {
1339         p.trim_front(trim);
1340         seg_len -= trim;
1341       }
1342       _rcv.next += seg_len;
1343       _rcv.data.push_back(std::move(p));
1344       // Since c++11, erase() always returns the value of the following element
1345       it = _rcv.out_of_order.map.erase(it);
1346       merged = true;
1347     } else if (_rcv.next >= seg_end) {
1348       // This segment has been receive already, drop it
1349       it = _rcv.out_of_order.map.erase(it);
1350     } else {
1351       // seg_beg > _rcv.need, can not merge. Note, seg_beg can grow only,
1352       // so we can stop looking here.
1353       it++;
1354       break;
1355     }
1356   }
1357   return merged;
1358 }
1359
1360 template <typename InetTraits>
1361 void tcp<InetTraits>::tcb::insert_out_of_order(tcp_sequence seg, Packet p) {
1362   _rcv.out_of_order.merge(seg, std::move(p));
1363 }
1364
1365 template <typename InetTraits>
1366 void tcp<InetTraits>::tcb::trim_receive_data_after_window() {
1367   abort();
1368 }
1369
1370 template <typename InetTraits>
1371 void tcp<InetTraits>::tcb::fast_retransmit() {
1372   if (!_snd.data.empty()) {
1373     auto& unacked_seg = _snd.data.front();
1374     unacked_seg.nr_transmits++;
1375     retransmit_one();
1376     output();
1377   }
1378 }
1379
1380 template <typename InetTraits>
1381 void tcp<InetTraits>::tcb::update_rto(clock_type::time_point tx_time) {
1382   // Update RTO according to RFC6298
1383   auto R = std::chrono::duration_cast<std::chrono::microseconds>(clock_type::now() - tx_time);
1384   if (_snd.first_rto_sample) {
1385     _snd.first_rto_sample = false;
1386     // RTTVAR <- R/2
1387     // SRTT <- R
1388     _snd.rttvar = R / 2;
1389     _snd.srtt = R;
1390   } else {
1391     // RTTVAR <- (1 - beta) * RTTVAR + beta * |SRTT - R'|
1392     // SRTT <- (1 - alpha) * SRTT + alpha * R'
1393     // where alpha = 1/8 and beta = 1/4
1394     auto delta = _snd.srtt > R ? (_snd.srtt - R) : (R - _snd.srtt);
1395     _snd.rttvar = _snd.rttvar * 3 / 4 + delta / 4;
1396     _snd.srtt = _snd.srtt * 7 / 8 +  R / 8;
1397   }
1398   // RTO <- SRTT + max(G, K * RTTVAR)
1399   _rto =  _snd.srtt + std::max(_rto_clk_granularity, 4 * _snd.rttvar);
1400
1401   // Make sure 1 sec << _rto << 60 sec
1402   _rto = std::max(_rto, _rto_min);
1403   _rto = std::min(_rto, _rto_max);
1404 }
1405
1406 template <typename InetTraits>
1407 void tcp<InetTraits>::tcb::update_cwnd(uint32_t acked_bytes) {
1408   uint32_t smss = _snd.mss;
1409   if (_snd.cwnd < _snd.ssthresh) {
1410     // In slow start phase
1411     _snd.cwnd += std::min(acked_bytes, smss);
1412   } else {
1413     // In congestion avoidance phase
1414     uint32_t round_up = 1;
1415     _snd.cwnd += std::max(round_up, smss * smss / _snd.cwnd);
1416   }
1417 }
1418
1419
1420 template <typename InetTraits>
1421 void tcp<InetTraits>::tcb::cleanup() {
1422   manager.notify(fd, EVENT_READABLE);
1423   _snd.closed = true;
1424   _snd.unsent.clear();
1425   _snd.data.clear();
1426   _rcv.out_of_order.map.clear();
1427   _rcv.data.clear();
1428   stop_retransmit_timer();
1429   clear_delayed_ack();
1430   center->dispatch_event_external(new tcp<InetTraits>::C_actual_remove_tcb(this));
1431   remove_from_tcbs();
1432 }
1433
1434 template <typename InetTraits>
1435 tcp_sequence tcp<InetTraits>::tcb::get_isn() {
1436   // Per RFC6528, TCP SHOULD generate its Initial Sequence Numbers
1437   // with the expression:
1438   //   ISN = M + F(localip, localport, remoteip, remoteport, secretkey)
1439   //   M is the 4 microsecond timer
1440   using namespace std::chrono;
1441   uint32_t hash[4];
1442   hash[0] = _local_ip.ip;
1443   hash[1] = _foreign_ip.ip;
1444   hash[2] = (_local_port << 16) + _foreign_port;
1445   hash[3] = _isn_secret.key[15];
1446   CryptoPP::Weak::MD5::Transform(hash, _isn_secret.key);
1447   auto seq = hash[0];
1448   auto m = duration_cast<microseconds>(clock_type::now().time_since_epoch());
1449   seq += m.count() / 4;
1450   return make_seq(seq);
1451 }
1452
1453 template <typename InetTraits>
1454 Tub<typename InetTraits::l4packet> tcp<InetTraits>::tcb::get_packet() {
1455   _poll_active = false;
1456   if (_packetq.empty()) {
1457     output_one();
1458   }
1459
1460   Tub<typename InetTraits::l4packet> p;
1461   if (in_state(CLOSED)) {
1462     return p;
1463   }
1464
1465   assert(!_packetq.empty());
1466
1467   p = std::move(_packetq.front());
1468   _packetq.pop_front();
1469   if (!_packetq.empty() || (_snd.dupacks < 3 && can_send() > 0)) {
1470     // If there are packets to send in the queue or tcb is allowed to send
1471     // more add tcp back to polling set to keep sending. In addition, dupacks >= 3
1472     // is an indication that an segment is lost, stop sending more in this case.
1473     output();
1474   }
1475   return p;
1476 }
1477
1478 template <typename InetTraits>
1479 void tcp<InetTraits>::connection::close_read() {
1480   // do nothing
1481   // _tcb->manager.notify(_tcb->fd, EVENT_READABLE);
1482 }
1483
1484 template <typename InetTraits>
1485 void tcp<InetTraits>::connection::close_write() {
1486   _tcb->close();
1487 }
1488
1489 template <typename InetTraits>
1490 constexpr uint16_t tcp<InetTraits>::tcb::_max_nr_retransmit;
1491
1492 template <typename InetTraits>
1493 constexpr std::chrono::microseconds tcp<InetTraits>::tcb::_rto_min;
1494
1495 template <typename InetTraits>
1496 constexpr std::chrono::microseconds tcp<InetTraits>::tcb::_rto_max;
1497
1498 template <typename InetTraits>
1499 constexpr std::chrono::microseconds tcp<InetTraits>::tcb::_rto_clk_granularity;
1500
1501 template <typename InetTraits>
1502 typename tcp<InetTraits>::tcb::isn_secret tcp<InetTraits>::tcb::_isn_secret;
1503
1504
1505 #endif /* TCP_HH_ */