2 // Copyright (c) 2010-2017 Intel Corporation
4 // Licensed under the Apache License, Version 2.0 (the "License");
5 // you may not use this file except in compliance with the License.
6 // You may obtain a copy of the License at
8 // http://www.apache.org/licenses/LICENSE-2.0
10 // Unless required by applicable law or agreed to in writing, software
11 // distributed under the License is distributed on an "AS IS" BASIS,
12 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 // See the License for the specific language governing permissions and
14 // limitations under the License.
17 #include <rte_cycles.h>
18 #include <rte_ether.h>
19 #include <rte_ethdev.h> // required by rte_eth_ctrl.h in 19.05
20 #include <rte_eth_ctrl.h>
23 #include "genl4_stream_tcp.h"
24 #include "prox_assert.h"
25 #include "mbuf_utils.h"
27 static uint64_t tcp_retx_timeout(const struct stream_ctx *ctx)
29 uint64_t delay = token_time_tsc_until_full(&ctx->token_time_other);
31 return delay + ctx->stream_cfg->tsc_timeout;
34 static uint64_t tcp_resched_timeout(const struct stream_ctx *ctx)
36 uint64_t delay = token_time_tsc_until_full(&ctx->token_time);
41 static void tcp_retx_timeout_start(struct stream_ctx *ctx, uint64_t *next_tsc)
43 uint64_t now = rte_rdtsc();
45 *next_tsc = tcp_retx_timeout(ctx);
46 ctx->sched_tsc = now + *next_tsc;
49 static int tcp_retx_timeout_occured(const struct stream_ctx *ctx, uint64_t now)
51 return ctx->sched_tsc < now;
54 static void tcp_retx_timeout_resume(const struct stream_ctx *ctx, uint64_t now, uint64_t *next_tsc)
56 *next_tsc = ctx->sched_tsc - now;
59 static void tcp_set_retransmit(struct stream_ctx *ctx)
67 } __attribute__((packed));
69 void stream_tcp_create_rst(struct rte_mbuf *mbuf, struct l4_meta *l4_meta, struct pkt_tuple *tuple)
71 prox_rte_tcp_hdr *tcp = (prox_rte_tcp_hdr *)l4_meta->l4_hdr;
72 prox_rte_ipv4_hdr *ip = ((prox_rte_ipv4_hdr *)tcp) - 1;
74 ip->src_addr = tuple->dst_addr;
75 ip->dst_addr = tuple->src_addr;
77 tcp->dst_port = tuple->src_port;
78 tcp->src_port = tuple->dst_port;
80 ip->total_length = rte_bswap16(sizeof(prox_rte_ipv4_hdr) + sizeof(prox_rte_tcp_hdr));
81 tcp->tcp_flags = PROX_RTE_TCP_RST_FLAG;
82 tcp->data_off = ((sizeof(prox_rte_tcp_hdr) / 4) << 4);
83 rte_pktmbuf_pkt_len(mbuf) = l4_meta->payload - rte_pktmbuf_mtod(mbuf, uint8_t *);
84 rte_pktmbuf_data_len(mbuf) = l4_meta->payload - rte_pktmbuf_mtod(mbuf, uint8_t *);
87 static void create_tcp_pkt(struct stream_ctx *ctx, struct rte_mbuf *mbuf, uint8_t tcp_flags, int data_beg, int data_len)
91 const struct peer_action *act = &ctx->stream_cfg->actions[ctx->cur_action];
92 const struct stream_cfg *stream_cfg = ctx->stream_cfg;
94 pkt = rte_pktmbuf_mtod(mbuf, uint8_t *);
95 rte_memcpy(pkt, stream_cfg->data[act->peer].hdr, stream_cfg->data[act->peer].hdr_len);
97 prox_rte_ipv4_hdr *l3_hdr = (prox_rte_ipv4_hdr*)&pkt[stream_cfg->data[act->peer].hdr_len - sizeof(prox_rte_ipv4_hdr)];
98 prox_rte_tcp_hdr *l4_hdr = (prox_rte_tcp_hdr *)&pkt[stream_cfg->data[act->peer].hdr_len];
100 l3_hdr->src_addr = ctx->tuple->dst_addr;
101 l3_hdr->dst_addr = ctx->tuple->src_addr;
102 l3_hdr->next_proto_id = IPPROTO_TCP;
104 l4_hdr->src_port = ctx->tuple->dst_port;
105 l4_hdr->dst_port = ctx->tuple->src_port;
107 uint32_t tcp_len = sizeof(prox_rte_tcp_hdr);
108 uint32_t tcp_payload_len = 0;
109 uint32_t seq_len = 0;
110 struct tcp_option *tcp_op;
112 if (tcp_flags & PROX_RTE_TCP_RST_FLAG) {
113 tcp_flags |= PROX_RTE_TCP_RST_FLAG;
116 else if (tcp_flags & PROX_RTE_TCP_SYN_FLAG) {
117 tcp_flags |= PROX_RTE_TCP_SYN_FLAG;
120 /* TODO: make options come from the stream. */
121 tcp_op = (struct tcp_option *)(l4_hdr + 1);
125 *(uint16_t *)(tcp_op + 1) = rte_bswap16(1460); /* TODO: Save this in this_mss */
130 ctx->seq_first_byte = ctx->ackd_seq + 1;
132 else if (tcp_flags & PROX_RTE_TCP_FIN_FLAG) {
133 tcp_flags |= PROX_RTE_TCP_FIN_FLAG;
137 if (tcp_flags & PROX_RTE_TCP_ACK_FLAG) {
138 l4_hdr->recv_ack = rte_bswap32(ctx->recv_seq);
139 tcp_flags |= PROX_RTE_TCP_ACK_FLAG;
142 l4_hdr->recv_ack = 0;
144 uint16_t l4_payload_offset = stream_cfg->data[act->peer].hdr_len + tcp_len;
148 plogx_dbg("l4 payload offset = %d\n", l4_payload_offset);
149 rte_memcpy(pkt + l4_payload_offset, stream_cfg->data[act->peer].content + data_beg, data_len);
152 l4_hdr->sent_seq = rte_bswap32(ctx->next_seq);
153 l4_hdr->tcp_flags = tcp_flags; /* SYN */
154 l4_hdr->rx_win = rte_bswap16(0x3890); // TODO: make this come from stream (config)
155 //l4_hdr->cksum = ...;
157 l4_hdr->data_off = ((tcp_len / 4) << 4); /* Highest 4 bits are TCP header len in units of 32 bit words */
159 /* ctx->next_seq = ctx->ackd_seq + seq_len; */
160 ctx->next_seq += seq_len;
162 /* No payload after TCP header. */
163 rte_pktmbuf_pkt_len(mbuf) = l4_payload_offset + data_len;
164 rte_pktmbuf_data_len(mbuf) = l4_payload_offset + data_len;
166 l3_hdr->total_length = rte_bswap16(sizeof(prox_rte_ipv4_hdr) + tcp_len + data_len);
167 plogdx_dbg(mbuf, NULL);
169 plogx_dbg("put tcp packet with flags: %s%s%s, (len = %d, seq = %d, ack =%d)\n",
170 tcp_flags & PROX_RTE_TCP_SYN_FLAG? "SYN ":"",
171 tcp_flags & PROX_RTE_TCP_ACK_FLAG? "ACK ":"",
172 tcp_flags & PROX_RTE_TCP_FIN_FLAG? "FIN ":"",
173 data_len, rte_bswap32(l4_hdr->sent_seq), rte_bswap32(l4_hdr->recv_ack));
176 /* Get the length of the reply associated for the next packet. Note
177 that the packet will come from the other peer. In case the next
178 packet belongs to the current peer (again), the reply length will
179 be that of an empty TCP packet (i.e. the ACK). */
180 uint16_t stream_tcp_reply_len(struct stream_ctx *ctx)
182 if (stream_tcp_is_ended(ctx))
184 else if (ctx->tcp_state != ESTABLISHED) {
185 if (ctx->tcp_state == SYN_SENT || ctx->tcp_state == LISTEN) {
186 /* First packet received is a SYN packet. In
187 the current implementation this packet
188 contains the TCP option field to set the
189 MSS. For this, add 4 bytes. */
190 return ctx->stream_cfg->data[!ctx->peer].hdr_len + sizeof(prox_rte_tcp_hdr) + 4;
192 return ctx->stream_cfg->data[!ctx->peer].hdr_len + sizeof(prox_rte_tcp_hdr);
194 else if (ctx->stream_cfg->actions[ctx->cur_action].peer == ctx->peer) {
195 /* The reply _could_ (due to races, still possibly
196 receive an old ack) contain data. This means that
197 in some cases, the prediction of the reply size
198 will be an overestimate. */
199 uint32_t data_beg = ctx->next_seq - ctx->seq_first_byte;
200 const struct peer_action *act = &ctx->stream_cfg->actions[ctx->cur_action];
202 uint32_t remaining_len = act->len - (data_beg - act->beg);
204 if (remaining_len == 0) {
205 if (ctx->cur_action + 1 != ctx->stream_cfg->n_actions) {
206 if (ctx->stream_cfg->actions[ctx->cur_action + 1].peer == ctx->peer)
207 return ctx->stream_cfg->data[ctx->peer].hdr_len + sizeof(prox_rte_tcp_hdr);
209 uint32_t seq_beg = ctx->recv_seq - ctx->other_seq_first_byte;
210 uint32_t end = ctx->stream_cfg->actions[ctx->cur_action + 1].beg +
211 ctx->stream_cfg->actions[ctx->cur_action + 1].len;
212 uint32_t remaining = end - seq_beg;
213 uint16_t data_len = remaining > 1460? 1460: remaining;
215 return ctx->stream_cfg->data[!ctx->peer].hdr_len + sizeof(prox_rte_tcp_hdr) + data_len;
219 return ctx->stream_cfg->data[ctx->peer].hdr_len + sizeof(prox_rte_tcp_hdr);
223 return ctx->stream_cfg->data[ctx->peer].hdr_len + sizeof(prox_rte_tcp_hdr);
226 else if (ctx->stream_cfg->actions[ctx->cur_action].peer != ctx->peer) {
227 uint32_t seq_beg = ctx->recv_seq - ctx->other_seq_first_byte;
228 uint32_t end = ctx->stream_cfg->actions[ctx->cur_action].beg +
229 ctx->stream_cfg->actions[ctx->cur_action].len;
230 uint32_t remaining = end - seq_beg;
231 uint16_t data_len = remaining > 1460? 1460: remaining;
233 return ctx->stream_cfg->data[!ctx->peer].hdr_len + sizeof(prox_rte_tcp_hdr) + data_len;
236 return ctx->stream_cfg->data[ctx->peer].hdr_len + sizeof(prox_rte_tcp_hdr);
239 static void stream_tcp_proc_in_order_data(struct stream_ctx *ctx, struct l4_meta *l4_meta, int *progress_seq)
241 plogx_dbg("Got data with seq %d (as expected), with len %d\n", ctx->recv_seq, l4_meta->len);
246 const struct peer_action *act = &ctx->stream_cfg->actions[ctx->cur_action];
247 enum l4gen_peer peer = act->peer;
248 /* Since we have received the expected sequence number, the start address will not exceed the cfg memory buffer. */
249 uint8_t *content = ctx->stream_cfg->data[peer].content;
250 uint32_t seq_beg = ctx->recv_seq - ctx->other_seq_first_byte;
251 uint32_t end = ctx->stream_cfg->actions[ctx->cur_action].beg + ctx->stream_cfg->actions[ctx->cur_action].len;
252 uint32_t remaining = end - seq_beg;
254 if (l4_meta->len > remaining) {
255 plogx_err("Provided data is too long:\n");
256 plogx_err("action.beg = %d, action.len = %d", act->beg, act->len);
257 plogx_err("tcp seq points at %d in action, l4_meta->len = %d\n", seq_beg, l4_meta->len);
260 if (memcmp(content + seq_beg, l4_meta->payload, l4_meta->len) == 0) {
261 plogx_dbg("Good payload in %d: %u -> %u\n", ctx->cur_action, ctx->recv_seq, l4_meta->len);
262 ctx->recv_seq += l4_meta->len;
263 ctx->cur_pos[peer] += l4_meta->len;
264 /* Move forward only when this was the last piece of data within current action (i.e. end of received data == end of action data). */
265 if (seq_beg + l4_meta->len == act->beg + act->len) {
266 plogx_dbg("Got last piece in action %d\n", ctx->cur_action);
270 plogx_dbg("Got data from %d with len %d, but waiting for more (tot len = %d)!\n", seq_beg, l4_meta->len, act->len);
273 ctx->flags |= STREAM_CTX_F_NEW_DATA;
276 plogx_err("ackable = %d, ackd = %d\n", ctx->ackable_data_seq ,ctx->ackd_seq);
277 plogx_err("Bad payload action[%d]{.len = %d, .peer = %s}\n", ctx->cur_action, act->len, peer == PEER_SERVER? "s" : "c");
278 plogx_err(" pkt payload len = %d, beginning at %u\n", l4_meta->len, seq_beg);
279 /* plogx_err(" Payload starts %zu bytes after beginning of l4_hdr\n", l4_meta->payload - l4_meta->l4_hdr); */
281 plogx_err(" payload[0-3] = %02x %02x %02x %02x\n",
285 l4_meta->payload[3]);
286 plogx_err(" expect[0-3] = %02x %02x %02x %02x\n",
287 content[seq_beg + 0],
288 content[seq_beg + 1],
289 content[seq_beg + 2],
290 content[seq_beg + 3]);
295 static int stream_tcp_proc_in(struct stream_ctx *ctx, struct l4_meta *l4_meta)
297 prox_rte_tcp_hdr *tcp = NULL;
303 tcp = (prox_rte_tcp_hdr *)l4_meta->l4_hdr;
305 got_syn = tcp->tcp_flags & PROX_RTE_TCP_SYN_FLAG;
306 got_ack = tcp->tcp_flags & PROX_RTE_TCP_ACK_FLAG;
307 got_fin = tcp->tcp_flags & PROX_RTE_TCP_FIN_FLAG;
308 got_rst = tcp->tcp_flags & PROX_RTE_TCP_RST_FLAG;
309 plogx_dbg("TCP, flags: %s%s%s, (len = %d, seq = %d, ack =%d)\n", got_syn? "SYN ":"", got_ack? "ACK ":"", got_fin? "FIN " : "", l4_meta->len, rte_bswap32(tcp->sent_seq), rte_bswap32(tcp->recv_ack));
312 ctx->flags |= STREAM_CTX_F_TCP_GOT_SYN;
314 ctx->flags |= STREAM_CTX_F_TCP_GOT_FIN;
316 int progress_ack = 0, progress_seq = 0;
318 /* RST => other side wants to terminate due to
319 inconsitent state (example: delay of retransmit of
320 last ACK while other side already closed the
321 connection. The other side will accept the packet
322 as a beginning of a new connection but there will
325 plogx_dbg("got rst\n");
326 ctx->flags |= STREAM_CTX_F_TCP_ENDED;
331 uint32_t ackd_seq = rte_bswap32(tcp->recv_ack);
333 if (ackd_seq > ctx->ackd_seq) {
334 plogx_dbg("Got ACK for outstanding data, from %d to %d\n", ctx->ackd_seq, ackd_seq);
335 ctx->ackd_seq = ackd_seq;
336 plogx_dbg("ackable data = %d\n", ctx->ackable_data_seq);
337 /* Ackable_data_seq set to byte after
339 if (ctx->ackable_data_seq == ctx->ackd_seq) {
340 /* Due to retransmit in
341 combination with late acks,
342 is is possible to ack
343 future data. In this case,
344 the assumption that data
345 was lost is not true and
346 the next seq is moved
348 if (ctx->next_seq < ctx->ackable_data_seq) {
349 ctx->next_seq = ctx->ackable_data_seq;
352 ctx->ackable_data_seq = 0;
353 const struct stream_cfg *stream_cfg = ctx->stream_cfg;
354 const struct peer_action *act = &stream_cfg->actions[ctx->cur_action];
356 ctx->cur_pos[act->peer] += act->len;
358 plogx_dbg("Moving to next action %u\n", ctx->ackd_seq);
363 plogx_dbg("Old data acked: acked = %d, ackable =%d\n", ackd_seq, ctx->ackd_seq);
367 uint32_t seq = rte_bswap32(tcp->sent_seq);
369 /* update recv_seq. */
371 /* When a syn is received, immediately reset recv_seq based on seq from packet. */
372 ctx->recv_seq = seq + 1;
373 /* Syn packets have length 1, so the first real data will start after that. */
374 ctx->other_seq_first_byte = seq + 1;
378 if (ctx->recv_seq == seq) {
379 plogx_dbg("Got fin with correct seq\n");
380 ctx->recv_seq = seq + 1;
384 plogx_dbg("Got fin but incorrect seq\n");
388 /* Only expect in-order packets. */
389 if (ctx->recv_seq == seq) {
390 stream_tcp_proc_in_order_data(ctx, l4_meta, &progress_seq);
392 else if (ctx->recv_seq < seq) {
393 plogx_dbg("Future data received (got = %d, expected = %d), missing data! (data ignored)\n", seq, ctx->recv_seq);
396 plogx_dbg("Old data received again (state = %s)\n", tcp_state_to_str(ctx->tcp_state));
397 plogx_dbg("expecting seq %d, got seq %d, len = %d\n",ctx->recv_seq, seq, l4_meta->len);
398 plogx_dbg("ackd_seq = %d, next_seq = %d, action = %d\n", ctx->ackd_seq, ctx->next_seq, ctx->cur_action);
403 if (((tcp->data_off >> 4)*4) > sizeof(prox_rte_tcp_hdr)) {
404 struct tcp_option *tcp_op = (struct tcp_option *)(tcp + 1);
405 uint8_t *payload = (uint8_t *)tcp + ((tcp->data_off >> 4)*4);
408 if (tcp_op->kind == 2 && tcp_op->len == 4) {
409 uint16_t mss = rte_bswap16(*(uint16_t *)(tcp_op + 1));
410 ctx->other_mss = mss;
413 tcp_op = (struct tcp_option *)(((uint8_t*)tcp_op) + tcp_op->len);
414 } while (((uint8_t*)tcp_op) < payload);
417 if (progress_ack || progress_seq) {
419 ctx->flags |= STREAM_CTX_F_LAST_RX_PKT_MADE_PROGRESS;
422 ctx->flags &= ~STREAM_CTX_F_LAST_RX_PKT_MADE_PROGRESS;
427 static int stream_tcp_proc_out_closed(struct stream_ctx *ctx, struct rte_mbuf *mbuf, uint64_t *next_tsc)
429 uint64_t wait_tsc = token_time_tsc_until_full(&ctx->token_time);
432 *next_tsc = wait_tsc;
436 /* create SYN packet in mbuf, return 0. goto SYN_SENT, set timeout */
437 ctx->tcp_state = SYN_SENT;
443 create_tcp_pkt(ctx, mbuf, PROX_RTE_TCP_SYN_FLAG, 0, 0);
444 token_time_take(&ctx->token_time, mbuf_wire_size(mbuf));
445 *next_tsc = tcp_retx_timeout(ctx);
449 static int stream_tcp_proc_out_listen(struct stream_ctx *ctx, struct rte_mbuf *mbuf, uint64_t *next_tsc)
451 uint64_t wait_tsc = token_time_tsc_until_full(&ctx->token_time);
454 *next_tsc = wait_tsc;
458 if (!(ctx->flags & STREAM_CTX_F_TCP_GOT_SYN)) {
459 // TODO: keep connection around at end to catch retransmits from client
460 plogx_dbg("Got packet while listening without SYN (will send RST)\n");
461 pkt_tuple_debug(ctx->tuple);
463 ctx->flags |= STREAM_CTX_F_TCP_ENDED;
464 create_tcp_pkt(ctx, mbuf, PROX_RTE_TCP_RST_FLAG, 0, 0);
465 token_time_take(&ctx->token_time, mbuf_wire_size(mbuf));
466 *next_tsc = tcp_retx_timeout(ctx);
470 /* if syn received _now_, send ack + syn. goto SYN_RECEIVED. */
471 plogx_dbg("Got packet while listen\n");
476 ctx->tcp_state = SYN_RECEIVED;
478 create_tcp_pkt(ctx, mbuf, PROX_RTE_TCP_SYN_FLAG | PROX_RTE_TCP_ACK_FLAG, 0, 0);
479 token_time_take(&ctx->token_time, mbuf_wire_size(mbuf));
480 *next_tsc = tcp_retx_timeout(ctx);
484 static int stream_tcp_proc_out_syn_sent(struct stream_ctx *ctx, struct rte_mbuf *mbuf, uint64_t *next_tsc)
486 uint64_t wait_tsc = token_time_tsc_until_full(&ctx->token_time);
489 *next_tsc = wait_tsc;
493 if (ctx->ackd_seq < ctx->next_seq || !(ctx->flags & STREAM_CTX_F_TCP_GOT_SYN)) {
494 plogx_dbg("Retransmit SYN\n");
495 /* Did not get packet, send syn again and keep state (waiting for ACK). */
497 tcp_set_retransmit(ctx);
498 return stream_tcp_proc_out_closed(ctx, mbuf, next_tsc);
501 plogx_dbg("SYN_SENT and everything ACK'ed\n");
502 plogx_dbg("ackd_seq = %d, next_seq = %d\n", ctx->ackd_seq, ctx->next_seq);
504 /* If syn received for this stream, send ack and goto
505 ESTABLISHED. If first peer is this peer to send actual
506 data, schedule immediately. */
509 ctx->tcp_state = ESTABLISHED;
511 /* third packet of three-way handshake will also contain
512 data. Don't send separate ACK yet. TODO: only send ACK if
513 data has not yet been ACK'ed. */
514 if (ctx->stream_cfg->actions[ctx->cur_action].peer == ctx->peer) {
515 *next_tsc = tcp_resched_timeout(ctx);
516 plogx_dbg("immediately resched (%d)\n", ctx->cur_action);
520 create_tcp_pkt(ctx, mbuf, PROX_RTE_TCP_ACK_FLAG, 0, 0);
521 token_time_take(&ctx->token_time, mbuf_wire_size(mbuf));
522 *next_tsc = tcp_retx_timeout(ctx);
527 static int stream_tcp_proc_out_syn_recv(struct stream_ctx *ctx, struct rte_mbuf *mbuf, uint64_t *next_tsc)
529 uint64_t wait_tsc = token_time_tsc_until_full(&ctx->token_time);
532 *next_tsc = wait_tsc;
536 if (ctx->ackd_seq == ctx->next_seq) {
537 /* Possible from server side with ctx->cur_action == 1
538 if the current packet received had ACK for syn from
539 server to client and also data completing the first
543 ctx->tcp_state = ESTABLISHED;
544 if (ctx->stream_cfg->actions[ctx->cur_action].peer != ctx->peer) {
545 create_tcp_pkt(ctx, mbuf, PROX_RTE_TCP_ACK_FLAG, 0, 0);
546 token_time_take(&ctx->token_time, mbuf_wire_size(mbuf));
547 *next_tsc = tcp_retx_timeout(ctx);
551 /* While at this point, an ACK without data
552 any could be sent by the server, it is not
553 really required because the next pacekt
554 after reschedule will also contain an ACK
557 In this implementation, if this is the
558 case, the client is not only expecting an
559 ACK, but also actual data. For this reason,
560 the empty ACK packet should not be sent,
561 otherwise the client will retransmit its
565 /* create_tcp_pkt(ctx, mbuf, PROX_RTE_TCP_ACK_FLAG, 0, 0); */
566 /* token_time_take(&ctx->token_time, mbuf_wire_size(mbuf)); */
567 *next_tsc = tcp_resched_timeout(ctx);
572 /* Either this portion is executed due to a time-out
573 or due to packet reception, the SYN that has been
574 sent is not yet ACK'ed. So, retransmit the SYN/ACK. */
575 plogx_dbg("Retransmit SYN/ACK\n");
577 tcp_set_retransmit(ctx);
578 ctx->next_seq = ctx->ackd_seq;
579 create_tcp_pkt(ctx, mbuf, PROX_RTE_TCP_SYN_FLAG | PROX_RTE_TCP_ACK_FLAG, 0, 0);
580 token_time_take(&ctx->token_time, mbuf_wire_size(mbuf));
581 *next_tsc = tcp_retx_timeout(ctx);
586 static int stream_tcp_proc_out_estab_tx(struct stream_ctx *ctx, struct rte_mbuf *mbuf, uint64_t *next_tsc)
588 uint64_t wait_tsc = token_time_tsc_until_full(&ctx->token_time);
591 *next_tsc = wait_tsc;
595 const struct peer_action *act = &ctx->stream_cfg->actions[ctx->cur_action];
598 plogx_dbg("Closing connection\n");
599 /* This would be an ACK combined with FIN. To
600 send a separate ack. keep the state in
601 established, put_ack and expire
603 plogx_dbg("Moving to FIN_WAIT\n");
604 ctx->tcp_state = FIN_WAIT;
606 create_tcp_pkt(ctx, mbuf, PROX_RTE_TCP_FIN_FLAG | PROX_RTE_TCP_ACK_FLAG, 0, 0);
607 token_time_take(&ctx->token_time, mbuf_wire_size(mbuf));
608 *next_tsc = tcp_retx_timeout(ctx);
611 /* remaining_len2 will be zero, while in case of
612 act->len == 0, the connection can be closed
615 plogx_dbg("This peer to send!\n");
616 uint32_t outstanding_bytes = ctx->next_seq - ctx->ackd_seq;
618 uint32_t data_beg2 = ctx->next_seq - ctx->seq_first_byte;
619 uint32_t remaining_len2 = act->len - (data_beg2 - act->beg);
621 const uint32_t rx_win = 300000;
622 /* If still data to be sent and allowed by outstanding amount */
623 if (outstanding_bytes <= rx_win && remaining_len2) {
624 plogx_dbg("Outstanding bytes = %d, and remaining_len = %d, next_seq = %d\n", outstanding_bytes, remaining_len2, ctx->next_seq);
626 if (ctx->ackable_data_seq == 0) {
627 PROX_ASSERT(outstanding_bytes == 0);
629 ctx->ackable_data_seq = ctx->next_seq + act->len;
632 plogx_dbg("This will not be the first part of the data within an action\n");
634 /* still data yet to be acked || still data to be sent but blocked by RX win. */
636 if (ctx->flags & STREAM_CTX_F_MORE_DATA) {
637 /* Don't send any packet. */
638 ctx->flags &= ~STREAM_CTX_F_MORE_DATA;
639 *next_tsc = tcp_retx_timeout(ctx);
640 ctx->sched_tsc = rte_rdtsc() + *next_tsc;
644 uint64_t now = rte_rdtsc();
646 if ((ctx->flags & STREAM_CTX_F_LAST_RX_PKT_MADE_PROGRESS) && token_time_tsc_until_full(&ctx->token_time_other) != 0) {
647 tcp_retx_timeout_start(ctx, next_tsc);
648 ctx->flags &= ~STREAM_CTX_F_LAST_RX_PKT_MADE_PROGRESS;
651 /* This function might be called due to packet
652 reception. In that case, cancel here and
653 wait until the timeout really occurs before
655 if (!tcp_retx_timeout_occured(ctx, now)) {
656 tcp_retx_timeout_resume(ctx, now, next_tsc);
661 tcp_set_retransmit(ctx);
662 /* This possibly means that now retransmit is resumed half-way in the action. */
663 plogx_dbg("Retransmit: outstanding = %d\n", outstanding_bytes);
664 plogx_dbg("Assuming %d->%d lost\n", ctx->ackd_seq, ctx->next_seq);
665 ctx->next_seq = ctx->ackd_seq;
666 plogx_dbg("highest seq from other side = %d\n", ctx->recv_seq);
668 /* When STREAM_CTX_F_MORE_DATA is set, real timeouts
669 can't occur. If this is needed, timeouts
670 need to carry additional information. */
673 /* The following code will retransmit the same data if next_seq is not moved forward. */
674 uint32_t data_beg = ctx->next_seq - ctx->seq_first_byte;
675 uint32_t remaining_len = act->len - (data_beg - act->beg);
676 uint32_t data_len = remaining_len > ctx->other_mss? ctx->other_mss: remaining_len;
678 plogx_warn("data_len == 0\n");
680 if (remaining_len > ctx->other_mss)
681 ctx->flags |= STREAM_CTX_F_MORE_DATA;
683 ctx->flags &= ~STREAM_CTX_F_MORE_DATA;
685 create_tcp_pkt(ctx, mbuf, PROX_RTE_TCP_ACK_FLAG, data_beg, data_len);
686 token_time_take(&ctx->token_time, mbuf_wire_size(mbuf));
687 if (ctx->flags & STREAM_CTX_F_MORE_DATA)
688 *next_tsc = tcp_resched_timeout(ctx);
690 tcp_retx_timeout_start(ctx, next_tsc);
695 static int stream_tcp_proc_out_estab_rx(struct stream_ctx *ctx, struct rte_mbuf *mbuf, uint64_t *next_tsc)
697 uint64_t wait_tsc = token_time_tsc_until_full(&ctx->token_time);
700 *next_tsc = wait_tsc;
704 if (ctx->flags & STREAM_CTX_F_TCP_GOT_FIN) {
705 plogx_dbg("Got fin!\n");
707 ctx->tcp_state = LAST_ACK;
708 create_tcp_pkt(ctx, mbuf, PROX_RTE_TCP_FIN_FLAG | PROX_RTE_TCP_ACK_FLAG, 0, 0);
709 token_time_take(&ctx->token_time, mbuf_wire_size(mbuf));
710 *next_tsc = tcp_retx_timeout(ctx);
714 ctx->tcp_state = CLOSE_WAIT;
715 create_tcp_pkt(ctx, mbuf, PROX_RTE_TCP_FIN_FLAG, 0, 0);
716 token_time_take(&ctx->token_time, mbuf_wire_size(mbuf));
717 *next_tsc = tcp_resched_timeout(ctx);
722 if (ctx->flags & STREAM_CTX_F_NEW_DATA)
723 ctx->flags &= ~STREAM_CTX_F_NEW_DATA;
726 tcp_set_retransmit(ctx);
727 plogx_dbg("state++ (ack = %d)\n", ctx->recv_seq);
730 create_tcp_pkt(ctx, mbuf, PROX_RTE_TCP_ACK_FLAG, 0, 0);
731 token_time_take(&ctx->token_time, mbuf_wire_size(mbuf));
732 *next_tsc = tcp_retx_timeout(ctx);
736 static int stream_tcp_proc_out_estab(struct stream_ctx *ctx, struct rte_mbuf *mbuf, uint64_t *next_tsc)
738 if (ctx->stream_cfg->actions[ctx->cur_action].peer == ctx->peer) {
739 return stream_tcp_proc_out_estab_tx(ctx, mbuf, next_tsc);
742 return stream_tcp_proc_out_estab_rx(ctx, mbuf, next_tsc);
746 static int stream_tcp_proc_out_close_wait(struct stream_ctx *ctx, struct rte_mbuf *mbuf, uint64_t *next_tsc)
748 uint64_t wait_tsc = token_time_tsc_until_full(&ctx->token_time);
751 *next_tsc = wait_tsc;
755 /* CLOSE_WAIT is an intermediary stage that is only visited
756 when the FIN is sent after ACK'ing the incoming FIN. In any
757 case, it does not matter if there was a packet or not. */
758 ctx->tcp_state = LAST_ACK;
759 create_tcp_pkt(ctx, mbuf, PROX_RTE_TCP_ACK_FLAG | PROX_RTE_TCP_FIN_FLAG, 0, 0);
760 token_time_take(&ctx->token_time, mbuf_wire_size(mbuf));
761 *next_tsc = tcp_retx_timeout(ctx);
765 static int stream_tcp_proc_out_last_ack(struct stream_ctx *ctx, struct rte_mbuf *mbuf, uint64_t *next_tsc)
767 if (ctx->ackd_seq == ctx->next_seq) {
768 plogx_dbg("Last ACK received\n");
769 ctx->flags |= STREAM_CTX_F_TCP_ENDED;
773 uint64_t wait_tsc = token_time_tsc_until_full(&ctx->token_time);
776 *next_tsc = wait_tsc;
779 if (ctx->flags & STREAM_CTX_F_LAST_RX_PKT_MADE_PROGRESS) {
780 ctx->flags &= ~STREAM_CTX_F_LAST_RX_PKT_MADE_PROGRESS;
781 *next_tsc = tcp_retx_timeout(ctx);
785 plogx_dbg("Retransmit!\n");
786 ctx->next_seq = ctx->ackd_seq;
788 tcp_set_retransmit(ctx);
789 create_tcp_pkt(ctx, mbuf, PROX_RTE_TCP_ACK_FLAG | PROX_RTE_TCP_FIN_FLAG, 0, 0);
790 token_time_take(&ctx->token_time, mbuf_wire_size(mbuf));
791 *next_tsc = tcp_retx_timeout(ctx);
796 static int stream_tcp_proc_out_fin_wait(struct stream_ctx *ctx, struct rte_mbuf *mbuf, uint64_t *next_tsc)
798 uint64_t wait_tsc = token_time_tsc_until_full(&ctx->token_time);
801 *next_tsc = wait_tsc;
805 if (ctx->ackd_seq == ctx->next_seq) {
806 if (ctx->flags & STREAM_CTX_F_TCP_GOT_FIN) {
808 ctx->tcp_state = TIME_WAIT;
809 ctx->sched_tsc = rte_rdtsc() + ctx->stream_cfg->tsc_timeout_time_wait;
810 plogx_dbg("from FIN_WAIT to TIME_WAIT\n");
811 create_tcp_pkt(ctx, mbuf, PROX_RTE_TCP_ACK_FLAG, 0, 0);
812 token_time_take(&ctx->token_time, mbuf_wire_size(mbuf));
813 *next_tsc = ctx->stream_cfg->tsc_timeout_time_wait;
817 /* FIN will still need to come */
818 *next_tsc = tcp_retx_timeout(ctx);
823 if (ctx->flags & STREAM_CTX_F_LAST_RX_PKT_MADE_PROGRESS) {
824 ctx->flags &= ~STREAM_CTX_F_LAST_RX_PKT_MADE_PROGRESS;
825 *next_tsc = tcp_retx_timeout(ctx);
829 plogx_dbg("Retransmit!\n");
831 tcp_set_retransmit(ctx);
832 ctx->next_seq = ctx->ackd_seq;
833 create_tcp_pkt(ctx, mbuf, PROX_RTE_TCP_FIN_FLAG | PROX_RTE_TCP_ACK_FLAG, 0, 0);
834 token_time_take(&ctx->token_time, mbuf_wire_size(mbuf));
835 *next_tsc = tcp_retx_timeout(ctx);
840 static int stream_tcp_proc_out_time_wait(struct stream_ctx *ctx, struct rte_mbuf *mbuf, uint64_t *next_tsc)
842 if (ctx->sched_tsc < rte_rdtsc()) {
843 plogx_dbg("TIME_WAIT expired! for %#x\n", ctx->tuple->dst_addr);
844 ctx->flags |= STREAM_CTX_F_TCP_ENDED;
847 uint64_t wait_tsc = token_time_tsc_until_full(&ctx->token_time);
850 *next_tsc = wait_tsc;
854 plogx_dbg("Got packet while in TIME_WAIT (pkt ACK reTX)\n");
855 ctx->sched_tsc = rte_rdtsc() + ctx->stream_cfg->tsc_timeout_time_wait;
856 create_tcp_pkt(ctx, mbuf, PROX_RTE_TCP_ACK_FLAG, 0, 0);
857 token_time_take(&ctx->token_time, mbuf_wire_size(mbuf));
858 *next_tsc = ctx->stream_cfg->tsc_timeout_time_wait;
862 static int stream_tcp_proc_out(struct stream_ctx *ctx, struct rte_mbuf *mbuf, uint64_t *next_tsc)
864 if (ctx->same_state == 10) {
865 ctx->flags |= STREAM_CTX_F_EXPIRED;
869 switch (ctx->tcp_state) {
870 case CLOSED: /* Client initial state */
871 return stream_tcp_proc_out_closed(ctx, mbuf, next_tsc);
872 case LISTEN: /* Server starts in this state. */
873 return stream_tcp_proc_out_listen(ctx, mbuf, next_tsc);
875 return stream_tcp_proc_out_syn_sent(ctx, mbuf, next_tsc);
877 return stream_tcp_proc_out_syn_recv(ctx, mbuf, next_tsc);
879 return stream_tcp_proc_out_estab(ctx, mbuf, next_tsc);
881 return stream_tcp_proc_out_close_wait(ctx, mbuf, next_tsc);
883 return stream_tcp_proc_out_last_ack(ctx, mbuf, next_tsc);
885 return stream_tcp_proc_out_fin_wait(ctx, mbuf, next_tsc);
887 return stream_tcp_proc_out_time_wait(ctx, mbuf, next_tsc);
893 /* Return: zero: packet in mbuf is the reply, non-zero: data consumed,
894 nothing to send. The latter case might mean that the connection has
895 ended, or that a future event has been scheduled. l4_meta =>
896 mbuf contains packet to be processed. */
897 int stream_tcp_proc(struct stream_ctx *ctx, struct rte_mbuf *mbuf, struct l4_meta *l4_meta, uint64_t *next_tsc)
899 token_time_update(&ctx->token_time, rte_rdtsc());
900 token_time_update(&ctx->token_time_other, rte_rdtsc());
904 token_time_take_clamp(&ctx->token_time_other, mbuf_wire_size(mbuf));
905 ret = stream_tcp_proc_in(ctx, l4_meta);
910 return stream_tcp_proc_out(ctx, mbuf, next_tsc);
913 int stream_tcp_is_ended(struct stream_ctx *ctx)
915 return ctx->flags & STREAM_CTX_F_TCP_ENDED;
918 static void add_pkt_bytes(uint32_t *n_pkts, uint32_t *n_bytes, uint32_t len)
920 len = (len < 60? 60 : len) + 20 + PROX_RTE_ETHER_CRC_LEN;
926 void stream_tcp_calc_len(struct stream_cfg *cfg, uint32_t *n_pkts, uint32_t *n_bytes)
928 const uint32_t client_hdr_len = cfg->data[PEER_CLIENT].hdr_len;
929 const uint32_t server_hdr_len = cfg->data[PEER_SERVER].hdr_len;
934 /* Connection setup */
935 add_pkt_bytes(n_pkts, n_bytes, client_hdr_len + sizeof(prox_rte_tcp_hdr) + 4); /* SYN */
936 add_pkt_bytes(n_pkts, n_bytes, server_hdr_len + sizeof(prox_rte_tcp_hdr) + 4); /* SYN/ACK */
937 add_pkt_bytes(n_pkts, n_bytes, client_hdr_len + sizeof(prox_rte_tcp_hdr)); /* ACK */
939 for (uint32_t i = 0; i < cfg->n_actions; ++i) {
940 const uint32_t mss = 1440; /* TODO: should come from peer's own mss. */
941 uint32_t remaining = cfg->actions[i].len;
942 const uint32_t send_hdr_len = cfg->actions[i].peer == PEER_CLIENT? client_hdr_len : server_hdr_len;
943 const uint32_t reply_hdr_len = cfg->actions[i].peer == PEER_CLIENT? server_hdr_len : client_hdr_len;
949 uint32_t seg = remaining > mss? mss: remaining;
950 add_pkt_bytes(n_pkts, n_bytes, send_hdr_len + sizeof(prox_rte_tcp_hdr) + seg);
954 add_pkt_bytes(n_pkts, n_bytes, reply_hdr_len + sizeof(prox_rte_tcp_hdr));
957 /* Connection Tear-down */
958 enum l4gen_peer last_peer = cfg->actions[cfg->n_actions - 1].peer;
960 const uint32_t init_hdr_len = last_peer == PEER_CLIENT? client_hdr_len : server_hdr_len;
961 const uint32_t resp_hdr_len = last_peer == PEER_CLIENT? server_hdr_len : client_hdr_len;
963 add_pkt_bytes(n_pkts, n_bytes, init_hdr_len + sizeof(prox_rte_tcp_hdr)); /* FIN */
964 add_pkt_bytes(n_pkts, n_bytes, resp_hdr_len + sizeof(prox_rte_tcp_hdr)); /* FIN/ACK */
965 add_pkt_bytes(n_pkts, n_bytes, init_hdr_len + sizeof(prox_rte_tcp_hdr)); /* ACK */