Added scripting support for ranges
[samplevnf.git] / VNFs / DPPD-PROX / handle_gen.c
1 /*
2 // Copyright (c) 2010-2020 Intel Corporation
3 //
4 // Licensed under the Apache License, Version 2.0 (the "License");
5 // you may not use this file except in compliance with the License.
6 // You may obtain a copy of the License at
7 //
8 //     http://www.apache.org/licenses/LICENSE-2.0
9 //
10 // Unless required by applicable law or agreed to in writing, software
11 // distributed under the License is distributed on an "AS IS" BASIS,
12 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 // See the License for the specific language governing permissions and
14 // limitations under the License.
15 */
16 #include <rte_mbuf.h>
17 #include <pcap.h>
18 #include <string.h>
19 #include <stdlib.h>
20 #include <rte_cycles.h>
21 #include <rte_version.h>
22 #include <rte_byteorder.h>
23 #include <rte_ether.h>
24 #include <rte_hash.h>
25 #include <rte_hash_crc.h>
26 #include <rte_malloc.h>
27
28 #include "prox_shared.h"
29 #include "random.h"
30 #include "prox_malloc.h"
31 #include "handle_gen.h"
32 #include "handle_lat.h"
33 #include "task_init.h"
34 #include "task_base.h"
35 #include "prox_port_cfg.h"
36 #include "lconf.h"
37 #include "log.h"
38 #include "quit.h"
39 #include "prox_cfg.h"
40 #include "mbuf_utils.h"
41 #include "qinq.h"
42 #include "prox_cksum.h"
43 #include "etypes.h"
44 #include "prox_assert.h"
45 #include "prefetch.h"
46 #include "token_time.h"
47 #include "local_mbuf.h"
48 #include "arp.h"
49 #include "tx_pkt.h"
50 #include "handle_master.h"
51 #include "defines.h"
52 #include "prox_ipv6.h"
53 #include "handle_lb_5tuple.h"
54
55 struct pkt_template {
56         uint16_t len;
57         uint16_t l2_len;
58         uint16_t l3_len;
59         uint8_t  *buf;
60 };
61
62 #define MAX_STORE_PKT_SIZE      2048
63
64 struct packet {
65         unsigned int len;
66         unsigned char buf[MAX_STORE_PKT_SIZE];
67 };
68
69 #define IP4(x) x & 0xff, (x >> 8) & 0xff, (x >> 16) & 0xff, x >> 24
70
71 #define DO_PANIC        1
72 #define DO_NOT_PANIC    0
73
74 #define FROM_PCAP       1
75 #define NOT_FROM_PCAP   0
76
77 #define MAX_RANGES      64
78
79 #define TASK_OVERWRITE_SRC_MAC_WITH_PORT_MAC 1
80
81 static void pkt_template_init_mbuf(struct pkt_template *pkt_template, struct rte_mbuf *mbuf, uint8_t *pkt)
82 {
83         const uint32_t pkt_size = pkt_template->len;
84
85         rte_pktmbuf_pkt_len(mbuf) = pkt_size;
86         rte_pktmbuf_data_len(mbuf) = pkt_size;
87         init_mbuf_seg(mbuf);
88         rte_memcpy(pkt, pkt_template->buf, pkt_template->len);
89 }
90
91 struct task_gen_pcap {
92         struct task_base base;
93         uint64_t hz;
94         struct local_mbuf local_mbuf;
95         uint32_t pkt_idx;
96         struct pkt_template *proto;
97         uint32_t loop;
98         uint32_t n_pkts;
99         uint64_t last_tsc;
100         uint64_t *proto_tsc;
101         uint32_t socket_id;
102 };
103
104 struct flows {
105         uint32_t packet_id;
106 };
107
108 struct task_gen {
109         struct task_base base;
110         uint64_t hz;
111         struct token_time token_time;
112         struct local_mbuf local_mbuf;
113         struct pkt_template *pkt_template; /* packet templates used at runtime */
114         uint64_t write_duration_estimate; /* how long it took previously to write the time stamps in the packets */
115         uint64_t earliest_tsc_next_pkt;
116         uint64_t new_rate_bps;
117         uint64_t pkt_queue_index;
118         uint32_t n_pkts; /* number of packets in pcap */
119         uint32_t orig_n_pkts; /* number of packets in pcap */
120         uint32_t pkt_idx; /* current packet from pcap */
121         uint32_t pkt_count; /* how many pakets to generate */
122         uint32_t max_frame_size;
123         uint32_t runtime_flags;
124         uint16_t lat_pos;
125         uint16_t packet_id_pos;
126         uint16_t accur_pos;
127         uint16_t sig_pos;
128         uint16_t flow_id_pos;
129         uint16_t packet_id_in_flow_pos;
130         uint32_t sig;
131         uint32_t socket_id;
132         uint8_t generator_id;
133         uint8_t n_rands; /* number of randoms */
134         uint8_t n_ranges; /* number of ranges */
135         uint8_t min_bulk_size;
136         uint8_t max_bulk_size;
137         uint8_t lat_enabled;
138         uint8_t runtime_checksum_needed;
139         struct {
140                 struct random state;
141                 uint32_t rand_mask; /* since the random vals are uniform, masks don't introduce bias  */
142                 uint32_t fixed_bits; /* length of each random (max len = 4) */
143                 uint16_t rand_offset; /* each random has an offset*/
144                 uint8_t rand_len; /* # bytes to take from random (no bias introduced) */
145         } rand[64];
146         struct range ranges[MAX_RANGES];
147         uint64_t accur[ACCURACY_WINDOW];
148         uint64_t pkt_tsc_offset[64];
149         struct pkt_template *pkt_template_orig; /* packet templates (from inline or from pcap) */
150         prox_rte_ether_addr  src_mac;
151         uint8_t flags;
152         uint8_t cksum_offload;
153         struct prox_port_cfg *port;
154         uint64_t *bytes_to_tsc;
155         uint32_t imix_pkt_sizes[MAX_IMIX_PKTS];
156         uint32_t imix_nb_pkts;
157         uint32_t new_imix_nb_pkts;
158         uint32_t store_pkt_id;
159         uint32_t store_msk;
160         struct packet *store_buf;
161         FILE *fp;
162         struct rte_hash *flow_id_table;
163         struct flows*flows;
164 } __rte_cache_aligned;
165
166 static void task_gen_set_pkt_templates_len(struct task_gen *task, uint32_t *pkt_sizes);
167 static void task_gen_reset_pkt_templates_content(struct task_gen *task);
168 static void task_gen_pkt_template_recalc_metadata(struct task_gen *task);
169 static int check_all_pkt_size(struct task_gen *task, int do_panic);
170 static int check_all_fields_in_bounds(struct task_gen *task, int do_panic);
171
172 static inline uint8_t ipv4_get_hdr_len(prox_rte_ipv4_hdr *ip)
173 {
174         /* Optimize for common case of IPv4 header without options. */
175         if (ip->version_ihl == 0x45)
176                 return sizeof(prox_rte_ipv4_hdr);
177         if (unlikely(ip->version_ihl >> 4 != 4)) {
178                 plog_warn("IPv4 ether_type but IP version = %d != 4", ip->version_ihl >> 4);
179                 return 0;
180         }
181         return (ip->version_ihl & 0xF) * 4;
182 }
183
184 static void parse_l2_l3_len(uint8_t *pkt, uint16_t *l2_len, uint16_t *l3_len, uint16_t len)
185 {
186         *l2_len = sizeof(prox_rte_ether_hdr);
187         *l3_len = 0;
188         prox_rte_vlan_hdr *vlan_hdr;
189         prox_rte_ether_hdr *eth_hdr = (prox_rte_ether_hdr*)pkt;
190         prox_rte_ipv4_hdr *ip;
191         uint16_t ether_type = eth_hdr->ether_type;
192
193         // Unstack VLAN tags
194         while (((ether_type == ETYPE_8021ad) || (ether_type == ETYPE_VLAN)) && (*l2_len + sizeof(prox_rte_vlan_hdr) < len)) {
195                 vlan_hdr = (prox_rte_vlan_hdr *)(pkt + *l2_len);
196                 *l2_len +=4;
197                 ether_type = vlan_hdr->eth_proto;
198         }
199
200         // No L3 cksum offload for IPv6, but TODO L4 offload
201         // ETYPE_EoGRE CRC not implemented yet
202
203         switch (ether_type) {
204         case ETYPE_MPLSU:
205         case ETYPE_MPLSM:
206                 *l2_len +=4;
207                 break;
208         case ETYPE_IPv6:
209         case ETYPE_IPv4:
210                 break;
211         case ETYPE_EoGRE:
212         case ETYPE_ARP:
213                 *l2_len = 0;
214                 break;
215         default:
216                 *l2_len = 0;
217                 plog_warn("Unsupported packet type %x - CRC might be wrong\n", ether_type);
218                 break;
219         }
220
221         if (*l2_len) {
222                 prox_rte_ipv4_hdr *ip = (prox_rte_ipv4_hdr *)(pkt + *l2_len);
223                 if (ip->version_ihl >> 4 == 4)
224                         *l3_len = ipv4_get_hdr_len(ip);
225         }
226 }
227
228 static void checksum_packet(uint8_t *hdr, struct rte_mbuf *mbuf, struct pkt_template *pkt_template, int cksum_offload)
229 {
230         uint16_t l2_len = pkt_template->l2_len;
231         uint16_t l3_len = pkt_template->l3_len;
232
233         prox_rte_ipv4_hdr *ip = (prox_rte_ipv4_hdr*)(hdr + l2_len);
234         if (l3_len) {
235                 prox_ip_udp_cksum(mbuf, ip, l2_len, l3_len, cksum_offload);
236         } else if (ip->version_ihl >> 4 == 6) {
237                 prox_rte_ipv6_hdr *ip6 = (prox_rte_ipv6_hdr *)(hdr + l2_len);
238                 if (ip6->proto == IPPROTO_UDP) {
239                         prox_rte_udp_hdr *udp = (prox_rte_udp_hdr *)(ip6 + 1);
240                         udp->dgram_cksum = 0;
241                         udp->dgram_cksum = rte_ipv6_udptcp_cksum(ip6, udp);
242                 } else if (ip6->proto == IPPROTO_TCP) {
243                         prox_rte_tcp_hdr *tcp = (prox_rte_tcp_hdr *)(ip6 + 1);
244                         tcp->cksum = 0;
245                         tcp->cksum = rte_ipv6_udptcp_cksum(ip6, tcp);
246                 }
247         }
248 }
249
250 static void task_gen_reset_token_time(struct task_gen *task)
251 {
252         token_time_set_bpp(&task->token_time, task->new_rate_bps);
253         token_time_reset(&task->token_time, rte_rdtsc(), 0);
254 }
255
256 static void task_gen_take_count(struct task_gen *task, uint32_t send_bulk)
257 {
258         if (task->pkt_count == (uint32_t)-1)
259                 return ;
260         else {
261                 if (task->pkt_count >= send_bulk)
262                         task->pkt_count -= send_bulk;
263                 else
264                         task->pkt_count = 0;
265         }
266 }
267
268 static int handle_gen_pcap_bulk(struct task_base *tbase, struct rte_mbuf **mbuf, uint16_t n_pkts)
269 {
270         struct task_gen_pcap *task = (struct task_gen_pcap *)tbase;
271         uint64_t now = rte_rdtsc();
272         uint64_t send_bulk = 0;
273         uint32_t pkt_idx_tmp = task->pkt_idx;
274
275         if (pkt_idx_tmp == task->n_pkts) {
276                 PROX_ASSERT(task->loop);
277                 return 0;
278         }
279
280         for (uint16_t j = 0; j < 64; ++j) {
281                 uint64_t tsc = task->proto_tsc[pkt_idx_tmp];
282                 if (task->last_tsc + tsc <= now) {
283                         task->last_tsc += tsc;
284                         send_bulk++;
285                         pkt_idx_tmp++;
286                         if (pkt_idx_tmp == task->n_pkts) {
287                                 if (task->loop)
288                                         pkt_idx_tmp = 0;
289                                 else
290                                         break;
291                         }
292                 }
293                 else
294                         break;
295         }
296
297         struct rte_mbuf **new_pkts = local_mbuf_refill_and_take(&task->local_mbuf, send_bulk);
298         if (new_pkts == NULL)
299                 return 0;
300
301         for (uint16_t j = 0; j < send_bulk; ++j) {
302                 struct rte_mbuf *next_pkt = new_pkts[j];
303                 struct pkt_template *pkt_template = &task->proto[task->pkt_idx];
304                 uint8_t *hdr = rte_pktmbuf_mtod(next_pkt, uint8_t *);
305
306                 pkt_template_init_mbuf(pkt_template, next_pkt, hdr);
307
308                 task->pkt_idx++;
309                 if (task->pkt_idx == task->n_pkts) {
310                         if (task->loop)
311                                 task->pkt_idx = 0;
312                         else
313                                 break;
314                 }
315         }
316
317         return task->base.tx_pkt(&task->base, new_pkts, send_bulk, NULL);
318 }
319
320 static inline uint64_t bytes_to_tsc(struct task_gen *task, uint32_t bytes)
321 {
322         return task->bytes_to_tsc[bytes];
323 }
324
325 static uint32_t task_gen_next_pkt_idx(const struct task_gen *task, uint32_t pkt_idx)
326 {
327         return pkt_idx + 1 >= task->n_pkts? 0 : pkt_idx + 1;
328 }
329
330 static uint32_t task_gen_offset_pkt_idx(const struct task_gen *task, uint32_t offset)
331 {
332         return (task->pkt_idx + offset) % task->n_pkts;
333 }
334
335 static uint32_t task_gen_calc_send_bulk(const struct task_gen *task, uint32_t *total_bytes)
336 {
337         /* The biggest bulk we allow to send is task->max_bulk_size
338            packets. The max bulk size can also be limited by the
339            pkt_count field.  At the same time, we are rate limiting
340            based on the specified speed (in bytes per second) so token
341            bucket based rate limiting must also be applied. The
342            minimum bulk size is also constrained. If the calculated
343            bulk size is less then the minimum, then don't send
344            anything. */
345
346         const uint32_t min_bulk = task->min_bulk_size;
347         uint32_t max_bulk = task->max_bulk_size;
348
349         if (task->pkt_count != (uint32_t)-1 && task->pkt_count < max_bulk) {
350                 max_bulk = task->pkt_count;
351         }
352
353         uint32_t send_bulk = 0;
354         uint32_t pkt_idx_tmp = task->pkt_idx;
355         uint32_t would_send_bytes = 0;
356         uint32_t pkt_size;
357
358         /*
359          * TODO - this must be improved to take into account the fact that, after applying randoms
360          * The packet can be replaced by an ARP
361          */
362         for (uint16_t j = 0; j < max_bulk; ++j) {
363                 struct pkt_template *pktpl = &task->pkt_template[pkt_idx_tmp];
364                 pkt_size = pktpl->len;
365                 uint32_t pkt_len = pkt_len_to_wire_size(pkt_size);
366                 if (pkt_len + would_send_bytes > task->token_time.bytes_now)
367                         break;
368
369                 pkt_idx_tmp = task_gen_next_pkt_idx(task, pkt_idx_tmp);
370
371                 send_bulk++;
372                 would_send_bytes += pkt_len;
373         }
374
375         if (send_bulk < min_bulk)
376                 return 0;
377         *total_bytes = would_send_bytes;
378         return send_bulk;
379 }
380
381 static void task_gen_apply_random_fields(struct task_gen *task, uint8_t *hdr)
382 {
383         uint32_t ret, ret_tmp;
384
385         for (uint16_t i = 0; i < task->n_rands; ++i) {
386                 ret = random_next(&task->rand[i].state);
387                 ret_tmp = (ret & task->rand[i].rand_mask) | task->rand[i].fixed_bits;
388
389                 ret_tmp = rte_bswap32(ret_tmp);
390                 /* At this point, the lower order bytes (BE) contain
391                    the generated value. The address where the values
392                    of interest starts is at ret_tmp + 4 - rand_len. */
393                 uint8_t *pret_tmp = (uint8_t*)&ret_tmp;
394                 rte_memcpy(hdr + task->rand[i].rand_offset, pret_tmp + 4 - task->rand[i].rand_len, task->rand[i].rand_len);
395         }
396 }
397
398 static void task_gen_apply_all_random_fields(struct task_gen *task, uint8_t **pkt_hdr, uint32_t count)
399 {
400         if (!task->n_rands)
401                 return;
402
403         for (uint16_t i = 0; i < count; ++i)
404                 task_gen_apply_random_fields(task, pkt_hdr[i]);
405 }
406
407 static void task_gen_apply_ranges(struct task_gen *task, uint8_t *pkt_hdr)
408 {
409         uint32_t ret;
410         if (!task->n_ranges)
411                 return;
412
413         for (uint16_t j = 0; j < task->n_ranges; ++j) {
414                 if (unlikely(task->ranges[j].value == task->ranges[j].max))
415                         task->ranges[j].value = task->ranges[j].min;
416                 else
417                         task->ranges[j].value++;
418                 ret = rte_bswap32(task->ranges[j].value);
419                 uint8_t *pret = (uint8_t*)&ret;
420                 rte_memcpy(pkt_hdr + task->ranges[j].offset, pret + 4 - task->ranges[j].range_len, task->ranges[j].range_len);
421         }
422 }
423
424 static void task_gen_apply_all_ranges(struct task_gen *task, uint8_t **pkt_hdr, uint32_t count)
425 {
426         uint32_t ret;
427         if (!task->n_ranges)
428                 return;
429
430         for (uint16_t i = 0; i < count; ++i) {
431                 task_gen_apply_ranges(task, pkt_hdr[i]);
432         }
433 }
434
435 static inline uint32_t gcd(uint32_t a, uint32_t b)
436 {
437         // Euclidean algorithm
438         uint32_t t;
439         while (b != 0) {
440                 t = b;
441                 b = a % b;
442                 a = t;
443         }
444         return a;
445 }
446
447 static inline uint32_t lcm(uint32_t a, uint32_t b)
448 {
449         return ((a / gcd(a, b)) * b);
450 }
451
452 static uint32_t get_n_range_flows(struct task_gen *task)
453 {
454         uint32_t t = 1;
455         for (int i = 0; i < task->n_ranges; i++) {
456                 t = lcm((task->ranges[i].max - task->ranges[i].min) + 1, t);
457         }
458         return t;
459 }
460
461 static uint32_t get_n_rand_flows(struct task_gen *task)
462 {
463         uint32_t t = 0;
464         for (int i = 0; i < task->n_rands; i++) {
465                 t += __builtin_popcount(task->rand[i].rand_mask);
466         }
467         PROX_PANIC(t > 31, "Too many random bits - maximum 31 supported\n");
468         return 1 << t;
469 }
470
471 //void add_to_hash_table(struct task_gen *task, uint32_t *buffer, uint32_t *idx, uint32_t mask, uint32_t bit_pos, uint32_t val, uint32_t fixed_bits, uint32_t rand_offset) {
472 //              uint32_t ret_tmp = val | fixed_bits;
473 //              ret_tmp = rte_bswap32(ret_tmp);
474 //              uint8_t *pret_tmp = (uint8_t*)&ret_tmp;
475 //              rte_memcpy(buf + rand_offset, pret_tmp + 4 - rand_len, rand_len);
476 //
477 // init idx
478 // alloc buffer
479 // init/alloc hash_table
480 //void build_buffer(struct task_gen *task, uint32_t *buffer, uint32_t *idx, uint32_t mask, uint32_t bit_pos, uint32_t val)
481 //{
482 //      if (mask == 0) {
483 //              buffer[*idx] = val;
484 //              *idx = (*idx) + 1;
485 //              return;
486 //      }
487 //      build_buffer(task, but, mask >> 1, bit_pos + 1, val);
488 //      if (mask & 1) {
489 //              build_buffer(task, but, mask >> 1, bit_pos + 1, val | (1 << bit_pos));
490 //}
491
492 static void build_flow_table(struct task_gen *task)
493 {
494         uint8_t buf[2048], *key_fields;
495         union ipv4_5tuple_host key;
496         struct pkt_template *pkt_template;
497         uint32_t n_range_flows = get_n_range_flows(task);
498         // uint32_t n_rand_flows = get_n_rand_flows(task);
499         // uint32_t n_flows= n_range_flows * n_rand_flows * task->orig_n_pkts;
500         // for (int i = 0; i < task->n_rands; i++) {
501         //      build_buffer(task, task->values_buf[i], &task->values_idx[i], task->rand[i].rand_mask, 0, 0);
502         // }
503
504         uint32_t n_flows = n_range_flows * task->orig_n_pkts;
505
506         for (uint32_t k = 0; k < task->orig_n_pkts; k++) {
507                 memcpy(buf, task->pkt_template[k].buf, task->pkt_template[k].len);
508                 for (uint32_t j = 0; j < n_range_flows; j++) {
509                         task_gen_apply_ranges(task, buf);
510                         key_fields = buf + sizeof(prox_rte_ether_hdr) + offsetof(prox_rte_ipv4_hdr, time_to_live);
511                         key.xmm = _mm_loadu_si128((__m128i*)(key_fields));
512                         key.pad0 = key.pad1 = 0;
513                         int idx = rte_hash_add_key(task->flow_id_table, (const void *)&key);
514                         PROX_PANIC(idx < 0, "Unable to add key in table\n");
515                         if (idx >= 0)
516                                 plog_dbg("Added key %d, %x, %x, %x, %x\n", key.proto, key.ip_src, key.ip_dst, key.port_src, key.port_dst);
517                 }
518         }
519 }
520
521 static int32_t task_gen_get_flow_id(struct task_gen *task, uint8_t *pkt_hdr)
522 {
523         int ret = 0;
524         union ipv4_5tuple_host key;
525         uint8_t *hdr = pkt_hdr + sizeof(prox_rte_ether_hdr) + offsetof(prox_rte_ipv4_hdr, time_to_live);
526         // __m128i data = _mm_loadu_si128((__m128i*)(hdr));
527         // key.xmm = _mm_and_si128(data, mask0);
528         key.xmm = _mm_loadu_si128((__m128i*)(hdr));
529         key.pad0 = key.pad1 = 0;
530         ret = rte_hash_lookup(task->flow_id_table, (const void *)&key);
531         if (ret < 0) {
532                 plog_err("Flow not found: %d, %x, %x, %x, %x\n", key.proto, key.ip_src, key.ip_dst, key.port_src, key.port_dst);
533         }
534         return ret;
535 }
536
537 static void task_gen_apply_all_flow_id(struct task_gen *task, uint8_t **pkt_hdr, uint32_t count, int32_t *flow_id)
538 {
539         if (task->flow_id_pos) {
540                 for (uint16_t j = 0; j < count; ++j) {
541                         flow_id[j] = task_gen_get_flow_id(task, pkt_hdr[j]);
542                         *(int32_t *)(pkt_hdr[j] + task->flow_id_pos) = flow_id[j];
543                 }
544         }
545 }
546
547 static void task_gen_apply_accur_pos(struct task_gen *task, uint8_t *pkt_hdr, uint32_t accuracy)
548 {
549         *(uint32_t *)(pkt_hdr + task->accur_pos) = accuracy;
550 }
551
552 static void task_gen_apply_sig(struct task_gen *task, struct pkt_template *dst)
553 {
554         if (task->sig_pos)
555                 *(uint32_t *)(dst->buf + task->sig_pos) = task->sig;
556 }
557
558 static void task_gen_apply_all_accur_pos(struct task_gen *task, uint8_t **pkt_hdr, uint32_t count)
559 {
560         if (!task->accur_pos)
561                 return;
562
563         /* The accuracy of task->pkt_queue_index - ACCURACY_WINDOW is stored in
564            packet task->pkt_queue_index. The ID modulo ACCURACY_WINDOW is the
565            same. */
566         for (uint16_t j = 0; j < count; ++j) {
567                 uint32_t accuracy = task->accur[(task->pkt_queue_index + j) & (ACCURACY_WINDOW - 1)];
568                 task_gen_apply_accur_pos(task, pkt_hdr[j], accuracy);
569         }
570 }
571
572 static void task_gen_apply_unique_id(struct task_gen *task, uint8_t *pkt_hdr, const struct unique_id *id)
573 {
574         struct unique_id *dst = (struct unique_id *)(pkt_hdr + task->packet_id_pos);
575
576         *dst = *id;
577 }
578
579 static void task_gen_apply_all_unique_id(struct task_gen *task, uint8_t **pkt_hdr, uint32_t count)
580 {
581         if (!task->packet_id_pos)
582                 return;
583
584         for (uint16_t i = 0; i < count; ++i) {
585                 struct unique_id id;
586                 unique_id_init(&id, task->generator_id, task->pkt_queue_index++);
587                 task_gen_apply_unique_id(task, pkt_hdr[i], &id);
588         }
589 }
590
591 static void task_gen_apply_id_in_flows(struct task_gen *task, uint8_t *pkt_hdr, const struct unique_id *id)
592 {
593         struct unique_id *dst = (struct unique_id *)(pkt_hdr + task->packet_id_in_flow_pos);
594         *dst = *id;
595 }
596
597 static void task_gen_apply_all_id_in_flows(struct task_gen *task, uint8_t **pkt_hdr, uint32_t count, int32_t *idx)
598 {
599         if (!task->packet_id_in_flow_pos)
600                 return;
601
602         for (uint16_t i = 0; i < count; ++i) {
603                 struct unique_id id;
604                 if (idx[i] >= 0 ) {
605                         unique_id_init(&id, task->generator_id, task->flows[idx[i]].packet_id++);
606                         task_gen_apply_id_in_flows(task, pkt_hdr[i], &id);
607                 }
608         }
609 }
610
611 static void task_gen_checksum_packets(struct task_gen *task, struct rte_mbuf **mbufs, uint8_t **pkt_hdr, uint32_t count)
612 {
613         if (!(task->runtime_flags & TASK_TX_CRC))
614                 return;
615
616         if (!task->runtime_checksum_needed)
617                 return;
618
619         uint32_t pkt_idx = task_gen_offset_pkt_idx(task, - count);
620         for (uint16_t i = 0; i < count; ++i) {
621                 struct pkt_template *pkt_template = &task->pkt_template[pkt_idx];
622                 checksum_packet(pkt_hdr[i], mbufs[i], pkt_template, task->cksum_offload);
623                 pkt_idx = task_gen_next_pkt_idx(task, pkt_idx);
624         }
625 }
626
627 static void task_gen_consume_tokens(struct task_gen *task, uint32_t tokens, uint32_t send_count)
628 {
629         /* If max burst has been sent, we can't keep up so just assume
630            that we can (leaving a "gap" in the packet stream on the
631            wire) */
632         task->token_time.bytes_now -= tokens;
633         if (send_count == task->max_bulk_size && task->token_time.bytes_now > tokens) {
634                 task->token_time.bytes_now = tokens;
635         }
636 }
637
638 static uint64_t task_gen_calc_bulk_duration(struct task_gen *task, uint32_t count)
639 {
640         uint32_t pkt_idx = task_gen_offset_pkt_idx(task, - 1);
641         struct pkt_template *last_pkt_template = &task->pkt_template[pkt_idx];
642         uint32_t last_pkt_len = pkt_len_to_wire_size(last_pkt_template->len);
643 #ifdef NO_EXTRAPOLATION
644         uint64_t bulk_duration = task->pkt_tsc_offset[count - 1];
645 #else
646         uint64_t last_pkt_duration = bytes_to_tsc(task, last_pkt_len);
647         uint64_t bulk_duration = task->pkt_tsc_offset[count - 1] + last_pkt_duration;
648 #endif
649
650         return bulk_duration;
651 }
652
653 static uint64_t task_gen_write_latency(struct task_gen *task, uint8_t **pkt_hdr, uint32_t count)
654 {
655         if (!task->lat_enabled)
656                 return 0;
657
658         uint64_t tx_tsc, delta_t;
659         uint64_t tsc_before_tx = 0;
660
661         /* Just before sending the packets, apply the time stamp
662            relative to when the first packet will be sent. The first
663            packet will be sent now. The time is read for each packet
664            to reduce the error towards the actual time the packet will
665            be sent. */
666         uint64_t write_tsc_after, write_tsc_before;
667
668         write_tsc_before = rte_rdtsc();
669
670         /* The time it took previously to write the time stamps in the
671            packets is used as an estimate for how long it will take to
672            write the time stamps now.  The estimated time at which the
673            packets will actually be sent will be at tx_tsc. */
674         tx_tsc = write_tsc_before + task->write_duration_estimate;
675
676         /* The offset delta_t tracks the difference between the actual
677            time and the time written in the packets. Adding the offset
678            to the actual time insures that the time written in the
679            packets is monotonically increasing. At the same time,
680            simply sleeping until delta_t is zero would leave a period
681            of silence on the line. The error has been introduced
682            earlier, but the packets have already been sent. */
683
684         /* This happens typically if previous bulk was delayed
685            by an interrupt e.g.  (with Time in nsec)
686            Time x: sleep 4 microsec
687            Time x+4000: send 64 packets (64 packets as 4000 nsec, w/ 10Gbps 64 bytes)
688            Time x+5000: send 16 packets (16 packets as 1000 nsec)
689            When we send the 16 packets, the 64 ealier packets are not yet
690            fully sent */
691         if (tx_tsc < task->earliest_tsc_next_pkt)
692                 delta_t = task->earliest_tsc_next_pkt - tx_tsc;
693         else
694                 delta_t = 0;
695
696         for (uint16_t i = 0; i < count; ++i) {
697                 uint32_t *pos = (uint32_t *)(pkt_hdr[i] + task->lat_pos);
698                 const uint64_t pkt_tsc = tx_tsc + delta_t + task->pkt_tsc_offset[i];
699                 *pos = pkt_tsc >> LATENCY_ACCURACY;
700         }
701
702         uint64_t bulk_duration = task_gen_calc_bulk_duration(task, count);
703         task->earliest_tsc_next_pkt = tx_tsc + delta_t + bulk_duration;
704         write_tsc_after = rte_rdtsc();
705         task->write_duration_estimate = write_tsc_after - write_tsc_before;
706
707         /* Make sure that the time stamps that were written
708            are valid. The offset must be taken into account */
709         do {
710                 tsc_before_tx = rte_rdtsc();
711         } while (tsc_before_tx < tx_tsc);
712
713         return tsc_before_tx;
714 }
715
716 static void task_gen_store_accuracy(struct task_gen *task, uint32_t count, uint64_t tsc_before_tx)
717 {
718         if (!task->accur_pos)
719                 return;
720
721         uint64_t accur = rte_rdtsc() - tsc_before_tx;
722         uint64_t first_accuracy_idx = task->pkt_queue_index - count;
723
724         for (uint32_t i = 0; i < count; ++i) {
725                 uint32_t accuracy_idx = (first_accuracy_idx + i) & (ACCURACY_WINDOW - 1);
726
727                 task->accur[accuracy_idx] = accur;
728         }
729 }
730
731 static void task_gen_load_and_prefetch(struct rte_mbuf **mbufs, uint8_t **pkt_hdr, uint32_t count)
732 {
733         for (uint16_t i = 0; i < count; ++i)
734                 rte_prefetch0(mbufs[i]);
735         for (uint16_t i = 0; i < count; ++i)
736                 pkt_hdr[i] = rte_pktmbuf_mtod(mbufs[i], uint8_t *);
737         for (uint16_t i = 0; i < count; ++i)
738                 rte_prefetch0(pkt_hdr[i]);
739 }
740
741 static void task_gen_build_packets(struct task_gen *task, struct rte_mbuf **mbufs, uint8_t **pkt_hdr, uint32_t count)
742 {
743         uint64_t will_send_bytes = 0;
744
745         for (uint16_t i = 0; i < count; ++i) {
746                 struct pkt_template *pktpl = &task->pkt_template[task->pkt_idx];
747                 struct pkt_template *pkt_template = &task->pkt_template[task->pkt_idx];
748                 pkt_template_init_mbuf(pkt_template, mbufs[i], pkt_hdr[i]);
749                 prox_rte_ether_hdr *hdr = (prox_rte_ether_hdr *)pkt_hdr[i];
750                 if (task->lat_enabled) {
751 #ifdef NO_EXTRAPOLATION
752                         task->pkt_tsc_offset[i] = 0;
753 #else
754                         task->pkt_tsc_offset[i] = bytes_to_tsc(task, will_send_bytes);
755 #endif
756                         will_send_bytes += pkt_len_to_wire_size(pkt_template->len);
757                 }
758                 task->pkt_idx = task_gen_next_pkt_idx(task, task->pkt_idx);
759         }
760 }
761
762 static int task_gen_allocate_templates(struct task_gen *task, uint32_t orig_nb_pkts, uint32_t nb_pkts, int do_panic, int pcap)
763 {
764         size_t mem_size = nb_pkts * sizeof(*task->pkt_template);
765         size_t orig_mem_size = orig_nb_pkts * sizeof(*task->pkt_template);
766         task->pkt_template = prox_zmalloc(mem_size, task->socket_id);
767         task->pkt_template_orig = prox_zmalloc(orig_mem_size, task->socket_id);
768
769         if (task->pkt_template == NULL || task->pkt_template_orig == NULL) {
770                 plog_err_or_panic(do_panic, "Failed to allocate %lu bytes (in huge pages) for %s\n", mem_size, pcap ? "pcap file":"packet template");
771                 return -1;
772         }
773
774         for (size_t i = 0; i < orig_nb_pkts; i++) {
775                 task->pkt_template_orig[i].buf = prox_zmalloc(task->max_frame_size, task->socket_id);
776                 if (task->pkt_template_orig[i].buf == NULL) {
777                         plog_err_or_panic(do_panic, "Failed to allocate %u bytes (in huge pages) for %s\n", task->max_frame_size, pcap ? "packet from pcap": "packet");
778                         return -1;
779                 }
780         }
781         for (size_t i = 0; i < nb_pkts; i++) {
782                 task->pkt_template[i].buf = prox_zmalloc(task->max_frame_size, task->socket_id);
783                 if (task->pkt_template[i].buf == NULL) {
784                         plog_err_or_panic(do_panic, "Failed to allocate %u bytes (in huge pages) for %s\n", task->max_frame_size, pcap ? "packet from pcap": "packet");
785                         return -1;
786                 }
787         }
788         return 0;
789 }
790
791 static int task_gen_reallocate_templates(struct task_gen *task, uint32_t nb_pkts, int do_panic)
792 {
793         // Need to free up bufs allocated in previous (longer) imix
794         for (size_t i = nb_pkts; i < task->n_pkts; i++) {
795                 if (task->pkt_template[i].buf) {
796                         rte_free(task->pkt_template[i].buf);
797                         task->pkt_template[i].buf = NULL;
798                 }
799         }
800
801         size_t mem_size = nb_pkts * sizeof(*task->pkt_template);
802         size_t old_mem_size = task->n_pkts * sizeof(*task->pkt_template);
803         if (old_mem_size > mem_size)
804                 old_mem_size = mem_size;
805
806         struct pkt_template *ptr;
807
808         // re-allocate memory for new pkt_template (this might allocate additional memory or free up some...)
809         if ((ptr = rte_malloc_socket(NULL, mem_size, RTE_CACHE_LINE_SIZE, task->socket_id)) != NULL) {
810                 memcpy(ptr, task->pkt_template, old_mem_size);
811                 rte_free(task->pkt_template);
812                 task->pkt_template = ptr;
813         } else {
814                 plog_err_or_panic(do_panic, "Failed to allocate %lu bytes (in huge pages) for packet template for IMIX\n", mem_size);
815                 return -1;
816         }
817
818         // Need to allocate bufs for new template but no need to reallocate for existing ones
819         for (size_t i = task->n_pkts; i < nb_pkts; ++i) {
820                 task->pkt_template[i].buf = prox_zmalloc(task->max_frame_size, task->socket_id);
821                 if (task->pkt_template[i].buf == NULL) {
822                         plog_err_or_panic(do_panic, "Failed to allocate %u bytes (in huge pages) for packet %zd in IMIX\n", task->max_frame_size, i);
823                         return -1;
824                 }
825         }
826         return 0;
827 }
828
829 static int check_pkt_size(struct task_gen *task, uint32_t pkt_size, int do_panic)
830 {
831         const uint16_t min_len = sizeof(prox_rte_ether_hdr) + sizeof(prox_rte_ipv4_hdr);
832         const uint16_t max_len = task->max_frame_size;
833
834         if (do_panic) {
835                 PROX_PANIC(pkt_size == 0, "Invalid packet size length (no packet defined?)\n");
836                 PROX_PANIC(pkt_size > max_len, "pkt_size out of range (must be <= %u)\n", max_len);
837                 PROX_PANIC(pkt_size < min_len, "pkt_size out of range (must be >= %u)\n", min_len);
838                 return 0;
839         } else {
840                 if (pkt_size == 0) {
841                         plog_err("Invalid packet size length (no packet defined?)\n");
842                         return -1;
843                 }
844                 if (pkt_size > max_len) {
845                         if (pkt_size >  PROX_RTE_ETHER_MAX_LEN + 2 * PROX_VLAN_TAG_SIZE - 4)
846                                 plog_err("pkt_size too high and jumbo frames disabled\n");
847                         else
848                                 plog_err("pkt_size out of range (must be <= (mtu=%u))\n", max_len);
849                         return -1;
850                 }
851                 if (pkt_size < min_len) {
852                         plog_err("pkt_size out of range (must be >= %u)\n", min_len);
853                         return -1;
854                 }
855                 return 0;
856         }
857 }
858
859 static int check_fields_in_bounds(struct task_gen *task, uint32_t pkt_size, int do_panic)
860 {
861         if (task->lat_enabled) {
862                 uint32_t pos_beg = task->lat_pos;
863                 uint32_t pos_end = task->lat_pos + 3U;
864
865                 if (do_panic)
866                         PROX_PANIC(pkt_size <= pos_end, "Writing latency at %u-%u, but packet size is %u bytes\n",
867                            pos_beg, pos_end, pkt_size);
868                 else if (pkt_size <= pos_end) {
869                         plog_err("Writing latency at %u-%u, but packet size is %u bytes\n", pos_beg, pos_end, pkt_size);
870                         return -1;
871                 }
872         }
873         if (task->packet_id_pos) {
874                 uint32_t pos_beg = task->packet_id_pos;
875                 uint32_t pos_end = task->packet_id_pos + 4U;
876
877                 if (do_panic)
878                         PROX_PANIC(pkt_size <= pos_end, "Writing packet at %u-%u, but packet size is %u bytes\n",
879                            pos_beg, pos_end, pkt_size);
880                 else if (pkt_size <= pos_end) {
881                         plog_err("Writing packet at %u-%u, but packet size is %u bytes\n", pos_beg, pos_end, pkt_size);
882                         return -1;
883                 }
884         }
885         if (task->accur_pos) {
886                 uint32_t pos_beg = task->accur_pos;
887                 uint32_t pos_end = task->accur_pos + 3U;
888
889                 if (do_panic)
890                         PROX_PANIC(pkt_size <= pos_end, "Writing accuracy at %u-%u, but packet size is %u bytes\n",
891                            pos_beg, pos_end, pkt_size);
892                 else if (pkt_size <= pos_end) {
893                         plog_err("Writing accuracy at %u-%u, but packet size is %u bytes\n", pos_beg, pos_end, pkt_size);
894                         return -1;
895                 }
896         }
897         return 0;
898 }
899
900 static int task_gen_set_eth_ip_udp_sizes(struct task_gen *task, uint32_t orig_n_pkts, uint32_t nb_pkt_sizes, uint32_t *pkt_sizes)
901 {
902         size_t k;
903         uint32_t l4_len;
904         prox_rte_ipv4_hdr *ip;
905         struct pkt_template *template;
906
907         for (size_t j = 0; j < nb_pkt_sizes; ++j) {
908                 for (size_t i = 0; i < orig_n_pkts; ++i) {
909                         k = j * orig_n_pkts + i;
910                         template = &task->pkt_template[k];
911                         if (template->l2_len == 0)
912                                 continue;
913                         ip = (prox_rte_ipv4_hdr *)(template->buf + template->l2_len);
914                         ip->total_length = rte_bswap16(pkt_sizes[j] - template->l2_len);
915                         l4_len = pkt_sizes[j] - template->l2_len - template->l3_len;
916                         ip->hdr_checksum = 0;
917                         prox_ip_cksum_sw(ip);
918
919                         if (ip->next_proto_id == IPPROTO_UDP) {
920                                 prox_rte_udp_hdr *udp = (prox_rte_udp_hdr *)(((uint8_t *)ip) + template->l3_len);
921                                 udp->dgram_len = rte_bswap16(l4_len);
922                                 prox_udp_cksum_sw(udp, l4_len, ip->src_addr, ip->dst_addr);
923                         } else if (ip->next_proto_id == IPPROTO_TCP) {
924                                 prox_rte_tcp_hdr *tcp = (prox_rte_tcp_hdr *)(((uint8_t *)ip) + template->l3_len);
925                                 prox_tcp_cksum_sw(tcp, l4_len, ip->src_addr, ip->dst_addr);
926                         }
927                 }
928         }
929         return 0;
930 }
931
932 static int task_gen_apply_imix(struct task_gen *task, int do_panic)
933 {
934         struct pkt_template *ptr;
935         int rc;
936         task->imix_nb_pkts = task->new_imix_nb_pkts;
937         uint32_t n_pkts = task->imix_nb_pkts * task->orig_n_pkts;
938
939         if ((n_pkts != task->n_pkts) && ((rc = task_gen_reallocate_templates(task, n_pkts, do_panic)) < 0))
940                 return rc;
941
942         task->n_pkts = n_pkts;
943         if (task->pkt_idx >= n_pkts)
944                 task->pkt_idx = 0;
945         task_gen_set_pkt_templates_len(task, task->imix_pkt_sizes);
946         task_gen_reset_pkt_templates_content(task);
947         task_gen_pkt_template_recalc_metadata(task);
948         check_all_pkt_size(task, DO_NOT_PANIC);
949         check_all_fields_in_bounds(task, DO_NOT_PANIC);
950         task_gen_set_eth_ip_udp_sizes(task, task->orig_n_pkts, task->imix_nb_pkts, task->imix_pkt_sizes);
951         return 0;
952 }
953
954 static void task_gen_update_config(struct task_gen *task)
955 {
956         if (task->token_time.cfg.bpp != task->new_rate_bps)
957                 task_gen_reset_token_time(task);
958         if (task->new_imix_nb_pkts)
959                 task_gen_apply_imix(task, DO_NOT_PANIC);
960         task->new_imix_nb_pkts = 0;
961 }
962
963 static inline void build_value(struct task_gen *task, uint32_t mask, int bit_pos, uint32_t val, uint32_t fixed_bits)
964 {
965         struct task_base *tbase = (struct task_base *)task;
966         if (bit_pos < 32) {
967                 build_value(task, mask >> 1, bit_pos + 1, val, fixed_bits);
968                 if (mask & 1) {
969                         build_value(task, mask >> 1, bit_pos + 1, val | (1 << bit_pos), fixed_bits);
970                 }
971         } else {
972                 register_ip_to_ctrl_plane(tbase->l3.tmaster, rte_cpu_to_be_32(val | fixed_bits), tbase->l3.reachable_port_id, tbase->l3.core_id, tbase->l3.task_id);
973         }
974 }
975
976 static inline void build_value_ipv6(struct task_gen *task, uint32_t mask, int var_bit_pos, int init_var_bit_pos, struct ipv6_addr val, struct ipv6_addr fixed_bits)
977 {
978         struct task_base *tbase = (struct task_base *)task;
979         if (var_bit_pos < 32) {
980                 build_value_ipv6(task, mask >> 1, var_bit_pos + 1, init_var_bit_pos, val, fixed_bits);
981                 if (mask & 1) {
982                         int byte_pos = (var_bit_pos + init_var_bit_pos) / 8;
983                         int bit_pos = (var_bit_pos + init_var_bit_pos) % 8;
984                         val.bytes[byte_pos] = val.bytes[byte_pos] | (1 << bit_pos);
985                         build_value_ipv6(task, mask >> 1, var_bit_pos + 1, init_var_bit_pos, val, fixed_bits);
986                 }
987         } else {
988                 for (uint i = 0; i < sizeof(struct ipv6_addr) / 8; i++)
989                         val.bytes[i] = val.bytes[i] | fixed_bits.bytes[i];
990                 register_node_to_ctrl_plane(tbase->l3.tmaster, &null_addr, &val, tbase->l3.reachable_port_id, tbase->l3.core_id, tbase->l3.task_id);
991         }
992 }
993
994 static inline void register_all_ip_to_ctrl_plane(struct task_gen *task)
995 {
996         struct task_base *tbase = (struct task_base *)task;
997         int i, len, fixed;
998         unsigned int offset;
999         uint32_t mask, ip_len;
1000         struct ipv6_addr *ip6_src = NULL;
1001         uint32_t *ip_src;
1002
1003         for (uint32_t i = 0; i < task->n_pkts; ++i) {
1004                 struct pkt_template *pktpl = &task->pkt_template[i];
1005                 unsigned int ip_src_pos = 0;
1006                 int ipv4 = 0;
1007                 unsigned int l2_len = sizeof(prox_rte_ether_hdr);
1008
1009                 uint8_t *pkt = pktpl->buf;
1010                 prox_rte_ether_hdr *eth_hdr = (prox_rte_ether_hdr*)pkt;
1011                 uint16_t ether_type = eth_hdr->ether_type;
1012                 prox_rte_vlan_hdr *vlan_hdr;
1013                 prox_rte_ipv4_hdr *ip;
1014
1015                 // Unstack VLAN tags
1016                 while (((ether_type == ETYPE_8021ad) || (ether_type == ETYPE_VLAN)) && (l2_len + sizeof(prox_rte_vlan_hdr) < pktpl->len)) {
1017                         vlan_hdr = (prox_rte_vlan_hdr *)(pkt + l2_len);
1018                         l2_len +=4;
1019                         ether_type = vlan_hdr->eth_proto;
1020                 }
1021                 if ((ether_type == ETYPE_MPLSU) || (ether_type == ETYPE_MPLSM)) {
1022                         l2_len +=4;
1023                         ip = (prox_rte_ipv4_hdr *)(pkt + l2_len);
1024                         if (ip->version_ihl >> 4 == 4)
1025                                 ipv4 = 1;
1026                         else if (ip->version_ihl >> 4 != 6)     // Version field at same location for IPv4 and IPv6
1027                                 continue;
1028                 } else if (ether_type == ETYPE_IPv4) {
1029                         ip = (prox_rte_ipv4_hdr *)(pkt + l2_len);
1030                         PROX_PANIC(ip->version_ihl >> 4 != 4, "IPv4 ether_type but IP version = %d != 4", ip->version_ihl >> 4);        // Invalid Packet
1031                         ipv4 = 1;
1032                 } else if (ether_type == ETYPE_IPv6) {
1033                         ip = (prox_rte_ipv4_hdr *)(pkt + l2_len);
1034                         PROX_PANIC(ip->version_ihl >> 4 != 6, "IPv6 ether_type but IP version = %d != 6", ip->version_ihl >> 4);        // Invalid Packet
1035                 } else {
1036                         continue;
1037                 }
1038
1039                 PROX_PANIC(ipv4 && ((prox_cfg.flags & DSF_L3_ENABLED) == 0), "Trying to generate an IPv4 packet in NDP mode => not supported\n");
1040                 PROX_PANIC((ipv4 == 0) && ((prox_cfg.flags & DSF_NDP_ENABLED) == 0), "Trying to generate an IPv6 packet in L3 (IPv4) mode => not supported\n");
1041                 if (ipv4) {
1042                         // Even if IPv4 header contains options, options are after ip src and dst
1043                         ip_src_pos = l2_len + sizeof(prox_rte_ipv4_hdr) - 2 * sizeof(uint32_t);
1044                         ip_src = ((uint32_t *)(pktpl->buf + ip_src_pos));
1045                         plog_info("\tip_src_pos = %d, ip_src = %x\n", ip_src_pos, *ip_src);
1046                         register_ip_to_ctrl_plane(tbase->l3.tmaster, *ip_src, tbase->l3.reachable_port_id, tbase->l3.core_id, tbase->l3.task_id);
1047                         ip_len = sizeof(uint32_t);
1048                 } else {
1049                         ip_src_pos = l2_len + sizeof(prox_rte_ipv6_hdr) - 2 * sizeof(struct ipv6_addr);
1050                         ip6_src = ((struct ipv6_addr *)(pktpl->buf + ip_src_pos));
1051                         plog_info("\tip_src_pos = %d, ip6_src = "IPv6_BYTES_FMT"\n", ip_src_pos, IPv6_BYTES(ip6_src->bytes));
1052                         register_node_to_ctrl_plane(tbase->l3.tmaster, ip6_src, &null_addr, tbase->l3.reachable_port_id, tbase->l3.core_id, tbase->l3.task_id);
1053                         ip_len = sizeof(struct ipv6_addr);
1054                 }
1055
1056                 for (int j = 0; j < task->n_rands; j++) {
1057                         offset = task->rand[j].rand_offset;
1058                         len = task->rand[j].rand_len;
1059                         mask = task->rand[j].rand_mask;
1060                         fixed = task->rand[j].fixed_bits;
1061                         plog_info("offset = %d, len = %d, mask = %x, fixed = %x\n", offset, len, mask, fixed);
1062                         if (offset >= ip_src_pos + ip_len)      // First random bit after IP
1063                                 continue;
1064                         if (offset + len < ip_src_pos)          // Last random bit before IP
1065                                 continue;
1066
1067                         if (ipv4) {
1068                                 if (offset >= ip_src_pos) {
1069                                         int32_t ip_src_mask = (1 << (4 + ip_src_pos - offset) * 8) - 1;
1070                                         mask = mask & ip_src_mask;
1071                                         fixed = (fixed & ip_src_mask) | (rte_be_to_cpu_32(*ip_src) & ~ip_src_mask);
1072                                         build_value(task, mask, 0, 0, fixed);
1073                                 } else {
1074                                         int32_t bits = ((ip_src_pos + 4 - offset - len) * 8);
1075                                         mask = mask << bits;
1076                                         fixed = (fixed << bits) | (rte_be_to_cpu_32(*ip_src) & ((1 << bits) - 1));
1077                                         build_value(task, mask, 0, 0, fixed);
1078                                 }
1079                         } else {
1080                                 // We do not support when random partially covers IP - either starting before or finishing after
1081                                 if (offset + len >= ip_src_pos + ip_len) { // len over the ip
1082                                         plog_err("Not supported: random_offset = %d, random_len = %d, ip_src_pos = %d, ip_len = %d\n", offset, len, ip_src_pos, ip_len);
1083                                         continue;
1084                                 }
1085                                 if (offset < ip_src_pos) {
1086                                         plog_err("Not supported: random_offset = %d, random_len = %d, ip_src_pos = %d, ip_len = %d\n", offset, len, ip_src_pos, ip_len);
1087                                         continue;
1088                                 }
1089                                 // Even for IPv6 the random mask supported by PROX are 32 bits only
1090                                 struct ipv6_addr fixed_ipv6;
1091                                 uint init_var_byte_pos = (offset - ip_src_pos);
1092                                 for (uint i = 0; i < sizeof(struct ipv6_addr); i++) {
1093                                         if (i < init_var_byte_pos)
1094                                                 fixed_ipv6.bytes[i] = ip6_src->bytes[i];
1095                                         else if (i < init_var_byte_pos + len)
1096                                                 fixed_ipv6.bytes[i] = (fixed >> (i - init_var_byte_pos)) & 0xFF;
1097                                         else
1098                                                 fixed_ipv6.bytes[i] = ip6_src->bytes[i];
1099                                 }
1100                                 build_value_ipv6(task, mask, 0, init_var_byte_pos * 8, null_addr, fixed_ipv6);
1101                         }
1102                 }
1103         }
1104 }
1105
1106 static int handle_gen_bulk(struct task_base *tbase, struct rte_mbuf **mbufs, uint16_t n_pkts)
1107 {
1108         struct task_gen *task = (struct task_gen *)tbase;
1109         uint8_t out[MAX_PKT_BURST] = {0};
1110         int ret;
1111
1112         int i, j;
1113
1114         task_gen_update_config(task);
1115
1116         if (task->pkt_count == 0) {
1117                 task_gen_reset_token_time(task);
1118                 return 0;
1119         }
1120         if (!task->token_time.cfg.bpp)
1121                 return 0;
1122
1123         token_time_update(&task->token_time, rte_rdtsc());
1124
1125         uint32_t would_send_bytes;
1126         uint32_t send_bulk = task_gen_calc_send_bulk(task, &would_send_bytes);
1127
1128         if (send_bulk == 0)
1129                 return 0;
1130         task_gen_take_count(task, send_bulk);
1131         task_gen_consume_tokens(task, would_send_bytes, send_bulk);
1132
1133         struct rte_mbuf **new_pkts = local_mbuf_refill_and_take(&task->local_mbuf, send_bulk);
1134         if (new_pkts == NULL)
1135                 return 0;
1136         uint8_t *pkt_hdr[MAX_RING_BURST];
1137         int32_t flow_id[MAX_RING_BURST];
1138         task_gen_load_and_prefetch(new_pkts, pkt_hdr, send_bulk);
1139         task_gen_build_packets(task, new_pkts, pkt_hdr, send_bulk);
1140         task_gen_apply_all_random_fields(task, pkt_hdr, send_bulk);
1141         task_gen_apply_all_ranges(task, pkt_hdr, send_bulk);
1142         task_gen_apply_all_accur_pos(task, pkt_hdr, send_bulk);
1143         task_gen_apply_all_flow_id(task, pkt_hdr, send_bulk, flow_id);
1144         task_gen_apply_all_unique_id(task, pkt_hdr, send_bulk);
1145         task_gen_apply_all_id_in_flows(task, pkt_hdr, send_bulk, flow_id);
1146
1147         uint64_t tsc_before_tx;
1148
1149         tsc_before_tx = task_gen_write_latency(task, pkt_hdr, send_bulk);
1150         task_gen_checksum_packets(task, new_pkts, pkt_hdr, send_bulk);
1151         if (task->store_msk) {
1152                 for (uint32_t i = 0; i < send_bulk; i++) {
1153                         if (out[i] != OUT_DISCARD) {
1154                                 uint8_t *hdr;
1155                                 hdr = (uint8_t *)rte_pktmbuf_mtod(new_pkts[i], prox_rte_ether_hdr *);
1156                                 memcpy(&task->store_buf[task->store_pkt_id & task->store_msk].buf, hdr, rte_pktmbuf_pkt_len(new_pkts[i]));
1157                                 task->store_buf[task->store_pkt_id & task->store_msk].len = rte_pktmbuf_pkt_len(new_pkts[i]);
1158                                 task->store_pkt_id++;
1159                         }
1160                 }
1161         }
1162         ret = task->base.tx_pkt(&task->base, new_pkts, send_bulk, out);
1163         task_gen_store_accuracy(task, send_bulk, tsc_before_tx);
1164
1165         // If we failed to send some packets, we need to do some clean-up:
1166
1167         if (unlikely(ret)) {
1168                 // We need re-use the packets indexes not being sent
1169                 // Hence non-sent packets will not be considered as lost by the receiver when it looks at
1170                 // packet ids. This should also increase the percentage of packets used for latency measurements
1171                 task->pkt_queue_index -= ret;
1172
1173                 // In case of failures, the estimate about when we can send next packet (earliest_tsc_next_pkt) is wrong
1174                 // This would result in under-estimated latency (up to 0 or negative)
1175                 uint64_t bulk_duration = task_gen_calc_bulk_duration(task, ret);
1176                 task->earliest_tsc_next_pkt -= bulk_duration;
1177         }
1178         return ret;
1179 }
1180
1181 static void init_task_gen_seeds(struct task_gen *task)
1182 {
1183         for (size_t i = 0; i < sizeof(task->rand)/sizeof(task->rand[0]); ++i)
1184                 random_init_seed(&task->rand[i].state);
1185 }
1186
1187 static uint32_t pcap_count_pkts(pcap_t *handle, uint32_t *max_frame_size)
1188 {
1189         struct pcap_pkthdr header;
1190         const uint8_t *buf;
1191         uint32_t ret = 0;
1192         *max_frame_size = 0;
1193         long pkt1_fpos = ftell(pcap_file(handle));
1194
1195         while ((buf = pcap_next(handle, &header))) {
1196                 if (header.len > *max_frame_size)
1197                         *max_frame_size = header.len;
1198                 ret++;
1199         }
1200         int ret2 = fseek(pcap_file(handle), pkt1_fpos, SEEK_SET);
1201         PROX_PANIC(ret2 != 0, "Failed to reset reading pcap file\n");
1202         return ret;
1203 }
1204
1205 static uint64_t avg_time_stamp(uint64_t *time_stamp, uint32_t n)
1206 {
1207         uint64_t tot_inter_pkt = 0;
1208
1209         for (uint32_t i = 0; i < n; ++i)
1210                 tot_inter_pkt += time_stamp[i];
1211         return (tot_inter_pkt + n / 2)/n;
1212 }
1213
1214 static int pcap_read_pkts(pcap_t *handle, const char *file_name, uint32_t n_pkts, struct pkt_template *proto, uint64_t *time_stamp, uint32_t max_frame_size)
1215 {
1216         struct pcap_pkthdr header;
1217         const uint8_t *buf;
1218         size_t len;
1219
1220         for (uint32_t i = 0; i < n_pkts; ++i) {
1221                 buf = pcap_next(handle, &header);
1222
1223                 PROX_PANIC(buf == NULL, "Failed to read packet %d from pcap %s\n", i, file_name);
1224                 proto[i].len = header.len;
1225                 len = RTE_MIN(header.len, max_frame_size);
1226                 if (header.len > len)
1227                         plogx_warn("Packet truncated from %u to %zu bytes\n", header.len, len);
1228
1229                 if (time_stamp) {
1230                         static struct timeval beg;
1231                         struct timeval tv;
1232
1233                         if (i == 0)
1234                                 beg = header.ts;
1235
1236                         tv = tv_diff(&beg, &header.ts);
1237                         tv_to_tsc(&tv, time_stamp + i);
1238                 }
1239                 rte_memcpy(proto[i].buf, buf, len);
1240         }
1241
1242         if (time_stamp && n_pkts) {
1243                 for (uint32_t i = n_pkts - 1; i > 0; --i)
1244                         time_stamp[i] -= time_stamp[i - 1];
1245                 /* Since the handle function will loop the packets,
1246                    there is one time-stamp that is not provided by the
1247                    pcap file. This is the time between the last and
1248                    the first packet. This implementation takes the
1249                    average of the inter-packet times here. */
1250                 if (n_pkts > 1)
1251                         time_stamp[0] = avg_time_stamp(time_stamp + 1, n_pkts - 1);
1252         }
1253
1254         return 0;
1255 }
1256
1257 static int check_all_pkt_size(struct task_gen *task, int do_panic)
1258 {
1259         int rc;
1260         for (uint32_t i = 0; i < task->n_pkts;++i) {
1261                 if ((rc = check_pkt_size(task, task->pkt_template[i].len, do_panic)) != 0)
1262                         return rc;
1263         }
1264         return 0;
1265 }
1266
1267 static int check_all_fields_in_bounds(struct task_gen *task, int do_panic)
1268 {
1269         int rc;
1270         for (uint32_t i = 0; i < task->n_pkts;++i) {
1271                 if ((rc = check_fields_in_bounds(task, task->pkt_template[i].len, do_panic)) != 0)
1272                         return rc;
1273         }
1274         return 0;
1275 }
1276
1277 static void task_gen_pkt_template_recalc_metadata(struct task_gen *task)
1278 {
1279         struct pkt_template *template;
1280
1281         for (size_t i = 0; i < task->n_pkts; ++i) {
1282                 template = &task->pkt_template[i];
1283                 parse_l2_l3_len(template->buf, &template->l2_len, &template->l3_len, template->len);
1284         }
1285 }
1286
1287 static void task_gen_pkt_template_recalc_checksum(struct task_gen *task)
1288 {
1289         struct pkt_template *template;
1290         prox_rte_ipv4_hdr *ip;
1291
1292         task->runtime_checksum_needed = 0;
1293         for (size_t i = 0; i < task->n_pkts; ++i) {
1294                 template = &task->pkt_template[i];
1295                 if (template->l2_len == 0)
1296                         continue;
1297                 ip = (prox_rte_ipv4_hdr *)(template->buf + template->l2_len);
1298                 if (ip->version_ihl >> 4 == 4) {
1299                         ip->hdr_checksum = 0;
1300                         prox_ip_cksum_sw(ip);
1301                         uint32_t l4_len = rte_bswap16(ip->total_length) - template->l3_len;
1302                         if (ip->next_proto_id == IPPROTO_UDP) {
1303                                 prox_rte_udp_hdr *udp = (prox_rte_udp_hdr *)(((uint8_t *)ip) + template->l3_len);
1304                                 prox_udp_cksum_sw(udp, l4_len, ip->src_addr, ip->dst_addr);
1305                         } else if (ip->next_proto_id == IPPROTO_TCP) {
1306                                 prox_rte_tcp_hdr *tcp = (prox_rte_tcp_hdr *)(((uint8_t *)ip) + template->l3_len);
1307                                 prox_tcp_cksum_sw(tcp, l4_len, ip->src_addr, ip->dst_addr);
1308                         }
1309                 } else if (ip->version_ihl >> 4 == 6) {
1310                         prox_rte_ipv6_hdr *ip6;
1311                         ip6 = (prox_rte_ipv6_hdr *)(template->buf + template->l2_len);
1312                         if (ip6->proto == IPPROTO_UDP) {
1313                                 prox_rte_udp_hdr *udp = (prox_rte_udp_hdr *)(ip6 + 1);
1314                                 udp->dgram_cksum = 0;
1315                                 udp->dgram_cksum = rte_ipv6_udptcp_cksum(ip6, udp);
1316                         } else if (ip6->proto == IPPROTO_TCP) {
1317                                 prox_rte_tcp_hdr *tcp = (prox_rte_tcp_hdr *)(ip6 + 1);
1318                                 tcp->cksum = 0;
1319                                 tcp->cksum = rte_ipv6_udptcp_cksum(ip6, tcp);
1320                         }
1321                 }
1322
1323                 /* The current implementation avoids checksum
1324                    calculation by determining that at packet
1325                    construction time, no fields are applied that would
1326                    require a recalculation of the checksum. */
1327                 if (task->lat_enabled && task->lat_pos > template->l2_len)
1328                         task->runtime_checksum_needed = 1;
1329                 if (task->accur_pos > template->l2_len)
1330                         task->runtime_checksum_needed = 1;
1331                 if (task->packet_id_pos > template->l2_len)
1332                         task->runtime_checksum_needed = 1;
1333         }
1334 }
1335
1336 static void task_gen_pkt_template_recalc_all(struct task_gen *task)
1337 {
1338         task_gen_pkt_template_recalc_metadata(task);
1339         task_gen_pkt_template_recalc_checksum(task);
1340 }
1341
1342 static void task_gen_set_pkt_templates_len(struct task_gen *task, uint32_t *pkt_sizes)
1343 {
1344         struct pkt_template *src, *dst;
1345
1346         for (size_t j = 0; j < task->n_pkts / task->orig_n_pkts; ++j) {
1347                 for (size_t i = 0; i < task->orig_n_pkts; ++i) {
1348                         dst = &task->pkt_template[j * task->orig_n_pkts + i];
1349                         dst->len = pkt_sizes[j];
1350                 }
1351         }
1352 }
1353
1354 static void task_gen_reset_pkt_templates_len(struct task_gen *task)
1355 {
1356         struct pkt_template *src, *dst;
1357
1358         for (size_t j = 0; j < task->n_pkts / task->orig_n_pkts; ++j) {
1359                 for (size_t i = 0; i < task->orig_n_pkts; ++i) {
1360                         src = &task->pkt_template_orig[i];
1361                         dst = &task->pkt_template[j * task->orig_n_pkts + i];
1362                         dst->len = src->len;
1363                 }
1364         }
1365 }
1366
1367 static void task_gen_reset_pkt_templates_content(struct task_gen *task)
1368 {
1369         struct pkt_template *src, *dst;
1370
1371         for (size_t j = 0; j < task->n_pkts / task->orig_n_pkts; ++j) {
1372                 for (size_t i = 0; i < task->orig_n_pkts; ++i) {
1373                         src = &task->pkt_template_orig[i];
1374                         dst = &task->pkt_template[j * task->orig_n_pkts + i];
1375                         memcpy(dst->buf, src->buf, RTE_MAX(src->len, dst->len));
1376                         if (task->flags & TASK_OVERWRITE_SRC_MAC_WITH_PORT_MAC) {
1377                                 rte_memcpy(&dst->buf[sizeof(prox_rte_ether_addr)], &task->src_mac, sizeof(prox_rte_ether_addr));
1378                         }
1379                         task_gen_apply_sig(task, dst);
1380                 }
1381         }
1382 }
1383
1384 static void task_gen_reset_pkt_templates(struct task_gen *task)
1385 {
1386         if (task->imix_nb_pkts)
1387                 task_gen_set_pkt_templates_len(task, task->imix_pkt_sizes);
1388         else
1389                 task_gen_reset_pkt_templates_len(task);
1390         task_gen_reset_pkt_templates_content(task);
1391         task_gen_pkt_template_recalc_all(task);
1392 }
1393
1394 static void task_init_gen_load_pkt_inline(struct task_gen *task, struct task_args *targ)
1395 {
1396         int rc;
1397
1398         task->orig_n_pkts = 1;
1399         if (task->imix_nb_pkts == 0) {
1400                 task->n_pkts = 1;
1401                 task->imix_pkt_sizes[0] = targ->pkt_size;
1402         } else {
1403                 task->n_pkts = task->imix_nb_pkts;
1404         }
1405         task_gen_allocate_templates(task, task->orig_n_pkts, task->n_pkts, DO_PANIC, NOT_FROM_PCAP);
1406
1407         rte_memcpy(task->pkt_template_orig[0].buf, targ->pkt_inline, task->max_frame_size);
1408         task->pkt_template_orig[0].len = task->imix_pkt_sizes[0];
1409         task_gen_reset_pkt_templates(task);
1410         check_all_pkt_size(task, DO_PANIC);
1411         check_all_fields_in_bounds(task, DO_PANIC);
1412
1413         // If IMIX was not specified then pkt_size is specified using pkt_size parameter or the length of pkt_inline
1414         // In that case, for backward compatibility, we do NOT adapt the length of IP and UDP to the length of the packet
1415         task_gen_set_eth_ip_udp_sizes(task, task->orig_n_pkts, task->imix_nb_pkts, task->imix_pkt_sizes);
1416 }
1417
1418 static void task_init_gen_load_pcap(struct task_gen *task, struct task_args *targ)
1419 {
1420         char err[PCAP_ERRBUF_SIZE];
1421         uint32_t max_frame_size;
1422         pcap_t *handle = pcap_open_offline(targ->pcap_file, err);
1423         PROX_PANIC(handle == NULL, "Failed to open PCAP file: %s\n", err);
1424
1425         task->orig_n_pkts = pcap_count_pkts(handle, &max_frame_size);
1426         plogx_info("%u packets in pcap file '%s'; max frame size=%d\n", task->orig_n_pkts, targ->pcap_file, max_frame_size);
1427         PROX_PANIC(max_frame_size > task->max_frame_size,
1428                 max_frame_size > PROX_RTE_ETHER_MAX_LEN + 2 * PROX_VLAN_TAG_SIZE -4 ?
1429                         "pkt_size too high and jumbo frames disabled" : "pkt_size > mtu");
1430
1431         if (targ->n_pkts)
1432                 task->orig_n_pkts = RTE_MIN(task->orig_n_pkts, targ->n_pkts);
1433         if (task->imix_nb_pkts == 0) {
1434                 task->n_pkts = task->orig_n_pkts;
1435         } else {
1436                 task->n_pkts = task->imix_nb_pkts * task->orig_n_pkts;
1437         }
1438         task_gen_allocate_templates(task, task->orig_n_pkts, task->n_pkts, DO_PANIC, FROM_PCAP);
1439         plogx_info("Loading %u packets from pcap\n", task->n_pkts);
1440
1441         pcap_read_pkts(handle, targ->pcap_file, task->orig_n_pkts, task->pkt_template_orig, NULL, max_frame_size);
1442         pcap_close(handle);
1443         task_gen_reset_pkt_templates(task);
1444         check_all_pkt_size(task, DO_PANIC);
1445         check_all_fields_in_bounds(task, DO_PANIC);
1446         task_gen_set_eth_ip_udp_sizes(task, task->orig_n_pkts, task->imix_nb_pkts, task->imix_pkt_sizes);
1447 }
1448
1449 static struct rte_mempool *task_gen_create_mempool(struct task_args *targ, uint16_t max_frame_size)
1450 {
1451         static char name[] = "gen_pool";
1452         struct rte_mempool *ret;
1453         const int sock_id = rte_lcore_to_socket_id(targ->lconf->id);
1454
1455         name[0]++;
1456         uint32_t mbuf_size = TX_MBUF_SIZE;
1457         if (max_frame_size + (unsigned)sizeof(struct rte_mbuf) + RTE_PKTMBUF_HEADROOM > mbuf_size)
1458                 mbuf_size = max_frame_size + (unsigned)sizeof(struct rte_mbuf) + RTE_PKTMBUF_HEADROOM;
1459         plog_info("\t\tCreating mempool with name '%s'\n", name);
1460         ret = rte_mempool_create(name, targ->nb_mbuf - 1, mbuf_size,
1461                                  targ->nb_cache_mbuf, sizeof(struct rte_pktmbuf_pool_private),
1462                                  rte_pktmbuf_pool_init, NULL, rte_pktmbuf_init, 0,
1463                                  sock_id, 0);
1464         PROX_PANIC(ret == NULL, "Failed to allocate dummy memory pool on socket %u with %u elements\n",
1465                    sock_id, targ->nb_mbuf - 1);
1466
1467         plog_info("\t\tMempool %p size = %u * %u cache %u, socket %d\n", ret,
1468                 targ->nb_mbuf - 1, mbuf_size, targ->nb_cache_mbuf, sock_id);
1469
1470         return ret;
1471 }
1472
1473 void task_gen_set_pkt_count(struct task_base *tbase, uint32_t count)
1474 {
1475         struct task_gen *task = (struct task_gen *)tbase;
1476
1477         task->pkt_count = count;
1478 }
1479
1480 int task_gen_set_pkt_size(struct task_base *tbase, uint32_t pkt_size)
1481 {
1482         struct task_gen *task = (struct task_gen *)tbase;
1483         int rc;
1484
1485         for (size_t i = 0; i < task->n_pkts; ++i) {
1486                 if ((rc = check_pkt_size(task, pkt_size, 0)) != 0)
1487                         return rc;
1488                 if ((rc = check_fields_in_bounds(task, pkt_size, 0)) != 0)
1489                         return rc;
1490         }
1491         for (size_t i = 0; i < task->n_pkts; ++i) {
1492                 task->pkt_template[i].len = pkt_size;
1493         }
1494         return 0;
1495 }
1496
1497 int task_gen_set_imix(struct task_base *tbase, uint32_t nb_pkt_sizes, uint32_t *pkt_sizes)
1498 {
1499         struct task_gen *task = (struct task_gen *)tbase;
1500         int rc;
1501
1502         memcpy(task->imix_pkt_sizes, pkt_sizes, nb_pkt_sizes * sizeof(uint32_t));
1503         for (size_t i = 0; i < nb_pkt_sizes; ++i) {
1504                 if ((rc = check_pkt_size(task, pkt_sizes[i], DO_NOT_PANIC)) != 0)
1505                         return rc;
1506                 if ((rc = check_fields_in_bounds(task, pkt_sizes[i], DO_NOT_PANIC)) != 0)
1507                         return rc;
1508         }
1509         // only set new_imix_nb_pkts if checks of pkt sizes succeeded
1510         task->new_imix_nb_pkts = nb_pkt_sizes;
1511         return 0;
1512 }
1513
1514 void task_gen_set_rate(struct task_base *tbase, uint64_t bps)
1515 {
1516         struct task_gen *task = (struct task_gen *)tbase;
1517
1518         task->new_rate_bps = bps;
1519 }
1520
1521 void task_gen_reset_randoms(struct task_base *tbase)
1522 {
1523         struct task_gen *task = (struct task_gen *)tbase;
1524
1525         for (uint32_t i = 0; i < task->n_rands; ++i) {
1526                 task->rand[i].rand_mask = 0;
1527                 task->rand[i].fixed_bits = 0;
1528                 task->rand[i].rand_offset = 0;
1529         }
1530         task->n_rands = 0;
1531 }
1532
1533 void task_gen_reset_ranges(struct task_base *tbase)
1534 {
1535         struct task_gen *task = (struct task_gen *)tbase;
1536
1537         memset(task->ranges, 0, task->n_ranges * sizeof(struct range));
1538         task->n_ranges = 0;
1539 }
1540
1541 int task_gen_set_value(struct task_base *tbase, uint32_t value, uint32_t offset, uint32_t len)
1542 {
1543         struct task_gen *task = (struct task_gen *)tbase;
1544
1545         if (offset + len > task->max_frame_size)
1546                 return -1;
1547         for (size_t i = 0; i < task->n_pkts; ++i) {
1548                 uint32_t to_write = rte_cpu_to_be_32(value) >> ((4 - len) * 8);
1549                 uint8_t *dst = task->pkt_template[i].buf;
1550
1551                 rte_memcpy(dst + offset, &to_write, len);
1552         }
1553
1554         task_gen_pkt_template_recalc_all(task);
1555
1556         return 0;
1557 }
1558
1559 void task_gen_reset_values(struct task_base *tbase)
1560 {
1561         struct task_gen *task = (struct task_gen *)tbase;
1562
1563         task_gen_reset_pkt_templates_content(task);
1564         task_gen_pkt_template_recalc_metadata(task);
1565         check_all_pkt_size(task, DO_NOT_PANIC);
1566         check_all_fields_in_bounds(task, DO_NOT_PANIC);
1567         task_gen_set_eth_ip_udp_sizes(task, task->orig_n_pkts, task->imix_nb_pkts, task->imix_pkt_sizes);
1568
1569         if (task->flags & TASK_OVERWRITE_SRC_MAC_WITH_PORT_MAC) {
1570                 for (uint32_t i = 0; i < task->n_pkts; ++i) {
1571                         rte_memcpy(&task->pkt_template[i].buf[sizeof(prox_rte_ether_addr)], &task->src_mac, sizeof(prox_rte_ether_addr));
1572                 }
1573         }
1574 }
1575
1576 uint32_t task_gen_get_n_randoms(struct task_base *tbase)
1577 {
1578         struct task_gen *task = (struct task_gen *)tbase;
1579
1580         return task->n_rands;
1581 }
1582
1583 uint32_t task_gen_get_n_ranges(struct task_base *tbase)
1584 {
1585         struct task_gen *task = (struct task_gen *)tbase;
1586
1587         return task->n_ranges;
1588 }
1589
1590 static void init_task_gen_pcap(struct task_base *tbase, struct task_args *targ)
1591 {
1592         struct task_gen_pcap *task = (struct task_gen_pcap *)tbase;
1593         task->socket_id = rte_lcore_to_socket_id(targ->lconf->id);
1594         uint32_t max_frame_size;
1595
1596         task->loop = targ->loop;
1597         task->pkt_idx = 0;
1598         task->hz = rte_get_tsc_hz();
1599
1600         char err[PCAP_ERRBUF_SIZE];
1601         pcap_t *handle = pcap_open_offline(targ->pcap_file, err);
1602         PROX_PANIC(handle == NULL, "Failed to open PCAP file: %s\n", err);
1603
1604         task->n_pkts = pcap_count_pkts(handle, &max_frame_size);
1605         plogx_info("%u packets in pcap file '%s'\n", task->n_pkts, targ->pcap_file);
1606
1607         task->local_mbuf.mempool = task_gen_create_mempool(targ, max_frame_size);
1608
1609         PROX_PANIC(!strcmp(targ->pcap_file, ""), "No pcap file defined\n");
1610
1611         if (targ->n_pkts) {
1612                 plogx_info("Configured to load %u packets\n", targ->n_pkts);
1613                 if (task->n_pkts > targ->n_pkts)
1614                         task->n_pkts = targ->n_pkts;
1615         }
1616         plogx_info("Loading %u packets from pcap\n", task->n_pkts);
1617
1618         size_t mem_size = task->n_pkts * (sizeof(*task->proto) + sizeof(*task->proto_tsc));
1619         uint8_t *mem = prox_zmalloc(mem_size, task->socket_id);
1620
1621         PROX_PANIC(mem == NULL, "Failed to allocate %lu bytes (in huge pages) for pcap file\n", mem_size);
1622         task->proto = (struct pkt_template *) mem;
1623         task->proto_tsc = (uint64_t *)(mem + task->n_pkts * sizeof(*task->proto));
1624
1625         for (uint i = 0; i < targ->n_pkts; i++) {
1626                 task->proto[i].buf = prox_zmalloc(max_frame_size, task->socket_id);
1627                 PROX_PANIC(task->proto[i].buf == NULL, "Failed to allocate %u bytes (in huge pages) for pcap file\n", max_frame_size);
1628         }
1629
1630         pcap_read_pkts(handle, targ->pcap_file, task->n_pkts, task->proto, task->proto_tsc, max_frame_size);
1631         pcap_close(handle);
1632 }
1633
1634 static int task_gen_find_random_with_offset(struct task_gen *task, uint32_t offset)
1635 {
1636         for (uint32_t i = 0; i < task->n_rands; ++i) {
1637                 if (task->rand[i].rand_offset == offset) {
1638                         return i;
1639                 }
1640         }
1641
1642         return UINT32_MAX;
1643 }
1644
1645 int task_gen_add_range(struct task_base *tbase, struct range *range)
1646 {
1647         struct task_gen *task = (struct task_gen *)tbase;
1648         if (task->n_ranges == MAX_RANGES) {
1649                 plog_err("Too many ranges\n");
1650                 return -1;
1651         }
1652         task->ranges[task->n_ranges].min = range->min;
1653         task->ranges[task->n_ranges].value = range->min;
1654         uint32_t m = range->max;
1655         task->ranges[task->n_ranges].range_len = 0;
1656         while (m != 0) {
1657                 m >>= 8;
1658                 task->ranges[task->n_ranges].range_len++;
1659         }
1660         task->ranges[task->n_ranges].offset = range->offset;
1661         task->ranges[task->n_ranges++].max = range->max;
1662         return 0;
1663 }
1664
1665 int task_gen_add_rand(struct task_base *tbase, const char *rand_str, uint32_t offset, uint32_t rand_id)
1666 {
1667         struct task_gen *task = (struct task_gen *)tbase;
1668         uint32_t existing_rand;
1669
1670         if (rand_id == UINT32_MAX && task->n_rands == 64) {
1671                 plog_err("Too many randoms\n");
1672                 return -1;
1673         }
1674         uint32_t mask, fixed, len;
1675
1676         if (parse_random_str(&mask, &fixed, &len, rand_str)) {
1677                 plog_err("%s\n", get_parse_err());
1678                 return -1;
1679         }
1680         task->runtime_checksum_needed = 1;
1681
1682         existing_rand = task_gen_find_random_with_offset(task, offset);
1683         if (existing_rand != UINT32_MAX) {
1684                 plog_warn("Random at offset %d already set => overwriting len = %d %s\n", offset, len, rand_str);
1685                 rand_id = existing_rand;
1686                 task->rand[rand_id].rand_len = len;
1687                 task->rand[rand_id].rand_offset = offset;
1688                 task->rand[rand_id].rand_mask = mask;
1689                 task->rand[rand_id].fixed_bits = fixed;
1690                 return 0;
1691         }
1692
1693         task->rand[task->n_rands].rand_len = len;
1694         task->rand[task->n_rands].rand_offset = offset;
1695         task->rand[task->n_rands].rand_mask = mask;
1696         task->rand[task->n_rands].fixed_bits = fixed;
1697
1698         task->n_rands++;
1699         return 0;
1700 }
1701
1702 static void start(struct task_base *tbase)
1703 {
1704         struct task_gen *task = (struct task_gen *)tbase;
1705         task->pkt_queue_index = 0;
1706
1707         task_gen_reset_token_time(task);
1708         if (tbase->l3.tmaster) {
1709                 register_all_ip_to_ctrl_plane(task);
1710         }
1711
1712         /* TODO
1713            Handle the case when two tasks transmit to the same port
1714            and one of them is stopped. In that case ARP (requests or replies)
1715            might not be sent. Master will have to keep a list of rings.
1716            stop will have to de-register IP from ctrl plane.
1717            un-registration will remove the ring. when having more than
1718            one active rings, master can always use the first one
1719         */
1720 }
1721
1722 static void stop_gen(struct task_base *tbase)
1723 {
1724         uint32_t i, j;
1725         struct task_gen *task = (struct task_gen *)tbase;
1726         if (task->store_msk) {
1727                 for (i = task->store_pkt_id & task->store_msk; i < task->store_msk + 1; i++) {
1728                         if (task->store_buf[i].len) {
1729                                 fprintf(task->fp, "%06d: ", i);
1730                                 for (j = 0; j < task->store_buf[i].len; j++) {
1731                                         fprintf(task->fp, "%02x ", task->store_buf[i].buf[j]);
1732                                 }
1733                                 fprintf(task->fp, "\n");
1734                         }
1735                 }
1736                 for (i = 0; i < (task->store_pkt_id & task->store_msk); i++) {
1737                         if (task->store_buf[i].len) {
1738                                 fprintf(task->fp, "%06d: ", i);
1739                                 for (j = 0; j < task->store_buf[i].len; j++) {
1740                                         fprintf(task->fp, "%02x ", task->store_buf[i].buf[j]);
1741                                 }
1742                                 fprintf(task->fp, "\n");
1743                         }
1744                 }
1745         }
1746 }
1747 static void start_pcap(struct task_base *tbase)
1748 {
1749         struct task_gen_pcap *task = (struct task_gen_pcap *)tbase;
1750         /* When we start, the first packet is sent immediately. */
1751         task->last_tsc = rte_rdtsc() - task->proto_tsc[0];
1752         task->pkt_idx = 0;
1753 }
1754
1755 static void init_task_gen_early(struct task_args *targ)
1756 {
1757         uint8_t *generator_count = prox_sh_find_system("generator_count");
1758
1759         if (generator_count == NULL) {
1760                 generator_count = prox_zmalloc(sizeof(*generator_count), rte_lcore_to_socket_id(targ->lconf->id));
1761                 PROX_PANIC(generator_count == NULL, "Failed to allocate generator count\n");
1762                 prox_sh_add_system("generator_count", generator_count);
1763         }
1764         targ->generator_id = *generator_count;
1765         (*generator_count)++;
1766 }
1767
1768 static void init_task_gen(struct task_base *tbase, struct task_args *targ)
1769 {
1770         struct task_gen *task = (struct task_gen *)tbase;
1771         task->socket_id = rte_lcore_to_socket_id(targ->lconf->id);
1772
1773         task->packet_id_pos = targ->packet_id_pos;
1774
1775         struct prox_port_cfg *port = find_reachable_port(targ);
1776         // TODO: check that all reachable ports have the same mtu...
1777         if (port) {
1778                 task->cksum_offload = port->requested_tx_offload & (DEV_TX_OFFLOAD_IPV4_CKSUM | DEV_TX_OFFLOAD_UDP_CKSUM);
1779                 task->port = port;
1780                 task->max_frame_size = port->mtu + PROX_RTE_ETHER_HDR_LEN + 2 * PROX_VLAN_TAG_SIZE;
1781         } else {
1782                 // Not generating to any port...
1783                 task->max_frame_size = PROX_RTE_ETHER_MAX_LEN;
1784         }
1785         task->local_mbuf.mempool = task_gen_create_mempool(targ, task->max_frame_size);
1786         PROX_PANIC(task->local_mbuf.mempool == NULL, "Failed to create mempool\n");
1787         task->pkt_idx = 0;
1788         task->hz = rte_get_tsc_hz();
1789         task->lat_pos = targ->lat_pos;
1790         task->accur_pos = targ->accur_pos;
1791         task->sig_pos = targ->sig_pos;
1792         task->flow_id_pos = targ->flow_id_pos;
1793         task->packet_id_in_flow_pos = targ->packet_id_in_flow_pos;
1794         task->sig = targ->sig;
1795         task->new_rate_bps = targ->rate_bps;
1796
1797         /*
1798          * For tokens, use 10 Gbps as base rate
1799          * Scripts can then use speed command, with speed=100 as 10 Gbps and speed=400 as 40 Gbps
1800          * Script can query prox "port info" command to find out the port link speed to know
1801          * at which rate to start. Note that virtio running on OVS returns 10 Gbps, so a script has
1802          * probably also to check the driver (as returned by the same "port info" command.
1803          */
1804         struct token_time_cfg tt_cfg = token_time_cfg_create(1250000000, rte_get_tsc_hz(), -1);
1805         token_time_init(&task->token_time, &tt_cfg);
1806
1807         init_task_gen_seeds(task);
1808
1809         task->min_bulk_size = targ->min_bulk_size;
1810         task->max_bulk_size = targ->max_bulk_size;
1811         if (task->min_bulk_size < 1)
1812                 task->min_bulk_size = 1;
1813         if (task->max_bulk_size < 1)
1814                 task->max_bulk_size = 64;
1815         PROX_PANIC(task->max_bulk_size > 64, "max_bulk_size higher than 64\n");
1816         PROX_PANIC(task->max_bulk_size < task->min_bulk_size, "max_bulk_size must be > than min_bulk_size\n");
1817
1818         task->pkt_count = -1;
1819         task->lat_enabled = targ->lat_enabled;
1820         task->runtime_flags = targ->runtime_flags;
1821         PROX_PANIC((task->lat_pos || task->accur_pos) && !task->lat_enabled, "lat not enabled by lat pos or accur pos configured\n");
1822
1823         task->generator_id = targ->generator_id;
1824         plog_info("\t\tGenerator id = %d\n", task->generator_id);
1825
1826         // Allocate array holding bytes to tsc for supported frame sizes
1827         task->bytes_to_tsc = prox_zmalloc(task->max_frame_size * MAX_PKT_BURST * sizeof(task->bytes_to_tsc[0]), task->socket_id);
1828         PROX_PANIC(task->bytes_to_tsc == NULL,
1829                 "Failed to allocate %u bytes (in huge pages) for bytes_to_tsc\n", task->max_frame_size);
1830
1831         // task->port->max_link_speed reports the maximum, non negotiated ink speed in Mbps e.g. 40k for a 40 Gbps NIC.
1832         // It can be UINT32_MAX (virtual devices or not supported by DPDK < 16.04)
1833         uint64_t bytes_per_hz = UINT64_MAX;
1834         if ((task->port) && (task->port->max_link_speed != UINT32_MAX)) {
1835                 bytes_per_hz = task->port->max_link_speed * 125000L;
1836                 plog_info("\t\tPort %u: max link speed is %ld Mbps\n",
1837                         (uint8_t)(task->port - prox_port_cfg), 8 * bytes_per_hz / 1000000);
1838         }
1839         // There are cases where hz estimate might be slighly over-estimated
1840         // This results in too much extrapolation
1841         // Only account for 99% of extrapolation to handle cases with up to 1% error clocks
1842         for (unsigned int i = 0; i < task->max_frame_size * MAX_PKT_BURST ; i++) {
1843                 if (bytes_per_hz == UINT64_MAX)
1844                         task->bytes_to_tsc[i] = 0;
1845                 else
1846                         task->bytes_to_tsc[i] = (task->hz * i * 0.99) / bytes_per_hz;
1847         }
1848
1849         task->imix_nb_pkts = targ->imix_nb_pkts;
1850         for (uint32_t i = 0; i < targ->imix_nb_pkts; i++) {
1851                 task->imix_pkt_sizes[i] = targ->imix_pkt_sizes[i];
1852         }
1853         if (!strcmp(targ->pcap_file, "")) {
1854                 plog_info("\t\tUsing inline definition of a packet\n");
1855                 task_init_gen_load_pkt_inline(task, targ);
1856         } else {
1857                 plog_info("\t\tLoading from pcap %s\n", targ->pcap_file);
1858                 task_init_gen_load_pcap(task, targ);
1859         }
1860
1861         PROX_PANIC(((targ->nb_txrings == 0) && (targ->nb_txports == 0)), "Gen mode requires a tx ring or a tx port");
1862         if ((targ->flags & DSF_KEEP_SRC_MAC) == 0) {
1863                 task->flags |= TASK_OVERWRITE_SRC_MAC_WITH_PORT_MAC;
1864                 memcpy(&task->src_mac, &prox_port_cfg[task->base.tx_params_hw.tx_port_queue->port].eth_addr, sizeof(prox_rte_ether_addr));
1865                 for (uint32_t i = 0; i < task->n_pkts; ++i) {
1866                         rte_memcpy(&task->pkt_template[i].buf[sizeof(prox_rte_ether_addr)], &task->src_mac, sizeof(prox_rte_ether_addr));
1867                 }
1868         }
1869         for (uint32_t i = 0; i < targ->n_rand_str; ++i) {
1870                 PROX_PANIC(task_gen_add_rand(tbase, targ->rand_str[i], targ->rand_offset[i], UINT32_MAX),
1871                            "Failed to add random\n");
1872         }
1873         for (uint32_t i = 0; i < targ->n_ranges; ++i) {
1874                 PROX_PANIC(task_gen_add_range(tbase, &targ->range[i]), "Failed to add range\n");
1875         }
1876         if (targ->store_max) {
1877                 char filename[256];
1878                 sprintf(filename, "gen_buf_%02d_%02d", targ->lconf->id, targ->task);
1879
1880                 task->store_msk = targ->store_max - 1;
1881                 task->store_buf = (struct packet *)malloc(sizeof(struct packet) * targ->store_max);
1882                 task->fp = fopen(filename, "w+");
1883                 PROX_PANIC(task->fp == NULL, "Unable to open %s\n", filename);
1884         } else {
1885                 task->store_msk = 0;
1886         }
1887         uint32_t n_entries = get_n_range_flows(task) * task->orig_n_pkts * 4;
1888 #ifndef RTE_HASH_BUCKET_ENTRIES
1889 #define RTE_HASH_BUCKET_ENTRIES 8
1890 #endif
1891         // cuckoo hash requires at least RTE_HASH_BUCKET_ENTRIES (8) entries
1892         if (n_entries < RTE_HASH_BUCKET_ENTRIES)
1893                 n_entries = RTE_HASH_BUCKET_ENTRIES;
1894
1895         static char hash_name[30];
1896         sprintf(hash_name, "A%03d_hash_gen_table", targ->lconf->id);
1897         struct rte_hash_parameters hash_params = {
1898                 .name = hash_name,
1899                 .entries = n_entries,
1900                 .key_len = sizeof(union ipv4_5tuple_host),
1901                 .hash_func = rte_hash_crc,
1902                 .hash_func_init_val = 0,
1903         };
1904         plog_info("\t\thash table name = %s\n", hash_params.name);
1905         task->flow_id_table = rte_hash_create(&hash_params);
1906         PROX_PANIC(task->flow_id_table == NULL, "Failed to set up flow_id hash table for gen\n");
1907         plog_info("\t\tflow_id hash table allocated, with %d entries of size %d\n", hash_params.entries, hash_params.key_len);
1908         build_flow_table(task);
1909         task->flows = (struct flows *)prox_zmalloc(n_entries * sizeof(struct flows), task->socket_id);
1910         PROX_PANIC(task->flows == NULL, "Failed to allocate flows\n");
1911         plog_info("\t\t%d flows allocated\n", n_entries);
1912 }
1913
1914 static struct task_init task_init_gen = {
1915         .mode_str = "gen",
1916         .init = init_task_gen,
1917         .handle = handle_gen_bulk,
1918         .start = start,
1919         .early_init = init_task_gen_early,
1920 #ifdef SOFT_CRC
1921         // For SOFT_CRC, no offload is needed. If both NOOFFLOADS and NOMULTSEGS flags are set the
1922         // vector mode is used by DPDK, resulting (theoretically) in higher performance.
1923         .flag_features = TASK_FEATURE_NEVER_DISCARDS | TASK_FEATURE_NO_RX | TASK_FEATURE_TXQ_FLAGS_NOOFFLOADS,
1924 #else
1925         .flag_features = TASK_FEATURE_NEVER_DISCARDS | TASK_FEATURE_NO_RX,
1926 #endif
1927         .size = sizeof(struct task_gen),
1928         .stop_last = stop_gen
1929 };
1930
1931 static struct task_init task_init_gen_l3 = {
1932         .mode_str = "gen",
1933         .sub_mode_str = "l3",
1934         .init = init_task_gen,
1935         .handle = handle_gen_bulk,
1936         .start = start,
1937         .early_init = init_task_gen_early,
1938 #ifdef SOFT_CRC
1939         // For SOFT_CRC, no offload is needed. If both NOOFFLOADS and NOMULTSEGS flags are set the
1940         // vector mode is used by DPDK, resulting (theoretically) in higher performance.
1941         .flag_features = TASK_FEATURE_NEVER_DISCARDS | TASK_FEATURE_NO_RX | TASK_FEATURE_TXQ_FLAGS_NOOFFLOADS,
1942 #else
1943         .flag_features = TASK_FEATURE_NEVER_DISCARDS | TASK_FEATURE_NO_RX,
1944 #endif
1945         .size = sizeof(struct task_gen)
1946 };
1947
1948 /* This mode uses time stamps in the pcap file */
1949 static struct task_init task_init_gen_pcap = {
1950         .mode_str = "gen",
1951         .sub_mode_str = "pcap",
1952         .init = init_task_gen_pcap,
1953         .handle = handle_gen_pcap_bulk,
1954         .start = start_pcap,
1955         .early_init = init_task_gen_early,
1956 #ifdef SOFT_CRC
1957         .flag_features = TASK_FEATURE_NEVER_DISCARDS | TASK_FEATURE_NO_RX | TASK_FEATURE_TXQ_FLAGS_NOOFFLOADS,
1958 #else
1959         .flag_features = TASK_FEATURE_NEVER_DISCARDS | TASK_FEATURE_NO_RX,
1960 #endif
1961         .size = sizeof(struct task_gen_pcap)
1962 };
1963
1964 __attribute__((constructor)) static void reg_task_gen(void)
1965 {
1966         reg_task(&task_init_gen);
1967         reg_task(&task_init_gen_l3);
1968         reg_task(&task_init_gen_pcap);
1969 }