Merge "PROX generator: performance optimization (3/4)"
[samplevnf.git] / VNFs / DPPD-PROX / genl4_stream_tcp.c
1 /*
2 // Copyright (c) 2010-2017 Intel Corporation
3 //
4 // Licensed under the Apache License, Version 2.0 (the "License");
5 // you may not use this file except in compliance with the License.
6 // You may obtain a copy of the License at
7 //
8 //     http://www.apache.org/licenses/LICENSE-2.0
9 //
10 // Unless required by applicable law or agreed to in writing, software
11 // distributed under the License is distributed on an "AS IS" BASIS,
12 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 // See the License for the specific language governing permissions and
14 // limitations under the License.
15 */
16
17 #include <rte_cycles.h>
18 #include <rte_ether.h>
19 #include <rte_eth_ctrl.h>
20
21 #include "log.h"
22 #include "genl4_stream_tcp.h"
23 #include "prox_assert.h"
24 #include "mbuf_utils.h"
25
26 static uint64_t tcp_retx_timeout(const struct stream_ctx *ctx)
27 {
28         uint64_t delay = token_time_tsc_until_full(&ctx->token_time_other);
29
30         return delay + ctx->stream_cfg->tsc_timeout;
31 }
32
33 static uint64_t tcp_resched_timeout(const struct stream_ctx *ctx)
34 {
35         uint64_t delay = token_time_tsc_until_full(&ctx->token_time);
36
37         return delay;
38 }
39
40 static void tcp_retx_timeout_start(struct stream_ctx *ctx, uint64_t *next_tsc)
41 {
42         uint64_t now = rte_rdtsc();
43
44         *next_tsc = tcp_retx_timeout(ctx);
45         ctx->sched_tsc = now + *next_tsc;
46 }
47
48 static int tcp_retx_timeout_occured(const struct stream_ctx *ctx, uint64_t now)
49 {
50         return ctx->sched_tsc < now;
51 }
52
53 static void tcp_retx_timeout_resume(const struct stream_ctx *ctx, uint64_t now, uint64_t *next_tsc)
54 {
55         *next_tsc = ctx->sched_tsc - now;
56 }
57
58 static void tcp_set_retransmit(struct stream_ctx *ctx)
59 {
60         ctx->retransmits++;
61 }
62
63 struct tcp_option {
64         uint8_t kind;
65         uint8_t len;
66 } __attribute__((packed));
67
68 void stream_tcp_create_rst(struct rte_mbuf *mbuf, struct l4_meta *l4_meta, struct pkt_tuple *tuple)
69 {
70         struct tcp_hdr *tcp = (struct tcp_hdr *)l4_meta->l4_hdr;
71         struct ipv4_hdr *ip = ((struct ipv4_hdr *)tcp) - 1;
72
73         ip->src_addr = tuple->dst_addr;
74         ip->dst_addr = tuple->src_addr;
75
76         tcp->dst_port = tuple->src_port;
77         tcp->src_port = tuple->dst_port;
78
79         ip->total_length = rte_bswap16(sizeof(struct ipv4_hdr) + sizeof(struct tcp_hdr));
80         tcp->tcp_flags = TCP_RST_FLAG;
81         tcp->data_off = ((sizeof(struct tcp_hdr) / 4) << 4);
82         rte_pktmbuf_pkt_len(mbuf) = l4_meta->payload - rte_pktmbuf_mtod(mbuf, uint8_t *);
83         rte_pktmbuf_data_len(mbuf) = l4_meta->payload - rte_pktmbuf_mtod(mbuf, uint8_t *);
84 }
85
86 static void create_tcp_pkt(struct stream_ctx *ctx, struct rte_mbuf *mbuf, uint8_t tcp_flags, int data_beg, int data_len)
87 {
88         uint8_t *pkt;
89
90         const struct peer_action *act = &ctx->stream_cfg->actions[ctx->cur_action];
91         const struct stream_cfg *stream_cfg = ctx->stream_cfg;
92
93         pkt = rte_pktmbuf_mtod(mbuf, uint8_t *);
94         rte_memcpy(pkt, stream_cfg->data[act->peer].hdr, stream_cfg->data[act->peer].hdr_len);
95
96         struct ipv4_hdr *l3_hdr = (struct ipv4_hdr*)&pkt[stream_cfg->data[act->peer].hdr_len - sizeof(struct ipv4_hdr)];
97         struct tcp_hdr *l4_hdr = (struct tcp_hdr *)&pkt[stream_cfg->data[act->peer].hdr_len];
98
99         l3_hdr->src_addr = ctx->tuple->dst_addr;
100         l3_hdr->dst_addr = ctx->tuple->src_addr;
101         l3_hdr->next_proto_id = IPPROTO_TCP;
102
103         l4_hdr->src_port = ctx->tuple->dst_port;
104         l4_hdr->dst_port = ctx->tuple->src_port;
105
106         uint32_t tcp_len = sizeof(struct tcp_hdr);
107         uint32_t tcp_payload_len = 0;
108         uint32_t seq_len = 0;
109         struct tcp_option *tcp_op;
110
111         if (tcp_flags & TCP_RST_FLAG) {
112                 tcp_flags |= TCP_RST_FLAG;
113                 seq_len = 1;
114         }
115         else if (tcp_flags & TCP_SYN_FLAG) {
116                 tcp_flags |= TCP_SYN_FLAG;
117                 /* Window scaling */
118
119                 /* TODO: make options come from the stream. */
120                 tcp_op = (struct tcp_option *)(l4_hdr + 1);
121
122                 tcp_op->kind = 2;
123                 tcp_op->len = 4;
124                 *(uint16_t *)(tcp_op + 1) = rte_bswap16(1460); /* TODO: Save this in this_mss */
125
126                 tcp_len += 4;
127                 seq_len = 1;
128
129                 ctx->seq_first_byte = ctx->ackd_seq + 1;
130         }
131         else if (tcp_flags & TCP_FIN_FLAG) {
132                 tcp_flags |= TCP_FIN_FLAG;
133                 seq_len = 1;
134         }
135
136         if (tcp_flags & TCP_ACK_FLAG) {
137                 l4_hdr->recv_ack = rte_bswap32(ctx->recv_seq);
138                 tcp_flags |= TCP_ACK_FLAG;
139         }
140         else
141                 l4_hdr->recv_ack = 0;
142
143         uint16_t l4_payload_offset = stream_cfg->data[act->peer].hdr_len + tcp_len;
144
145         if (data_len) {
146                 seq_len = data_len;
147                 plogx_dbg("l4 payload offset = %d\n", l4_payload_offset);
148                 rte_memcpy(pkt + l4_payload_offset, stream_cfg->data[act->peer].content + data_beg, data_len);
149         }
150
151         l4_hdr->sent_seq = rte_bswap32(ctx->next_seq);
152         l4_hdr->tcp_flags = tcp_flags; /* SYN */
153         l4_hdr->rx_win = rte_bswap16(0x3890); // TODO: make this come from stream (config)
154         //l4_hdr->cksum = ...;
155         l4_hdr->tcp_urp = 0;
156         l4_hdr->data_off = ((tcp_len / 4) << 4); /* Highest 4 bits are TCP header len in units of 32 bit words */
157
158         /* ctx->next_seq = ctx->ackd_seq + seq_len; */
159         ctx->next_seq += seq_len;
160
161         /* No payload after TCP header. */
162         rte_pktmbuf_pkt_len(mbuf)  = l4_payload_offset + data_len;
163         rte_pktmbuf_data_len(mbuf) = l4_payload_offset + data_len;
164
165         l3_hdr->total_length = rte_bswap16(sizeof(struct ipv4_hdr) + tcp_len + data_len);
166         plogdx_dbg(mbuf, NULL);
167
168         plogx_dbg("put tcp packet with flags: %s%s%s, (len = %d, seq = %d, ack =%d)\n",
169                   tcp_flags & TCP_SYN_FLAG? "SYN ":"",
170                   tcp_flags & TCP_ACK_FLAG? "ACK ":"",
171                   tcp_flags & TCP_FIN_FLAG? "FIN ":"",
172                   data_len, rte_bswap32(l4_hdr->sent_seq), rte_bswap32(l4_hdr->recv_ack));
173 }
174
175 /* Get the length of the reply associated for the next packet. Note
176    that the packet will come from the other peer. In case the next
177    packet belongs to the current peer (again), the reply length will
178    be that of an empty TCP packet (i.e. the ACK). */
179 uint16_t stream_tcp_reply_len(struct stream_ctx *ctx)
180 {
181         if (stream_tcp_is_ended(ctx))
182                 return 0;
183         else if (ctx->tcp_state != ESTABLISHED) {
184                 if (ctx->tcp_state == SYN_SENT || ctx->tcp_state == LISTEN) {
185                         /* First packet received is a SYN packet. In
186                            the current implementation this packet
187                            contains the TCP option field to set the
188                            MSS. For this, add 4 bytes. */
189                         return ctx->stream_cfg->data[!ctx->peer].hdr_len + sizeof(struct tcp_hdr) + 4;
190                 }
191                 return ctx->stream_cfg->data[!ctx->peer].hdr_len + sizeof(struct tcp_hdr);
192         }
193         else if (ctx->stream_cfg->actions[ctx->cur_action].peer == ctx->peer) {
194                 /* The reply _could_ (due to races, still possibly
195                    receive an old ack) contain data. This means that
196                    in some cases, the prediction of the reply size
197                    will be an overestimate. */
198                 uint32_t data_beg = ctx->next_seq - ctx->seq_first_byte;
199                 const struct peer_action *act = &ctx->stream_cfg->actions[ctx->cur_action];
200
201                 uint32_t remaining_len = act->len - (data_beg - act->beg);
202
203                 if (remaining_len == 0) {
204                         if (ctx->cur_action + 1 != ctx->stream_cfg->n_actions) {
205                                 if (ctx->stream_cfg->actions[ctx->cur_action + 1].peer == ctx->peer)
206                                         return ctx->stream_cfg->data[ctx->peer].hdr_len + sizeof(struct tcp_hdr);
207                                 else {
208                                         uint32_t seq_beg = ctx->recv_seq - ctx->other_seq_first_byte;
209                                         uint32_t end = ctx->stream_cfg->actions[ctx->cur_action + 1].beg +
210                                                 ctx->stream_cfg->actions[ctx->cur_action + 1].len;
211                                         uint32_t remaining = end - seq_beg;
212                                         uint16_t data_len = remaining > 1460? 1460: remaining;
213
214                                         return ctx->stream_cfg->data[!ctx->peer].hdr_len + sizeof(struct tcp_hdr) + data_len;
215                                 }
216                         }
217                         else {
218                                 return ctx->stream_cfg->data[ctx->peer].hdr_len + sizeof(struct tcp_hdr);
219                         }
220                 }
221                 else {
222                         return ctx->stream_cfg->data[ctx->peer].hdr_len + sizeof(struct tcp_hdr);
223                 }
224         }
225         else if (ctx->stream_cfg->actions[ctx->cur_action].peer != ctx->peer) {
226                 uint32_t seq_beg = ctx->recv_seq - ctx->other_seq_first_byte;
227                 uint32_t end = ctx->stream_cfg->actions[ctx->cur_action].beg +
228                         ctx->stream_cfg->actions[ctx->cur_action].len;
229                 uint32_t remaining = end - seq_beg;
230                 uint16_t data_len = remaining > 1460? 1460: remaining;
231
232                 return ctx->stream_cfg->data[!ctx->peer].hdr_len + sizeof(struct tcp_hdr) + data_len;
233         }
234         else
235                 return ctx->stream_cfg->data[ctx->peer].hdr_len + sizeof(struct tcp_hdr);
236 }
237
238 static void stream_tcp_proc_in_order_data(struct stream_ctx *ctx, struct l4_meta *l4_meta, int *progress_seq)
239 {
240         plogx_dbg("Got data with seq %d (as expected), with len %d\n", ctx->recv_seq, l4_meta->len);
241
242         if (!l4_meta->len)
243                 return;
244
245         const struct peer_action *act = &ctx->stream_cfg->actions[ctx->cur_action];
246         enum l4gen_peer peer = act->peer;
247         /* Since we have received the expected sequence number, the start address will not exceed the cfg memory buffer. */
248         uint8_t *content = ctx->stream_cfg->data[peer].content;
249         uint32_t seq_beg = ctx->recv_seq - ctx->other_seq_first_byte;
250         uint32_t end = ctx->stream_cfg->actions[ctx->cur_action].beg + ctx->stream_cfg->actions[ctx->cur_action].len;
251         uint32_t remaining = end - seq_beg;
252
253         if (l4_meta->len > remaining) {
254                 plogx_err("Provided data is too long:\n");
255                 plogx_err("action.beg = %d, action.len = %d", act->beg, act->len);
256                 plogx_err("tcp seq points at %d in action, l4_meta->len = %d\n", seq_beg, l4_meta->len);
257         }
258         else {
259                 if (memcmp(content + seq_beg, l4_meta->payload, l4_meta->len) == 0) {
260                         plogx_dbg("Good payload in %d: %u -> %u\n", ctx->cur_action, ctx->recv_seq, l4_meta->len);
261                         ctx->recv_seq += l4_meta->len;
262                         ctx->cur_pos[peer] += l4_meta->len;
263                         /* Move forward only when this was the last piece of data within current action (i.e. end of received data == end of action data). */
264                         if (seq_beg + l4_meta->len == act->beg + act->len) {
265                                 plogx_dbg("Got last piece in action %d\n", ctx->cur_action);
266                                 ctx->cur_action++;
267                         }
268                         else {
269                                 plogx_dbg("Got data from %d with len %d, but waiting for more (tot len = %d)!\n", seq_beg, l4_meta->len, act->len);
270                         }
271                         *progress_seq = 1;
272                         ctx->flags |= STREAM_CTX_F_NEW_DATA;
273                 }
274                 else {
275                         plogx_err("ackable = %d, ackd = %d\n", ctx->ackable_data_seq ,ctx->ackd_seq);
276                         plogx_err("Bad payload action[%d]{.len = %d, .peer  = %s}\n", ctx->cur_action, act->len, peer == PEER_SERVER? "s" : "c");
277                         plogx_err("   pkt payload len = %d, beginning at %u\n", l4_meta->len, seq_beg);
278                         /* plogx_err("   Payload starts %zu bytes after beginning of l4_hdr\n", l4_meta->payload - l4_meta->l4_hdr); */
279
280                         plogx_err("   payload[0-3] = %02x %02x %02x %02x\n",
281                                   l4_meta->payload[0],
282                                   l4_meta->payload[1],
283                                   l4_meta->payload[2],
284                                   l4_meta->payload[3]);
285                         plogx_err("   expect[0-3]  = %02x %02x %02x %02x\n",
286                                   content[seq_beg + 0],
287                                   content[seq_beg + 1],
288                                   content[seq_beg + 2],
289                                   content[seq_beg + 3]);
290                 }
291         }
292 }
293
294 static int stream_tcp_proc_in(struct stream_ctx *ctx, struct l4_meta *l4_meta)
295 {
296         struct tcp_hdr *tcp = NULL;
297         int got_syn = 0;
298         int got_ack = 0;
299         int got_fin = 0;
300         int got_rst = 0;
301
302         tcp = (struct tcp_hdr *)l4_meta->l4_hdr;
303
304         got_syn = tcp->tcp_flags & TCP_SYN_FLAG;
305         got_ack = tcp->tcp_flags & TCP_ACK_FLAG;
306         got_fin = tcp->tcp_flags & TCP_FIN_FLAG;
307         got_rst = tcp->tcp_flags & TCP_RST_FLAG;
308         plogx_dbg("TCP, flags: %s%s%s, (len = %d, seq = %d, ack =%d)\n", got_syn? "SYN ":"", got_ack? "ACK ":"", got_fin? "FIN " : "", l4_meta->len, rte_bswap32(tcp->sent_seq), rte_bswap32(tcp->recv_ack));
309
310         if (got_syn)
311                 ctx->flags |= STREAM_CTX_F_TCP_GOT_SYN;
312         if (got_fin)
313                 ctx->flags |= STREAM_CTX_F_TCP_GOT_FIN;
314
315         int progress_ack = 0, progress_seq = 0;
316
317         /* RST => other side wants to terminate due to
318            inconsitent state (example: delay of retransmit of
319            last ACK while other side already closed the
320            connection. The other side will accept the packet
321            as a beginning of a new connection but there will
322            be no SYN. ) */
323         if (got_rst) {
324                 plogx_dbg("got rst\n");
325                 ctx->flags |= STREAM_CTX_F_TCP_ENDED;
326                 return -1;
327         }
328
329         if (got_ack) {
330                 uint32_t ackd_seq = rte_bswap32(tcp->recv_ack);
331
332                 if (ackd_seq > ctx->ackd_seq) {
333                         plogx_dbg("Got ACK for outstanding data, from %d to %d\n", ctx->ackd_seq, ackd_seq);
334                         ctx->ackd_seq = ackd_seq;
335                         plogx_dbg("ackable data = %d\n", ctx->ackable_data_seq);
336                         /* Ackable_data_seq set to byte after
337                            current action. */
338                         if (ctx->ackable_data_seq == ctx->ackd_seq) {
339                                 /* Due to retransmit in
340                                    combination with late acks,
341                                    is is possible to ack
342                                    future data. In this case,
343                                    the assumption that data
344                                    was lost is not true and
345                                    the next seq is moved
346                                    forward. */
347                                 if (ctx->next_seq < ctx->ackable_data_seq) {
348                                         ctx->next_seq = ctx->ackable_data_seq;
349                                 }
350
351                                 ctx->ackable_data_seq = 0;
352                                 const struct stream_cfg *stream_cfg = ctx->stream_cfg;
353                                 const struct peer_action *act = &stream_cfg->actions[ctx->cur_action];
354
355                                 ctx->cur_pos[act->peer] += act->len;
356                                 ctx->cur_action++;
357                                 plogx_dbg("Moving to next action %u\n", ctx->ackd_seq);
358                         }
359                         progress_ack = 1;
360                 }
361                 else {
362                         plogx_dbg("Old data acked: acked = %d, ackable =%d\n", ackd_seq, ctx->ackd_seq);
363                 }
364         }
365
366         uint32_t seq = rte_bswap32(tcp->sent_seq);
367
368         /* update recv_seq. */
369         if (got_syn) {
370                 /* When a syn is received, immediately reset recv_seq based on seq from packet. */
371                 ctx->recv_seq = seq + 1;
372                 /* Syn packets have length 1, so the first real data will start after that. */
373                 ctx->other_seq_first_byte = seq + 1;
374                 progress_seq = 1;
375         }
376         else if (got_fin) {
377                 if (ctx->recv_seq == seq) {
378                         plogx_dbg("Got fin with correct seq\n");
379                         ctx->recv_seq = seq + 1;
380                         progress_seq = 1;
381                 }
382                 else {
383                         plogx_dbg("Got fin but incorrect seq\n");
384                 }
385         }
386         else {
387                 /* Only expect in-order packets. */
388                 if (ctx->recv_seq == seq) {
389                         stream_tcp_proc_in_order_data(ctx, l4_meta, &progress_seq);
390                 }
391                 else if (ctx->recv_seq < seq) {
392                         plogx_dbg("Future data received (got = %d, expected = %d), missing data! (data ignored)\n", seq, ctx->recv_seq);
393                 }
394                 else {
395                         plogx_dbg("Old data received again (state = %s)\n", tcp_state_to_str(ctx->tcp_state));
396                         plogx_dbg("expecting seq %d, got seq %d, len = %d\n",ctx->recv_seq, seq, l4_meta->len);
397                         plogx_dbg("ackd_seq = %d, next_seq = %d, action = %d\n", ctx->ackd_seq, ctx->next_seq, ctx->cur_action);
398                 }
399         }
400
401         /* parse options */
402         if (((tcp->data_off >> 4)*4) > sizeof(struct tcp_hdr)) {
403                 struct tcp_option *tcp_op = (struct tcp_option *)(tcp + 1);
404                 uint8_t *payload = (uint8_t *)tcp + ((tcp->data_off >> 4)*4);
405
406                 do {
407                         if (tcp_op->kind == 2 && tcp_op->len == 4) {
408                                 uint16_t mss = rte_bswap16(*(uint16_t *)(tcp_op + 1));
409                                 ctx->other_mss = mss;
410                         }
411
412                         tcp_op = (struct tcp_option *)(((uint8_t*)tcp_op) + tcp_op->len);
413                 } while (((uint8_t*)tcp_op) < payload);
414         }
415
416         if (progress_ack || progress_seq) {
417                 ctx->same_state = 0;
418                 ctx->flags |= STREAM_CTX_F_LAST_RX_PKT_MADE_PROGRESS;
419         }
420         else {
421                 ctx->flags &= ~STREAM_CTX_F_LAST_RX_PKT_MADE_PROGRESS;
422         }
423         return 0;
424 }
425
426 static int stream_tcp_proc_out_closed(struct stream_ctx *ctx, struct rte_mbuf *mbuf, uint64_t *next_tsc)
427 {
428         uint64_t wait_tsc = token_time_tsc_until_full(&ctx->token_time);
429
430         if (wait_tsc != 0) {
431                 *next_tsc = wait_tsc;
432                 return -1;
433         }
434
435         /* create SYN packet in mbuf, return 0. goto SYN_SENT, set timeout */
436         ctx->tcp_state = SYN_SENT;
437
438         /* Initialize: */
439         ctx->next_seq = 99;
440         ctx->ackd_seq = 99;
441
442         create_tcp_pkt(ctx, mbuf, TCP_SYN_FLAG, 0, 0);
443         token_time_take(&ctx->token_time, mbuf_wire_size(mbuf));
444         *next_tsc = tcp_retx_timeout(ctx);
445         return 0;
446 }
447
448 static int stream_tcp_proc_out_listen(struct stream_ctx *ctx, struct rte_mbuf *mbuf, uint64_t *next_tsc)
449 {
450         uint64_t wait_tsc = token_time_tsc_until_full(&ctx->token_time);
451
452         if (wait_tsc != 0) {
453                 *next_tsc = wait_tsc;
454                 return -1;
455         }
456
457         if (!(ctx->flags & STREAM_CTX_F_TCP_GOT_SYN)) {
458                 // TODO: keep connection around at end to catch retransmits from client
459                 plogx_dbg("Got packet while listening without SYN (will send RST)\n");
460                 pkt_tuple_debug(ctx->tuple);
461
462                 ctx->flags |= STREAM_CTX_F_TCP_ENDED;
463                 create_tcp_pkt(ctx, mbuf, TCP_RST_FLAG, 0, 0);
464                 token_time_take(&ctx->token_time, mbuf_wire_size(mbuf));
465                 *next_tsc = tcp_retx_timeout(ctx);
466                 return 0;
467         }
468
469         /* if syn received _now_, send ack + syn. goto SYN_RECEIVED. */
470         plogx_dbg("Got packet while listen\n");
471
472         ctx->next_seq = 200;
473         ctx->ackd_seq = 200;
474
475         ctx->tcp_state = SYN_RECEIVED;
476
477         create_tcp_pkt(ctx, mbuf, TCP_SYN_FLAG | TCP_ACK_FLAG, 0, 0);
478         token_time_take(&ctx->token_time, mbuf_wire_size(mbuf));
479         *next_tsc = tcp_retx_timeout(ctx);
480         return 0;
481 }
482
483 static int stream_tcp_proc_out_syn_sent(struct stream_ctx *ctx, struct rte_mbuf *mbuf, uint64_t *next_tsc)
484 {
485         uint64_t wait_tsc = token_time_tsc_until_full(&ctx->token_time);
486
487         if (wait_tsc != 0) {
488                 *next_tsc = wait_tsc;
489                 return -1;
490         }
491
492         if (ctx->ackd_seq < ctx->next_seq || !(ctx->flags & STREAM_CTX_F_TCP_GOT_SYN)) {
493                 plogx_dbg("Retransmit SYN\n");
494                 /* Did not get packet, send syn again and keep state (waiting for ACK). */
495                 ++ctx->same_state;
496                 tcp_set_retransmit(ctx);
497                 return stream_tcp_proc_out_closed(ctx, mbuf, next_tsc);
498         }
499
500         plogx_dbg("SYN_SENT and everything ACK'ed\n");
501         plogx_dbg("ackd_seq = %d, next_seq = %d\n", ctx->ackd_seq, ctx->next_seq);
502
503         /* If syn received for this stream, send ack and goto
504            ESTABLISHED. If first peer is this peer to send actual
505            data, schedule immediately. */
506
507         ctx->same_state = 0;
508         ctx->tcp_state = ESTABLISHED;
509
510         /* third packet of three-way handshake will also contain
511            data. Don't send separate ACK yet. TODO: only send ACK if
512            data has not yet been ACK'ed. */
513         if (ctx->stream_cfg->actions[ctx->cur_action].peer == ctx->peer) {
514                 *next_tsc = tcp_resched_timeout(ctx);
515                 plogx_dbg("immediately resched (%d)\n", ctx->cur_action);
516                 return -1;
517         }
518         else {
519                 create_tcp_pkt(ctx, mbuf, TCP_ACK_FLAG, 0, 0);
520                 token_time_take(&ctx->token_time, mbuf_wire_size(mbuf));
521                 *next_tsc = tcp_retx_timeout(ctx);
522         }
523         return 0;
524 }
525
526 static int stream_tcp_proc_out_syn_recv(struct stream_ctx *ctx, struct rte_mbuf *mbuf, uint64_t *next_tsc)
527 {
528         uint64_t wait_tsc = token_time_tsc_until_full(&ctx->token_time);
529
530         if (wait_tsc != 0) {
531                 *next_tsc = wait_tsc;
532                 return -1;
533         }
534
535         if (ctx->ackd_seq == ctx->next_seq) {
536                 /* Possible from server side with ctx->cur_action == 1
537                    if the current packet received had ACK for syn from
538                    server to client and also data completing the first
539                    action. */
540
541                 ctx->same_state = 0;
542                 ctx->tcp_state = ESTABLISHED;
543                 if (ctx->stream_cfg->actions[ctx->cur_action].peer != ctx->peer) {
544                         create_tcp_pkt(ctx, mbuf, TCP_ACK_FLAG, 0, 0);
545                         token_time_take(&ctx->token_time, mbuf_wire_size(mbuf));
546                         *next_tsc = tcp_retx_timeout(ctx);
547                         return 0;
548                 }
549                 else {
550                         /* While at this point, an ACK without data
551                            any could be sent by the server, it is not
552                            really required because the next pacekt
553                            after reschedule will also contain an ACK
554                            along with new data.
555
556                            In this implementation, if this is the
557                            case, the client is not only expecting an
558                            ACK, but also actual data. For this reason,
559                            the empty ACK packet should not be sent,
560                            otherwise the client will retransmit its
561                            data.
562                         */
563
564                         /* create_tcp_pkt(ctx, mbuf, TCP_ACK_FLAG, 0, 0); */
565                         /* token_time_take(&ctx->token_time, mbuf_wire_size(mbuf)); */
566                         *next_tsc = tcp_resched_timeout(ctx);
567                         return -1;
568                 }
569         }
570         else {
571                 /* Either this portion is executed due to a time-out
572                    or due to packet reception, the SYN that has been
573                    sent is not yet ACK'ed. So, retransmit the SYN/ACK. */
574                 plogx_dbg("Retransmit SYN/ACK\n");
575                 ++ctx->same_state;
576                 tcp_set_retransmit(ctx);
577                 ctx->next_seq = ctx->ackd_seq;
578                 create_tcp_pkt(ctx, mbuf, TCP_SYN_FLAG | TCP_ACK_FLAG, 0, 0);
579                 token_time_take(&ctx->token_time, mbuf_wire_size(mbuf));
580                 *next_tsc = tcp_retx_timeout(ctx);
581                 return 0;
582         }
583 }
584
585 static int stream_tcp_proc_out_estab_tx(struct stream_ctx *ctx, struct rte_mbuf *mbuf, uint64_t *next_tsc)
586 {
587         uint64_t wait_tsc = token_time_tsc_until_full(&ctx->token_time);
588
589         if (wait_tsc != 0) {
590                 *next_tsc = wait_tsc;
591                 return -1;
592         }
593
594         const struct peer_action *act = &ctx->stream_cfg->actions[ctx->cur_action];
595
596         if (act->len == 0) {
597                 plogx_dbg("Closing connection\n");
598                 /* This would be an ACK combined with FIN. To
599                    send a separate ack. keep the state in
600                    established, put_ack and expire
601                    immediately*/
602                 plogx_dbg("Moving to FIN_WAIT\n");
603                 ctx->tcp_state = FIN_WAIT;
604                 ctx->same_state = 0;
605                 create_tcp_pkt(ctx, mbuf, TCP_FIN_FLAG | TCP_ACK_FLAG, 0, 0);
606                 token_time_take(&ctx->token_time, mbuf_wire_size(mbuf));
607                 *next_tsc = tcp_retx_timeout(ctx);
608                 return 0;
609         }
610         /* remaining_len2 will be zero, while in case of
611            act->len == 0, the connection can be closed
612            immediately. */
613
614         plogx_dbg("This peer to send!\n");
615         uint32_t outstanding_bytes = ctx->next_seq - ctx->ackd_seq;
616
617         uint32_t data_beg2 = ctx->next_seq - ctx->seq_first_byte;
618         uint32_t remaining_len2 = act->len - (data_beg2 - act->beg);
619
620         const uint32_t rx_win = 300000;
621         /* If still data to be sent and allowed by outstanding amount */
622         if (outstanding_bytes <= rx_win && remaining_len2) {
623                 plogx_dbg("Outstanding bytes = %d, and remaining_len = %d, next_seq = %d\n", outstanding_bytes, remaining_len2, ctx->next_seq);
624
625                 if (ctx->ackable_data_seq == 0) {
626                         PROX_ASSERT(outstanding_bytes == 0);
627
628                         ctx->ackable_data_seq = ctx->next_seq + act->len;
629                 }
630                 else
631                         plogx_dbg("This will not be the first part of the data within an action\n");
632         }
633         /* still data yet to be acked || still data to be sent but blocked by RX win. */
634         else {
635                 if (ctx->flags & STREAM_CTX_F_MORE_DATA) {
636                         /* Don't send any packet. */
637                         ctx->flags &= ~STREAM_CTX_F_MORE_DATA;
638                         *next_tsc = tcp_retx_timeout(ctx);
639                         ctx->sched_tsc = rte_rdtsc() + *next_tsc;
640                         return -1;
641                 }
642                 else {
643                         uint64_t now = rte_rdtsc();
644
645                         if ((ctx->flags & STREAM_CTX_F_LAST_RX_PKT_MADE_PROGRESS) && token_time_tsc_until_full(&ctx->token_time_other) != 0) {
646                                 tcp_retx_timeout_start(ctx, next_tsc);
647                                 ctx->flags &= ~STREAM_CTX_F_LAST_RX_PKT_MADE_PROGRESS;
648                                 return -1;
649                         }
650                         /* This function might be called due to packet
651                            reception. In that case, cancel here and
652                            wait until the timeout really occurs before
653                            reTX. */
654                         if (!tcp_retx_timeout_occured(ctx, now)) {
655                                 tcp_retx_timeout_resume(ctx, now, next_tsc);
656                                 return -1;
657                         }
658
659                         ctx->same_state++;
660                         tcp_set_retransmit(ctx);
661                         /* This possibly means that now retransmit is resumed half-way in the action. */
662                         plogx_dbg("Retransmit: outstanding = %d\n", outstanding_bytes);
663                         plogx_dbg("Assuming %d->%d lost\n", ctx->ackd_seq, ctx->next_seq);
664                         ctx->next_seq = ctx->ackd_seq;
665                         plogx_dbg("highest seq from other side = %d\n", ctx->recv_seq);
666                 }
667                 /* When STREAM_CTX_F_MORE_DATA is set, real timeouts
668                    can't occur. If this is needed, timeouts
669                    need to carry additional information. */
670         }
671
672         /* The following code will retransmit the same data if next_seq is not moved forward. */
673         uint32_t data_beg = ctx->next_seq - ctx->seq_first_byte;
674         uint32_t remaining_len = act->len - (data_beg - act->beg);
675         uint32_t data_len = remaining_len > ctx->other_mss? ctx->other_mss: remaining_len;
676         if (data_len == 0)
677                 plogx_warn("data_len == 0\n");
678
679         if (remaining_len > ctx->other_mss)
680                 ctx->flags |= STREAM_CTX_F_MORE_DATA;
681         else
682                 ctx->flags &= ~STREAM_CTX_F_MORE_DATA;
683
684         create_tcp_pkt(ctx, mbuf, TCP_ACK_FLAG, data_beg, data_len);
685         token_time_take(&ctx->token_time, mbuf_wire_size(mbuf));
686         if (ctx->flags & STREAM_CTX_F_MORE_DATA)
687                 *next_tsc = tcp_resched_timeout(ctx);
688         else
689                 tcp_retx_timeout_start(ctx, next_tsc);
690
691         return 0;
692 }
693
694 static int stream_tcp_proc_out_estab_rx(struct stream_ctx *ctx, struct rte_mbuf *mbuf, uint64_t *next_tsc)
695 {
696         uint64_t wait_tsc = token_time_tsc_until_full(&ctx->token_time);
697
698         if (wait_tsc != 0) {
699                 *next_tsc = wait_tsc;
700                 return -1;
701         }
702
703         if (ctx->flags & STREAM_CTX_F_TCP_GOT_FIN) {
704                 plogx_dbg("Got fin!\n");
705                 if (1) {
706                         ctx->tcp_state = LAST_ACK;
707                         create_tcp_pkt(ctx, mbuf, TCP_FIN_FLAG | TCP_ACK_FLAG, 0, 0);
708                         token_time_take(&ctx->token_time, mbuf_wire_size(mbuf));
709                         *next_tsc = tcp_retx_timeout(ctx);
710                         return 0;
711                 }
712                 else {
713                         ctx->tcp_state = CLOSE_WAIT;
714                         create_tcp_pkt(ctx, mbuf, TCP_FIN_FLAG, 0, 0);
715                         token_time_take(&ctx->token_time, mbuf_wire_size(mbuf));
716                         *next_tsc = tcp_resched_timeout(ctx);
717                         return 0;
718                 }
719         }
720
721         if (ctx->flags & STREAM_CTX_F_NEW_DATA)
722                 ctx->flags &= ~STREAM_CTX_F_NEW_DATA;
723         else {
724                 ctx->same_state++;
725                 tcp_set_retransmit(ctx);
726                 plogx_dbg("state++ (ack = %d)\n", ctx->recv_seq);
727         }
728
729         create_tcp_pkt(ctx, mbuf, TCP_ACK_FLAG, 0, 0);
730         token_time_take(&ctx->token_time, mbuf_wire_size(mbuf));
731         *next_tsc = tcp_retx_timeout(ctx);
732         return 0;
733 }
734
735 static int stream_tcp_proc_out_estab(struct stream_ctx *ctx, struct rte_mbuf *mbuf, uint64_t *next_tsc)
736 {
737         if (ctx->stream_cfg->actions[ctx->cur_action].peer == ctx->peer) {
738                 return stream_tcp_proc_out_estab_tx(ctx, mbuf, next_tsc);
739         }
740         else {
741                 return stream_tcp_proc_out_estab_rx(ctx, mbuf, next_tsc);
742         }
743 }
744
745 static int stream_tcp_proc_out_close_wait(struct stream_ctx *ctx, struct rte_mbuf *mbuf, uint64_t *next_tsc)
746 {
747         uint64_t wait_tsc = token_time_tsc_until_full(&ctx->token_time);
748
749         if (wait_tsc != 0) {
750                 *next_tsc = wait_tsc;
751                 return -1;
752         }
753
754         /* CLOSE_WAIT is an intermediary stage that is only visited
755            when the FIN is sent after ACK'ing the incoming FIN. In any
756            case, it does not matter if there was a packet or not. */
757         ctx->tcp_state = LAST_ACK;
758         create_tcp_pkt(ctx, mbuf, TCP_ACK_FLAG | TCP_FIN_FLAG, 0, 0);
759         token_time_take(&ctx->token_time, mbuf_wire_size(mbuf));
760         *next_tsc = tcp_retx_timeout(ctx);
761         return 0;
762 }
763
764 static int stream_tcp_proc_out_last_ack(struct stream_ctx *ctx, struct rte_mbuf *mbuf, uint64_t *next_tsc)
765 {
766         if (ctx->ackd_seq == ctx->next_seq) {
767                 plogx_dbg("Last ACK received\n");
768                 ctx->flags |= STREAM_CTX_F_TCP_ENDED;
769                 return -1;
770         }
771         else {
772                 uint64_t wait_tsc = token_time_tsc_until_full(&ctx->token_time);
773
774                 if (wait_tsc != 0) {
775                         *next_tsc = wait_tsc;
776                         return -1;
777                 }
778                 if (ctx->flags & STREAM_CTX_F_LAST_RX_PKT_MADE_PROGRESS) {
779                         ctx->flags &= ~STREAM_CTX_F_LAST_RX_PKT_MADE_PROGRESS;
780                         *next_tsc = tcp_retx_timeout(ctx);
781                         return -1;
782                 }
783
784                 plogx_dbg("Retransmit!\n");
785                 ctx->next_seq = ctx->ackd_seq;
786                 ctx->same_state++;
787                 tcp_set_retransmit(ctx);
788                 create_tcp_pkt(ctx, mbuf, TCP_ACK_FLAG | TCP_FIN_FLAG, 0, 0);
789                 token_time_take(&ctx->token_time, mbuf_wire_size(mbuf));
790                 *next_tsc = tcp_retx_timeout(ctx);
791                 return 0;
792         }
793 }
794
795 static int stream_tcp_proc_out_fin_wait(struct stream_ctx *ctx, struct rte_mbuf *mbuf, uint64_t *next_tsc)
796 {
797         uint64_t wait_tsc = token_time_tsc_until_full(&ctx->token_time);
798
799         if (wait_tsc != 0) {
800                 *next_tsc = wait_tsc;
801                 return -1;
802         }
803
804         if (ctx->ackd_seq == ctx->next_seq) {
805                 if (ctx->flags & STREAM_CTX_F_TCP_GOT_FIN) {
806                         ctx->same_state = 0;
807                         ctx->tcp_state = TIME_WAIT;
808                         ctx->sched_tsc = rte_rdtsc() + ctx->stream_cfg->tsc_timeout_time_wait;
809                         plogx_dbg("from FIN_WAIT to TIME_WAIT\n");
810                         create_tcp_pkt(ctx, mbuf, TCP_ACK_FLAG, 0, 0);
811                         token_time_take(&ctx->token_time, mbuf_wire_size(mbuf));
812                         *next_tsc = ctx->stream_cfg->tsc_timeout_time_wait;
813                         return 0;
814                 }
815                 else {
816                         /* FIN will still need to come */
817                         *next_tsc = tcp_retx_timeout(ctx);
818                         return -1;
819                 }
820         }
821         else {
822                 if (ctx->flags & STREAM_CTX_F_LAST_RX_PKT_MADE_PROGRESS) {
823                         ctx->flags &= ~STREAM_CTX_F_LAST_RX_PKT_MADE_PROGRESS;
824                         *next_tsc = tcp_retx_timeout(ctx);
825                         return -1;
826                 }
827
828                 plogx_dbg("Retransmit!\n");
829                 ctx->same_state++;
830                 tcp_set_retransmit(ctx);
831                 ctx->next_seq = ctx->ackd_seq;
832                 create_tcp_pkt(ctx, mbuf, TCP_FIN_FLAG | TCP_ACK_FLAG, 0, 0);
833                 token_time_take(&ctx->token_time, mbuf_wire_size(mbuf));
834                 *next_tsc = tcp_retx_timeout(ctx);
835                 return 0;
836         }
837 }
838
839 static int stream_tcp_proc_out_time_wait(struct stream_ctx *ctx, struct rte_mbuf *mbuf, uint64_t *next_tsc)
840 {
841         if (ctx->sched_tsc < rte_rdtsc()) {
842                 plogx_dbg("TIME_WAIT expired! for %#x\n", ctx->tuple->dst_addr);
843                 ctx->flags |= STREAM_CTX_F_TCP_ENDED;
844                 return -1;
845         }
846         uint64_t wait_tsc = token_time_tsc_until_full(&ctx->token_time);
847
848         if (wait_tsc != 0) {
849                 *next_tsc = wait_tsc;
850                 return -1;
851         }
852
853         plogx_dbg("Got packet while in TIME_WAIT (pkt ACK reTX)\n");
854         ctx->sched_tsc = rte_rdtsc() + ctx->stream_cfg->tsc_timeout_time_wait;
855         create_tcp_pkt(ctx, mbuf, TCP_ACK_FLAG, 0, 0);
856         token_time_take(&ctx->token_time, mbuf_wire_size(mbuf));
857         *next_tsc = ctx->stream_cfg->tsc_timeout_time_wait;
858         return 0;
859 }
860
861 static int stream_tcp_proc_out(struct stream_ctx *ctx, struct rte_mbuf *mbuf, uint64_t *next_tsc)
862 {
863         if (ctx->same_state == 10) {
864                 ctx->flags |= STREAM_CTX_F_EXPIRED;
865                 return -1;
866         }
867
868         switch (ctx->tcp_state) {
869         case CLOSED: /* Client initial state */
870                 return stream_tcp_proc_out_closed(ctx, mbuf, next_tsc);
871         case LISTEN: /* Server starts in this state. */
872                 return stream_tcp_proc_out_listen(ctx, mbuf, next_tsc);
873         case SYN_SENT:
874                 return stream_tcp_proc_out_syn_sent(ctx, mbuf, next_tsc);
875         case SYN_RECEIVED:
876                 return stream_tcp_proc_out_syn_recv(ctx, mbuf, next_tsc);
877         case ESTABLISHED:
878                 return stream_tcp_proc_out_estab(ctx, mbuf, next_tsc);
879         case CLOSE_WAIT:
880                 return stream_tcp_proc_out_close_wait(ctx, mbuf, next_tsc);
881         case LAST_ACK:
882                 return stream_tcp_proc_out_last_ack(ctx, mbuf, next_tsc);
883         case FIN_WAIT:
884                 return stream_tcp_proc_out_fin_wait(ctx, mbuf, next_tsc);
885         case TIME_WAIT:
886                 return stream_tcp_proc_out_time_wait(ctx, mbuf, next_tsc);
887         }
888
889         return -1;
890 }
891
892 /* Return: zero: packet in mbuf is the reply, non-zero: data consumed,
893    nothing to send. The latter case might mean that the connection has
894    ended, or that a future event has been scheduled. l4_meta =>
895    mbuf contains packet to be processed. */
896 int stream_tcp_proc(struct stream_ctx *ctx, struct rte_mbuf *mbuf, struct l4_meta *l4_meta, uint64_t *next_tsc)
897 {
898         token_time_update(&ctx->token_time, rte_rdtsc());
899         token_time_update(&ctx->token_time_other, rte_rdtsc());
900         if (l4_meta) {
901                 int ret;
902
903                 token_time_take_clamp(&ctx->token_time_other, mbuf_wire_size(mbuf));
904                 ret = stream_tcp_proc_in(ctx, l4_meta);
905                 if (ret)
906                         return ret;
907         }
908
909         return stream_tcp_proc_out(ctx, mbuf, next_tsc);
910 }
911
912 int stream_tcp_is_ended(struct stream_ctx *ctx)
913 {
914         return ctx->flags & STREAM_CTX_F_TCP_ENDED;
915 }
916
917 static void add_pkt_bytes(uint32_t *n_pkts, uint32_t *n_bytes, uint32_t len)
918 {
919         len = (len < 60? 60 : len) + 20 + ETHER_CRC_LEN;
920
921         (*n_pkts)++;
922         *n_bytes += len;
923 }
924
925 void stream_tcp_calc_len(struct stream_cfg *cfg, uint32_t *n_pkts, uint32_t *n_bytes)
926 {
927         const uint32_t client_hdr_len = cfg->data[PEER_CLIENT].hdr_len;
928         const uint32_t server_hdr_len = cfg->data[PEER_SERVER].hdr_len;
929
930         *n_pkts = 0;
931         *n_bytes = 0;
932
933         /* Connection setup */
934         add_pkt_bytes(n_pkts, n_bytes, client_hdr_len + sizeof(struct tcp_hdr) + 4); /* SYN */
935         add_pkt_bytes(n_pkts, n_bytes, server_hdr_len + sizeof(struct tcp_hdr) + 4); /* SYN/ACK */
936         add_pkt_bytes(n_pkts, n_bytes, client_hdr_len + sizeof(struct tcp_hdr)); /* ACK */
937
938         for (uint32_t i = 0; i < cfg->n_actions; ++i) {
939                 const uint32_t mss = 1440; /* TODO: should come from peer's own mss. */
940                 uint32_t remaining = cfg->actions[i].len;
941                 const uint32_t send_hdr_len = cfg->actions[i].peer == PEER_CLIENT? client_hdr_len : server_hdr_len;
942                 const uint32_t reply_hdr_len = cfg->actions[i].peer == PEER_CLIENT? server_hdr_len : client_hdr_len;
943
944                 if (remaining == 0)
945                         break;
946
947                 while (remaining) {
948                         uint32_t seg = remaining > mss? mss: remaining;
949                         add_pkt_bytes(n_pkts, n_bytes, send_hdr_len + sizeof(struct tcp_hdr) + seg);
950                         remaining -= seg;
951                 }
952
953                 add_pkt_bytes(n_pkts, n_bytes, reply_hdr_len + sizeof(struct tcp_hdr));
954         }
955
956         /* Connection Tear-down */
957         enum l4gen_peer last_peer = cfg->actions[cfg->n_actions - 1].peer;
958
959         const uint32_t init_hdr_len = last_peer == PEER_CLIENT? client_hdr_len : server_hdr_len;
960         const uint32_t resp_hdr_len = last_peer == PEER_CLIENT? server_hdr_len : client_hdr_len;
961
962         add_pkt_bytes(n_pkts, n_bytes, init_hdr_len + sizeof(struct tcp_hdr)); /* FIN */
963         add_pkt_bytes(n_pkts, n_bytes, resp_hdr_len + sizeof(struct tcp_hdr)); /* FIN/ACK */
964         add_pkt_bytes(n_pkts, n_bytes, init_hdr_len + sizeof(struct tcp_hdr)); /* ACK */
965 }