Support packets in flight
[samplevnf.git] / VNFs / DPPD-PROX / handle_gen.c
1 /*
2 // Copyright (c) 2010-2020 Intel Corporation
3 //
4 // Licensed under the Apache License, Version 2.0 (the "License");
5 // you may not use this file except in compliance with the License.
6 // You may obtain a copy of the License at
7 //
8 //     http://www.apache.org/licenses/LICENSE-2.0
9 //
10 // Unless required by applicable law or agreed to in writing, software
11 // distributed under the License is distributed on an "AS IS" BASIS,
12 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 // See the License for the specific language governing permissions and
14 // limitations under the License.
15 */
16
17 #include <rte_common.h>
18 #ifndef __rte_cache_aligned
19 #include <rte_memory.h>
20 #endif
21
22 #include <rte_mbuf.h>
23 #include <pcap.h>
24 #include <string.h>
25 #include <stdlib.h>
26 #include <rte_cycles.h>
27 #include <rte_version.h>
28 #include <rte_byteorder.h>
29 #include <rte_ether.h>
30 #include <rte_hash.h>
31 #include <rte_hash_crc.h>
32 #include <rte_malloc.h>
33
34 #include "prox_shared.h"
35 #include "random.h"
36 #include "prox_malloc.h"
37 #include "handle_gen.h"
38 #include "handle_lat.h"
39 #include "task_init.h"
40 #include "task_base.h"
41 #include "prox_port_cfg.h"
42 #include "lconf.h"
43 #include "log.h"
44 #include "quit.h"
45 #include "prox_cfg.h"
46 #include "mbuf_utils.h"
47 #include "qinq.h"
48 #include "prox_cksum.h"
49 #include "etypes.h"
50 #include "prox_assert.h"
51 #include "prefetch.h"
52 #include "token_time.h"
53 #include "local_mbuf.h"
54 #include "arp.h"
55 #include "tx_pkt.h"
56 #include "handle_master.h"
57 #include "defines.h"
58 #include "prox_ipv6.h"
59 #include "handle_lb_5tuple.h"
60
61 struct pkt_template {
62         uint16_t len;
63         uint16_t l2_len;
64         uint16_t l3_len;
65         uint8_t  *buf;
66 };
67
68 #define MAX_STORE_PKT_SIZE      2048
69
70 struct packet {
71         unsigned int len;
72         unsigned char buf[MAX_STORE_PKT_SIZE];
73 };
74
75 #define IP4(x) x & 0xff, (x >> 8) & 0xff, (x >> 16) & 0xff, x >> 24
76
77 #define DO_PANIC        1
78 #define DO_NOT_PANIC    0
79
80 #define FROM_PCAP       1
81 #define NOT_FROM_PCAP   0
82
83 #define MAX_RANGES      64
84
85 #define TASK_OVERWRITE_SRC_MAC_WITH_PORT_MAC 1
86
87 static void pkt_template_init_mbuf(struct pkt_template *pkt_template, struct rte_mbuf *mbuf, uint8_t *pkt)
88 {
89         const uint32_t pkt_size = pkt_template->len;
90
91         rte_pktmbuf_pkt_len(mbuf) = pkt_size;
92         rte_pktmbuf_data_len(mbuf) = pkt_size;
93         init_mbuf_seg(mbuf);
94         rte_memcpy(pkt, pkt_template->buf, pkt_template->len);
95 }
96
97 struct task_gen_pcap {
98         struct task_base base;
99         uint64_t hz;
100         struct local_mbuf local_mbuf;
101         uint32_t pkt_idx;
102         struct pkt_template *proto;
103         uint32_t loop;
104         uint32_t n_pkts;
105         uint64_t last_tsc;
106         uint64_t *proto_tsc;
107         uint32_t socket_id;
108 };
109
110 struct flows {
111         uint32_t packet_id;
112 };
113
114 struct task_gen {
115         struct task_base base;
116         uint64_t hz;
117         struct token_time token_time;
118         struct local_mbuf local_mbuf;
119         struct pkt_template *pkt_template; /* packet templates used at runtime */
120         uint64_t write_duration_estimate; /* how long it took previously to write the time stamps in the packets */
121         uint64_t earliest_tsc_next_pkt;
122         uint64_t new_rate_bps;
123         uint64_t pkt_queue_index;
124         uint32_t n_pkts; /* number of packets in pcap */
125         uint32_t orig_n_pkts; /* number of packets in pcap */
126         uint32_t pkt_idx; /* current packet from pcap */
127         uint32_t pkt_count; /* how many pakets to generate */
128         uint32_t max_frame_size;
129         uint32_t runtime_flags;
130         uint16_t lat_pos;
131         uint16_t packet_id_pos;
132         uint16_t accur_pos;
133         uint16_t sig_pos;
134         uint16_t flow_id_pos;
135         uint16_t packet_id_in_flow_pos;
136         uint32_t sig;
137         uint32_t socket_id;
138         uint8_t generator_id;
139         uint8_t n_rands; /* number of randoms */
140         uint8_t n_ranges; /* number of ranges */
141         uint8_t min_bulk_size;
142         uint8_t max_bulk_size;
143         uint8_t lat_enabled;
144         uint8_t runtime_checksum_needed;
145         struct {
146                 struct random state;
147                 uint32_t rand_mask; /* since the random vals are uniform, masks don't introduce bias  */
148                 uint32_t fixed_bits; /* length of each random (max len = 4) */
149                 uint16_t rand_offset; /* each random has an offset*/
150                 uint8_t rand_len; /* # bytes to take from random (no bias introduced) */
151         } rand[64];
152         struct range ranges[MAX_RANGES];
153         uint64_t accur[ACCURACY_WINDOW];
154         uint64_t pkt_tsc_offset[64];
155         struct pkt_template *pkt_template_orig; /* packet templates (from inline or from pcap) */
156         prox_rte_ether_addr  src_mac;
157         uint8_t flags;
158         uint8_t cksum_offload;
159         struct prox_port_cfg *port;
160         uint64_t *bytes_to_tsc;
161         uint32_t imix_pkt_sizes[MAX_IMIX_PKTS];
162         uint32_t imix_nb_pkts;
163         uint32_t new_imix_nb_pkts;
164         uint32_t store_pkt_id;
165         uint32_t store_msk;
166         struct packet *store_buf;
167         FILE *fp;
168         struct rte_hash *flow_id_table;
169         struct flows*flows;
170 } __rte_cache_aligned;
171
172 static void task_gen_set_pkt_templates_len(struct task_gen *task, uint32_t *pkt_sizes);
173 static void task_gen_reset_pkt_templates_content(struct task_gen *task);
174 static void task_gen_pkt_template_recalc_metadata(struct task_gen *task);
175 static int check_all_pkt_size(struct task_gen *task, int do_panic);
176 static int check_all_fields_in_bounds(struct task_gen *task, int do_panic);
177
178 static inline uint8_t ipv4_get_hdr_len(prox_rte_ipv4_hdr *ip)
179 {
180         /* Optimize for common case of IPv4 header without options. */
181         if (ip->version_ihl == 0x45)
182                 return sizeof(prox_rte_ipv4_hdr);
183         if (unlikely(ip->version_ihl >> 4 != 4)) {
184                 plog_warn("IPv4 ether_type but IP version = %d != 4", ip->version_ihl >> 4);
185                 return 0;
186         }
187         return (ip->version_ihl & 0xF) * 4;
188 }
189
190 static void parse_l2_l3_len(uint8_t *pkt, uint16_t *l2_len, uint16_t *l3_len, uint16_t len)
191 {
192         *l2_len = sizeof(prox_rte_ether_hdr);
193         *l3_len = 0;
194         prox_rte_vlan_hdr *vlan_hdr;
195         prox_rte_ether_hdr *eth_hdr = (prox_rte_ether_hdr*)pkt;
196         prox_rte_ipv4_hdr *ip;
197         uint16_t ether_type = eth_hdr->ether_type;
198
199         // Unstack VLAN tags
200         while (((ether_type == ETYPE_8021ad) || (ether_type == ETYPE_VLAN)) && (*l2_len + sizeof(prox_rte_vlan_hdr) < len)) {
201                 vlan_hdr = (prox_rte_vlan_hdr *)(pkt + *l2_len);
202                 *l2_len +=4;
203                 ether_type = vlan_hdr->eth_proto;
204         }
205
206         // No L3 cksum offload for IPv6, but TODO L4 offload
207         // ETYPE_EoGRE CRC not implemented yet
208
209         switch (ether_type) {
210         case ETYPE_MPLSU:
211         case ETYPE_MPLSM:
212                 *l2_len +=4;
213                 break;
214         case ETYPE_IPv6:
215         case ETYPE_IPv4:
216                 break;
217         case ETYPE_EoGRE:
218         case ETYPE_ARP:
219                 *l2_len = 0;
220                 break;
221         default:
222                 *l2_len = 0;
223                 plog_warn("Unsupported packet type %x - CRC might be wrong\n", ether_type);
224                 break;
225         }
226
227         if (*l2_len) {
228                 prox_rte_ipv4_hdr *ip = (prox_rte_ipv4_hdr *)(pkt + *l2_len);
229                 if (ip->version_ihl >> 4 == 4)
230                         *l3_len = ipv4_get_hdr_len(ip);
231         }
232 }
233
234 static void checksum_packet(uint8_t *hdr, struct rte_mbuf *mbuf, struct pkt_template *pkt_template, int cksum_offload)
235 {
236         uint16_t l2_len = pkt_template->l2_len;
237         uint16_t l3_len = pkt_template->l3_len;
238
239         prox_rte_ipv4_hdr *ip = (prox_rte_ipv4_hdr*)(hdr + l2_len);
240         if (l3_len) {
241                 prox_ip_udp_cksum(mbuf, ip, l2_len, l3_len, cksum_offload);
242         } else if (ip->version_ihl >> 4 == 6) {
243                 prox_rte_ipv6_hdr *ip6 = (prox_rte_ipv6_hdr *)(hdr + l2_len);
244                 if (ip6->proto == IPPROTO_UDP) {
245                         prox_rte_udp_hdr *udp = (prox_rte_udp_hdr *)(ip6 + 1);
246                         udp->dgram_cksum = 0;
247                         udp->dgram_cksum = rte_ipv6_udptcp_cksum(ip6, udp);
248                 } else if (ip6->proto == IPPROTO_TCP) {
249                         prox_rte_tcp_hdr *tcp = (prox_rte_tcp_hdr *)(ip6 + 1);
250                         tcp->cksum = 0;
251                         tcp->cksum = rte_ipv6_udptcp_cksum(ip6, tcp);
252                 }
253         }
254 }
255
256 static void task_gen_reset_token_time(struct task_gen *task)
257 {
258         token_time_set_bpp(&task->token_time, task->new_rate_bps);
259         token_time_reset(&task->token_time, rte_rdtsc(), 0);
260 }
261
262 static void task_gen_take_count(struct task_gen *task, uint32_t send_bulk)
263 {
264         if (task->pkt_count == (uint32_t)-1)
265                 return ;
266         else {
267                 if (task->pkt_count >= send_bulk)
268                         task->pkt_count -= send_bulk;
269                 else
270                         task->pkt_count = 0;
271         }
272 }
273
274 static int handle_gen_pcap_bulk(struct task_base *tbase, struct rte_mbuf **mbuf, uint16_t n_pkts)
275 {
276         struct task_gen_pcap *task = (struct task_gen_pcap *)tbase;
277         uint64_t now = rte_rdtsc();
278         uint64_t send_bulk = 0;
279         uint32_t pkt_idx_tmp = task->pkt_idx;
280
281         if (pkt_idx_tmp == task->n_pkts) {
282                 PROX_ASSERT(task->loop);
283                 return 0;
284         }
285
286         for (uint16_t j = 0; j < 64; ++j) {
287                 uint64_t tsc = task->proto_tsc[pkt_idx_tmp];
288                 if (task->last_tsc + tsc <= now) {
289                         task->last_tsc += tsc;
290                         send_bulk++;
291                         pkt_idx_tmp++;
292                         if (pkt_idx_tmp == task->n_pkts) {
293                                 if (task->loop)
294                                         pkt_idx_tmp = 0;
295                                 else
296                                         break;
297                         }
298                 }
299                 else
300                         break;
301         }
302
303         struct rte_mbuf **new_pkts = local_mbuf_refill_and_take(&task->local_mbuf, send_bulk);
304         if (new_pkts == NULL)
305                 return 0;
306
307         for (uint16_t j = 0; j < send_bulk; ++j) {
308                 struct rte_mbuf *next_pkt = new_pkts[j];
309                 struct pkt_template *pkt_template = &task->proto[task->pkt_idx];
310                 uint8_t *hdr = rte_pktmbuf_mtod(next_pkt, uint8_t *);
311
312                 pkt_template_init_mbuf(pkt_template, next_pkt, hdr);
313
314                 task->pkt_idx++;
315                 if (task->pkt_idx == task->n_pkts) {
316                         if (task->loop)
317                                 task->pkt_idx = 0;
318                         else
319                                 break;
320                 }
321         }
322
323         return task->base.tx_pkt(&task->base, new_pkts, send_bulk, NULL);
324 }
325
326 static inline uint64_t bytes_to_tsc(struct task_gen *task, uint32_t bytes)
327 {
328         return task->bytes_to_tsc[bytes];
329 }
330
331 static uint32_t task_gen_next_pkt_idx(const struct task_gen *task, uint32_t pkt_idx)
332 {
333         return pkt_idx + 1 >= task->n_pkts? 0 : pkt_idx + 1;
334 }
335
336 static uint32_t task_gen_offset_pkt_idx(const struct task_gen *task, uint32_t offset)
337 {
338         return (task->pkt_idx + offset) % task->n_pkts;
339 }
340
341 static uint32_t task_gen_calc_send_bulk(const struct task_gen *task, uint32_t *total_bytes)
342 {
343         /* The biggest bulk we allow to send is task->max_bulk_size
344            packets. The max bulk size can also be limited by the
345            pkt_count field.  At the same time, we are rate limiting
346            based on the specified speed (in bytes per second) so token
347            bucket based rate limiting must also be applied. The
348            minimum bulk size is also constrained. If the calculated
349            bulk size is less then the minimum, then don't send
350            anything. */
351
352         const uint32_t min_bulk = task->min_bulk_size;
353         uint32_t max_bulk = task->max_bulk_size;
354
355         if (task->pkt_count != (uint32_t)-1 && task->pkt_count < max_bulk) {
356                 max_bulk = task->pkt_count;
357         }
358
359         uint32_t send_bulk = 0;
360         uint32_t pkt_idx_tmp = task->pkt_idx;
361         uint32_t would_send_bytes = 0;
362         uint32_t pkt_size;
363
364         /*
365          * TODO - this must be improved to take into account the fact that, after applying randoms
366          * The packet can be replaced by an ARP
367          */
368         for (uint16_t j = 0; j < max_bulk; ++j) {
369                 struct pkt_template *pktpl = &task->pkt_template[pkt_idx_tmp];
370                 pkt_size = pktpl->len;
371                 uint32_t pkt_len = pkt_len_to_wire_size(pkt_size);
372                 if (pkt_len + would_send_bytes > task->token_time.bytes_now)
373                         break;
374
375                 pkt_idx_tmp = task_gen_next_pkt_idx(task, pkt_idx_tmp);
376
377                 send_bulk++;
378                 would_send_bytes += pkt_len;
379         }
380
381         if (send_bulk < min_bulk)
382                 return 0;
383         *total_bytes = would_send_bytes;
384         return send_bulk;
385 }
386
387 static void task_gen_apply_random_fields(struct task_gen *task, uint8_t *hdr)
388 {
389         uint32_t ret, ret_tmp;
390
391         for (uint16_t i = 0; i < task->n_rands; ++i) {
392                 ret = random_next(&task->rand[i].state);
393                 ret_tmp = (ret & task->rand[i].rand_mask) | task->rand[i].fixed_bits;
394
395                 ret_tmp = rte_bswap32(ret_tmp);
396                 /* At this point, the lower order bytes (BE) contain
397                    the generated value. The address where the values
398                    of interest starts is at ret_tmp + 4 - rand_len. */
399                 uint8_t *pret_tmp = (uint8_t*)&ret_tmp;
400                 rte_memcpy(hdr + task->rand[i].rand_offset, pret_tmp + 4 - task->rand[i].rand_len, task->rand[i].rand_len);
401         }
402 }
403
404 static void task_gen_apply_all_random_fields(struct task_gen *task, uint8_t **pkt_hdr, uint32_t count)
405 {
406         if (!task->n_rands)
407                 return;
408
409         for (uint16_t i = 0; i < count; ++i)
410                 task_gen_apply_random_fields(task, pkt_hdr[i]);
411 }
412
413 static void task_gen_apply_ranges(struct task_gen *task, uint8_t *pkt_hdr)
414 {
415         uint32_t ret;
416         if (!task->n_ranges)
417                 return;
418
419         for (uint16_t j = 0; j < task->n_ranges; ++j) {
420                 if (unlikely(task->ranges[j].value == task->ranges[j].max))
421                         task->ranges[j].value = task->ranges[j].min;
422                 else
423                         task->ranges[j].value++;
424                 ret = rte_bswap32(task->ranges[j].value);
425                 uint8_t *pret = (uint8_t*)&ret;
426                 rte_memcpy(pkt_hdr + task->ranges[j].offset, pret + 4 - task->ranges[j].range_len, task->ranges[j].range_len);
427         }
428 }
429
430 static void task_gen_apply_all_ranges(struct task_gen *task, uint8_t **pkt_hdr, uint32_t count)
431 {
432         uint32_t ret;
433         if (!task->n_ranges)
434                 return;
435
436         for (uint16_t i = 0; i < count; ++i) {
437                 task_gen_apply_ranges(task, pkt_hdr[i]);
438         }
439 }
440
441 static inline uint32_t gcd(uint32_t a, uint32_t b)
442 {
443         // Euclidean algorithm
444         uint32_t t;
445         while (b != 0) {
446                 t = b;
447                 b = a % b;
448                 a = t;
449         }
450         return a;
451 }
452
453 static inline uint32_t lcm(uint32_t a, uint32_t b)
454 {
455         return ((a / gcd(a, b)) * b);
456 }
457
458 static uint32_t get_n_range_flows(struct task_gen *task)
459 {
460         uint32_t t = 1;
461         for (int i = 0; i < task->n_ranges; i++) {
462                 t = lcm((task->ranges[i].max - task->ranges[i].min) + 1, t);
463         }
464         return t;
465 }
466
467 static uint32_t get_n_rand_flows(struct task_gen *task)
468 {
469         uint32_t t = 0;
470         for (int i = 0; i < task->n_rands; i++) {
471                 t += __builtin_popcount(task->rand[i].rand_mask);
472         }
473         PROX_PANIC(t > 31, "Too many random bits - maximum 31 supported\n");
474         return 1 << t;
475 }
476
477 //void add_to_hash_table(struct task_gen *task, uint32_t *buffer, uint32_t *idx, uint32_t mask, uint32_t bit_pos, uint32_t val, uint32_t fixed_bits, uint32_t rand_offset) {
478 //              uint32_t ret_tmp = val | fixed_bits;
479 //              ret_tmp = rte_bswap32(ret_tmp);
480 //              uint8_t *pret_tmp = (uint8_t*)&ret_tmp;
481 //              rte_memcpy(buf + rand_offset, pret_tmp + 4 - rand_len, rand_len);
482 //
483 // init idx
484 // alloc buffer
485 // init/alloc hash_table
486 //void build_buffer(struct task_gen *task, uint32_t *buffer, uint32_t *idx, uint32_t mask, uint32_t bit_pos, uint32_t val)
487 //{
488 //      if (mask == 0) {
489 //              buffer[*idx] = val;
490 //              *idx = (*idx) + 1;
491 //              return;
492 //      }
493 //      build_buffer(task, but, mask >> 1, bit_pos + 1, val);
494 //      if (mask & 1) {
495 //              build_buffer(task, but, mask >> 1, bit_pos + 1, val | (1 << bit_pos));
496 //}
497
498 static void build_flow_table(struct task_gen *task)
499 {
500         uint8_t buf[2048], *key_fields;
501         union ipv4_5tuple_host key;
502         struct pkt_template *pkt_template;
503         uint32_t n_range_flows = get_n_range_flows(task);
504         // uint32_t n_rand_flows = get_n_rand_flows(task);
505         // uint32_t n_flows= n_range_flows * n_rand_flows * task->orig_n_pkts;
506         // for (int i = 0; i < task->n_rands; i++) {
507         //      build_buffer(task, task->values_buf[i], &task->values_idx[i], task->rand[i].rand_mask, 0, 0);
508         // }
509
510         uint32_t n_flows = n_range_flows * task->orig_n_pkts;
511
512         for (uint32_t k = 0; k < task->orig_n_pkts; k++) {
513                 memcpy(buf, task->pkt_template[k].buf, task->pkt_template[k].len);
514                 for (uint32_t j = 0; j < n_range_flows; j++) {
515                         task_gen_apply_ranges(task, buf);
516                         key_fields = buf + sizeof(prox_rte_ether_hdr) + offsetof(prox_rte_ipv4_hdr, time_to_live);
517                         key.xmm = _mm_loadu_si128((__m128i*)(key_fields));
518                         key.pad0 = key.pad1 = 0;
519                         int idx = rte_hash_add_key(task->flow_id_table, (const void *)&key);
520                         PROX_PANIC(idx < 0, "Unable to add key in table\n");
521                         if (idx >= 0)
522                                 plog_dbg("Added key %d, %x, %x, %x, %x\n", key.proto, key.ip_src, key.ip_dst, key.port_src, key.port_dst);
523                 }
524         }
525 }
526
527 static int32_t task_gen_get_flow_id(struct task_gen *task, uint8_t *pkt_hdr)
528 {
529         int ret = 0;
530         union ipv4_5tuple_host key;
531         uint8_t *hdr = pkt_hdr + sizeof(prox_rte_ether_hdr) + offsetof(prox_rte_ipv4_hdr, time_to_live);
532         // __m128i data = _mm_loadu_si128((__m128i*)(hdr));
533         // key.xmm = _mm_and_si128(data, mask0);
534         key.xmm = _mm_loadu_si128((__m128i*)(hdr));
535         key.pad0 = key.pad1 = 0;
536         ret = rte_hash_lookup(task->flow_id_table, (const void *)&key);
537         if (ret < 0) {
538                 plog_err("Flow not found: %d, %x, %x, %x, %x\n", key.proto, key.ip_src, key.ip_dst, key.port_src, key.port_dst);
539         }
540         return ret;
541 }
542
543 static void task_gen_apply_all_flow_id(struct task_gen *task, uint8_t **pkt_hdr, uint32_t count, int32_t *flow_id)
544 {
545         if (task->flow_id_pos) {
546                 for (uint16_t j = 0; j < count; ++j) {
547                         flow_id[j] = task_gen_get_flow_id(task, pkt_hdr[j]);
548                         *(int32_t *)(pkt_hdr[j] + task->flow_id_pos) = flow_id[j];
549                 }
550         }
551 }
552
553 static void task_gen_apply_accur_pos(struct task_gen *task, uint8_t *pkt_hdr, uint32_t accuracy)
554 {
555         *(uint32_t *)(pkt_hdr + task->accur_pos) = accuracy;
556 }
557
558 static void task_gen_apply_sig(struct task_gen *task, struct pkt_template *dst)
559 {
560         if (task->sig_pos)
561                 *(uint32_t *)(dst->buf + task->sig_pos) = task->sig;
562 }
563
564 static void task_gen_apply_all_accur_pos(struct task_gen *task, uint8_t **pkt_hdr, uint32_t count)
565 {
566         if (!task->accur_pos)
567                 return;
568
569         /* The accuracy of task->pkt_queue_index - ACCURACY_WINDOW is stored in
570            packet task->pkt_queue_index. The ID modulo ACCURACY_WINDOW is the
571            same. */
572         for (uint16_t j = 0; j < count; ++j) {
573                 uint32_t accuracy = task->accur[(task->pkt_queue_index + j) & (ACCURACY_WINDOW - 1)];
574                 task_gen_apply_accur_pos(task, pkt_hdr[j], accuracy);
575         }
576 }
577
578 static void task_gen_apply_unique_id(struct task_gen *task, uint8_t *pkt_hdr, const struct unique_id *id)
579 {
580         struct unique_id *dst = (struct unique_id *)(pkt_hdr + task->packet_id_pos);
581
582         *dst = *id;
583 }
584
585 static void task_gen_apply_all_unique_id(struct task_gen *task, uint8_t **pkt_hdr, uint32_t count)
586 {
587         if (!task->packet_id_pos)
588                 return;
589
590         for (uint16_t i = 0; i < count; ++i) {
591                 struct unique_id id;
592                 unique_id_init(&id, task->generator_id, task->pkt_queue_index++);
593                 task_gen_apply_unique_id(task, pkt_hdr[i], &id);
594         }
595 }
596
597 static void task_gen_apply_id_in_flows(struct task_gen *task, uint8_t *pkt_hdr, const struct unique_id *id)
598 {
599         struct unique_id *dst = (struct unique_id *)(pkt_hdr + task->packet_id_in_flow_pos);
600         *dst = *id;
601 }
602
603 static void task_gen_apply_all_id_in_flows(struct task_gen *task, uint8_t **pkt_hdr, uint32_t count, int32_t *idx)
604 {
605         if (!task->packet_id_in_flow_pos)
606                 return;
607
608         for (uint16_t i = 0; i < count; ++i) {
609                 struct unique_id id;
610                 if (idx[i] >= 0 ) {
611                         unique_id_init(&id, task->generator_id, task->flows[idx[i]].packet_id++);
612                         task_gen_apply_id_in_flows(task, pkt_hdr[i], &id);
613                 }
614         }
615 }
616
617 static void task_gen_checksum_packets(struct task_gen *task, struct rte_mbuf **mbufs, uint8_t **pkt_hdr, uint32_t count)
618 {
619         if (!(task->runtime_flags & TASK_TX_CRC))
620                 return;
621
622         if (!task->runtime_checksum_needed)
623                 return;
624
625         uint32_t pkt_idx = task_gen_offset_pkt_idx(task, - count);
626         for (uint16_t i = 0; i < count; ++i) {
627                 struct pkt_template *pkt_template = &task->pkt_template[pkt_idx];
628                 checksum_packet(pkt_hdr[i], mbufs[i], pkt_template, task->cksum_offload);
629                 pkt_idx = task_gen_next_pkt_idx(task, pkt_idx);
630         }
631 }
632
633 static void task_gen_consume_tokens(struct task_gen *task, uint32_t tokens, uint32_t send_count)
634 {
635         /* If max burst has been sent, we can't keep up so just assume
636            that we can (leaving a "gap" in the packet stream on the
637            wire) */
638         task->token_time.bytes_now -= tokens;
639         if (send_count == task->max_bulk_size && task->token_time.bytes_now > tokens) {
640                 task->token_time.bytes_now = tokens;
641         }
642 }
643
644 static uint64_t task_gen_calc_bulk_duration(struct task_gen *task, uint32_t count)
645 {
646         uint32_t pkt_idx = task_gen_offset_pkt_idx(task, - 1);
647         struct pkt_template *last_pkt_template = &task->pkt_template[pkt_idx];
648         uint32_t last_pkt_len = pkt_len_to_wire_size(last_pkt_template->len);
649 #ifdef NO_EXTRAPOLATION
650         uint64_t bulk_duration = task->pkt_tsc_offset[count - 1];
651 #else
652         uint64_t last_pkt_duration = bytes_to_tsc(task, last_pkt_len);
653         uint64_t bulk_duration = task->pkt_tsc_offset[count - 1] + last_pkt_duration;
654 #endif
655
656         return bulk_duration;
657 }
658
659 static uint64_t task_gen_write_latency(struct task_gen *task, uint8_t **pkt_hdr, uint32_t count)
660 {
661         if (!task->lat_enabled)
662                 return 0;
663
664         uint64_t tx_tsc, delta_t;
665         uint64_t tsc_before_tx = 0;
666
667         /* Just before sending the packets, apply the time stamp
668            relative to when the first packet will be sent. The first
669            packet will be sent now. The time is read for each packet
670            to reduce the error towards the actual time the packet will
671            be sent. */
672         uint64_t write_tsc_after, write_tsc_before;
673
674         write_tsc_before = rte_rdtsc();
675
676         /* The time it took previously to write the time stamps in the
677            packets is used as an estimate for how long it will take to
678            write the time stamps now.  The estimated time at which the
679            packets will actually be sent will be at tx_tsc. */
680         tx_tsc = write_tsc_before + task->write_duration_estimate;
681
682         /* The offset delta_t tracks the difference between the actual
683            time and the time written in the packets. Adding the offset
684            to the actual time insures that the time written in the
685            packets is monotonically increasing. At the same time,
686            simply sleeping until delta_t is zero would leave a period
687            of silence on the line. The error has been introduced
688            earlier, but the packets have already been sent. */
689
690         /* This happens typically if previous bulk was delayed
691            by an interrupt e.g.  (with Time in nsec)
692            Time x: sleep 4 microsec
693            Time x+4000: send 64 packets (64 packets as 4000 nsec, w/ 10Gbps 64 bytes)
694            Time x+5000: send 16 packets (16 packets as 1000 nsec)
695            When we send the 16 packets, the 64 ealier packets are not yet
696            fully sent */
697         if (tx_tsc < task->earliest_tsc_next_pkt)
698                 delta_t = task->earliest_tsc_next_pkt - tx_tsc;
699         else
700                 delta_t = 0;
701
702         for (uint16_t i = 0; i < count; ++i) {
703                 uint32_t *pos = (uint32_t *)(pkt_hdr[i] + task->lat_pos);
704                 const uint64_t pkt_tsc = tx_tsc + delta_t + task->pkt_tsc_offset[i];
705                 *pos = pkt_tsc >> LATENCY_ACCURACY;
706         }
707
708         uint64_t bulk_duration = task_gen_calc_bulk_duration(task, count);
709         task->earliest_tsc_next_pkt = tx_tsc + delta_t + bulk_duration;
710         write_tsc_after = rte_rdtsc();
711         task->write_duration_estimate = write_tsc_after - write_tsc_before;
712
713         /* Make sure that the time stamps that were written
714            are valid. The offset must be taken into account */
715         do {
716                 tsc_before_tx = rte_rdtsc();
717         } while (tsc_before_tx < tx_tsc);
718
719         return tsc_before_tx;
720 }
721
722 static void task_gen_store_accuracy(struct task_gen *task, uint32_t count, uint64_t tsc_before_tx)
723 {
724         if (!task->accur_pos)
725                 return;
726
727         uint64_t accur = rte_rdtsc() - tsc_before_tx;
728         uint64_t first_accuracy_idx = task->pkt_queue_index - count;
729
730         for (uint32_t i = 0; i < count; ++i) {
731                 uint32_t accuracy_idx = (first_accuracy_idx + i) & (ACCURACY_WINDOW - 1);
732
733                 task->accur[accuracy_idx] = accur;
734         }
735 }
736
737 static void task_gen_load_and_prefetch(struct rte_mbuf **mbufs, uint8_t **pkt_hdr, uint32_t count)
738 {
739         for (uint16_t i = 0; i < count; ++i)
740                 rte_prefetch0(mbufs[i]);
741         for (uint16_t i = 0; i < count; ++i)
742                 pkt_hdr[i] = rte_pktmbuf_mtod(mbufs[i], uint8_t *);
743         for (uint16_t i = 0; i < count; ++i)
744                 rte_prefetch0(pkt_hdr[i]);
745 }
746
747 static void task_gen_build_packets(struct task_gen *task, struct rte_mbuf **mbufs, uint8_t **pkt_hdr, uint32_t count)
748 {
749         uint64_t will_send_bytes = 0;
750
751         for (uint16_t i = 0; i < count; ++i) {
752                 struct pkt_template *pktpl = &task->pkt_template[task->pkt_idx];
753                 struct pkt_template *pkt_template = &task->pkt_template[task->pkt_idx];
754                 pkt_template_init_mbuf(pkt_template, mbufs[i], pkt_hdr[i]);
755                 prox_rte_ether_hdr *hdr = (prox_rte_ether_hdr *)pkt_hdr[i];
756                 if (task->lat_enabled) {
757 #ifdef NO_EXTRAPOLATION
758                         task->pkt_tsc_offset[i] = 0;
759 #else
760                         task->pkt_tsc_offset[i] = bytes_to_tsc(task, will_send_bytes);
761 #endif
762                         will_send_bytes += pkt_len_to_wire_size(pkt_template->len);
763                 }
764                 task->pkt_idx = task_gen_next_pkt_idx(task, task->pkt_idx);
765         }
766 }
767
768 static int task_gen_allocate_templates(struct task_gen *task, uint32_t orig_nb_pkts, uint32_t nb_pkts, int do_panic, int pcap)
769 {
770         size_t mem_size = nb_pkts * sizeof(*task->pkt_template);
771         size_t orig_mem_size = orig_nb_pkts * sizeof(*task->pkt_template);
772         task->pkt_template = prox_zmalloc(mem_size, task->socket_id);
773         task->pkt_template_orig = prox_zmalloc(orig_mem_size, task->socket_id);
774
775         if (task->pkt_template == NULL || task->pkt_template_orig == NULL) {
776                 plog_err_or_panic(do_panic, "Failed to allocate %lu bytes (in huge pages) for %s\n", mem_size, pcap ? "pcap file":"packet template");
777                 return -1;
778         }
779
780         for (size_t i = 0; i < orig_nb_pkts; i++) {
781                 task->pkt_template_orig[i].buf = prox_zmalloc(task->max_frame_size, task->socket_id);
782                 if (task->pkt_template_orig[i].buf == NULL) {
783                         plog_err_or_panic(do_panic, "Failed to allocate %u bytes (in huge pages) for %s\n", task->max_frame_size, pcap ? "packet from pcap": "packet");
784                         return -1;
785                 }
786         }
787         for (size_t i = 0; i < nb_pkts; i++) {
788                 task->pkt_template[i].buf = prox_zmalloc(task->max_frame_size, task->socket_id);
789                 if (task->pkt_template[i].buf == NULL) {
790                         plog_err_or_panic(do_panic, "Failed to allocate %u bytes (in huge pages) for %s\n", task->max_frame_size, pcap ? "packet from pcap": "packet");
791                         return -1;
792                 }
793         }
794         return 0;
795 }
796
797 static int task_gen_reallocate_templates(struct task_gen *task, uint32_t nb_pkts, int do_panic)
798 {
799         // Need to free up bufs allocated in previous (longer) imix
800         for (size_t i = nb_pkts; i < task->n_pkts; i++) {
801                 if (task->pkt_template[i].buf) {
802                         rte_free(task->pkt_template[i].buf);
803                         task->pkt_template[i].buf = NULL;
804                 }
805         }
806
807         size_t mem_size = nb_pkts * sizeof(*task->pkt_template);
808         size_t old_mem_size = task->n_pkts * sizeof(*task->pkt_template);
809         if (old_mem_size > mem_size)
810                 old_mem_size = mem_size;
811
812         struct pkt_template *ptr;
813
814         // re-allocate memory for new pkt_template (this might allocate additional memory or free up some...)
815         if ((ptr = rte_malloc_socket(NULL, mem_size, RTE_CACHE_LINE_SIZE, task->socket_id)) != NULL) {
816                 memcpy(ptr, task->pkt_template, old_mem_size);
817                 rte_free(task->pkt_template);
818                 task->pkt_template = ptr;
819         } else {
820                 plog_err_or_panic(do_panic, "Failed to allocate %lu bytes (in huge pages) for packet template for IMIX\n", mem_size);
821                 return -1;
822         }
823
824         // Need to allocate bufs for new template but no need to reallocate for existing ones
825         for (size_t i = task->n_pkts; i < nb_pkts; ++i) {
826                 task->pkt_template[i].buf = prox_zmalloc(task->max_frame_size, task->socket_id);
827                 if (task->pkt_template[i].buf == NULL) {
828                         plog_err_or_panic(do_panic, "Failed to allocate %u bytes (in huge pages) for packet %zd in IMIX\n", task->max_frame_size, i);
829                         return -1;
830                 }
831         }
832         return 0;
833 }
834
835 static int check_pkt_size(struct task_gen *task, uint32_t pkt_size, int do_panic)
836 {
837         const uint16_t min_len = sizeof(prox_rte_ether_hdr) + sizeof(prox_rte_ipv4_hdr);
838         const uint16_t max_len = task->max_frame_size;
839
840         if (do_panic) {
841                 PROX_PANIC(pkt_size == 0, "Invalid packet size length (no packet defined?)\n");
842                 PROX_PANIC(pkt_size > max_len, "pkt_size out of range (must be <= %u)\n", max_len);
843                 PROX_PANIC(pkt_size < min_len, "pkt_size out of range (must be >= %u)\n", min_len);
844                 return 0;
845         } else {
846                 if (pkt_size == 0) {
847                         plog_err("Invalid packet size length (no packet defined?)\n");
848                         return -1;
849                 }
850                 if (pkt_size > max_len) {
851                         if (pkt_size >  PROX_RTE_ETHER_MAX_LEN + 2 * PROX_VLAN_TAG_SIZE - 4)
852                                 plog_err("pkt_size too high and jumbo frames disabled\n");
853                         else
854                                 plog_err("pkt_size out of range (must be <= (mtu=%u))\n", max_len);
855                         return -1;
856                 }
857                 if (pkt_size < min_len) {
858                         plog_err("pkt_size out of range (must be >= %u)\n", min_len);
859                         return -1;
860                 }
861                 return 0;
862         }
863 }
864
865 static int check_fields_in_bounds(struct task_gen *task, uint32_t pkt_size, int do_panic)
866 {
867         if (task->lat_enabled) {
868                 uint32_t pos_beg = task->lat_pos;
869                 uint32_t pos_end = task->lat_pos + 3U;
870
871                 if (do_panic)
872                         PROX_PANIC(pkt_size <= pos_end, "Writing latency at %u-%u, but packet size is %u bytes\n",
873                            pos_beg, pos_end, pkt_size);
874                 else if (pkt_size <= pos_end) {
875                         plog_err("Writing latency at %u-%u, but packet size is %u bytes\n", pos_beg, pos_end, pkt_size);
876                         return -1;
877                 }
878         }
879         if (task->packet_id_pos) {
880                 uint32_t pos_beg = task->packet_id_pos;
881                 uint32_t pos_end = task->packet_id_pos + 4U;
882
883                 if (do_panic)
884                         PROX_PANIC(pkt_size <= pos_end, "Writing packet at %u-%u, but packet size is %u bytes\n",
885                            pos_beg, pos_end, pkt_size);
886                 else if (pkt_size <= pos_end) {
887                         plog_err("Writing packet at %u-%u, but packet size is %u bytes\n", pos_beg, pos_end, pkt_size);
888                         return -1;
889                 }
890         }
891         if (task->accur_pos) {
892                 uint32_t pos_beg = task->accur_pos;
893                 uint32_t pos_end = task->accur_pos + 3U;
894
895                 if (do_panic)
896                         PROX_PANIC(pkt_size <= pos_end, "Writing accuracy at %u-%u, but packet size is %u bytes\n",
897                            pos_beg, pos_end, pkt_size);
898                 else if (pkt_size <= pos_end) {
899                         plog_err("Writing accuracy at %u-%u, but packet size is %u bytes\n", pos_beg, pos_end, pkt_size);
900                         return -1;
901                 }
902         }
903         return 0;
904 }
905
906 static int task_gen_set_eth_ip_udp_sizes(struct task_gen *task, uint32_t orig_n_pkts, uint32_t nb_pkt_sizes, uint32_t *pkt_sizes)
907 {
908         size_t k;
909         uint32_t l4_len;
910         prox_rte_ipv4_hdr *ip;
911         struct pkt_template *template;
912
913         for (size_t j = 0; j < nb_pkt_sizes; ++j) {
914                 for (size_t i = 0; i < orig_n_pkts; ++i) {
915                         k = j * orig_n_pkts + i;
916                         template = &task->pkt_template[k];
917                         if (template->l2_len == 0)
918                                 continue;
919                         ip = (prox_rte_ipv4_hdr *)(template->buf + template->l2_len);
920                         ip->total_length = rte_bswap16(pkt_sizes[j] - template->l2_len);
921                         l4_len = pkt_sizes[j] - template->l2_len - template->l3_len;
922                         ip->hdr_checksum = 0;
923                         prox_ip_cksum_sw(ip);
924
925                         if (ip->next_proto_id == IPPROTO_UDP) {
926                                 prox_rte_udp_hdr *udp = (prox_rte_udp_hdr *)(((uint8_t *)ip) + template->l3_len);
927                                 udp->dgram_len = rte_bswap16(l4_len);
928                                 prox_udp_cksum_sw(udp, l4_len, ip->src_addr, ip->dst_addr);
929                         } else if (ip->next_proto_id == IPPROTO_TCP) {
930                                 prox_rte_tcp_hdr *tcp = (prox_rte_tcp_hdr *)(((uint8_t *)ip) + template->l3_len);
931                                 prox_tcp_cksum_sw(tcp, l4_len, ip->src_addr, ip->dst_addr);
932                         }
933                 }
934         }
935         return 0;
936 }
937
938 static int task_gen_apply_imix(struct task_gen *task, int do_panic)
939 {
940         struct pkt_template *ptr;
941         int rc;
942         task->imix_nb_pkts = task->new_imix_nb_pkts;
943         uint32_t n_pkts = task->imix_nb_pkts * task->orig_n_pkts;
944
945         if ((n_pkts != task->n_pkts) && ((rc = task_gen_reallocate_templates(task, n_pkts, do_panic)) < 0))
946                 return rc;
947
948         task->n_pkts = n_pkts;
949         if (task->pkt_idx >= n_pkts)
950                 task->pkt_idx = 0;
951         task_gen_set_pkt_templates_len(task, task->imix_pkt_sizes);
952         task_gen_reset_pkt_templates_content(task);
953         task_gen_pkt_template_recalc_metadata(task);
954         check_all_pkt_size(task, DO_NOT_PANIC);
955         check_all_fields_in_bounds(task, DO_NOT_PANIC);
956         task_gen_set_eth_ip_udp_sizes(task, task->orig_n_pkts, task->imix_nb_pkts, task->imix_pkt_sizes);
957         return 0;
958 }
959
960 static void task_gen_update_config(struct task_gen *task)
961 {
962         if (task->token_time.cfg.bpp != task->new_rate_bps)
963                 task_gen_reset_token_time(task);
964         if (task->new_imix_nb_pkts)
965                 task_gen_apply_imix(task, DO_NOT_PANIC);
966         task->new_imix_nb_pkts = 0;
967 }
968
969 static inline void build_value(struct task_gen *task, uint32_t mask, int bit_pos, uint32_t val, uint32_t fixed_bits)
970 {
971         struct task_base *tbase = (struct task_base *)task;
972         if (bit_pos < 32) {
973                 build_value(task, mask >> 1, bit_pos + 1, val, fixed_bits);
974                 if (mask & 1) {
975                         build_value(task, mask >> 1, bit_pos + 1, val | (1 << bit_pos), fixed_bits);
976                 }
977         } else {
978                 register_ip_to_ctrl_plane(tbase->l3.tmaster, rte_cpu_to_be_32(val | fixed_bits), tbase->l3.reachable_port_id, tbase->l3.core_id, tbase->l3.task_id);
979         }
980 }
981
982 static inline void build_value_ipv6(struct task_gen *task, uint32_t mask, int var_bit_pos, int init_var_bit_pos, struct ipv6_addr val, struct ipv6_addr fixed_bits)
983 {
984         struct task_base *tbase = (struct task_base *)task;
985         if (var_bit_pos < 32) {
986                 build_value_ipv6(task, mask >> 1, var_bit_pos + 1, init_var_bit_pos, val, fixed_bits);
987                 if (mask & 1) {
988                         int byte_pos = (var_bit_pos + init_var_bit_pos) / 8;
989                         int bit_pos = (var_bit_pos + init_var_bit_pos) % 8;
990                         val.bytes[byte_pos] = val.bytes[byte_pos] | (1 << bit_pos);
991                         build_value_ipv6(task, mask >> 1, var_bit_pos + 1, init_var_bit_pos, val, fixed_bits);
992                 }
993         } else {
994                 for (uint i = 0; i < sizeof(struct ipv6_addr) / 8; i++)
995                         val.bytes[i] = val.bytes[i] | fixed_bits.bytes[i];
996                 register_node_to_ctrl_plane(tbase->l3.tmaster, &null_addr, &val, tbase->l3.reachable_port_id, tbase->l3.core_id, tbase->l3.task_id);
997         }
998 }
999
1000 static inline void register_all_ip_to_ctrl_plane(struct task_gen *task)
1001 {
1002         struct task_base *tbase = (struct task_base *)task;
1003         int i, len, fixed;
1004         unsigned int offset;
1005         uint32_t mask, ip_len;
1006         struct ipv6_addr *ip6_src = NULL;
1007         uint32_t *ip_src;
1008
1009         for (uint32_t i = 0; i < task->n_pkts; ++i) {
1010                 struct pkt_template *pktpl = &task->pkt_template[i];
1011                 unsigned int ip_src_pos = 0;
1012                 int ipv4 = 0;
1013                 unsigned int l2_len = sizeof(prox_rte_ether_hdr);
1014
1015                 uint8_t *pkt = pktpl->buf;
1016                 prox_rte_ether_hdr *eth_hdr = (prox_rte_ether_hdr*)pkt;
1017                 uint16_t ether_type = eth_hdr->ether_type;
1018                 prox_rte_vlan_hdr *vlan_hdr;
1019                 prox_rte_ipv4_hdr *ip;
1020
1021                 // Unstack VLAN tags
1022                 while (((ether_type == ETYPE_8021ad) || (ether_type == ETYPE_VLAN)) && (l2_len + sizeof(prox_rte_vlan_hdr) < pktpl->len)) {
1023                         vlan_hdr = (prox_rte_vlan_hdr *)(pkt + l2_len);
1024                         l2_len +=4;
1025                         ether_type = vlan_hdr->eth_proto;
1026                 }
1027                 if ((ether_type == ETYPE_MPLSU) || (ether_type == ETYPE_MPLSM)) {
1028                         l2_len +=4;
1029                         ip = (prox_rte_ipv4_hdr *)(pkt + l2_len);
1030                         if (ip->version_ihl >> 4 == 4)
1031                                 ipv4 = 1;
1032                         else if (ip->version_ihl >> 4 != 6)     // Version field at same location for IPv4 and IPv6
1033                                 continue;
1034                 } else if (ether_type == ETYPE_IPv4) {
1035                         ip = (prox_rte_ipv4_hdr *)(pkt + l2_len);
1036                         PROX_PANIC(ip->version_ihl >> 4 != 4, "IPv4 ether_type but IP version = %d != 4", ip->version_ihl >> 4);        // Invalid Packet
1037                         ipv4 = 1;
1038                 } else if (ether_type == ETYPE_IPv6) {
1039                         ip = (prox_rte_ipv4_hdr *)(pkt + l2_len);
1040                         PROX_PANIC(ip->version_ihl >> 4 != 6, "IPv6 ether_type but IP version = %d != 6", ip->version_ihl >> 4);        // Invalid Packet
1041                 } else {
1042                         continue;
1043                 }
1044
1045                 PROX_PANIC(ipv4 && ((prox_cfg.flags & DSF_L3_ENABLED) == 0), "Trying to generate an IPv4 packet in NDP mode => not supported\n");
1046                 PROX_PANIC((ipv4 == 0) && ((prox_cfg.flags & DSF_NDP_ENABLED) == 0), "Trying to generate an IPv6 packet in L3 (IPv4) mode => not supported\n");
1047                 if (ipv4) {
1048                         // Even if IPv4 header contains options, options are after ip src and dst
1049                         ip_src_pos = l2_len + sizeof(prox_rte_ipv4_hdr) - 2 * sizeof(uint32_t);
1050                         ip_src = ((uint32_t *)(pktpl->buf + ip_src_pos));
1051                         plog_info("\tip_src_pos = %d, ip_src = %x\n", ip_src_pos, *ip_src);
1052                         register_ip_to_ctrl_plane(tbase->l3.tmaster, *ip_src, tbase->l3.reachable_port_id, tbase->l3.core_id, tbase->l3.task_id);
1053                         ip_len = sizeof(uint32_t);
1054                 } else {
1055                         ip_src_pos = l2_len + sizeof(prox_rte_ipv6_hdr) - 2 * sizeof(struct ipv6_addr);
1056                         ip6_src = ((struct ipv6_addr *)(pktpl->buf + ip_src_pos));
1057                         plog_info("\tip_src_pos = %d, ip6_src = "IPv6_BYTES_FMT"\n", ip_src_pos, IPv6_BYTES(ip6_src->bytes));
1058                         register_node_to_ctrl_plane(tbase->l3.tmaster, ip6_src, &null_addr, tbase->l3.reachable_port_id, tbase->l3.core_id, tbase->l3.task_id);
1059                         ip_len = sizeof(struct ipv6_addr);
1060                 }
1061
1062                 for (int j = 0; j < task->n_rands; j++) {
1063                         offset = task->rand[j].rand_offset;
1064                         len = task->rand[j].rand_len;
1065                         mask = task->rand[j].rand_mask;
1066                         fixed = task->rand[j].fixed_bits;
1067                         plog_info("offset = %d, len = %d, mask = %x, fixed = %x\n", offset, len, mask, fixed);
1068                         if (offset >= ip_src_pos + ip_len)      // First random bit after IP
1069                                 continue;
1070                         if (offset + len < ip_src_pos)          // Last random bit before IP
1071                                 continue;
1072
1073                         if (ipv4) {
1074                                 if (offset >= ip_src_pos) {
1075                                         int32_t ip_src_mask = (1 << (4 + ip_src_pos - offset) * 8) - 1;
1076                                         mask = mask & ip_src_mask;
1077                                         fixed = (fixed & ip_src_mask) | (rte_be_to_cpu_32(*ip_src) & ~ip_src_mask);
1078                                         build_value(task, mask, 0, 0, fixed);
1079                                 } else {
1080                                         int32_t bits = ((ip_src_pos + 4 - offset - len) * 8);
1081                                         mask = mask << bits;
1082                                         fixed = (fixed << bits) | (rte_be_to_cpu_32(*ip_src) & ((1 << bits) - 1));
1083                                         build_value(task, mask, 0, 0, fixed);
1084                                 }
1085                         } else {
1086                                 // We do not support when random partially covers IP - either starting before or finishing after
1087                                 if (offset + len >= ip_src_pos + ip_len) { // len over the ip
1088                                         plog_err("Not supported: random_offset = %d, random_len = %d, ip_src_pos = %d, ip_len = %d\n", offset, len, ip_src_pos, ip_len);
1089                                         continue;
1090                                 }
1091                                 if (offset < ip_src_pos) {
1092                                         plog_err("Not supported: random_offset = %d, random_len = %d, ip_src_pos = %d, ip_len = %d\n", offset, len, ip_src_pos, ip_len);
1093                                         continue;
1094                                 }
1095                                 // Even for IPv6 the random mask supported by PROX are 32 bits only
1096                                 struct ipv6_addr fixed_ipv6;
1097                                 uint init_var_byte_pos = (offset - ip_src_pos);
1098                                 for (uint i = 0; i < sizeof(struct ipv6_addr); i++) {
1099                                         if (i < init_var_byte_pos)
1100                                                 fixed_ipv6.bytes[i] = ip6_src->bytes[i];
1101                                         else if (i < init_var_byte_pos + len)
1102                                                 fixed_ipv6.bytes[i] = (fixed >> (i - init_var_byte_pos)) & 0xFF;
1103                                         else
1104                                                 fixed_ipv6.bytes[i] = ip6_src->bytes[i];
1105                                 }
1106                                 build_value_ipv6(task, mask, 0, init_var_byte_pos * 8, null_addr, fixed_ipv6);
1107                         }
1108                 }
1109         }
1110 }
1111
1112 static int handle_gen_bulk(struct task_base *tbase, struct rte_mbuf **mbufs, uint16_t n_pkts)
1113 {
1114         struct task_gen *task = (struct task_gen *)tbase;
1115         uint8_t out[MAX_PKT_BURST] = {0};
1116         int ret;
1117
1118         int i, j;
1119
1120         task_gen_update_config(task);
1121
1122         if (task->pkt_count == 0) {
1123                 task_gen_reset_token_time(task);
1124                 return 0;
1125         }
1126         if (!task->token_time.cfg.bpp)
1127                 return 0;
1128
1129         token_time_update(&task->token_time, rte_rdtsc());
1130
1131         uint32_t would_send_bytes;
1132         uint32_t send_bulk = task_gen_calc_send_bulk(task, &would_send_bytes);
1133
1134         if (send_bulk == 0)
1135                 return 0;
1136         task_gen_take_count(task, send_bulk);
1137         task_gen_consume_tokens(task, would_send_bytes, send_bulk);
1138
1139         struct rte_mbuf **new_pkts = local_mbuf_refill_and_take(&task->local_mbuf, send_bulk);
1140         if (new_pkts == NULL)
1141                 return 0;
1142         uint8_t *pkt_hdr[MAX_RING_BURST];
1143         int32_t flow_id[MAX_RING_BURST];
1144         task_gen_load_and_prefetch(new_pkts, pkt_hdr, send_bulk);
1145         task_gen_build_packets(task, new_pkts, pkt_hdr, send_bulk);
1146         task_gen_apply_all_random_fields(task, pkt_hdr, send_bulk);
1147         task_gen_apply_all_ranges(task, pkt_hdr, send_bulk);
1148         task_gen_apply_all_accur_pos(task, pkt_hdr, send_bulk);
1149         task_gen_apply_all_flow_id(task, pkt_hdr, send_bulk, flow_id);
1150         task_gen_apply_all_unique_id(task, pkt_hdr, send_bulk);
1151         task_gen_apply_all_id_in_flows(task, pkt_hdr, send_bulk, flow_id);
1152
1153         uint64_t tsc_before_tx;
1154
1155         tsc_before_tx = task_gen_write_latency(task, pkt_hdr, send_bulk);
1156         task_gen_checksum_packets(task, new_pkts, pkt_hdr, send_bulk);
1157         if (task->store_msk) {
1158                 for (uint32_t i = 0; i < send_bulk; i++) {
1159                         if (out[i] != OUT_DISCARD) {
1160                                 uint8_t *hdr;
1161                                 hdr = (uint8_t *)rte_pktmbuf_mtod(new_pkts[i], prox_rte_ether_hdr *);
1162                                 memcpy(&task->store_buf[task->store_pkt_id & task->store_msk].buf, hdr, rte_pktmbuf_pkt_len(new_pkts[i]));
1163                                 task->store_buf[task->store_pkt_id & task->store_msk].len = rte_pktmbuf_pkt_len(new_pkts[i]);
1164                                 task->store_pkt_id++;
1165                         }
1166                 }
1167         }
1168         ret = task->base.tx_pkt(&task->base, new_pkts, send_bulk, out);
1169         task_gen_store_accuracy(task, send_bulk, tsc_before_tx);
1170
1171         // If we failed to send some packets, we need to do some clean-up:
1172
1173         if (unlikely(ret)) {
1174                 // We need re-use the packets indexes not being sent
1175                 // Hence non-sent packets will not be considered as lost by the receiver when it looks at
1176                 // packet ids. This should also increase the percentage of packets used for latency measurements
1177                 task->pkt_queue_index -= ret;
1178
1179                 // In case of failures, the estimate about when we can send next packet (earliest_tsc_next_pkt) is wrong
1180                 // This would result in under-estimated latency (up to 0 or negative)
1181                 uint64_t bulk_duration = task_gen_calc_bulk_duration(task, ret);
1182                 task->earliest_tsc_next_pkt -= bulk_duration;
1183         }
1184         return ret;
1185 }
1186
1187 static void init_task_gen_seeds(struct task_gen *task)
1188 {
1189         for (size_t i = 0; i < sizeof(task->rand)/sizeof(task->rand[0]); ++i)
1190                 random_init_seed(&task->rand[i].state);
1191 }
1192
1193 static uint32_t pcap_count_pkts(pcap_t *handle, uint32_t *max_frame_size)
1194 {
1195         struct pcap_pkthdr header;
1196         const uint8_t *buf;
1197         uint32_t ret = 0;
1198         *max_frame_size = 0;
1199         long pkt1_fpos = ftell(pcap_file(handle));
1200
1201         while ((buf = pcap_next(handle, &header))) {
1202                 if (header.len > *max_frame_size)
1203                         *max_frame_size = header.len;
1204                 ret++;
1205         }
1206         int ret2 = fseek(pcap_file(handle), pkt1_fpos, SEEK_SET);
1207         PROX_PANIC(ret2 != 0, "Failed to reset reading pcap file\n");
1208         return ret;
1209 }
1210
1211 static uint64_t avg_time_stamp(uint64_t *time_stamp, uint32_t n)
1212 {
1213         uint64_t tot_inter_pkt = 0;
1214
1215         for (uint32_t i = 0; i < n; ++i)
1216                 tot_inter_pkt += time_stamp[i];
1217         return (tot_inter_pkt + n / 2)/n;
1218 }
1219
1220 static int pcap_read_pkts(pcap_t *handle, const char *file_name, uint32_t n_pkts, struct pkt_template *proto, uint64_t *time_stamp, uint32_t max_frame_size)
1221 {
1222         struct pcap_pkthdr header;
1223         const uint8_t *buf;
1224         size_t len;
1225
1226         for (uint32_t i = 0; i < n_pkts; ++i) {
1227                 buf = pcap_next(handle, &header);
1228
1229                 PROX_PANIC(buf == NULL, "Failed to read packet %d from pcap %s\n", i, file_name);
1230                 proto[i].len = header.len;
1231                 len = RTE_MIN(header.len, max_frame_size);
1232                 if (header.len > len)
1233                         plogx_warn("Packet truncated from %u to %zu bytes\n", header.len, len);
1234
1235                 if (time_stamp) {
1236                         static struct timeval beg;
1237                         struct timeval tv;
1238
1239                         if (i == 0)
1240                                 beg = header.ts;
1241
1242                         tv = tv_diff(&beg, &header.ts);
1243                         tv_to_tsc(&tv, time_stamp + i);
1244                 }
1245                 rte_memcpy(proto[i].buf, buf, len);
1246         }
1247
1248         if (time_stamp && n_pkts) {
1249                 for (uint32_t i = n_pkts - 1; i > 0; --i)
1250                         time_stamp[i] -= time_stamp[i - 1];
1251                 /* Since the handle function will loop the packets,
1252                    there is one time-stamp that is not provided by the
1253                    pcap file. This is the time between the last and
1254                    the first packet. This implementation takes the
1255                    average of the inter-packet times here. */
1256                 if (n_pkts > 1)
1257                         time_stamp[0] = avg_time_stamp(time_stamp + 1, n_pkts - 1);
1258         }
1259
1260         return 0;
1261 }
1262
1263 static int check_all_pkt_size(struct task_gen *task, int do_panic)
1264 {
1265         int rc;
1266         for (uint32_t i = 0; i < task->n_pkts;++i) {
1267                 if ((rc = check_pkt_size(task, task->pkt_template[i].len, do_panic)) != 0)
1268                         return rc;
1269         }
1270         return 0;
1271 }
1272
1273 static int check_all_fields_in_bounds(struct task_gen *task, int do_panic)
1274 {
1275         int rc;
1276         for (uint32_t i = 0; i < task->n_pkts;++i) {
1277                 if ((rc = check_fields_in_bounds(task, task->pkt_template[i].len, do_panic)) != 0)
1278                         return rc;
1279         }
1280         return 0;
1281 }
1282
1283 static void task_gen_pkt_template_recalc_metadata(struct task_gen *task)
1284 {
1285         struct pkt_template *template;
1286
1287         for (size_t i = 0; i < task->n_pkts; ++i) {
1288                 template = &task->pkt_template[i];
1289                 parse_l2_l3_len(template->buf, &template->l2_len, &template->l3_len, template->len);
1290         }
1291 }
1292
1293 static void task_gen_pkt_template_recalc_checksum(struct task_gen *task)
1294 {
1295         struct pkt_template *template;
1296         prox_rte_ipv4_hdr *ip;
1297
1298         task->runtime_checksum_needed = 0;
1299         for (size_t i = 0; i < task->n_pkts; ++i) {
1300                 template = &task->pkt_template[i];
1301                 if (template->l2_len == 0)
1302                         continue;
1303                 ip = (prox_rte_ipv4_hdr *)(template->buf + template->l2_len);
1304                 if (ip->version_ihl >> 4 == 4) {
1305                         ip->hdr_checksum = 0;
1306                         prox_ip_cksum_sw(ip);
1307                         uint32_t l4_len = rte_bswap16(ip->total_length) - template->l3_len;
1308                         if (ip->next_proto_id == IPPROTO_UDP) {
1309                                 prox_rte_udp_hdr *udp = (prox_rte_udp_hdr *)(((uint8_t *)ip) + template->l3_len);
1310                                 prox_udp_cksum_sw(udp, l4_len, ip->src_addr, ip->dst_addr);
1311                         } else if (ip->next_proto_id == IPPROTO_TCP) {
1312                                 prox_rte_tcp_hdr *tcp = (prox_rte_tcp_hdr *)(((uint8_t *)ip) + template->l3_len);
1313                                 prox_tcp_cksum_sw(tcp, l4_len, ip->src_addr, ip->dst_addr);
1314                         }
1315                 } else if (ip->version_ihl >> 4 == 6) {
1316                         prox_rte_ipv6_hdr *ip6;
1317                         ip6 = (prox_rte_ipv6_hdr *)(template->buf + template->l2_len);
1318                         if (ip6->proto == IPPROTO_UDP) {
1319                                 prox_rte_udp_hdr *udp = (prox_rte_udp_hdr *)(ip6 + 1);
1320                                 udp->dgram_cksum = 0;
1321                                 udp->dgram_cksum = rte_ipv6_udptcp_cksum(ip6, udp);
1322                         } else if (ip6->proto == IPPROTO_TCP) {
1323                                 prox_rte_tcp_hdr *tcp = (prox_rte_tcp_hdr *)(ip6 + 1);
1324                                 tcp->cksum = 0;
1325                                 tcp->cksum = rte_ipv6_udptcp_cksum(ip6, tcp);
1326                         }
1327                 }
1328
1329                 /* The current implementation avoids checksum
1330                    calculation by determining that at packet
1331                    construction time, no fields are applied that would
1332                    require a recalculation of the checksum. */
1333                 if (task->lat_enabled && task->lat_pos > template->l2_len)
1334                         task->runtime_checksum_needed = 1;
1335                 if (task->accur_pos > template->l2_len)
1336                         task->runtime_checksum_needed = 1;
1337                 if (task->packet_id_pos > template->l2_len)
1338                         task->runtime_checksum_needed = 1;
1339         }
1340 }
1341
1342 static void task_gen_pkt_template_recalc_all(struct task_gen *task)
1343 {
1344         task_gen_pkt_template_recalc_metadata(task);
1345         task_gen_pkt_template_recalc_checksum(task);
1346 }
1347
1348 static void task_gen_set_pkt_templates_len(struct task_gen *task, uint32_t *pkt_sizes)
1349 {
1350         struct pkt_template *src, *dst;
1351
1352         for (size_t j = 0; j < task->n_pkts / task->orig_n_pkts; ++j) {
1353                 for (size_t i = 0; i < task->orig_n_pkts; ++i) {
1354                         dst = &task->pkt_template[j * task->orig_n_pkts + i];
1355                         dst->len = pkt_sizes[j];
1356                 }
1357         }
1358 }
1359
1360 static void task_gen_reset_pkt_templates_len(struct task_gen *task)
1361 {
1362         struct pkt_template *src, *dst;
1363
1364         for (size_t j = 0; j < task->n_pkts / task->orig_n_pkts; ++j) {
1365                 for (size_t i = 0; i < task->orig_n_pkts; ++i) {
1366                         src = &task->pkt_template_orig[i];
1367                         dst = &task->pkt_template[j * task->orig_n_pkts + i];
1368                         dst->len = src->len;
1369                 }
1370         }
1371 }
1372
1373 static void task_gen_reset_pkt_templates_content(struct task_gen *task)
1374 {
1375         struct pkt_template *src, *dst;
1376
1377         for (size_t j = 0; j < task->n_pkts / task->orig_n_pkts; ++j) {
1378                 for (size_t i = 0; i < task->orig_n_pkts; ++i) {
1379                         src = &task->pkt_template_orig[i];
1380                         dst = &task->pkt_template[j * task->orig_n_pkts + i];
1381                         memcpy(dst->buf, src->buf, RTE_MAX(src->len, dst->len));
1382                         if (task->flags & TASK_OVERWRITE_SRC_MAC_WITH_PORT_MAC) {
1383                                 rte_memcpy(&dst->buf[sizeof(prox_rte_ether_addr)], &task->src_mac, sizeof(prox_rte_ether_addr));
1384                         }
1385                         task_gen_apply_sig(task, dst);
1386                 }
1387         }
1388 }
1389
1390 static void task_gen_reset_pkt_templates(struct task_gen *task)
1391 {
1392         if (task->imix_nb_pkts)
1393                 task_gen_set_pkt_templates_len(task, task->imix_pkt_sizes);
1394         else
1395                 task_gen_reset_pkt_templates_len(task);
1396         task_gen_reset_pkt_templates_content(task);
1397         task_gen_pkt_template_recalc_all(task);
1398 }
1399
1400 static void task_init_gen_load_pkt_inline(struct task_gen *task, struct task_args *targ)
1401 {
1402         int rc;
1403
1404         task->orig_n_pkts = 1;
1405         if (task->imix_nb_pkts == 0) {
1406                 task->n_pkts = 1;
1407                 task->imix_pkt_sizes[0] = targ->pkt_size;
1408         } else {
1409                 task->n_pkts = task->imix_nb_pkts;
1410         }
1411         task_gen_allocate_templates(task, task->orig_n_pkts, task->n_pkts, DO_PANIC, NOT_FROM_PCAP);
1412
1413         rte_memcpy(task->pkt_template_orig[0].buf, targ->pkt_inline, task->max_frame_size);
1414         task->pkt_template_orig[0].len = task->imix_pkt_sizes[0];
1415         task_gen_reset_pkt_templates(task);
1416         check_all_pkt_size(task, DO_PANIC);
1417         check_all_fields_in_bounds(task, DO_PANIC);
1418
1419         // If IMIX was not specified then pkt_size is specified using pkt_size parameter or the length of pkt_inline
1420         // In that case, for backward compatibility, we do NOT adapt the length of IP and UDP to the length of the packet
1421         task_gen_set_eth_ip_udp_sizes(task, task->orig_n_pkts, task->imix_nb_pkts, task->imix_pkt_sizes);
1422 }
1423
1424 static void task_init_gen_load_pcap(struct task_gen *task, struct task_args *targ)
1425 {
1426         char err[PCAP_ERRBUF_SIZE];
1427         uint32_t max_frame_size;
1428         pcap_t *handle = pcap_open_offline(targ->pcap_file, err);
1429         PROX_PANIC(handle == NULL, "Failed to open PCAP file: %s\n", err);
1430
1431         task->orig_n_pkts = pcap_count_pkts(handle, &max_frame_size);
1432         plogx_info("%u packets in pcap file '%s'; max frame size=%d\n", task->orig_n_pkts, targ->pcap_file, max_frame_size);
1433         PROX_PANIC(max_frame_size > task->max_frame_size,
1434                 max_frame_size > PROX_RTE_ETHER_MAX_LEN + 2 * PROX_VLAN_TAG_SIZE -4 ?
1435                         "pkt_size too high and jumbo frames disabled" : "pkt_size > mtu");
1436
1437         if (targ->n_pkts)
1438                 task->orig_n_pkts = RTE_MIN(task->orig_n_pkts, targ->n_pkts);
1439         if (task->imix_nb_pkts == 0) {
1440                 task->n_pkts = task->orig_n_pkts;
1441         } else {
1442                 task->n_pkts = task->imix_nb_pkts * task->orig_n_pkts;
1443         }
1444         task_gen_allocate_templates(task, task->orig_n_pkts, task->n_pkts, DO_PANIC, FROM_PCAP);
1445         plogx_info("Loading %u packets from pcap\n", task->n_pkts);
1446
1447         pcap_read_pkts(handle, targ->pcap_file, task->orig_n_pkts, task->pkt_template_orig, NULL, max_frame_size);
1448         pcap_close(handle);
1449         task_gen_reset_pkt_templates(task);
1450         check_all_pkt_size(task, DO_PANIC);
1451         check_all_fields_in_bounds(task, DO_PANIC);
1452         task_gen_set_eth_ip_udp_sizes(task, task->orig_n_pkts, task->imix_nb_pkts, task->imix_pkt_sizes);
1453 }
1454
1455 static struct rte_mempool *task_gen_create_mempool(struct task_args *targ, uint16_t max_frame_size)
1456 {
1457         static char name[] = "gen_pool";
1458         struct rte_mempool *ret;
1459         const int sock_id = rte_lcore_to_socket_id(targ->lconf->id);
1460
1461         name[0]++;
1462         uint32_t mbuf_size = TX_MBUF_SIZE;
1463         if (max_frame_size + (unsigned)sizeof(struct rte_mbuf) + RTE_PKTMBUF_HEADROOM > mbuf_size)
1464                 mbuf_size = max_frame_size + (unsigned)sizeof(struct rte_mbuf) + RTE_PKTMBUF_HEADROOM;
1465         plog_info("\t\tCreating mempool with name '%s'\n", name);
1466         ret = rte_mempool_create(name, targ->nb_mbuf - 1, mbuf_size,
1467                                  targ->nb_cache_mbuf, sizeof(struct rte_pktmbuf_pool_private),
1468                                  rte_pktmbuf_pool_init, NULL, rte_pktmbuf_init, 0,
1469                                  sock_id, 0);
1470         PROX_PANIC(ret == NULL, "Failed to allocate dummy memory pool on socket %u with %u elements\n",
1471                    sock_id, targ->nb_mbuf - 1);
1472
1473         plog_info("\t\tMempool %p size = %u * %u cache %u, socket %d\n", ret,
1474                 targ->nb_mbuf - 1, mbuf_size, targ->nb_cache_mbuf, sock_id);
1475
1476         return ret;
1477 }
1478
1479 void task_gen_set_pkt_count(struct task_base *tbase, uint32_t count)
1480 {
1481         struct task_gen *task = (struct task_gen *)tbase;
1482
1483         task->pkt_count = count;
1484 }
1485
1486 int task_gen_set_pkt_size(struct task_base *tbase, uint32_t pkt_size)
1487 {
1488         struct task_gen *task = (struct task_gen *)tbase;
1489         int rc;
1490
1491         for (size_t i = 0; i < task->n_pkts; ++i) {
1492                 if ((rc = check_pkt_size(task, pkt_size, 0)) != 0)
1493                         return rc;
1494                 if ((rc = check_fields_in_bounds(task, pkt_size, 0)) != 0)
1495                         return rc;
1496         }
1497         for (size_t i = 0; i < task->n_pkts; ++i) {
1498                 task->pkt_template[i].len = pkt_size;
1499         }
1500         return 0;
1501 }
1502
1503 int task_gen_set_imix(struct task_base *tbase, uint32_t nb_pkt_sizes, uint32_t *pkt_sizes)
1504 {
1505         struct task_gen *task = (struct task_gen *)tbase;
1506         int rc;
1507
1508         memcpy(task->imix_pkt_sizes, pkt_sizes, nb_pkt_sizes * sizeof(uint32_t));
1509         for (size_t i = 0; i < nb_pkt_sizes; ++i) {
1510                 if ((rc = check_pkt_size(task, pkt_sizes[i], DO_NOT_PANIC)) != 0)
1511                         return rc;
1512                 if ((rc = check_fields_in_bounds(task, pkt_sizes[i], DO_NOT_PANIC)) != 0)
1513                         return rc;
1514         }
1515         // only set new_imix_nb_pkts if checks of pkt sizes succeeded
1516         task->new_imix_nb_pkts = nb_pkt_sizes;
1517         return 0;
1518 }
1519
1520 void task_gen_set_rate(struct task_base *tbase, uint64_t bps)
1521 {
1522         struct task_gen *task = (struct task_gen *)tbase;
1523
1524         task->new_rate_bps = bps;
1525 }
1526
1527 void task_gen_reset_randoms(struct task_base *tbase)
1528 {
1529         struct task_gen *task = (struct task_gen *)tbase;
1530
1531         for (uint32_t i = 0; i < task->n_rands; ++i) {
1532                 task->rand[i].rand_mask = 0;
1533                 task->rand[i].fixed_bits = 0;
1534                 task->rand[i].rand_offset = 0;
1535         }
1536         task->n_rands = 0;
1537 }
1538
1539 void task_gen_reset_ranges(struct task_base *tbase)
1540 {
1541         struct task_gen *task = (struct task_gen *)tbase;
1542
1543         memset(task->ranges, 0, task->n_ranges * sizeof(struct range));
1544         task->n_ranges = 0;
1545 }
1546
1547 int task_gen_set_value(struct task_base *tbase, uint32_t value, uint32_t offset, uint32_t len)
1548 {
1549         struct task_gen *task = (struct task_gen *)tbase;
1550
1551         if (offset + len > task->max_frame_size)
1552                 return -1;
1553         for (size_t i = 0; i < task->n_pkts; ++i) {
1554                 uint32_t to_write = rte_cpu_to_be_32(value) >> ((4 - len) * 8);
1555                 uint8_t *dst = task->pkt_template[i].buf;
1556
1557                 rte_memcpy(dst + offset, &to_write, len);
1558         }
1559
1560         task_gen_pkt_template_recalc_all(task);
1561
1562         return 0;
1563 }
1564
1565 void task_gen_reset_values(struct task_base *tbase)
1566 {
1567         struct task_gen *task = (struct task_gen *)tbase;
1568
1569         task_gen_reset_pkt_templates_content(task);
1570         task_gen_pkt_template_recalc_metadata(task);
1571         check_all_pkt_size(task, DO_NOT_PANIC);
1572         check_all_fields_in_bounds(task, DO_NOT_PANIC);
1573         task_gen_set_eth_ip_udp_sizes(task, task->orig_n_pkts, task->imix_nb_pkts, task->imix_pkt_sizes);
1574
1575         if (task->flags & TASK_OVERWRITE_SRC_MAC_WITH_PORT_MAC) {
1576                 for (uint32_t i = 0; i < task->n_pkts; ++i) {
1577                         rte_memcpy(&task->pkt_template[i].buf[sizeof(prox_rte_ether_addr)], &task->src_mac, sizeof(prox_rte_ether_addr));
1578                 }
1579         }
1580 }
1581
1582 uint32_t task_gen_get_n_randoms(struct task_base *tbase)
1583 {
1584         struct task_gen *task = (struct task_gen *)tbase;
1585
1586         return task->n_rands;
1587 }
1588
1589 uint32_t task_gen_get_n_ranges(struct task_base *tbase)
1590 {
1591         struct task_gen *task = (struct task_gen *)tbase;
1592
1593         return task->n_ranges;
1594 }
1595
1596 static void init_task_gen_pcap(struct task_base *tbase, struct task_args *targ)
1597 {
1598         struct task_gen_pcap *task = (struct task_gen_pcap *)tbase;
1599         task->socket_id = rte_lcore_to_socket_id(targ->lconf->id);
1600         uint32_t max_frame_size;
1601
1602         task->loop = targ->loop;
1603         task->pkt_idx = 0;
1604         task->hz = rte_get_tsc_hz();
1605
1606         char err[PCAP_ERRBUF_SIZE];
1607         pcap_t *handle = pcap_open_offline(targ->pcap_file, err);
1608         PROX_PANIC(handle == NULL, "Failed to open PCAP file: %s\n", err);
1609
1610         task->n_pkts = pcap_count_pkts(handle, &max_frame_size);
1611         plogx_info("%u packets in pcap file '%s'\n", task->n_pkts, targ->pcap_file);
1612
1613         task->local_mbuf.mempool = task_gen_create_mempool(targ, max_frame_size);
1614
1615         PROX_PANIC(!strcmp(targ->pcap_file, ""), "No pcap file defined\n");
1616
1617         if (targ->n_pkts) {
1618                 plogx_info("Configured to load %u packets\n", targ->n_pkts);
1619                 if (task->n_pkts > targ->n_pkts)
1620                         task->n_pkts = targ->n_pkts;
1621         }
1622         plogx_info("Loading %u packets from pcap\n", task->n_pkts);
1623
1624         size_t mem_size = task->n_pkts * (sizeof(*task->proto) + sizeof(*task->proto_tsc));
1625         uint8_t *mem = prox_zmalloc(mem_size, task->socket_id);
1626
1627         PROX_PANIC(mem == NULL, "Failed to allocate %lu bytes (in huge pages) for pcap file\n", mem_size);
1628         task->proto = (struct pkt_template *) mem;
1629         task->proto_tsc = (uint64_t *)(mem + task->n_pkts * sizeof(*task->proto));
1630
1631         for (uint i = 0; i < targ->n_pkts; i++) {
1632                 task->proto[i].buf = prox_zmalloc(max_frame_size, task->socket_id);
1633                 PROX_PANIC(task->proto[i].buf == NULL, "Failed to allocate %u bytes (in huge pages) for pcap file\n", max_frame_size);
1634         }
1635
1636         pcap_read_pkts(handle, targ->pcap_file, task->n_pkts, task->proto, task->proto_tsc, max_frame_size);
1637         pcap_close(handle);
1638 }
1639
1640 static int task_gen_find_random_with_offset(struct task_gen *task, uint32_t offset)
1641 {
1642         for (uint32_t i = 0; i < task->n_rands; ++i) {
1643                 if (task->rand[i].rand_offset == offset) {
1644                         return i;
1645                 }
1646         }
1647
1648         return UINT32_MAX;
1649 }
1650
1651 int task_gen_add_range(struct task_base *tbase, struct range *range)
1652 {
1653         struct task_gen *task = (struct task_gen *)tbase;
1654         if (task->n_ranges == MAX_RANGES) {
1655                 plog_err("Too many ranges\n");
1656                 return -1;
1657         }
1658         task->ranges[task->n_ranges].min = range->min;
1659         task->ranges[task->n_ranges].value = range->min;
1660         uint32_t m = range->max;
1661         task->ranges[task->n_ranges].range_len = 0;
1662         while (m != 0) {
1663                 m >>= 8;
1664                 task->ranges[task->n_ranges].range_len++;
1665         }
1666         task->ranges[task->n_ranges].offset = range->offset;
1667         task->ranges[task->n_ranges++].max = range->max;
1668         return 0;
1669 }
1670
1671 int task_gen_add_rand(struct task_base *tbase, const char *rand_str, uint32_t offset, uint32_t rand_id)
1672 {
1673         struct task_gen *task = (struct task_gen *)tbase;
1674         uint32_t existing_rand;
1675
1676         if (rand_id == UINT32_MAX && task->n_rands == 64) {
1677                 plog_err("Too many randoms\n");
1678                 return -1;
1679         }
1680         uint32_t mask, fixed, len;
1681
1682         if (parse_random_str(&mask, &fixed, &len, rand_str)) {
1683                 plog_err("%s\n", get_parse_err());
1684                 return -1;
1685         }
1686         task->runtime_checksum_needed = 1;
1687
1688         existing_rand = task_gen_find_random_with_offset(task, offset);
1689         if (existing_rand != UINT32_MAX) {
1690                 plog_warn("Random at offset %d already set => overwriting len = %d %s\n", offset, len, rand_str);
1691                 rand_id = existing_rand;
1692                 task->rand[rand_id].rand_len = len;
1693                 task->rand[rand_id].rand_offset = offset;
1694                 task->rand[rand_id].rand_mask = mask;
1695                 task->rand[rand_id].fixed_bits = fixed;
1696                 return 0;
1697         }
1698
1699         task->rand[task->n_rands].rand_len = len;
1700         task->rand[task->n_rands].rand_offset = offset;
1701         task->rand[task->n_rands].rand_mask = mask;
1702         task->rand[task->n_rands].fixed_bits = fixed;
1703
1704         task->n_rands++;
1705         return 0;
1706 }
1707
1708 static void start(struct task_base *tbase)
1709 {
1710         struct task_gen *task = (struct task_gen *)tbase;
1711         task->pkt_queue_index = 0;
1712
1713         task_gen_reset_token_time(task);
1714         if (tbase->l3.tmaster) {
1715                 register_all_ip_to_ctrl_plane(task);
1716         }
1717
1718         /* TODO
1719            Handle the case when two tasks transmit to the same port
1720            and one of them is stopped. In that case ARP (requests or replies)
1721            might not be sent. Master will have to keep a list of rings.
1722            stop will have to de-register IP from ctrl plane.
1723            un-registration will remove the ring. when having more than
1724            one active rings, master can always use the first one
1725         */
1726 }
1727
1728 static void stop_gen(struct task_base *tbase)
1729 {
1730         uint32_t i, j;
1731         struct task_gen *task = (struct task_gen *)tbase;
1732         if (task->store_msk) {
1733                 for (i = task->store_pkt_id & task->store_msk; i < task->store_msk + 1; i++) {
1734                         if (task->store_buf[i].len) {
1735                                 fprintf(task->fp, "%06d: ", i);
1736                                 for (j = 0; j < task->store_buf[i].len; j++) {
1737                                         fprintf(task->fp, "%02x ", task->store_buf[i].buf[j]);
1738                                 }
1739                                 fprintf(task->fp, "\n");
1740                         }
1741                 }
1742                 for (i = 0; i < (task->store_pkt_id & task->store_msk); i++) {
1743                         if (task->store_buf[i].len) {
1744                                 fprintf(task->fp, "%06d: ", i);
1745                                 for (j = 0; j < task->store_buf[i].len; j++) {
1746                                         fprintf(task->fp, "%02x ", task->store_buf[i].buf[j]);
1747                                 }
1748                                 fprintf(task->fp, "\n");
1749                         }
1750                 }
1751         }
1752 }
1753 static void start_pcap(struct task_base *tbase)
1754 {
1755         struct task_gen_pcap *task = (struct task_gen_pcap *)tbase;
1756         /* When we start, the first packet is sent immediately. */
1757         task->last_tsc = rte_rdtsc() - task->proto_tsc[0];
1758         task->pkt_idx = 0;
1759 }
1760
1761 static void init_task_gen_early(struct task_args *targ)
1762 {
1763         uint8_t *generator_count = prox_sh_find_system("generator_count");
1764
1765         if (generator_count == NULL) {
1766                 generator_count = prox_zmalloc(sizeof(*generator_count), rte_lcore_to_socket_id(targ->lconf->id));
1767                 PROX_PANIC(generator_count == NULL, "Failed to allocate generator count\n");
1768                 prox_sh_add_system("generator_count", generator_count);
1769         }
1770         targ->generator_id = *generator_count;
1771         (*generator_count)++;
1772 }
1773
1774 static void init_task_gen(struct task_base *tbase, struct task_args *targ)
1775 {
1776         struct task_gen *task = (struct task_gen *)tbase;
1777         task->socket_id = rte_lcore_to_socket_id(targ->lconf->id);
1778
1779         task->packet_id_pos = targ->packet_id_pos;
1780
1781         struct prox_port_cfg *port = find_reachable_port(targ);
1782         // TODO: check that all reachable ports have the same mtu...
1783         if (port) {
1784                 task->cksum_offload = port->requested_tx_offload & (RTE_ETH_TX_OFFLOAD_IPV4_CKSUM | RTE_ETH_TX_OFFLOAD_UDP_CKSUM);
1785                 task->port = port;
1786                 task->max_frame_size = port->mtu + PROX_RTE_ETHER_HDR_LEN + 2 * PROX_VLAN_TAG_SIZE;
1787         } else {
1788                 // Not generating to any port...
1789                 task->max_frame_size = PROX_RTE_ETHER_MAX_LEN;
1790         }
1791         task->local_mbuf.mempool = task_gen_create_mempool(targ, task->max_frame_size);
1792         PROX_PANIC(task->local_mbuf.mempool == NULL, "Failed to create mempool\n");
1793         task->pkt_idx = 0;
1794         task->hz = rte_get_tsc_hz();
1795         task->lat_pos = targ->lat_pos;
1796         task->accur_pos = targ->accur_pos;
1797         task->sig_pos = targ->sig_pos;
1798         task->flow_id_pos = targ->flow_id_pos;
1799         task->packet_id_in_flow_pos = targ->packet_id_in_flow_pos;
1800         task->sig = targ->sig;
1801         task->new_rate_bps = targ->rate_bps;
1802
1803         /*
1804          * For tokens, use 10 Gbps as base rate
1805          * Scripts can then use speed command, with speed=100 as 10 Gbps and speed=400 as 40 Gbps
1806          * Script can query prox "port info" command to find out the port link speed to know
1807          * at which rate to start. Note that virtio running on OVS returns 10 Gbps, so a script has
1808          * probably also to check the driver (as returned by the same "port info" command.
1809          */
1810         struct token_time_cfg tt_cfg = token_time_cfg_create(1250000000, rte_get_tsc_hz(), -1);
1811         token_time_init(&task->token_time, &tt_cfg);
1812
1813         init_task_gen_seeds(task);
1814
1815         task->min_bulk_size = targ->min_bulk_size;
1816         task->max_bulk_size = targ->max_bulk_size;
1817         if (task->min_bulk_size < 1)
1818                 task->min_bulk_size = 1;
1819         if (task->max_bulk_size < 1)
1820                 task->max_bulk_size = 64;
1821         PROX_PANIC(task->max_bulk_size > 64, "max_bulk_size higher than 64\n");
1822         PROX_PANIC(task->max_bulk_size < task->min_bulk_size, "max_bulk_size must be > than min_bulk_size\n");
1823
1824         task->pkt_count = -1;
1825         task->lat_enabled = targ->lat_enabled;
1826         task->runtime_flags = targ->runtime_flags;
1827         PROX_PANIC((task->lat_pos || task->accur_pos) && !task->lat_enabled, "lat not enabled by lat pos or accur pos configured\n");
1828
1829         task->generator_id = targ->generator_id;
1830         plog_info("\t\tGenerator id = %d\n", task->generator_id);
1831
1832         // Allocate array holding bytes to tsc for supported frame sizes
1833         task->bytes_to_tsc = prox_zmalloc(task->max_frame_size * MAX_PKT_BURST * sizeof(task->bytes_to_tsc[0]), task->socket_id);
1834         PROX_PANIC(task->bytes_to_tsc == NULL,
1835                 "Failed to allocate %u bytes (in huge pages) for bytes_to_tsc\n", task->max_frame_size);
1836
1837         // task->port->max_link_speed reports the maximum, non negotiated ink speed in Mbps e.g. 40k for a 40 Gbps NIC.
1838         // It can be UINT32_MAX (virtual devices or not supported by DPDK < 16.04)
1839         uint64_t bytes_per_hz = UINT64_MAX;
1840         if ((task->port) && (task->port->max_link_speed != UINT32_MAX)) {
1841                 bytes_per_hz = task->port->max_link_speed * 125000L;
1842                 plog_info("\t\tPort %u: max link speed is %ld Mbps\n",
1843                         (uint8_t)(task->port - prox_port_cfg), 8 * bytes_per_hz / 1000000);
1844         }
1845         // There are cases where hz estimate might be slighly over-estimated
1846         // This results in too much extrapolation
1847         // Only account for 99% of extrapolation to handle cases with up to 1% error clocks
1848         for (unsigned int i = 0; i < task->max_frame_size * MAX_PKT_BURST ; i++) {
1849                 if (bytes_per_hz == UINT64_MAX)
1850                         task->bytes_to_tsc[i] = 0;
1851                 else
1852                         task->bytes_to_tsc[i] = (task->hz * i * 0.99) / bytes_per_hz;
1853         }
1854
1855         task->imix_nb_pkts = targ->imix_nb_pkts;
1856         for (uint32_t i = 0; i < targ->imix_nb_pkts; i++) {
1857                 task->imix_pkt_sizes[i] = targ->imix_pkt_sizes[i];
1858         }
1859         if (!strcmp(targ->pcap_file, "")) {
1860                 plog_info("\t\tUsing inline definition of a packet\n");
1861                 task_init_gen_load_pkt_inline(task, targ);
1862         } else {
1863                 plog_info("\t\tLoading from pcap %s\n", targ->pcap_file);
1864                 task_init_gen_load_pcap(task, targ);
1865         }
1866
1867         PROX_PANIC(((targ->nb_txrings == 0) && (targ->nb_txports == 0)), "Gen mode requires a tx ring or a tx port");
1868         if ((targ->flags & DSF_KEEP_SRC_MAC) == 0) {
1869                 task->flags |= TASK_OVERWRITE_SRC_MAC_WITH_PORT_MAC;
1870                 memcpy(&task->src_mac, &prox_port_cfg[task->base.tx_params_hw.tx_port_queue->port].eth_addr, sizeof(prox_rte_ether_addr));
1871                 for (uint32_t i = 0; i < task->n_pkts; ++i) {
1872                         rte_memcpy(&task->pkt_template[i].buf[sizeof(prox_rte_ether_addr)], &task->src_mac, sizeof(prox_rte_ether_addr));
1873                 }
1874         }
1875         for (uint32_t i = 0; i < targ->n_rand_str; ++i) {
1876                 PROX_PANIC(task_gen_add_rand(tbase, targ->rand_str[i], targ->rand_offset[i], UINT32_MAX),
1877                            "Failed to add random\n");
1878         }
1879         for (uint32_t i = 0; i < targ->n_ranges; ++i) {
1880                 PROX_PANIC(task_gen_add_range(tbase, &targ->range[i]), "Failed to add range\n");
1881         }
1882         if (targ->store_max) {
1883                 char filename[256];
1884                 sprintf(filename, "gen_buf_%02d_%02d", targ->lconf->id, targ->task);
1885
1886                 task->store_msk = targ->store_max - 1;
1887                 task->store_buf = (struct packet *)malloc(sizeof(struct packet) * targ->store_max);
1888                 task->fp = fopen(filename, "w+");
1889                 PROX_PANIC(task->fp == NULL, "Unable to open %s\n", filename);
1890         } else {
1891                 task->store_msk = 0;
1892         }
1893         uint32_t n_entries = get_n_range_flows(task) * task->orig_n_pkts * 4;
1894 #ifndef RTE_HASH_BUCKET_ENTRIES
1895 #define RTE_HASH_BUCKET_ENTRIES 8
1896 #endif
1897         // cuckoo hash requires at least RTE_HASH_BUCKET_ENTRIES (8) entries
1898         if (n_entries < RTE_HASH_BUCKET_ENTRIES)
1899                 n_entries = RTE_HASH_BUCKET_ENTRIES;
1900
1901         static char hash_name[30];
1902         sprintf(hash_name, "A%03d_hash_gen_table", targ->lconf->id);
1903         struct rte_hash_parameters hash_params = {
1904                 .name = hash_name,
1905                 .entries = n_entries,
1906                 .key_len = sizeof(union ipv4_5tuple_host),
1907                 .hash_func = rte_hash_crc,
1908                 .hash_func_init_val = 0,
1909                 .socket_id = task->socket_id,
1910         };
1911         plog_info("\t\thash table name = %s\n", hash_params.name);
1912         task->flow_id_table = rte_hash_create(&hash_params);
1913         PROX_PANIC(task->flow_id_table == NULL, "Failed to set up flow_id hash table for gen\n");
1914         plog_info("\t\tflow_id hash table allocated, with %d entries of size %d\n", hash_params.entries, hash_params.key_len);
1915         build_flow_table(task);
1916         task->flows = (struct flows *)prox_zmalloc(n_entries * sizeof(struct flows), task->socket_id);
1917         PROX_PANIC(task->flows == NULL, "Failed to allocate flows\n");
1918         plog_info("\t\t%d flows allocated\n", n_entries);
1919 }
1920
1921 static struct task_init task_init_gen = {
1922         .mode_str = "gen",
1923         .init = init_task_gen,
1924         .handle = handle_gen_bulk,
1925         .start = start,
1926         .early_init = init_task_gen_early,
1927 #ifdef SOFT_CRC
1928         // For SOFT_CRC, no offload is needed. If both NOOFFLOADS and NOMULTSEGS flags are set the
1929         // vector mode is used by DPDK, resulting (theoretically) in higher performance.
1930         .flag_features = TASK_FEATURE_NEVER_DISCARDS | TASK_FEATURE_NO_RX | TASK_FEATURE_TXQ_FLAGS_NOOFFLOADS,
1931 #else
1932         .flag_features = TASK_FEATURE_NEVER_DISCARDS | TASK_FEATURE_NO_RX,
1933 #endif
1934         .size = sizeof(struct task_gen),
1935         .stop_last = stop_gen
1936 };
1937
1938 static struct task_init task_init_gen_l3 = {
1939         .mode_str = "gen",
1940         .sub_mode_str = "l3",
1941         .init = init_task_gen,
1942         .handle = handle_gen_bulk,
1943         .start = start,
1944         .early_init = init_task_gen_early,
1945 #ifdef SOFT_CRC
1946         // For SOFT_CRC, no offload is needed. If both NOOFFLOADS and NOMULTSEGS flags are set the
1947         // vector mode is used by DPDK, resulting (theoretically) in higher performance.
1948         .flag_features = TASK_FEATURE_NEVER_DISCARDS | TASK_FEATURE_NO_RX | TASK_FEATURE_TXQ_FLAGS_NOOFFLOADS,
1949 #else
1950         .flag_features = TASK_FEATURE_NEVER_DISCARDS | TASK_FEATURE_NO_RX,
1951 #endif
1952         .size = sizeof(struct task_gen)
1953 };
1954
1955 /* This mode uses time stamps in the pcap file */
1956 static struct task_init task_init_gen_pcap = {
1957         .mode_str = "gen",
1958         .sub_mode_str = "pcap",
1959         .init = init_task_gen_pcap,
1960         .handle = handle_gen_pcap_bulk,
1961         .start = start_pcap,
1962         .early_init = init_task_gen_early,
1963 #ifdef SOFT_CRC
1964         .flag_features = TASK_FEATURE_NEVER_DISCARDS | TASK_FEATURE_NO_RX | TASK_FEATURE_TXQ_FLAGS_NOOFFLOADS,
1965 #else
1966         .flag_features = TASK_FEATURE_NEVER_DISCARDS | TASK_FEATURE_NO_RX,
1967 #endif
1968         .size = sizeof(struct task_gen_pcap)
1969 };
1970
1971 __attribute__((constructor)) static void reg_task_gen(void)
1972 {
1973         reg_task(&task_init_gen);
1974         reg_task(&task_init_gen_l3);
1975         reg_task(&task_init_gen_pcap);
1976 }