Support for xtesting
[samplevnf.git] / VNFs / DPPD-PROX / genl4_stream_tcp.c
1 /*
2 // Copyright (c) 2010-2017 Intel Corporation
3 //
4 // Licensed under the Apache License, Version 2.0 (the "License");
5 // you may not use this file except in compliance with the License.
6 // You may obtain a copy of the License at
7 //
8 //     http://www.apache.org/licenses/LICENSE-2.0
9 //
10 // Unless required by applicable law or agreed to in writing, software
11 // distributed under the License is distributed on an "AS IS" BASIS,
12 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 // See the License for the specific language governing permissions and
14 // limitations under the License.
15 */
16
17 #include <rte_cycles.h>
18 #include <rte_ether.h>
19 #include <rte_ethdev.h> // required by rte_eth_ctrl.h in 19.05
20 #include <rte_eth_ctrl.h>
21
22 #include "log.h"
23 #include "genl4_stream_tcp.h"
24 #include "prox_assert.h"
25 #include "mbuf_utils.h"
26
27 static uint64_t tcp_retx_timeout(const struct stream_ctx *ctx)
28 {
29         uint64_t delay = token_time_tsc_until_full(&ctx->token_time_other);
30
31         return delay + ctx->stream_cfg->tsc_timeout;
32 }
33
34 static uint64_t tcp_resched_timeout(const struct stream_ctx *ctx)
35 {
36         uint64_t delay = token_time_tsc_until_full(&ctx->token_time);
37
38         return delay;
39 }
40
41 static void tcp_retx_timeout_start(struct stream_ctx *ctx, uint64_t *next_tsc)
42 {
43         uint64_t now = rte_rdtsc();
44
45         *next_tsc = tcp_retx_timeout(ctx);
46         ctx->sched_tsc = now + *next_tsc;
47 }
48
49 static int tcp_retx_timeout_occured(const struct stream_ctx *ctx, uint64_t now)
50 {
51         return ctx->sched_tsc < now;
52 }
53
54 static void tcp_retx_timeout_resume(const struct stream_ctx *ctx, uint64_t now, uint64_t *next_tsc)
55 {
56         *next_tsc = ctx->sched_tsc - now;
57 }
58
59 static void tcp_set_retransmit(struct stream_ctx *ctx)
60 {
61         ctx->retransmits++;
62 }
63
64 struct tcp_option {
65         uint8_t kind;
66         uint8_t len;
67 } __attribute__((packed));
68
69 void stream_tcp_create_rst(struct rte_mbuf *mbuf, struct l4_meta *l4_meta, struct pkt_tuple *tuple)
70 {
71         prox_rte_tcp_hdr *tcp = (prox_rte_tcp_hdr *)l4_meta->l4_hdr;
72         prox_rte_ipv4_hdr *ip = ((prox_rte_ipv4_hdr *)tcp) - 1;
73
74         ip->src_addr = tuple->dst_addr;
75         ip->dst_addr = tuple->src_addr;
76
77         tcp->dst_port = tuple->src_port;
78         tcp->src_port = tuple->dst_port;
79
80         ip->total_length = rte_bswap16(sizeof(prox_rte_ipv4_hdr) + sizeof(prox_rte_tcp_hdr));
81         tcp->tcp_flags = PROX_RTE_TCP_RST_FLAG;
82         tcp->data_off = ((sizeof(prox_rte_tcp_hdr) / 4) << 4);
83         rte_pktmbuf_pkt_len(mbuf) = l4_meta->payload - rte_pktmbuf_mtod(mbuf, uint8_t *);
84         rte_pktmbuf_data_len(mbuf) = l4_meta->payload - rte_pktmbuf_mtod(mbuf, uint8_t *);
85 }
86
87 static void create_tcp_pkt(struct stream_ctx *ctx, struct rte_mbuf *mbuf, uint8_t tcp_flags, int data_beg, int data_len)
88 {
89         uint8_t *pkt;
90
91         const struct peer_action *act = &ctx->stream_cfg->actions[ctx->cur_action];
92         const struct stream_cfg *stream_cfg = ctx->stream_cfg;
93
94         pkt = rte_pktmbuf_mtod(mbuf, uint8_t *);
95         rte_memcpy(pkt, stream_cfg->data[act->peer].hdr, stream_cfg->data[act->peer].hdr_len);
96
97         prox_rte_ipv4_hdr *l3_hdr = (prox_rte_ipv4_hdr*)&pkt[stream_cfg->data[act->peer].hdr_len - sizeof(prox_rte_ipv4_hdr)];
98         prox_rte_tcp_hdr *l4_hdr = (prox_rte_tcp_hdr *)&pkt[stream_cfg->data[act->peer].hdr_len];
99
100         l3_hdr->src_addr = ctx->tuple->dst_addr;
101         l3_hdr->dst_addr = ctx->tuple->src_addr;
102         l3_hdr->next_proto_id = IPPROTO_TCP;
103
104         l4_hdr->src_port = ctx->tuple->dst_port;
105         l4_hdr->dst_port = ctx->tuple->src_port;
106
107         uint32_t tcp_len = sizeof(prox_rte_tcp_hdr);
108         uint32_t tcp_payload_len = 0;
109         uint32_t seq_len = 0;
110         struct tcp_option *tcp_op;
111
112         if (tcp_flags & PROX_RTE_TCP_RST_FLAG) {
113                 tcp_flags |= PROX_RTE_TCP_RST_FLAG;
114                 seq_len = 1;
115         }
116         else if (tcp_flags & PROX_RTE_TCP_SYN_FLAG) {
117                 tcp_flags |= PROX_RTE_TCP_SYN_FLAG;
118                 /* Window scaling */
119
120                 /* TODO: make options come from the stream. */
121                 tcp_op = (struct tcp_option *)(l4_hdr + 1);
122
123                 tcp_op->kind = 2;
124                 tcp_op->len = 4;
125                 *(uint16_t *)(tcp_op + 1) = rte_bswap16(1460); /* TODO: Save this in this_mss */
126
127                 tcp_len += 4;
128                 seq_len = 1;
129
130                 ctx->seq_first_byte = ctx->ackd_seq + 1;
131         }
132         else if (tcp_flags & PROX_RTE_TCP_FIN_FLAG) {
133                 tcp_flags |= PROX_RTE_TCP_FIN_FLAG;
134                 seq_len = 1;
135         }
136
137         if (tcp_flags & PROX_RTE_TCP_ACK_FLAG) {
138                 l4_hdr->recv_ack = rte_bswap32(ctx->recv_seq);
139                 tcp_flags |= PROX_RTE_TCP_ACK_FLAG;
140         }
141         else
142                 l4_hdr->recv_ack = 0;
143
144         uint16_t l4_payload_offset = stream_cfg->data[act->peer].hdr_len + tcp_len;
145
146         if (data_len) {
147                 seq_len = data_len;
148                 plogx_dbg("l4 payload offset = %d\n", l4_payload_offset);
149                 rte_memcpy(pkt + l4_payload_offset, stream_cfg->data[act->peer].content + data_beg, data_len);
150         }
151
152         l4_hdr->sent_seq = rte_bswap32(ctx->next_seq);
153         l4_hdr->tcp_flags = tcp_flags; /* SYN */
154         l4_hdr->rx_win = rte_bswap16(0x3890); // TODO: make this come from stream (config)
155         //l4_hdr->cksum = ...;
156         l4_hdr->tcp_urp = 0;
157         l4_hdr->data_off = ((tcp_len / 4) << 4); /* Highest 4 bits are TCP header len in units of 32 bit words */
158
159         /* ctx->next_seq = ctx->ackd_seq + seq_len; */
160         ctx->next_seq += seq_len;
161
162         /* No payload after TCP header. */
163         rte_pktmbuf_pkt_len(mbuf)  = l4_payload_offset + data_len;
164         rte_pktmbuf_data_len(mbuf) = l4_payload_offset + data_len;
165
166         l3_hdr->total_length = rte_bswap16(sizeof(prox_rte_ipv4_hdr) + tcp_len + data_len);
167         plogdx_dbg(mbuf, NULL);
168
169         plogx_dbg("put tcp packet with flags: %s%s%s, (len = %d, seq = %d, ack =%d)\n",
170                   tcp_flags & PROX_RTE_TCP_SYN_FLAG? "SYN ":"",
171                   tcp_flags & PROX_RTE_TCP_ACK_FLAG? "ACK ":"",
172                   tcp_flags & PROX_RTE_TCP_FIN_FLAG? "FIN ":"",
173                   data_len, rte_bswap32(l4_hdr->sent_seq), rte_bswap32(l4_hdr->recv_ack));
174 }
175
176 /* Get the length of the reply associated for the next packet. Note
177    that the packet will come from the other peer. In case the next
178    packet belongs to the current peer (again), the reply length will
179    be that of an empty TCP packet (i.e. the ACK). */
180 uint16_t stream_tcp_reply_len(struct stream_ctx *ctx)
181 {
182         if (stream_tcp_is_ended(ctx))
183                 return 0;
184         else if (ctx->tcp_state != ESTABLISHED) {
185                 if (ctx->tcp_state == SYN_SENT || ctx->tcp_state == LISTEN) {
186                         /* First packet received is a SYN packet. In
187                            the current implementation this packet
188                            contains the TCP option field to set the
189                            MSS. For this, add 4 bytes. */
190                         return ctx->stream_cfg->data[!ctx->peer].hdr_len + sizeof(prox_rte_tcp_hdr) + 4;
191                 }
192                 return ctx->stream_cfg->data[!ctx->peer].hdr_len + sizeof(prox_rte_tcp_hdr);
193         }
194         else if (ctx->stream_cfg->actions[ctx->cur_action].peer == ctx->peer) {
195                 /* The reply _could_ (due to races, still possibly
196                    receive an old ack) contain data. This means that
197                    in some cases, the prediction of the reply size
198                    will be an overestimate. */
199                 uint32_t data_beg = ctx->next_seq - ctx->seq_first_byte;
200                 const struct peer_action *act = &ctx->stream_cfg->actions[ctx->cur_action];
201
202                 uint32_t remaining_len = act->len - (data_beg - act->beg);
203
204                 if (remaining_len == 0) {
205                         if (ctx->cur_action + 1 != ctx->stream_cfg->n_actions) {
206                                 if (ctx->stream_cfg->actions[ctx->cur_action + 1].peer == ctx->peer)
207                                         return ctx->stream_cfg->data[ctx->peer].hdr_len + sizeof(prox_rte_tcp_hdr);
208                                 else {
209                                         uint32_t seq_beg = ctx->recv_seq - ctx->other_seq_first_byte;
210                                         uint32_t end = ctx->stream_cfg->actions[ctx->cur_action + 1].beg +
211                                                 ctx->stream_cfg->actions[ctx->cur_action + 1].len;
212                                         uint32_t remaining = end - seq_beg;
213                                         uint16_t data_len = remaining > 1460? 1460: remaining;
214
215                                         return ctx->stream_cfg->data[!ctx->peer].hdr_len + sizeof(prox_rte_tcp_hdr) + data_len;
216                                 }
217                         }
218                         else {
219                                 return ctx->stream_cfg->data[ctx->peer].hdr_len + sizeof(prox_rte_tcp_hdr);
220                         }
221                 }
222                 else {
223                         return ctx->stream_cfg->data[ctx->peer].hdr_len + sizeof(prox_rte_tcp_hdr);
224                 }
225         }
226         else if (ctx->stream_cfg->actions[ctx->cur_action].peer != ctx->peer) {
227                 uint32_t seq_beg = ctx->recv_seq - ctx->other_seq_first_byte;
228                 uint32_t end = ctx->stream_cfg->actions[ctx->cur_action].beg +
229                         ctx->stream_cfg->actions[ctx->cur_action].len;
230                 uint32_t remaining = end - seq_beg;
231                 uint16_t data_len = remaining > 1460? 1460: remaining;
232
233                 return ctx->stream_cfg->data[!ctx->peer].hdr_len + sizeof(prox_rte_tcp_hdr) + data_len;
234         }
235         else
236                 return ctx->stream_cfg->data[ctx->peer].hdr_len + sizeof(prox_rte_tcp_hdr);
237 }
238
239 static void stream_tcp_proc_in_order_data(struct stream_ctx *ctx, struct l4_meta *l4_meta, int *progress_seq)
240 {
241         plogx_dbg("Got data with seq %d (as expected), with len %d\n", ctx->recv_seq, l4_meta->len);
242
243         if (!l4_meta->len)
244                 return;
245
246         const struct peer_action *act = &ctx->stream_cfg->actions[ctx->cur_action];
247         enum l4gen_peer peer = act->peer;
248         /* Since we have received the expected sequence number, the start address will not exceed the cfg memory buffer. */
249         uint8_t *content = ctx->stream_cfg->data[peer].content;
250         uint32_t seq_beg = ctx->recv_seq - ctx->other_seq_first_byte;
251         uint32_t end = ctx->stream_cfg->actions[ctx->cur_action].beg + ctx->stream_cfg->actions[ctx->cur_action].len;
252         uint32_t remaining = end - seq_beg;
253
254         if (l4_meta->len > remaining) {
255                 plogx_err("Provided data is too long:\n");
256                 plogx_err("action.beg = %d, action.len = %d", act->beg, act->len);
257                 plogx_err("tcp seq points at %d in action, l4_meta->len = %d\n", seq_beg, l4_meta->len);
258         }
259         else {
260                 if (memcmp(content + seq_beg, l4_meta->payload, l4_meta->len) == 0) {
261                         plogx_dbg("Good payload in %d: %u -> %u\n", ctx->cur_action, ctx->recv_seq, l4_meta->len);
262                         ctx->recv_seq += l4_meta->len;
263                         ctx->cur_pos[peer] += l4_meta->len;
264                         /* Move forward only when this was the last piece of data within current action (i.e. end of received data == end of action data). */
265                         if (seq_beg + l4_meta->len == act->beg + act->len) {
266                                 plogx_dbg("Got last piece in action %d\n", ctx->cur_action);
267                                 ctx->cur_action++;
268                         }
269                         else {
270                                 plogx_dbg("Got data from %d with len %d, but waiting for more (tot len = %d)!\n", seq_beg, l4_meta->len, act->len);
271                         }
272                         *progress_seq = 1;
273                         ctx->flags |= STREAM_CTX_F_NEW_DATA;
274                 }
275                 else {
276                         plogx_err("ackable = %d, ackd = %d\n", ctx->ackable_data_seq ,ctx->ackd_seq);
277                         plogx_err("Bad payload action[%d]{.len = %d, .peer  = %s}\n", ctx->cur_action, act->len, peer == PEER_SERVER? "s" : "c");
278                         plogx_err("   pkt payload len = %d, beginning at %u\n", l4_meta->len, seq_beg);
279                         /* plogx_err("   Payload starts %zu bytes after beginning of l4_hdr\n", l4_meta->payload - l4_meta->l4_hdr); */
280
281                         plogx_err("   payload[0-3] = %02x %02x %02x %02x\n",
282                                   l4_meta->payload[0],
283                                   l4_meta->payload[1],
284                                   l4_meta->payload[2],
285                                   l4_meta->payload[3]);
286                         plogx_err("   expect[0-3]  = %02x %02x %02x %02x\n",
287                                   content[seq_beg + 0],
288                                   content[seq_beg + 1],
289                                   content[seq_beg + 2],
290                                   content[seq_beg + 3]);
291                 }
292         }
293 }
294
295 static int stream_tcp_proc_in(struct stream_ctx *ctx, struct l4_meta *l4_meta)
296 {
297         prox_rte_tcp_hdr *tcp = NULL;
298         int got_syn = 0;
299         int got_ack = 0;
300         int got_fin = 0;
301         int got_rst = 0;
302
303         tcp = (prox_rte_tcp_hdr *)l4_meta->l4_hdr;
304
305         got_syn = tcp->tcp_flags & PROX_RTE_TCP_SYN_FLAG;
306         got_ack = tcp->tcp_flags & PROX_RTE_TCP_ACK_FLAG;
307         got_fin = tcp->tcp_flags & PROX_RTE_TCP_FIN_FLAG;
308         got_rst = tcp->tcp_flags & PROX_RTE_TCP_RST_FLAG;
309         plogx_dbg("TCP, flags: %s%s%s, (len = %d, seq = %d, ack =%d)\n", got_syn? "SYN ":"", got_ack? "ACK ":"", got_fin? "FIN " : "", l4_meta->len, rte_bswap32(tcp->sent_seq), rte_bswap32(tcp->recv_ack));
310
311         if (got_syn)
312                 ctx->flags |= STREAM_CTX_F_TCP_GOT_SYN;
313         if (got_fin)
314                 ctx->flags |= STREAM_CTX_F_TCP_GOT_FIN;
315
316         int progress_ack = 0, progress_seq = 0;
317
318         /* RST => other side wants to terminate due to
319            inconsitent state (example: delay of retransmit of
320            last ACK while other side already closed the
321            connection. The other side will accept the packet
322            as a beginning of a new connection but there will
323            be no SYN. ) */
324         if (got_rst) {
325                 plogx_dbg("got rst\n");
326                 ctx->flags |= STREAM_CTX_F_TCP_ENDED;
327                 return -1;
328         }
329
330         if (got_ack) {
331                 uint32_t ackd_seq = rte_bswap32(tcp->recv_ack);
332
333                 if (ackd_seq > ctx->ackd_seq) {
334                         plogx_dbg("Got ACK for outstanding data, from %d to %d\n", ctx->ackd_seq, ackd_seq);
335                         ctx->ackd_seq = ackd_seq;
336                         plogx_dbg("ackable data = %d\n", ctx->ackable_data_seq);
337                         /* Ackable_data_seq set to byte after
338                            current action. */
339                         if (ctx->ackable_data_seq == ctx->ackd_seq) {
340                                 /* Due to retransmit in
341                                    combination with late acks,
342                                    is is possible to ack
343                                    future data. In this case,
344                                    the assumption that data
345                                    was lost is not true and
346                                    the next seq is moved
347                                    forward. */
348                                 if (ctx->next_seq < ctx->ackable_data_seq) {
349                                         ctx->next_seq = ctx->ackable_data_seq;
350                                 }
351
352                                 ctx->ackable_data_seq = 0;
353                                 const struct stream_cfg *stream_cfg = ctx->stream_cfg;
354                                 const struct peer_action *act = &stream_cfg->actions[ctx->cur_action];
355
356                                 ctx->cur_pos[act->peer] += act->len;
357                                 ctx->cur_action++;
358                                 plogx_dbg("Moving to next action %u\n", ctx->ackd_seq);
359                         }
360                         progress_ack = 1;
361                 }
362                 else {
363                         plogx_dbg("Old data acked: acked = %d, ackable =%d\n", ackd_seq, ctx->ackd_seq);
364                 }
365         }
366
367         uint32_t seq = rte_bswap32(tcp->sent_seq);
368
369         /* update recv_seq. */
370         if (got_syn) {
371                 /* When a syn is received, immediately reset recv_seq based on seq from packet. */
372                 ctx->recv_seq = seq + 1;
373                 /* Syn packets have length 1, so the first real data will start after that. */
374                 ctx->other_seq_first_byte = seq + 1;
375                 progress_seq = 1;
376         }
377         else if (got_fin) {
378                 if (ctx->recv_seq == seq) {
379                         plogx_dbg("Got fin with correct seq\n");
380                         ctx->recv_seq = seq + 1;
381                         progress_seq = 1;
382                 }
383                 else {
384                         plogx_dbg("Got fin but incorrect seq\n");
385                 }
386         }
387         else {
388                 /* Only expect in-order packets. */
389                 if (ctx->recv_seq == seq) {
390                         stream_tcp_proc_in_order_data(ctx, l4_meta, &progress_seq);
391                 }
392                 else if (ctx->recv_seq < seq) {
393                         plogx_dbg("Future data received (got = %d, expected = %d), missing data! (data ignored)\n", seq, ctx->recv_seq);
394                 }
395                 else {
396                         plogx_dbg("Old data received again (state = %s)\n", tcp_state_to_str(ctx->tcp_state));
397                         plogx_dbg("expecting seq %d, got seq %d, len = %d\n",ctx->recv_seq, seq, l4_meta->len);
398                         plogx_dbg("ackd_seq = %d, next_seq = %d, action = %d\n", ctx->ackd_seq, ctx->next_seq, ctx->cur_action);
399                 }
400         }
401
402         /* parse options */
403         if (((tcp->data_off >> 4)*4) > sizeof(prox_rte_tcp_hdr)) {
404                 struct tcp_option *tcp_op = (struct tcp_option *)(tcp + 1);
405                 uint8_t *payload = (uint8_t *)tcp + ((tcp->data_off >> 4)*4);
406
407                 do {
408                         if (tcp_op->kind == 2 && tcp_op->len == 4) {
409                                 uint16_t mss = rte_bswap16(*(uint16_t *)(tcp_op + 1));
410                                 ctx->other_mss = mss;
411                         }
412
413                         tcp_op = (struct tcp_option *)(((uint8_t*)tcp_op) + tcp_op->len);
414                 } while (((uint8_t*)tcp_op) < payload);
415         }
416
417         if (progress_ack || progress_seq) {
418                 ctx->same_state = 0;
419                 ctx->flags |= STREAM_CTX_F_LAST_RX_PKT_MADE_PROGRESS;
420         }
421         else {
422                 ctx->flags &= ~STREAM_CTX_F_LAST_RX_PKT_MADE_PROGRESS;
423         }
424         return 0;
425 }
426
427 static int stream_tcp_proc_out_closed(struct stream_ctx *ctx, struct rte_mbuf *mbuf, uint64_t *next_tsc)
428 {
429         uint64_t wait_tsc = token_time_tsc_until_full(&ctx->token_time);
430
431         if (wait_tsc != 0) {
432                 *next_tsc = wait_tsc;
433                 return -1;
434         }
435
436         /* create SYN packet in mbuf, return 0. goto SYN_SENT, set timeout */
437         ctx->tcp_state = SYN_SENT;
438
439         /* Initialize: */
440         ctx->next_seq = 99;
441         ctx->ackd_seq = 99;
442
443         create_tcp_pkt(ctx, mbuf, PROX_RTE_TCP_SYN_FLAG, 0, 0);
444         token_time_take(&ctx->token_time, mbuf_wire_size(mbuf));
445         *next_tsc = tcp_retx_timeout(ctx);
446         return 0;
447 }
448
449 static int stream_tcp_proc_out_listen(struct stream_ctx *ctx, struct rte_mbuf *mbuf, uint64_t *next_tsc)
450 {
451         uint64_t wait_tsc = token_time_tsc_until_full(&ctx->token_time);
452
453         if (wait_tsc != 0) {
454                 *next_tsc = wait_tsc;
455                 return -1;
456         }
457
458         if (!(ctx->flags & STREAM_CTX_F_TCP_GOT_SYN)) {
459                 // TODO: keep connection around at end to catch retransmits from client
460                 plogx_dbg("Got packet while listening without SYN (will send RST)\n");
461                 pkt_tuple_debug(ctx->tuple);
462
463                 ctx->flags |= STREAM_CTX_F_TCP_ENDED;
464                 create_tcp_pkt(ctx, mbuf, PROX_RTE_TCP_RST_FLAG, 0, 0);
465                 token_time_take(&ctx->token_time, mbuf_wire_size(mbuf));
466                 *next_tsc = tcp_retx_timeout(ctx);
467                 return 0;
468         }
469
470         /* if syn received _now_, send ack + syn. goto SYN_RECEIVED. */
471         plogx_dbg("Got packet while listen\n");
472
473         ctx->next_seq = 200;
474         ctx->ackd_seq = 200;
475
476         ctx->tcp_state = SYN_RECEIVED;
477
478         create_tcp_pkt(ctx, mbuf, PROX_RTE_TCP_SYN_FLAG | PROX_RTE_TCP_ACK_FLAG, 0, 0);
479         token_time_take(&ctx->token_time, mbuf_wire_size(mbuf));
480         *next_tsc = tcp_retx_timeout(ctx);
481         return 0;
482 }
483
484 static int stream_tcp_proc_out_syn_sent(struct stream_ctx *ctx, struct rte_mbuf *mbuf, uint64_t *next_tsc)
485 {
486         uint64_t wait_tsc = token_time_tsc_until_full(&ctx->token_time);
487
488         if (wait_tsc != 0) {
489                 *next_tsc = wait_tsc;
490                 return -1;
491         }
492
493         if (ctx->ackd_seq < ctx->next_seq || !(ctx->flags & STREAM_CTX_F_TCP_GOT_SYN)) {
494                 plogx_dbg("Retransmit SYN\n");
495                 /* Did not get packet, send syn again and keep state (waiting for ACK). */
496                 ++ctx->same_state;
497                 tcp_set_retransmit(ctx);
498                 return stream_tcp_proc_out_closed(ctx, mbuf, next_tsc);
499         }
500
501         plogx_dbg("SYN_SENT and everything ACK'ed\n");
502         plogx_dbg("ackd_seq = %d, next_seq = %d\n", ctx->ackd_seq, ctx->next_seq);
503
504         /* If syn received for this stream, send ack and goto
505            ESTABLISHED. If first peer is this peer to send actual
506            data, schedule immediately. */
507
508         ctx->same_state = 0;
509         ctx->tcp_state = ESTABLISHED;
510
511         /* third packet of three-way handshake will also contain
512            data. Don't send separate ACK yet. TODO: only send ACK if
513            data has not yet been ACK'ed. */
514         if (ctx->stream_cfg->actions[ctx->cur_action].peer == ctx->peer) {
515                 *next_tsc = tcp_resched_timeout(ctx);
516                 plogx_dbg("immediately resched (%d)\n", ctx->cur_action);
517                 return -1;
518         }
519         else {
520                 create_tcp_pkt(ctx, mbuf, PROX_RTE_TCP_ACK_FLAG, 0, 0);
521                 token_time_take(&ctx->token_time, mbuf_wire_size(mbuf));
522                 *next_tsc = tcp_retx_timeout(ctx);
523         }
524         return 0;
525 }
526
527 static int stream_tcp_proc_out_syn_recv(struct stream_ctx *ctx, struct rte_mbuf *mbuf, uint64_t *next_tsc)
528 {
529         uint64_t wait_tsc = token_time_tsc_until_full(&ctx->token_time);
530
531         if (wait_tsc != 0) {
532                 *next_tsc = wait_tsc;
533                 return -1;
534         }
535
536         if (ctx->ackd_seq == ctx->next_seq) {
537                 /* Possible from server side with ctx->cur_action == 1
538                    if the current packet received had ACK for syn from
539                    server to client and also data completing the first
540                    action. */
541
542                 ctx->same_state = 0;
543                 ctx->tcp_state = ESTABLISHED;
544                 if (ctx->stream_cfg->actions[ctx->cur_action].peer != ctx->peer) {
545                         create_tcp_pkt(ctx, mbuf, PROX_RTE_TCP_ACK_FLAG, 0, 0);
546                         token_time_take(&ctx->token_time, mbuf_wire_size(mbuf));
547                         *next_tsc = tcp_retx_timeout(ctx);
548                         return 0;
549                 }
550                 else {
551                         /* While at this point, an ACK without data
552                            any could be sent by the server, it is not
553                            really required because the next pacekt
554                            after reschedule will also contain an ACK
555                            along with new data.
556
557                            In this implementation, if this is the
558                            case, the client is not only expecting an
559                            ACK, but also actual data. For this reason,
560                            the empty ACK packet should not be sent,
561                            otherwise the client will retransmit its
562                            data.
563                         */
564
565                         /* create_tcp_pkt(ctx, mbuf, PROX_RTE_TCP_ACK_FLAG, 0, 0); */
566                         /* token_time_take(&ctx->token_time, mbuf_wire_size(mbuf)); */
567                         *next_tsc = tcp_resched_timeout(ctx);
568                         return -1;
569                 }
570         }
571         else {
572                 /* Either this portion is executed due to a time-out
573                    or due to packet reception, the SYN that has been
574                    sent is not yet ACK'ed. So, retransmit the SYN/ACK. */
575                 plogx_dbg("Retransmit SYN/ACK\n");
576                 ++ctx->same_state;
577                 tcp_set_retransmit(ctx);
578                 ctx->next_seq = ctx->ackd_seq;
579                 create_tcp_pkt(ctx, mbuf, PROX_RTE_TCP_SYN_FLAG | PROX_RTE_TCP_ACK_FLAG, 0, 0);
580                 token_time_take(&ctx->token_time, mbuf_wire_size(mbuf));
581                 *next_tsc = tcp_retx_timeout(ctx);
582                 return 0;
583         }
584 }
585
586 static int stream_tcp_proc_out_estab_tx(struct stream_ctx *ctx, struct rte_mbuf *mbuf, uint64_t *next_tsc)
587 {
588         uint64_t wait_tsc = token_time_tsc_until_full(&ctx->token_time);
589
590         if (wait_tsc != 0) {
591                 *next_tsc = wait_tsc;
592                 return -1;
593         }
594
595         const struct peer_action *act = &ctx->stream_cfg->actions[ctx->cur_action];
596
597         if (act->len == 0) {
598                 plogx_dbg("Closing connection\n");
599                 /* This would be an ACK combined with FIN. To
600                    send a separate ack. keep the state in
601                    established, put_ack and expire
602                    immediately*/
603                 plogx_dbg("Moving to FIN_WAIT\n");
604                 ctx->tcp_state = FIN_WAIT;
605                 ctx->same_state = 0;
606                 create_tcp_pkt(ctx, mbuf, PROX_RTE_TCP_FIN_FLAG | PROX_RTE_TCP_ACK_FLAG, 0, 0);
607                 token_time_take(&ctx->token_time, mbuf_wire_size(mbuf));
608                 *next_tsc = tcp_retx_timeout(ctx);
609                 return 0;
610         }
611         /* remaining_len2 will be zero, while in case of
612            act->len == 0, the connection can be closed
613            immediately. */
614
615         plogx_dbg("This peer to send!\n");
616         uint32_t outstanding_bytes = ctx->next_seq - ctx->ackd_seq;
617
618         uint32_t data_beg2 = ctx->next_seq - ctx->seq_first_byte;
619         uint32_t remaining_len2 = act->len - (data_beg2 - act->beg);
620
621         const uint32_t rx_win = 300000;
622         /* If still data to be sent and allowed by outstanding amount */
623         if (outstanding_bytes <= rx_win && remaining_len2) {
624                 plogx_dbg("Outstanding bytes = %d, and remaining_len = %d, next_seq = %d\n", outstanding_bytes, remaining_len2, ctx->next_seq);
625
626                 if (ctx->ackable_data_seq == 0) {
627                         PROX_ASSERT(outstanding_bytes == 0);
628
629                         ctx->ackable_data_seq = ctx->next_seq + act->len;
630                 }
631                 else
632                         plogx_dbg("This will not be the first part of the data within an action\n");
633         }
634         /* still data yet to be acked || still data to be sent but blocked by RX win. */
635         else {
636                 if (ctx->flags & STREAM_CTX_F_MORE_DATA) {
637                         /* Don't send any packet. */
638                         ctx->flags &= ~STREAM_CTX_F_MORE_DATA;
639                         *next_tsc = tcp_retx_timeout(ctx);
640                         ctx->sched_tsc = rte_rdtsc() + *next_tsc;
641                         return -1;
642                 }
643                 else {
644                         uint64_t now = rte_rdtsc();
645
646                         if ((ctx->flags & STREAM_CTX_F_LAST_RX_PKT_MADE_PROGRESS) && token_time_tsc_until_full(&ctx->token_time_other) != 0) {
647                                 tcp_retx_timeout_start(ctx, next_tsc);
648                                 ctx->flags &= ~STREAM_CTX_F_LAST_RX_PKT_MADE_PROGRESS;
649                                 return -1;
650                         }
651                         /* This function might be called due to packet
652                            reception. In that case, cancel here and
653                            wait until the timeout really occurs before
654                            reTX. */
655                         if (!tcp_retx_timeout_occured(ctx, now)) {
656                                 tcp_retx_timeout_resume(ctx, now, next_tsc);
657                                 return -1;
658                         }
659
660                         ctx->same_state++;
661                         tcp_set_retransmit(ctx);
662                         /* This possibly means that now retransmit is resumed half-way in the action. */
663                         plogx_dbg("Retransmit: outstanding = %d\n", outstanding_bytes);
664                         plogx_dbg("Assuming %d->%d lost\n", ctx->ackd_seq, ctx->next_seq);
665                         ctx->next_seq = ctx->ackd_seq;
666                         plogx_dbg("highest seq from other side = %d\n", ctx->recv_seq);
667                 }
668                 /* When STREAM_CTX_F_MORE_DATA is set, real timeouts
669                    can't occur. If this is needed, timeouts
670                    need to carry additional information. */
671         }
672
673         /* The following code will retransmit the same data if next_seq is not moved forward. */
674         uint32_t data_beg = ctx->next_seq - ctx->seq_first_byte;
675         uint32_t remaining_len = act->len - (data_beg - act->beg);
676         uint32_t data_len = remaining_len > ctx->other_mss? ctx->other_mss: remaining_len;
677         if (data_len == 0)
678                 plogx_warn("data_len == 0\n");
679
680         if (remaining_len > ctx->other_mss)
681                 ctx->flags |= STREAM_CTX_F_MORE_DATA;
682         else
683                 ctx->flags &= ~STREAM_CTX_F_MORE_DATA;
684
685         create_tcp_pkt(ctx, mbuf, PROX_RTE_TCP_ACK_FLAG, data_beg, data_len);
686         token_time_take(&ctx->token_time, mbuf_wire_size(mbuf));
687         if (ctx->flags & STREAM_CTX_F_MORE_DATA)
688                 *next_tsc = tcp_resched_timeout(ctx);
689         else
690                 tcp_retx_timeout_start(ctx, next_tsc);
691
692         return 0;
693 }
694
695 static int stream_tcp_proc_out_estab_rx(struct stream_ctx *ctx, struct rte_mbuf *mbuf, uint64_t *next_tsc)
696 {
697         uint64_t wait_tsc = token_time_tsc_until_full(&ctx->token_time);
698
699         if (wait_tsc != 0) {
700                 *next_tsc = wait_tsc;
701                 return -1;
702         }
703
704         if (ctx->flags & STREAM_CTX_F_TCP_GOT_FIN) {
705                 plogx_dbg("Got fin!\n");
706                 if (1) {
707                         ctx->tcp_state = LAST_ACK;
708                         create_tcp_pkt(ctx, mbuf, PROX_RTE_TCP_FIN_FLAG | PROX_RTE_TCP_ACK_FLAG, 0, 0);
709                         token_time_take(&ctx->token_time, mbuf_wire_size(mbuf));
710                         *next_tsc = tcp_retx_timeout(ctx);
711                         return 0;
712                 }
713                 else {
714                         ctx->tcp_state = CLOSE_WAIT;
715                         create_tcp_pkt(ctx, mbuf, PROX_RTE_TCP_FIN_FLAG, 0, 0);
716                         token_time_take(&ctx->token_time, mbuf_wire_size(mbuf));
717                         *next_tsc = tcp_resched_timeout(ctx);
718                         return 0;
719                 }
720         }
721
722         if (ctx->flags & STREAM_CTX_F_NEW_DATA)
723                 ctx->flags &= ~STREAM_CTX_F_NEW_DATA;
724         else {
725                 ctx->same_state++;
726                 tcp_set_retransmit(ctx);
727                 plogx_dbg("state++ (ack = %d)\n", ctx->recv_seq);
728         }
729
730         create_tcp_pkt(ctx, mbuf, PROX_RTE_TCP_ACK_FLAG, 0, 0);
731         token_time_take(&ctx->token_time, mbuf_wire_size(mbuf));
732         *next_tsc = tcp_retx_timeout(ctx);
733         return 0;
734 }
735
736 static int stream_tcp_proc_out_estab(struct stream_ctx *ctx, struct rte_mbuf *mbuf, uint64_t *next_tsc)
737 {
738         if (ctx->stream_cfg->actions[ctx->cur_action].peer == ctx->peer) {
739                 return stream_tcp_proc_out_estab_tx(ctx, mbuf, next_tsc);
740         }
741         else {
742                 return stream_tcp_proc_out_estab_rx(ctx, mbuf, next_tsc);
743         }
744 }
745
746 static int stream_tcp_proc_out_close_wait(struct stream_ctx *ctx, struct rte_mbuf *mbuf, uint64_t *next_tsc)
747 {
748         uint64_t wait_tsc = token_time_tsc_until_full(&ctx->token_time);
749
750         if (wait_tsc != 0) {
751                 *next_tsc = wait_tsc;
752                 return -1;
753         }
754
755         /* CLOSE_WAIT is an intermediary stage that is only visited
756            when the FIN is sent after ACK'ing the incoming FIN. In any
757            case, it does not matter if there was a packet or not. */
758         ctx->tcp_state = LAST_ACK;
759         create_tcp_pkt(ctx, mbuf, PROX_RTE_TCP_ACK_FLAG | PROX_RTE_TCP_FIN_FLAG, 0, 0);
760         token_time_take(&ctx->token_time, mbuf_wire_size(mbuf));
761         *next_tsc = tcp_retx_timeout(ctx);
762         return 0;
763 }
764
765 static int stream_tcp_proc_out_last_ack(struct stream_ctx *ctx, struct rte_mbuf *mbuf, uint64_t *next_tsc)
766 {
767         if (ctx->ackd_seq == ctx->next_seq) {
768                 plogx_dbg("Last ACK received\n");
769                 ctx->flags |= STREAM_CTX_F_TCP_ENDED;
770                 return -1;
771         }
772         else {
773                 uint64_t wait_tsc = token_time_tsc_until_full(&ctx->token_time);
774
775                 if (wait_tsc != 0) {
776                         *next_tsc = wait_tsc;
777                         return -1;
778                 }
779                 if (ctx->flags & STREAM_CTX_F_LAST_RX_PKT_MADE_PROGRESS) {
780                         ctx->flags &= ~STREAM_CTX_F_LAST_RX_PKT_MADE_PROGRESS;
781                         *next_tsc = tcp_retx_timeout(ctx);
782                         return -1;
783                 }
784
785                 plogx_dbg("Retransmit!\n");
786                 ctx->next_seq = ctx->ackd_seq;
787                 ctx->same_state++;
788                 tcp_set_retransmit(ctx);
789                 create_tcp_pkt(ctx, mbuf, PROX_RTE_TCP_ACK_FLAG | PROX_RTE_TCP_FIN_FLAG, 0, 0);
790                 token_time_take(&ctx->token_time, mbuf_wire_size(mbuf));
791                 *next_tsc = tcp_retx_timeout(ctx);
792                 return 0;
793         }
794 }
795
796 static int stream_tcp_proc_out_fin_wait(struct stream_ctx *ctx, struct rte_mbuf *mbuf, uint64_t *next_tsc)
797 {
798         uint64_t wait_tsc = token_time_tsc_until_full(&ctx->token_time);
799
800         if (wait_tsc != 0) {
801                 *next_tsc = wait_tsc;
802                 return -1;
803         }
804
805         if (ctx->ackd_seq == ctx->next_seq) {
806                 if (ctx->flags & STREAM_CTX_F_TCP_GOT_FIN) {
807                         ctx->same_state = 0;
808                         ctx->tcp_state = TIME_WAIT;
809                         ctx->sched_tsc = rte_rdtsc() + ctx->stream_cfg->tsc_timeout_time_wait;
810                         plogx_dbg("from FIN_WAIT to TIME_WAIT\n");
811                         create_tcp_pkt(ctx, mbuf, PROX_RTE_TCP_ACK_FLAG, 0, 0);
812                         token_time_take(&ctx->token_time, mbuf_wire_size(mbuf));
813                         *next_tsc = ctx->stream_cfg->tsc_timeout_time_wait;
814                         return 0;
815                 }
816                 else {
817                         /* FIN will still need to come */
818                         *next_tsc = tcp_retx_timeout(ctx);
819                         return -1;
820                 }
821         }
822         else {
823                 if (ctx->flags & STREAM_CTX_F_LAST_RX_PKT_MADE_PROGRESS) {
824                         ctx->flags &= ~STREAM_CTX_F_LAST_RX_PKT_MADE_PROGRESS;
825                         *next_tsc = tcp_retx_timeout(ctx);
826                         return -1;
827                 }
828
829                 plogx_dbg("Retransmit!\n");
830                 ctx->same_state++;
831                 tcp_set_retransmit(ctx);
832                 ctx->next_seq = ctx->ackd_seq;
833                 create_tcp_pkt(ctx, mbuf, PROX_RTE_TCP_FIN_FLAG | PROX_RTE_TCP_ACK_FLAG, 0, 0);
834                 token_time_take(&ctx->token_time, mbuf_wire_size(mbuf));
835                 *next_tsc = tcp_retx_timeout(ctx);
836                 return 0;
837         }
838 }
839
840 static int stream_tcp_proc_out_time_wait(struct stream_ctx *ctx, struct rte_mbuf *mbuf, uint64_t *next_tsc)
841 {
842         if (ctx->sched_tsc < rte_rdtsc()) {
843                 plogx_dbg("TIME_WAIT expired! for %#x\n", ctx->tuple->dst_addr);
844                 ctx->flags |= STREAM_CTX_F_TCP_ENDED;
845                 return -1;
846         }
847         uint64_t wait_tsc = token_time_tsc_until_full(&ctx->token_time);
848
849         if (wait_tsc != 0) {
850                 *next_tsc = wait_tsc;
851                 return -1;
852         }
853
854         plogx_dbg("Got packet while in TIME_WAIT (pkt ACK reTX)\n");
855         ctx->sched_tsc = rte_rdtsc() + ctx->stream_cfg->tsc_timeout_time_wait;
856         create_tcp_pkt(ctx, mbuf, PROX_RTE_TCP_ACK_FLAG, 0, 0);
857         token_time_take(&ctx->token_time, mbuf_wire_size(mbuf));
858         *next_tsc = ctx->stream_cfg->tsc_timeout_time_wait;
859         return 0;
860 }
861
862 static int stream_tcp_proc_out(struct stream_ctx *ctx, struct rte_mbuf *mbuf, uint64_t *next_tsc)
863 {
864         if (ctx->same_state == 10) {
865                 ctx->flags |= STREAM_CTX_F_EXPIRED;
866                 return -1;
867         }
868
869         switch (ctx->tcp_state) {
870         case CLOSED: /* Client initial state */
871                 return stream_tcp_proc_out_closed(ctx, mbuf, next_tsc);
872         case LISTEN: /* Server starts in this state. */
873                 return stream_tcp_proc_out_listen(ctx, mbuf, next_tsc);
874         case SYN_SENT:
875                 return stream_tcp_proc_out_syn_sent(ctx, mbuf, next_tsc);
876         case SYN_RECEIVED:
877                 return stream_tcp_proc_out_syn_recv(ctx, mbuf, next_tsc);
878         case ESTABLISHED:
879                 return stream_tcp_proc_out_estab(ctx, mbuf, next_tsc);
880         case CLOSE_WAIT:
881                 return stream_tcp_proc_out_close_wait(ctx, mbuf, next_tsc);
882         case LAST_ACK:
883                 return stream_tcp_proc_out_last_ack(ctx, mbuf, next_tsc);
884         case FIN_WAIT:
885                 return stream_tcp_proc_out_fin_wait(ctx, mbuf, next_tsc);
886         case TIME_WAIT:
887                 return stream_tcp_proc_out_time_wait(ctx, mbuf, next_tsc);
888         }
889
890         return -1;
891 }
892
893 /* Return: zero: packet in mbuf is the reply, non-zero: data consumed,
894    nothing to send. The latter case might mean that the connection has
895    ended, or that a future event has been scheduled. l4_meta =>
896    mbuf contains packet to be processed. */
897 int stream_tcp_proc(struct stream_ctx *ctx, struct rte_mbuf *mbuf, struct l4_meta *l4_meta, uint64_t *next_tsc)
898 {
899         token_time_update(&ctx->token_time, rte_rdtsc());
900         token_time_update(&ctx->token_time_other, rte_rdtsc());
901         if (l4_meta) {
902                 int ret;
903
904                 token_time_take_clamp(&ctx->token_time_other, mbuf_wire_size(mbuf));
905                 ret = stream_tcp_proc_in(ctx, l4_meta);
906                 if (ret)
907                         return ret;
908         }
909
910         return stream_tcp_proc_out(ctx, mbuf, next_tsc);
911 }
912
913 int stream_tcp_is_ended(struct stream_ctx *ctx)
914 {
915         return ctx->flags & STREAM_CTX_F_TCP_ENDED;
916 }
917
918 static void add_pkt_bytes(uint32_t *n_pkts, uint32_t *n_bytes, uint32_t len)
919 {
920         len = (len < 60? 60 : len) + 20 + PROX_RTE_ETHER_CRC_LEN;
921
922         (*n_pkts)++;
923         *n_bytes += len;
924 }
925
926 void stream_tcp_calc_len(struct stream_cfg *cfg, uint32_t *n_pkts, uint32_t *n_bytes)
927 {
928         const uint32_t client_hdr_len = cfg->data[PEER_CLIENT].hdr_len;
929         const uint32_t server_hdr_len = cfg->data[PEER_SERVER].hdr_len;
930
931         *n_pkts = 0;
932         *n_bytes = 0;
933
934         /* Connection setup */
935         add_pkt_bytes(n_pkts, n_bytes, client_hdr_len + sizeof(prox_rte_tcp_hdr) + 4); /* SYN */
936         add_pkt_bytes(n_pkts, n_bytes, server_hdr_len + sizeof(prox_rte_tcp_hdr) + 4); /* SYN/ACK */
937         add_pkt_bytes(n_pkts, n_bytes, client_hdr_len + sizeof(prox_rte_tcp_hdr)); /* ACK */
938
939         for (uint32_t i = 0; i < cfg->n_actions; ++i) {
940                 const uint32_t mss = 1440; /* TODO: should come from peer's own mss. */
941                 uint32_t remaining = cfg->actions[i].len;
942                 const uint32_t send_hdr_len = cfg->actions[i].peer == PEER_CLIENT? client_hdr_len : server_hdr_len;
943                 const uint32_t reply_hdr_len = cfg->actions[i].peer == PEER_CLIENT? server_hdr_len : client_hdr_len;
944
945                 if (remaining == 0)
946                         break;
947
948                 while (remaining) {
949                         uint32_t seg = remaining > mss? mss: remaining;
950                         add_pkt_bytes(n_pkts, n_bytes, send_hdr_len + sizeof(prox_rte_tcp_hdr) + seg);
951                         remaining -= seg;
952                 }
953
954                 add_pkt_bytes(n_pkts, n_bytes, reply_hdr_len + sizeof(prox_rte_tcp_hdr));
955         }
956
957         /* Connection Tear-down */
958         enum l4gen_peer last_peer = cfg->actions[cfg->n_actions - 1].peer;
959
960         const uint32_t init_hdr_len = last_peer == PEER_CLIENT? client_hdr_len : server_hdr_len;
961         const uint32_t resp_hdr_len = last_peer == PEER_CLIENT? server_hdr_len : client_hdr_len;
962
963         add_pkt_bytes(n_pkts, n_bytes, init_hdr_len + sizeof(prox_rte_tcp_hdr)); /* FIN */
964         add_pkt_bytes(n_pkts, n_bytes, resp_hdr_len + sizeof(prox_rte_tcp_hdr)); /* FIN/ACK */
965         add_pkt_bytes(n_pkts, n_bytes, init_hdr_len + sizeof(prox_rte_tcp_hdr)); /* ACK */
966 }