2 // Copyright (c) 2010-2017 Intel Corporation
4 // Licensed under the Apache License, Version 2.0 (the "License");
5 // you may not use this file except in compliance with the License.
6 // You may obtain a copy of the License at
8 // http://www.apache.org/licenses/LICENSE-2.0
10 // Unless required by applicable law or agreed to in writing, software
11 // distributed under the License is distributed on an "AS IS" BASIS,
12 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 // See the License for the specific language governing permissions and
14 // limitations under the License.
17 #include <rte_cycles.h>
18 #include <rte_ether.h>
19 #include <rte_eth_ctrl.h>
22 #include "genl4_stream_tcp.h"
23 #include "prox_assert.h"
24 #include "mbuf_utils.h"
26 static uint64_t tcp_retx_timeout(const struct stream_ctx *ctx)
28 uint64_t delay = token_time_tsc_until_full(&ctx->token_time_other);
30 return delay + ctx->stream_cfg->tsc_timeout;
33 static uint64_t tcp_resched_timeout(const struct stream_ctx *ctx)
35 uint64_t delay = token_time_tsc_until_full(&ctx->token_time);
40 static void tcp_retx_timeout_start(struct stream_ctx *ctx, uint64_t *next_tsc)
42 uint64_t now = rte_rdtsc();
44 *next_tsc = tcp_retx_timeout(ctx);
45 ctx->sched_tsc = now + *next_tsc;
48 static int tcp_retx_timeout_occured(const struct stream_ctx *ctx, uint64_t now)
50 return ctx->sched_tsc < now;
53 static void tcp_retx_timeout_resume(const struct stream_ctx *ctx, uint64_t now, uint64_t *next_tsc)
55 *next_tsc = ctx->sched_tsc - now;
58 static void tcp_set_retransmit(struct stream_ctx *ctx)
66 } __attribute__((packed));
68 void stream_tcp_create_rst(struct rte_mbuf *mbuf, struct l4_meta *l4_meta, struct pkt_tuple *tuple)
70 struct tcp_hdr *tcp = (struct tcp_hdr *)l4_meta->l4_hdr;
71 struct ipv4_hdr *ip = ((struct ipv4_hdr *)tcp) - 1;
73 ip->src_addr = tuple->dst_addr;
74 ip->dst_addr = tuple->src_addr;
76 tcp->dst_port = tuple->src_port;
77 tcp->src_port = tuple->dst_port;
79 ip->total_length = rte_bswap16(sizeof(struct ipv4_hdr) + sizeof(struct tcp_hdr));
80 tcp->tcp_flags = TCP_RST_FLAG;
81 tcp->data_off = ((sizeof(struct tcp_hdr) / 4) << 4);
82 rte_pktmbuf_pkt_len(mbuf) = l4_meta->payload - rte_pktmbuf_mtod(mbuf, uint8_t *);
83 rte_pktmbuf_data_len(mbuf) = l4_meta->payload - rte_pktmbuf_mtod(mbuf, uint8_t *);
86 static void create_tcp_pkt(struct stream_ctx *ctx, struct rte_mbuf *mbuf, uint8_t tcp_flags, int data_beg, int data_len)
90 const struct peer_action *act = &ctx->stream_cfg->actions[ctx->cur_action];
91 const struct stream_cfg *stream_cfg = ctx->stream_cfg;
93 pkt = rte_pktmbuf_mtod(mbuf, uint8_t *);
94 rte_memcpy(pkt, stream_cfg->data[act->peer].hdr, stream_cfg->data[act->peer].hdr_len);
96 struct ipv4_hdr *l3_hdr = (struct ipv4_hdr*)&pkt[stream_cfg->data[act->peer].hdr_len - sizeof(struct ipv4_hdr)];
97 struct tcp_hdr *l4_hdr = (struct tcp_hdr *)&pkt[stream_cfg->data[act->peer].hdr_len];
99 l3_hdr->src_addr = ctx->tuple->dst_addr;
100 l3_hdr->dst_addr = ctx->tuple->src_addr;
101 l3_hdr->next_proto_id = IPPROTO_TCP;
103 l4_hdr->src_port = ctx->tuple->dst_port;
104 l4_hdr->dst_port = ctx->tuple->src_port;
106 uint32_t tcp_len = sizeof(struct tcp_hdr);
107 uint32_t tcp_payload_len = 0;
108 uint32_t seq_len = 0;
109 struct tcp_option *tcp_op;
111 if (tcp_flags & TCP_RST_FLAG) {
112 tcp_flags |= TCP_RST_FLAG;
115 else if (tcp_flags & TCP_SYN_FLAG) {
116 tcp_flags |= TCP_SYN_FLAG;
119 /* TODO: make options come from the stream. */
120 tcp_op = (struct tcp_option *)(l4_hdr + 1);
124 *(uint16_t *)(tcp_op + 1) = rte_bswap16(1460); /* TODO: Save this in this_mss */
129 ctx->seq_first_byte = ctx->ackd_seq + 1;
131 else if (tcp_flags & TCP_FIN_FLAG) {
132 tcp_flags |= TCP_FIN_FLAG;
136 if (tcp_flags & TCP_ACK_FLAG) {
137 l4_hdr->recv_ack = rte_bswap32(ctx->recv_seq);
138 tcp_flags |= TCP_ACK_FLAG;
141 l4_hdr->recv_ack = 0;
143 uint16_t l4_payload_offset = stream_cfg->data[act->peer].hdr_len + tcp_len;
147 plogx_dbg("l4 payload offset = %d\n", l4_payload_offset);
148 rte_memcpy(pkt + l4_payload_offset, stream_cfg->data[act->peer].content + data_beg, data_len);
151 l4_hdr->sent_seq = rte_bswap32(ctx->next_seq);
152 l4_hdr->tcp_flags = tcp_flags; /* SYN */
153 l4_hdr->rx_win = rte_bswap16(0x3890); // TODO: make this come from stream (config)
154 //l4_hdr->cksum = ...;
156 l4_hdr->data_off = ((tcp_len / 4) << 4); /* Highest 4 bits are TCP header len in units of 32 bit words */
158 /* ctx->next_seq = ctx->ackd_seq + seq_len; */
159 ctx->next_seq += seq_len;
161 /* No payload after TCP header. */
162 rte_pktmbuf_pkt_len(mbuf) = l4_payload_offset + data_len;
163 rte_pktmbuf_data_len(mbuf) = l4_payload_offset + data_len;
165 l3_hdr->total_length = rte_bswap16(sizeof(struct ipv4_hdr) + tcp_len + data_len);
166 plogdx_dbg(mbuf, NULL);
168 plogx_dbg("put tcp packet with flags: %s%s%s, (len = %d, seq = %d, ack =%d)\n",
169 tcp_flags & TCP_SYN_FLAG? "SYN ":"",
170 tcp_flags & TCP_ACK_FLAG? "ACK ":"",
171 tcp_flags & TCP_FIN_FLAG? "FIN ":"",
172 data_len, rte_bswap32(l4_hdr->sent_seq), rte_bswap32(l4_hdr->recv_ack));
175 /* Get the length of the reply associated for the next packet. Note
176 that the packet will come from the other peer. In case the next
177 packet belongs to the current peer (again), the reply length will
178 be that of an empty TCP packet (i.e. the ACK). */
179 uint16_t stream_tcp_reply_len(struct stream_ctx *ctx)
181 if (stream_tcp_is_ended(ctx))
183 else if (ctx->tcp_state != ESTABLISHED) {
184 if (ctx->tcp_state == SYN_SENT || ctx->tcp_state == LISTEN) {
185 /* First packet received is a SYN packet. In
186 the current implementation this packet
187 contains the TCP option field to set the
188 MSS. For this, add 4 bytes. */
189 return ctx->stream_cfg->data[!ctx->peer].hdr_len + sizeof(struct tcp_hdr) + 4;
191 return ctx->stream_cfg->data[!ctx->peer].hdr_len + sizeof(struct tcp_hdr);
193 else if (ctx->stream_cfg->actions[ctx->cur_action].peer == ctx->peer) {
194 /* The reply _could_ (due to races, still possibly
195 receive an old ack) contain data. This means that
196 in some cases, the prediction of the reply size
197 will be an overestimate. */
198 uint32_t data_beg = ctx->next_seq - ctx->seq_first_byte;
199 const struct peer_action *act = &ctx->stream_cfg->actions[ctx->cur_action];
201 uint32_t remaining_len = act->len - (data_beg - act->beg);
203 if (remaining_len == 0) {
204 if (ctx->cur_action + 1 != ctx->stream_cfg->n_actions) {
205 if (ctx->stream_cfg->actions[ctx->cur_action + 1].peer == ctx->peer)
206 return ctx->stream_cfg->data[ctx->peer].hdr_len + sizeof(struct tcp_hdr);
208 uint32_t seq_beg = ctx->recv_seq - ctx->other_seq_first_byte;
209 uint32_t end = ctx->stream_cfg->actions[ctx->cur_action + 1].beg +
210 ctx->stream_cfg->actions[ctx->cur_action + 1].len;
211 uint32_t remaining = end - seq_beg;
212 uint16_t data_len = remaining > 1460? 1460: remaining;
214 return ctx->stream_cfg->data[!ctx->peer].hdr_len + sizeof(struct tcp_hdr) + data_len;
218 return ctx->stream_cfg->data[ctx->peer].hdr_len + sizeof(struct tcp_hdr);
222 return ctx->stream_cfg->data[ctx->peer].hdr_len + sizeof(struct tcp_hdr);
225 else if (ctx->stream_cfg->actions[ctx->cur_action].peer != ctx->peer) {
226 uint32_t seq_beg = ctx->recv_seq - ctx->other_seq_first_byte;
227 uint32_t end = ctx->stream_cfg->actions[ctx->cur_action].beg +
228 ctx->stream_cfg->actions[ctx->cur_action].len;
229 uint32_t remaining = end - seq_beg;
230 uint16_t data_len = remaining > 1460? 1460: remaining;
232 return ctx->stream_cfg->data[!ctx->peer].hdr_len + sizeof(struct tcp_hdr) + data_len;
235 return ctx->stream_cfg->data[ctx->peer].hdr_len + sizeof(struct tcp_hdr);
238 static void stream_tcp_proc_in_order_data(struct stream_ctx *ctx, struct l4_meta *l4_meta, int *progress_seq)
240 plogx_dbg("Got data with seq %d (as expected), with len %d\n", ctx->recv_seq, l4_meta->len);
245 const struct peer_action *act = &ctx->stream_cfg->actions[ctx->cur_action];
246 enum l4gen_peer peer = act->peer;
247 /* Since we have received the expected sequence number, the start address will not exceed the cfg memory buffer. */
248 uint8_t *content = ctx->stream_cfg->data[peer].content;
249 uint32_t seq_beg = ctx->recv_seq - ctx->other_seq_first_byte;
250 uint32_t end = ctx->stream_cfg->actions[ctx->cur_action].beg + ctx->stream_cfg->actions[ctx->cur_action].len;
251 uint32_t remaining = end - seq_beg;
253 if (l4_meta->len > remaining) {
254 plogx_err("Provided data is too long:\n");
255 plogx_err("action.beg = %d, action.len = %d", act->beg, act->len);
256 plogx_err("tcp seq points at %d in action, l4_meta->len = %d\n", seq_beg, l4_meta->len);
259 if (memcmp(content + seq_beg, l4_meta->payload, l4_meta->len) == 0) {
260 plogx_dbg("Good payload in %d: %u -> %u\n", ctx->cur_action, ctx->recv_seq, l4_meta->len);
261 ctx->recv_seq += l4_meta->len;
262 ctx->cur_pos[peer] += l4_meta->len;
263 /* Move forward only when this was the last piece of data within current action (i.e. end of received data == end of action data). */
264 if (seq_beg + l4_meta->len == act->beg + act->len) {
265 plogx_dbg("Got last piece in action %d\n", ctx->cur_action);
269 plogx_dbg("Got data from %d with len %d, but waiting for more (tot len = %d)!\n", seq_beg, l4_meta->len, act->len);
272 ctx->flags |= STREAM_CTX_F_NEW_DATA;
275 plogx_err("ackable = %d, ackd = %d\n", ctx->ackable_data_seq ,ctx->ackd_seq);
276 plogx_err("Bad payload action[%d]{.len = %d, .peer = %s}\n", ctx->cur_action, act->len, peer == PEER_SERVER? "s" : "c");
277 plogx_err(" pkt payload len = %d, beginning at %u\n", l4_meta->len, seq_beg);
278 /* plogx_err(" Payload starts %zu bytes after beginning of l4_hdr\n", l4_meta->payload - l4_meta->l4_hdr); */
280 plogx_err(" payload[0-3] = %02x %02x %02x %02x\n",
284 l4_meta->payload[3]);
285 plogx_err(" expect[0-3] = %02x %02x %02x %02x\n",
286 content[seq_beg + 0],
287 content[seq_beg + 1],
288 content[seq_beg + 2],
289 content[seq_beg + 3]);
294 static int stream_tcp_proc_in(struct stream_ctx *ctx, struct l4_meta *l4_meta)
296 struct tcp_hdr *tcp = NULL;
302 tcp = (struct tcp_hdr *)l4_meta->l4_hdr;
304 got_syn = tcp->tcp_flags & TCP_SYN_FLAG;
305 got_ack = tcp->tcp_flags & TCP_ACK_FLAG;
306 got_fin = tcp->tcp_flags & TCP_FIN_FLAG;
307 got_rst = tcp->tcp_flags & TCP_RST_FLAG;
308 plogx_dbg("TCP, flags: %s%s%s, (len = %d, seq = %d, ack =%d)\n", got_syn? "SYN ":"", got_ack? "ACK ":"", got_fin? "FIN " : "", l4_meta->len, rte_bswap32(tcp->sent_seq), rte_bswap32(tcp->recv_ack));
311 ctx->flags |= STREAM_CTX_F_TCP_GOT_SYN;
313 ctx->flags |= STREAM_CTX_F_TCP_GOT_FIN;
315 int progress_ack = 0, progress_seq = 0;
317 /* RST => other side wants to terminate due to
318 inconsitent state (example: delay of retransmit of
319 last ACK while other side already closed the
320 connection. The other side will accept the packet
321 as a beginning of a new connection but there will
324 plogx_dbg("got rst\n");
325 ctx->flags |= STREAM_CTX_F_TCP_ENDED;
330 uint32_t ackd_seq = rte_bswap32(tcp->recv_ack);
332 if (ackd_seq > ctx->ackd_seq) {
333 plogx_dbg("Got ACK for outstanding data, from %d to %d\n", ctx->ackd_seq, ackd_seq);
334 ctx->ackd_seq = ackd_seq;
335 plogx_dbg("ackable data = %d\n", ctx->ackable_data_seq);
336 /* Ackable_data_seq set to byte after
338 if (ctx->ackable_data_seq == ctx->ackd_seq) {
339 /* Due to retransmit in
340 combination with late acks,
341 is is possible to ack
342 future data. In this case,
343 the assumption that data
344 was lost is not true and
345 the next seq is moved
347 if (ctx->next_seq < ctx->ackable_data_seq) {
348 ctx->next_seq = ctx->ackable_data_seq;
351 ctx->ackable_data_seq = 0;
352 const struct stream_cfg *stream_cfg = ctx->stream_cfg;
353 const struct peer_action *act = &stream_cfg->actions[ctx->cur_action];
355 ctx->cur_pos[act->peer] += act->len;
357 plogx_dbg("Moving to next action %u\n", ctx->ackd_seq);
362 plogx_dbg("Old data acked: acked = %d, ackable =%d\n", ackd_seq, ctx->ackd_seq);
366 uint32_t seq = rte_bswap32(tcp->sent_seq);
368 /* update recv_seq. */
370 /* When a syn is received, immediately reset recv_seq based on seq from packet. */
371 ctx->recv_seq = seq + 1;
372 /* Syn packets have length 1, so the first real data will start after that. */
373 ctx->other_seq_first_byte = seq + 1;
377 if (ctx->recv_seq == seq) {
378 plogx_dbg("Got fin with correct seq\n");
379 ctx->recv_seq = seq + 1;
383 plogx_dbg("Got fin but incorrect seq\n");
387 /* Only expect in-order packets. */
388 if (ctx->recv_seq == seq) {
389 stream_tcp_proc_in_order_data(ctx, l4_meta, &progress_seq);
391 else if (ctx->recv_seq < seq) {
392 plogx_dbg("Future data received (got = %d, expected = %d), missing data! (data ignored)\n", seq, ctx->recv_seq);
395 plogx_dbg("Old data received again (state = %s)\n", tcp_state_to_str(ctx->tcp_state));
396 plogx_dbg("expecting seq %d, got seq %d, len = %d\n",ctx->recv_seq, seq, l4_meta->len);
397 plogx_dbg("ackd_seq = %d, next_seq = %d, action = %d\n", ctx->ackd_seq, ctx->next_seq, ctx->cur_action);
402 if (((tcp->data_off >> 4)*4) > sizeof(struct tcp_hdr)) {
403 struct tcp_option *tcp_op = (struct tcp_option *)(tcp + 1);
404 uint8_t *payload = (uint8_t *)tcp + ((tcp->data_off >> 4)*4);
407 if (tcp_op->kind == 2 && tcp_op->len == 4) {
408 uint16_t mss = rte_bswap16(*(uint16_t *)(tcp_op + 1));
409 ctx->other_mss = mss;
412 tcp_op = (struct tcp_option *)(((uint8_t*)tcp_op) + tcp_op->len);
413 } while (((uint8_t*)tcp_op) < payload);
416 if (progress_ack || progress_seq) {
418 ctx->flags |= STREAM_CTX_F_LAST_RX_PKT_MADE_PROGRESS;
421 ctx->flags &= ~STREAM_CTX_F_LAST_RX_PKT_MADE_PROGRESS;
426 static int stream_tcp_proc_out_closed(struct stream_ctx *ctx, struct rte_mbuf *mbuf, uint64_t *next_tsc)
428 uint64_t wait_tsc = token_time_tsc_until_full(&ctx->token_time);
431 *next_tsc = wait_tsc;
435 /* create SYN packet in mbuf, return 0. goto SYN_SENT, set timeout */
436 ctx->tcp_state = SYN_SENT;
442 create_tcp_pkt(ctx, mbuf, TCP_SYN_FLAG, 0, 0);
443 token_time_take(&ctx->token_time, mbuf_wire_size(mbuf));
444 *next_tsc = tcp_retx_timeout(ctx);
448 static int stream_tcp_proc_out_listen(struct stream_ctx *ctx, struct rte_mbuf *mbuf, uint64_t *next_tsc)
450 uint64_t wait_tsc = token_time_tsc_until_full(&ctx->token_time);
453 *next_tsc = wait_tsc;
457 if (!(ctx->flags & STREAM_CTX_F_TCP_GOT_SYN)) {
458 // TODO: keep connection around at end to catch retransmits from client
459 plogx_dbg("Got packet while listening without SYN (will send RST)\n");
460 pkt_tuple_debug(ctx->tuple);
462 ctx->flags |= STREAM_CTX_F_TCP_ENDED;
463 create_tcp_pkt(ctx, mbuf, TCP_RST_FLAG, 0, 0);
464 token_time_take(&ctx->token_time, mbuf_wire_size(mbuf));
465 *next_tsc = tcp_retx_timeout(ctx);
469 /* if syn received _now_, send ack + syn. goto SYN_RECEIVED. */
470 plogx_dbg("Got packet while listen\n");
475 ctx->tcp_state = SYN_RECEIVED;
477 create_tcp_pkt(ctx, mbuf, TCP_SYN_FLAG | TCP_ACK_FLAG, 0, 0);
478 token_time_take(&ctx->token_time, mbuf_wire_size(mbuf));
479 *next_tsc = tcp_retx_timeout(ctx);
483 static int stream_tcp_proc_out_syn_sent(struct stream_ctx *ctx, struct rte_mbuf *mbuf, uint64_t *next_tsc)
485 uint64_t wait_tsc = token_time_tsc_until_full(&ctx->token_time);
488 *next_tsc = wait_tsc;
492 if (ctx->ackd_seq < ctx->next_seq || !(ctx->flags & STREAM_CTX_F_TCP_GOT_SYN)) {
493 plogx_dbg("Retransmit SYN\n");
494 /* Did not get packet, send syn again and keep state (waiting for ACK). */
496 tcp_set_retransmit(ctx);
497 return stream_tcp_proc_out_closed(ctx, mbuf, next_tsc);
500 plogx_dbg("SYN_SENT and everything ACK'ed\n");
501 plogx_dbg("ackd_seq = %d, next_seq = %d\n", ctx->ackd_seq, ctx->next_seq);
503 /* If syn received for this stream, send ack and goto
504 ESTABLISHED. If first peer is this peer to send actual
505 data, schedule immediately. */
508 ctx->tcp_state = ESTABLISHED;
510 /* third packet of three-way handshake will also contain
511 data. Don't send separate ACK yet. TODO: only send ACK if
512 data has not yet been ACK'ed. */
513 if (ctx->stream_cfg->actions[ctx->cur_action].peer == ctx->peer) {
514 *next_tsc = tcp_resched_timeout(ctx);
515 plogx_dbg("immediately resched (%d)\n", ctx->cur_action);
519 create_tcp_pkt(ctx, mbuf, TCP_ACK_FLAG, 0, 0);
520 token_time_take(&ctx->token_time, mbuf_wire_size(mbuf));
521 *next_tsc = tcp_retx_timeout(ctx);
526 static int stream_tcp_proc_out_syn_recv(struct stream_ctx *ctx, struct rte_mbuf *mbuf, uint64_t *next_tsc)
528 uint64_t wait_tsc = token_time_tsc_until_full(&ctx->token_time);
531 *next_tsc = wait_tsc;
535 if (ctx->ackd_seq == ctx->next_seq) {
536 /* Possible from server side with ctx->cur_action == 1
537 if the current packet received had ACK for syn from
538 server to client and also data completing the first
542 ctx->tcp_state = ESTABLISHED;
543 if (ctx->stream_cfg->actions[ctx->cur_action].peer != ctx->peer) {
544 create_tcp_pkt(ctx, mbuf, TCP_ACK_FLAG, 0, 0);
545 token_time_take(&ctx->token_time, mbuf_wire_size(mbuf));
546 *next_tsc = tcp_retx_timeout(ctx);
550 /* While at this point, an ACK without data
551 any could be sent by the server, it is not
552 really required because the next pacekt
553 after reschedule will also contain an ACK
556 In this implementation, if this is the
557 case, the client is not only expecting an
558 ACK, but also actual data. For this reason,
559 the empty ACK packet should not be sent,
560 otherwise the client will retransmit its
564 /* create_tcp_pkt(ctx, mbuf, TCP_ACK_FLAG, 0, 0); */
565 /* token_time_take(&ctx->token_time, mbuf_wire_size(mbuf)); */
566 *next_tsc = tcp_resched_timeout(ctx);
571 /* Either this portion is executed due to a time-out
572 or due to packet reception, the SYN that has been
573 sent is not yet ACK'ed. So, retransmit the SYN/ACK. */
574 plogx_dbg("Retransmit SYN/ACK\n");
576 tcp_set_retransmit(ctx);
577 ctx->next_seq = ctx->ackd_seq;
578 create_tcp_pkt(ctx, mbuf, TCP_SYN_FLAG | TCP_ACK_FLAG, 0, 0);
579 token_time_take(&ctx->token_time, mbuf_wire_size(mbuf));
580 *next_tsc = tcp_retx_timeout(ctx);
585 static int stream_tcp_proc_out_estab_tx(struct stream_ctx *ctx, struct rte_mbuf *mbuf, uint64_t *next_tsc)
587 uint64_t wait_tsc = token_time_tsc_until_full(&ctx->token_time);
590 *next_tsc = wait_tsc;
594 const struct peer_action *act = &ctx->stream_cfg->actions[ctx->cur_action];
597 plogx_dbg("Closing connection\n");
598 /* This would be an ACK combined with FIN. To
599 send a separate ack. keep the state in
600 established, put_ack and expire
602 plogx_dbg("Moving to FIN_WAIT\n");
603 ctx->tcp_state = FIN_WAIT;
605 create_tcp_pkt(ctx, mbuf, TCP_FIN_FLAG | TCP_ACK_FLAG, 0, 0);
606 token_time_take(&ctx->token_time, mbuf_wire_size(mbuf));
607 *next_tsc = tcp_retx_timeout(ctx);
610 /* remaining_len2 will be zero, while in case of
611 act->len == 0, the connection can be closed
614 plogx_dbg("This peer to send!\n");
615 uint32_t outstanding_bytes = ctx->next_seq - ctx->ackd_seq;
617 uint32_t data_beg2 = ctx->next_seq - ctx->seq_first_byte;
618 uint32_t remaining_len2 = act->len - (data_beg2 - act->beg);
620 const uint32_t rx_win = 300000;
621 /* If still data to be sent and allowed by outstanding amount */
622 if (outstanding_bytes <= rx_win && remaining_len2) {
623 plogx_dbg("Outstanding bytes = %d, and remaining_len = %d, next_seq = %d\n", outstanding_bytes, remaining_len2, ctx->next_seq);
625 if (ctx->ackable_data_seq == 0) {
626 PROX_ASSERT(outstanding_bytes == 0);
628 ctx->ackable_data_seq = ctx->next_seq + act->len;
631 plogx_dbg("This will not be the first part of the data within an action\n");
633 /* still data yet to be acked || still data to be sent but blocked by RX win. */
635 if (ctx->flags & STREAM_CTX_F_MORE_DATA) {
636 /* Don't send any packet. */
637 ctx->flags &= ~STREAM_CTX_F_MORE_DATA;
638 *next_tsc = tcp_retx_timeout(ctx);
639 ctx->sched_tsc = rte_rdtsc() + *next_tsc;
643 uint64_t now = rte_rdtsc();
645 if ((ctx->flags & STREAM_CTX_F_LAST_RX_PKT_MADE_PROGRESS) && token_time_tsc_until_full(&ctx->token_time_other) != 0) {
646 tcp_retx_timeout_start(ctx, next_tsc);
647 ctx->flags &= ~STREAM_CTX_F_LAST_RX_PKT_MADE_PROGRESS;
650 /* This function might be called due to packet
651 reception. In that case, cancel here and
652 wait until the timeout really occurs before
654 if (!tcp_retx_timeout_occured(ctx, now)) {
655 tcp_retx_timeout_resume(ctx, now, next_tsc);
660 tcp_set_retransmit(ctx);
661 /* This possibly means that now retransmit is resumed half-way in the action. */
662 plogx_dbg("Retransmit: outstanding = %d\n", outstanding_bytes);
663 plogx_dbg("Assuming %d->%d lost\n", ctx->ackd_seq, ctx->next_seq);
664 ctx->next_seq = ctx->ackd_seq;
665 plogx_dbg("highest seq from other side = %d\n", ctx->recv_seq);
667 /* When STREAM_CTX_F_MORE_DATA is set, real timeouts
668 can't occur. If this is needed, timeouts
669 need to carry additional information. */
672 /* The following code will retransmit the same data if next_seq is not moved forward. */
673 uint32_t data_beg = ctx->next_seq - ctx->seq_first_byte;
674 uint32_t remaining_len = act->len - (data_beg - act->beg);
675 uint32_t data_len = remaining_len > ctx->other_mss? ctx->other_mss: remaining_len;
677 plogx_warn("data_len == 0\n");
679 if (remaining_len > ctx->other_mss)
680 ctx->flags |= STREAM_CTX_F_MORE_DATA;
682 ctx->flags &= ~STREAM_CTX_F_MORE_DATA;
684 create_tcp_pkt(ctx, mbuf, TCP_ACK_FLAG, data_beg, data_len);
685 token_time_take(&ctx->token_time, mbuf_wire_size(mbuf));
686 if (ctx->flags & STREAM_CTX_F_MORE_DATA)
687 *next_tsc = tcp_resched_timeout(ctx);
689 tcp_retx_timeout_start(ctx, next_tsc);
694 static int stream_tcp_proc_out_estab_rx(struct stream_ctx *ctx, struct rte_mbuf *mbuf, uint64_t *next_tsc)
696 uint64_t wait_tsc = token_time_tsc_until_full(&ctx->token_time);
699 *next_tsc = wait_tsc;
703 if (ctx->flags & STREAM_CTX_F_TCP_GOT_FIN) {
704 plogx_dbg("Got fin!\n");
706 ctx->tcp_state = LAST_ACK;
707 create_tcp_pkt(ctx, mbuf, TCP_FIN_FLAG | TCP_ACK_FLAG, 0, 0);
708 token_time_take(&ctx->token_time, mbuf_wire_size(mbuf));
709 *next_tsc = tcp_retx_timeout(ctx);
713 ctx->tcp_state = CLOSE_WAIT;
714 create_tcp_pkt(ctx, mbuf, TCP_FIN_FLAG, 0, 0);
715 token_time_take(&ctx->token_time, mbuf_wire_size(mbuf));
716 *next_tsc = tcp_resched_timeout(ctx);
721 if (ctx->flags & STREAM_CTX_F_NEW_DATA)
722 ctx->flags &= ~STREAM_CTX_F_NEW_DATA;
725 tcp_set_retransmit(ctx);
726 plogx_dbg("state++ (ack = %d)\n", ctx->recv_seq);
729 create_tcp_pkt(ctx, mbuf, TCP_ACK_FLAG, 0, 0);
730 token_time_take(&ctx->token_time, mbuf_wire_size(mbuf));
731 *next_tsc = tcp_retx_timeout(ctx);
735 static int stream_tcp_proc_out_estab(struct stream_ctx *ctx, struct rte_mbuf *mbuf, uint64_t *next_tsc)
737 if (ctx->stream_cfg->actions[ctx->cur_action].peer == ctx->peer) {
738 return stream_tcp_proc_out_estab_tx(ctx, mbuf, next_tsc);
741 return stream_tcp_proc_out_estab_rx(ctx, mbuf, next_tsc);
745 static int stream_tcp_proc_out_close_wait(struct stream_ctx *ctx, struct rte_mbuf *mbuf, uint64_t *next_tsc)
747 uint64_t wait_tsc = token_time_tsc_until_full(&ctx->token_time);
750 *next_tsc = wait_tsc;
754 /* CLOSE_WAIT is an intermediary stage that is only visited
755 when the FIN is sent after ACK'ing the incoming FIN. In any
756 case, it does not matter if there was a packet or not. */
757 ctx->tcp_state = LAST_ACK;
758 create_tcp_pkt(ctx, mbuf, TCP_ACK_FLAG | TCP_FIN_FLAG, 0, 0);
759 token_time_take(&ctx->token_time, mbuf_wire_size(mbuf));
760 *next_tsc = tcp_retx_timeout(ctx);
764 static int stream_tcp_proc_out_last_ack(struct stream_ctx *ctx, struct rte_mbuf *mbuf, uint64_t *next_tsc)
766 if (ctx->ackd_seq == ctx->next_seq) {
767 plogx_dbg("Last ACK received\n");
768 ctx->flags |= STREAM_CTX_F_TCP_ENDED;
772 uint64_t wait_tsc = token_time_tsc_until_full(&ctx->token_time);
775 *next_tsc = wait_tsc;
778 if (ctx->flags & STREAM_CTX_F_LAST_RX_PKT_MADE_PROGRESS) {
779 ctx->flags &= ~STREAM_CTX_F_LAST_RX_PKT_MADE_PROGRESS;
780 *next_tsc = tcp_retx_timeout(ctx);
784 plogx_dbg("Retransmit!\n");
785 ctx->next_seq = ctx->ackd_seq;
787 tcp_set_retransmit(ctx);
788 create_tcp_pkt(ctx, mbuf, TCP_ACK_FLAG | TCP_FIN_FLAG, 0, 0);
789 token_time_take(&ctx->token_time, mbuf_wire_size(mbuf));
790 *next_tsc = tcp_retx_timeout(ctx);
795 static int stream_tcp_proc_out_fin_wait(struct stream_ctx *ctx, struct rte_mbuf *mbuf, uint64_t *next_tsc)
797 uint64_t wait_tsc = token_time_tsc_until_full(&ctx->token_time);
800 *next_tsc = wait_tsc;
804 if (ctx->ackd_seq == ctx->next_seq) {
805 if (ctx->flags & STREAM_CTX_F_TCP_GOT_FIN) {
807 ctx->tcp_state = TIME_WAIT;
808 ctx->sched_tsc = rte_rdtsc() + ctx->stream_cfg->tsc_timeout_time_wait;
809 plogx_dbg("from FIN_WAIT to TIME_WAIT\n");
810 create_tcp_pkt(ctx, mbuf, TCP_ACK_FLAG, 0, 0);
811 token_time_take(&ctx->token_time, mbuf_wire_size(mbuf));
812 *next_tsc = ctx->stream_cfg->tsc_timeout_time_wait;
816 /* FIN will still need to come */
817 *next_tsc = tcp_retx_timeout(ctx);
822 if (ctx->flags & STREAM_CTX_F_LAST_RX_PKT_MADE_PROGRESS) {
823 ctx->flags &= ~STREAM_CTX_F_LAST_RX_PKT_MADE_PROGRESS;
824 *next_tsc = tcp_retx_timeout(ctx);
828 plogx_dbg("Retransmit!\n");
830 tcp_set_retransmit(ctx);
831 ctx->next_seq = ctx->ackd_seq;
832 create_tcp_pkt(ctx, mbuf, TCP_FIN_FLAG | TCP_ACK_FLAG, 0, 0);
833 token_time_take(&ctx->token_time, mbuf_wire_size(mbuf));
834 *next_tsc = tcp_retx_timeout(ctx);
839 static int stream_tcp_proc_out_time_wait(struct stream_ctx *ctx, struct rte_mbuf *mbuf, uint64_t *next_tsc)
841 if (ctx->sched_tsc < rte_rdtsc()) {
842 plogx_dbg("TIME_WAIT expired! for %#x\n", ctx->tuple->dst_addr);
843 ctx->flags |= STREAM_CTX_F_TCP_ENDED;
846 uint64_t wait_tsc = token_time_tsc_until_full(&ctx->token_time);
849 *next_tsc = wait_tsc;
853 plogx_dbg("Got packet while in TIME_WAIT (pkt ACK reTX)\n");
854 ctx->sched_tsc = rte_rdtsc() + ctx->stream_cfg->tsc_timeout_time_wait;
855 create_tcp_pkt(ctx, mbuf, TCP_ACK_FLAG, 0, 0);
856 token_time_take(&ctx->token_time, mbuf_wire_size(mbuf));
857 *next_tsc = ctx->stream_cfg->tsc_timeout_time_wait;
861 static int stream_tcp_proc_out(struct stream_ctx *ctx, struct rte_mbuf *mbuf, uint64_t *next_tsc)
863 if (ctx->same_state == 10) {
864 ctx->flags |= STREAM_CTX_F_EXPIRED;
868 switch (ctx->tcp_state) {
869 case CLOSED: /* Client initial state */
870 return stream_tcp_proc_out_closed(ctx, mbuf, next_tsc);
871 case LISTEN: /* Server starts in this state. */
872 return stream_tcp_proc_out_listen(ctx, mbuf, next_tsc);
874 return stream_tcp_proc_out_syn_sent(ctx, mbuf, next_tsc);
876 return stream_tcp_proc_out_syn_recv(ctx, mbuf, next_tsc);
878 return stream_tcp_proc_out_estab(ctx, mbuf, next_tsc);
880 return stream_tcp_proc_out_close_wait(ctx, mbuf, next_tsc);
882 return stream_tcp_proc_out_last_ack(ctx, mbuf, next_tsc);
884 return stream_tcp_proc_out_fin_wait(ctx, mbuf, next_tsc);
886 return stream_tcp_proc_out_time_wait(ctx, mbuf, next_tsc);
892 /* Return: zero: packet in mbuf is the reply, non-zero: data consumed,
893 nothing to send. The latter case might mean that the connection has
894 ended, or that a future event has been scheduled. l4_meta =>
895 mbuf contains packet to be processed. */
896 int stream_tcp_proc(struct stream_ctx *ctx, struct rte_mbuf *mbuf, struct l4_meta *l4_meta, uint64_t *next_tsc)
898 token_time_update(&ctx->token_time, rte_rdtsc());
899 token_time_update(&ctx->token_time_other, rte_rdtsc());
903 token_time_take_clamp(&ctx->token_time_other, mbuf_wire_size(mbuf));
904 ret = stream_tcp_proc_in(ctx, l4_meta);
909 return stream_tcp_proc_out(ctx, mbuf, next_tsc);
912 int stream_tcp_is_ended(struct stream_ctx *ctx)
914 return ctx->flags & STREAM_CTX_F_TCP_ENDED;
917 static void add_pkt_bytes(uint32_t *n_pkts, uint32_t *n_bytes, uint32_t len)
919 len = (len < 60? 60 : len) + 20 + ETHER_CRC_LEN;
925 void stream_tcp_calc_len(struct stream_cfg *cfg, uint32_t *n_pkts, uint32_t *n_bytes)
927 const uint32_t client_hdr_len = cfg->data[PEER_CLIENT].hdr_len;
928 const uint32_t server_hdr_len = cfg->data[PEER_SERVER].hdr_len;
933 /* Connection setup */
934 add_pkt_bytes(n_pkts, n_bytes, client_hdr_len + sizeof(struct tcp_hdr) + 4); /* SYN */
935 add_pkt_bytes(n_pkts, n_bytes, server_hdr_len + sizeof(struct tcp_hdr) + 4); /* SYN/ACK */
936 add_pkt_bytes(n_pkts, n_bytes, client_hdr_len + sizeof(struct tcp_hdr)); /* ACK */
938 for (uint32_t i = 0; i < cfg->n_actions; ++i) {
939 const uint32_t mss = 1440; /* TODO: should come from peer's own mss. */
940 uint32_t remaining = cfg->actions[i].len;
941 const uint32_t send_hdr_len = cfg->actions[i].peer == PEER_CLIENT? client_hdr_len : server_hdr_len;
942 const uint32_t reply_hdr_len = cfg->actions[i].peer == PEER_CLIENT? server_hdr_len : client_hdr_len;
948 uint32_t seg = remaining > mss? mss: remaining;
949 add_pkt_bytes(n_pkts, n_bytes, send_hdr_len + sizeof(struct tcp_hdr) + seg);
953 add_pkt_bytes(n_pkts, n_bytes, reply_hdr_len + sizeof(struct tcp_hdr));
956 /* Connection Tear-down */
957 enum l4gen_peer last_peer = cfg->actions[cfg->n_actions - 1].peer;
959 const uint32_t init_hdr_len = last_peer == PEER_CLIENT? client_hdr_len : server_hdr_len;
960 const uint32_t resp_hdr_len = last_peer == PEER_CLIENT? server_hdr_len : client_hdr_len;
962 add_pkt_bytes(n_pkts, n_bytes, init_hdr_len + sizeof(struct tcp_hdr)); /* FIN */
963 add_pkt_bytes(n_pkts, n_bytes, resp_hdr_len + sizeof(struct tcp_hdr)); /* FIN/ACK */
964 add_pkt_bytes(n_pkts, n_bytes, init_hdr_len + sizeof(struct tcp_hdr)); /* ACK */