Fix the offset bug for UDP dst port in the traffic profile
[yardstick.git] / yardstick / network_services / vnf_generic / vnf / prox_helpers.py
1 # Copyright (c) 2017 Intel Corporation
2 #
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at
6 #
7 #      http://www.apache.org/licenses/LICENSE-2.0
8 #
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
14 from __future__ import absolute_import
15
16 import array
17 import operator
18 import logging
19 import io
20 import os
21 import re
22 import select
23 import socket
24
25 from collections import OrderedDict, namedtuple
26 import time
27 from contextlib import contextmanager
28 from itertools import repeat, chain
29
30 import six
31 from six.moves import zip, StringIO
32 from six.moves import cStringIO
33
34 from yardstick.benchmark.scenarios.networking.vnf_generic import find_relative_file
35 from yardstick.common.utils import SocketTopology, ip_to_hex, join_non_strings, try_int
36 from yardstick.network_services.vnf_generic.vnf.iniparser import ConfigParser
37 from yardstick.network_services.vnf_generic.vnf.sample_vnf import ClientResourceHelper
38 from yardstick.network_services.vnf_generic.vnf.sample_vnf import DpdkVnfSetupEnvHelper
39
40
41 PROX_PORT = 8474
42
43 SECTION_NAME = 0
44 SECTION_CONTENTS = 1
45
46 LOG = logging.getLogger(__name__)
47 LOG.setLevel(logging.DEBUG)
48
49 TEN_GIGABIT = 1e10
50 BITS_PER_BYTE = 8
51 RETRY_SECONDS = 60
52 RETRY_INTERVAL = 1
53
54 CONFIGURATION_OPTIONS = (
55     # dict key           section     key               default value
56     ('pktSizes', 'general', 'pkt_sizes', '64,128,256,512,1024,1280,1518'),
57     ('testDuration', 'general', 'test_duration', 5.0),
58     ('testPrecision', 'general', 'test_precision', 1.0),
59     ('tests', 'general', 'tests', None),
60     ('toleratedLoss', 'general', 'tolerated_loss', 0.0),
61
62     ('logFile', 'logging', 'file', 'dats.log'),
63     ('logDateFormat', 'logging', 'datefmt', None),
64     ('logLevel', 'logging', 'level', 'INFO'),
65     ('logOverwrite', 'logging', 'overwrite', 1),
66
67     ('testerIp', 'tester', 'ip', None),
68     ('testerUser', 'tester', 'user', 'root'),
69     ('testerDpdkDir', 'tester', 'rte_sdk', '/root/dpdk'),
70     ('testerDpdkTgt', 'tester', 'rte_target', 'x86_64-native-linuxapp-gcc'),
71     ('testerProxDir', 'tester', 'prox_dir', '/root/prox'),
72     ('testerSocketId', 'tester', 'socket_id', 0),
73
74     ('sutIp', 'sut', 'ip', None),
75     ('sutUser', 'sut', 'user', 'root'),
76     ('sutDpdkDir', 'sut', 'rte_sdk', '/root/dpdk'),
77     ('sutDpdkTgt', 'sut', 'rte_target', 'x86_64-native-linuxapp-gcc'),
78     ('sutProxDir', 'sut', 'prox_dir', '/root/prox'),
79     ('sutSocketId', 'sut', 'socket_id', 0),
80 )
81
82
83 class CoreSocketTuple(namedtuple('CoreTuple', 'core_id, socket_id, hyperthread')):
84
85     CORE_RE = re.compile(r"core\s+(\d+)(?:s(\d+))?(h)?$")
86
87     def __new__(cls, *args):
88         try:
89             matches = cls.CORE_RE.search(str(args[0]))
90             if matches:
91                 args = matches.groups()
92
93             return super(CoreSocketTuple, cls).__new__(cls, int(args[0]), try_int(args[1], 0),
94                                                        'h' if args[2] else '')
95
96         except (AttributeError, TypeError, IndexError, ValueError):
97             raise ValueError('Invalid core spec {}'.format(args))
98
99     def is_hyperthread(self):
100         return self.hyperthread == 'h'
101
102     @property
103     def index(self):
104         return int(self.is_hyperthread())
105
106     def find_in_topology(self, cpu_topology):
107         try:
108             socket_core_match = cpu_topology[self.socket_id][self.core_id]
109             sorted_match = sorted(socket_core_match.values())
110             return sorted_match[self.index][0]
111         except (KeyError, IndexError):
112             template = "Core {}{} on socket {} does not exist"
113             raise ValueError(template.format(self.core_id, self.hyperthread, self.socket_id))
114
115
116 class TotStatsTuple(namedtuple('TotStats', 'rx,tx,tsc,hz')):
117
118     def __new__(cls, *args):
119         try:
120             assert args[0] is not str(args[0])
121             args = tuple(args[0])
122         except (AssertionError, IndexError, TypeError):
123             pass
124
125         return super(TotStatsTuple, cls).__new__(cls, *args)
126
127
128 class ProxTestDataTuple(namedtuple('ProxTestDataTuple', 'tolerated,tsc_hz,delta_rx,'
129                                                         'delta_tx,delta_tsc,'
130                                                         'latency,rx_total,tx_total,pps')):
131
132     @property
133     def pkt_loss(self):
134         try:
135             return 1e2 * self.drop_total / float(self.tx_total)
136         except ZeroDivisionError:
137             return 100.0
138
139     @property
140     def mpps(self):
141         # calculate the effective throughput in Mpps
142         return float(self.delta_tx) * self.tsc_hz / self.delta_tsc / 1e6
143
144     @property
145     def can_be_lost(self):
146         return int(self.tx_total * self.tolerated / 1e2)
147
148     @property
149     def drop_total(self):
150         return self.tx_total - self.rx_total
151
152     @property
153     def success(self):
154         return self.drop_total <= self.can_be_lost
155
156     def get_samples(self, pkt_size, pkt_loss=None, port_samples=None):
157         if pkt_loss is None:
158             pkt_loss = self.pkt_loss
159
160         if port_samples is None:
161             port_samples = {}
162
163         latency_keys = [
164             "LatencyMin",
165             "LatencyMax",
166             "LatencyAvg",
167         ]
168
169         samples = {
170             "Throughput": self.mpps,
171             "DropPackets": pkt_loss,
172             "CurrentDropPackets": pkt_loss,
173             "TxThroughput": self.pps / 1e6,
174             "RxThroughput": self.mpps,
175             "PktSize": pkt_size,
176         }
177         if port_samples:
178             samples.update(port_samples)
179
180         samples.update((key, value) for key, value in zip(latency_keys, self.latency))
181         return samples
182
183     def log_data(self, logger=None):
184         if logger is None:
185             logger = LOG
186
187         template = "RX: %d; TX: %d; dropped: %d (tolerated: %d)"
188         logger.debug(template, self.rx_total, self.tx_total, self.drop_total, self.can_be_lost)
189         logger.debug("Mpps configured: %f; Mpps effective %f", self.pps / 1e6, self.mpps)
190
191
192 class PacketDump(object):
193
194     @staticmethod
195     def assert_func(func, value1, value2, template=None):
196         assert func(value1, value2), template.format(value1, value2)
197
198     def __init__(self, port_id, data_len, payload):
199         template = "Packet dump has specified length {}, but payload is {} bytes long"
200         self.assert_func(operator.eq, data_len, len(payload), template)
201         self._port_id = port_id
202         self._data_len = data_len
203         self._payload = payload
204
205     @property
206     def port_id(self):
207         """Get the port id of the packet dump"""
208         return self._port_id
209
210     @property
211     def data_len(self):
212         """Get the length of the data received"""
213         return self._data_len
214
215     def __str__(self):
216         return '<PacketDump port: {} payload: {}>'.format(self._port_id, self._payload)
217
218     def payload(self, start=None, end=None):
219         """Get part of the payload as a list of ordinals.
220
221         Returns a list of byte values, matching the contents of the packet dump.
222         Optional start and end parameters can be specified to retrieve only a
223         part of the packet contents.
224
225         The number of elements in the list is equal to end - start + 1, so end
226         is the offset of the last character.
227
228         Args:
229             start (pos. int): the starting offset in the payload. If it is not
230                 specified or None, offset 0 is assumed.
231             end (pos. int): the ending offset of the payload. If it is not
232                 specified or None, the contents until the end of the packet are
233                 returned.
234
235         Returns:
236             [int, int, ...]. Each int represents the ordinal value of a byte in
237             the packet payload.
238         """
239         if start is None:
240             start = 0
241
242         if end is None:
243             end = self.data_len - 1
244
245         # Bounds checking on offsets
246         template = "Start offset must be non-negative"
247         self.assert_func(operator.ge, start, 0, template)
248
249         template = "End offset must be less than {1}"
250         self.assert_func(operator.lt, end, self.data_len, template)
251
252         # Adjust for splice operation: end offset must be 1 more than the offset
253         # of the last desired character.
254         end += 1
255
256         return self._payload[start:end]
257
258
259 class ProxSocketHelper(object):
260
261     def __init__(self, sock=None):
262         """ creates new prox instance """
263         super(ProxSocketHelper, self).__init__()
264
265         if sock is None:
266             sock = socket.socket()
267
268         self._sock = sock
269         self._pkt_dumps = []
270
271     def connect(self, ip, port):
272         """Connect to the prox instance on the remote system"""
273         self._sock.connect((ip, port))
274
275     def get_socket(self):
276         """ get the socket connected to the remote instance """
277         return self._sock
278
279     def _parse_socket_data(self, decoded_data, pkt_dump_only):
280         def get_newline_index():
281             return decoded_data.find('\n', index)
282
283         ret_str = ''
284         index = 0
285         for newline_index in iter(get_newline_index, -1):
286             ret_str = decoded_data[index:newline_index]
287
288             try:
289                 mode, port_id, data_len = ret_str.split(',', 2)
290             except ValueError:
291                 mode, port_id, data_len = None, None, None
292
293             if mode != 'pktdump':
294                 # Regular 1-line message. Stop reading from the socket.
295                 LOG.debug("Regular response read")
296                 return ret_str
297
298             LOG.debug("Packet dump header read: [%s]", ret_str)
299
300             # The line is a packet dump header. Parse it, read the
301             # packet payload, store the dump for later retrieval.
302             # Skip over the packet dump and continue processing: a
303             # 1-line response may follow the packet dump.
304
305             data_len = int(data_len)
306             data_start = newline_index + 1  # + 1 to skip over \n
307             data_end = data_start + data_len
308             sub_data = decoded_data[data_start:data_end]
309             pkt_payload = array.array('B', (ord(v) for v in sub_data))
310             pkt_dump = PacketDump(int(port_id), data_len, pkt_payload)
311             self._pkt_dumps.append(pkt_dump)
312
313             if pkt_dump_only:
314                 # Return boolean instead of string to signal
315                 # successful reception of the packet dump.
316                 LOG.debug("Packet dump stored, returning")
317                 return True
318
319             index = data_end + 1
320
321         return ret_str
322
323     def get_data(self, pkt_dump_only=False, timeout=1):
324         """ read data from the socket """
325         # This method behaves slightly differently depending on whether it is
326         # called to read the response to a command (pkt_dump_only = 0) or if
327         # it is called specifically to read a packet dump (pkt_dump_only = 1).
328         #
329         # Packet dumps look like:
330         #   pktdump,<port_id>,<data_len>\n
331         #   <packet contents as byte array>\n
332         # This means the total packet dump message consists of 2 lines instead
333         # of 1 line.
334         #
335         # - Response for a command (pkt_dump_only = 0):
336         #   1) Read response from the socket until \n (end of message)
337         #   2a) If the response is a packet dump header (starts with "pktdump,"):
338         #     - Read the packet payload and store the packet dump for later
339         #       retrieval.
340         #     - Reset the state and restart from 1). Eventually state 2b) will
341         #       be reached and the function will return.
342         #   2b) If the response is not a packet dump:
343         #     - Return the received message as a string
344         #
345         # - Explicit request to read a packet dump (pkt_dump_only = 1):
346         #   - Read the dump header and payload
347         #   - Store the packet dump for later retrieval
348         #   - Return True to signify a packet dump was successfully read
349
350         def is_ready():
351             # recv() is blocking, so avoid calling it when no data is waiting.
352             ready = select.select([self._sock], [], [], timeout)
353             return bool(ready[0])
354
355         status = False
356         ret_str = ""
357         for status in iter(is_ready, False):
358             decoded_data = self._sock.recv(256).decode('utf-8')
359             ret_str = self._parse_socket_data(decoded_data, pkt_dump_only)
360
361         LOG.debug("Received data from socket: [%s]", ret_str)
362         return ret_str if status else ''
363
364     def put_command(self, to_send):
365         """ send data to the remote instance """
366         LOG.debug("Sending data to socket: [%s]", to_send.rstrip('\n'))
367         try:
368             self._sock.sendall(to_send.encode('utf-8'))
369         except:
370             pass
371
372     def get_packet_dump(self):
373         """ get the next packet dump """
374         if self._pkt_dumps:
375             return self._pkt_dumps.pop(0)
376         return None
377
378     def stop_all_reset(self):
379         """ stop the remote instance and reset stats """
380         LOG.debug("Stop all and reset stats")
381         self.stop_all()
382         self.reset_stats()
383
384     def stop_all(self):
385         """ stop all cores on the remote instance """
386         LOG.debug("Stop all")
387         self.put_command("stop all\n")
388         time.sleep(3)
389
390     def stop(self, cores, task=''):
391         """ stop specific cores on the remote instance """
392         LOG.debug("Stopping cores %s", cores)
393         self.put_command("stop {} {}\n".format(join_non_strings(',', cores), task))
394         time.sleep(3)
395
396     def start_all(self):
397         """ start all cores on the remote instance """
398         LOG.debug("Start all")
399         self.put_command("start all\n")
400
401     def start(self, cores):
402         """ start specific cores on the remote instance """
403         LOG.debug("Starting cores %s", cores)
404         self.put_command("start {}\n".format(join_non_strings(',', cores)))
405         time.sleep(3)
406
407     def reset_stats(self):
408         """ reset the statistics on the remote instance """
409         LOG.debug("Reset stats")
410         self.put_command("reset stats\n")
411         time.sleep(1)
412
413     def _run_template_over_cores(self, template, cores, *args):
414         for core in cores:
415             self.put_command(template.format(core, *args))
416
417     def set_pkt_size(self, cores, pkt_size):
418         """ set the packet size to generate on the remote instance """
419         LOG.debug("Set packet size for core(s) %s to %d", cores, pkt_size)
420         pkt_size -= 4
421         self._run_template_over_cores("pkt_size {} 0 {}\n", cores, pkt_size)
422         time.sleep(1)
423
424     def set_value(self, cores, offset, value, length):
425         """ set value on the remote instance """
426         msg = "Set value for core(s) %s to '%s' (length %d), offset %d"
427         LOG.debug(msg, cores, value, length, offset)
428         template = "set value {} 0 {} {} {}\n"
429         self._run_template_over_cores(template, cores, offset, value, length)
430
431     def reset_values(self, cores):
432         """ reset values on the remote instance """
433         LOG.debug("Set value for core(s) %s", cores)
434         self._run_template_over_cores("reset values {} 0\n", cores)
435
436     def set_speed(self, cores, speed):
437         """ set speed on the remote instance """
438         LOG.debug("Set speed for core(s) %s to %g", cores, speed)
439         self._run_template_over_cores("speed {} 0 {}\n", cores, speed)
440
441     def slope_speed(self, cores_speed, duration, n_steps=0):
442         """will start to increase speed from 0 to N where N is taken from
443         a['speed'] for each a in cores_speed"""
444         # by default, each step will take 0.5 sec
445         if n_steps == 0:
446             n_steps = duration * 2
447
448         private_core_data = []
449         step_duration = float(duration) / n_steps
450         for core_data in cores_speed:
451             target = float(core_data['speed'])
452             private_core_data.append({
453                 'cores': core_data['cores'],
454                 'zero': 0,
455                 'delta': target / n_steps,
456                 'current': 0,
457                 'speed': target,
458             })
459
460         deltas_keys_iter = repeat(('current', 'delta'), n_steps - 1)
461         for key1, key2 in chain(deltas_keys_iter, [('zero', 'speed')]):
462             time.sleep(step_duration)
463             for core_data in private_core_data:
464                 core_data['current'] = core_data[key1] + core_data[key2]
465                 self.set_speed(core_data['cores'], core_data['current'])
466
467     def set_pps(self, cores, pps, pkt_size):
468         """ set packets per second for specific cores on the remote instance """
469         msg = "Set packets per sec for core(s) %s to %g%% of line rate (packet size: %d)"
470         LOG.debug(msg, cores, pps, pkt_size)
471
472         # speed in percent of line-rate
473         speed = float(pps) * (pkt_size + 20) / TEN_GIGABIT / BITS_PER_BYTE
474         self._run_template_over_cores("speed {} 0 {}\n", cores, speed)
475
476     def lat_stats(self, cores, task=0):
477         """Get the latency statistics from the remote system"""
478         # 1-based index, if max core is 4, then 0, 1, 2, 3, 4  len = 5
479         lat_min = {}
480         lat_max = {}
481         lat_avg = {}
482         for core in cores:
483             self.put_command("lat stats {} {} \n".format(core, task))
484             ret = self.get_data()
485
486             try:
487                 lat_min[core], lat_max[core], lat_avg[core] = \
488                     tuple(int(n) for n in ret.split(",")[:3])
489
490             except (AttributeError, ValueError, TypeError):
491                 pass
492
493         return lat_min, lat_max, lat_avg
494
495     def get_all_tot_stats(self):
496         self.put_command("tot stats\n")
497         all_stats_str = self.get_data().split(",")
498         if len(all_stats_str) != 4:
499             all_stats = [0] * 4
500             return all_stats
501         all_stats = TotStatsTuple(int(v) for v in all_stats_str)
502         self.master_stats = all_stats
503         return all_stats
504
505     def hz(self):
506         return self.get_all_tot_stats()[3]
507
508     # Deprecated
509     # TODO: remove
510     def rx_stats(self, cores, task=0):
511         return self.core_stats(cores, task)
512
513     def core_stats(self, cores, task=0):
514         """Get the receive statistics from the remote system"""
515         rx = tx = drop = tsc = 0
516         for core in cores:
517             self.put_command("core stats {} {}\n".format(core, task))
518             ret = self.get_data().split(",")
519             rx += int(ret[0])
520             tx += int(ret[1])
521             drop += int(ret[2])
522             tsc = int(ret[3])
523         return rx, tx, drop, tsc
524
525     def port_stats(self, ports):
526         """get counter values from a specific port"""
527         tot_result = [0] * 12
528         for port in ports:
529             self.put_command("port_stats {}\n".format(port))
530             ret = [try_int(s, 0) for s in self.get_data().split(",")]
531             tot_result = [sum(x) for x in zip(tot_result, ret)]
532         return tot_result
533
534     @contextmanager
535     def measure_tot_stats(self):
536         start = self.get_all_tot_stats()
537         container = {'start_tot': start}
538         try:
539             yield container
540         finally:
541             container['end_tot'] = end = self.get_all_tot_stats()
542
543         container['delta'] = TotStatsTuple(end - start for start, end in zip(start, end))
544
545     def tot_stats(self):
546         """Get the total statistics from the remote system"""
547         stats = self.get_all_tot_stats()
548         return stats[:3]
549
550     def tot_ierrors(self):
551         """Get the total ierrors from the remote system"""
552         self.put_command("tot ierrors tot\n")
553         recv = self.get_data().split(',')
554         tot_ierrors = int(recv[0])
555         tsc = int(recv[0])
556         return tot_ierrors, tsc
557
558     def set_count(self, count, cores):
559         """Set the number of packets to send on the specified core"""
560         self._run_template_over_cores("count {} 0 {}\n", cores, count)
561
562     def dump_rx(self, core_id, task_id=0, count=1):
563         """Activate dump on rx on the specified core"""
564         LOG.debug("Activating dump on RX for core %d, task %d, count %d", core_id, task_id, count)
565         self.put_command("dump_rx {} {} {}\n".format(core_id, task_id, count))
566         time.sleep(1.5)     # Give PROX time to set up packet dumping
567
568     def quit(self):
569         self.stop_all()
570         self._quit()
571         self.force_quit()
572
573     def _quit(self):
574         """ stop all cores on the remote instance """
575         LOG.debug("Quit prox")
576         self.put_command("quit\n")
577         time.sleep(3)
578
579     def force_quit(self):
580         """ stop all cores on the remote instance """
581         LOG.debug("Force Quit prox")
582         self.put_command("quit_force\n")
583         time.sleep(3)
584
585
586 class ProxDpdkVnfSetupEnvHelper(DpdkVnfSetupEnvHelper):
587     # the actual app is lowercase
588     APP_NAME = 'prox'
589
590     LUA_PARAMETER_NAME = ""
591     LUA_PARAMETER_PEER = {
592         "gen": "sut",
593         "sut": "gen",
594     }
595
596     def __init__(self, vnfd_helper, ssh_helper, scenario_helper):
597         self.remote_path = None
598         super(ProxDpdkVnfSetupEnvHelper, self).__init__(vnfd_helper, ssh_helper, scenario_helper)
599         self.remote_prox_file_name = None
600         self.prox_config_dict = None
601         self.additional_files = {}
602
603     def _build_pipeline_kwargs(self):
604         tool_path = self.ssh_helper.provision_tool(tool_file=self.APP_NAME)
605         self.pipeline_kwargs = {
606             'tool_path': tool_path,
607             'tool_dir': os.path.dirname(tool_path),
608         }
609
610     def copy_to_target(self, config_file_path, prox_file):
611         remote_path = os.path.join("/tmp", prox_file)
612         self.ssh_helper.put(config_file_path, remote_path)
613         return remote_path
614
615     @staticmethod
616     def _get_tx_port(section, sections):
617         iface_port = [-1]
618         for item in sections[section]:
619             if item[0] == "tx port":
620                 iface_port = re.findall(r'\d+', item[1])
621                 # do we want the last one?
622                 #   if yes, then can we reverse?
623         return int(iface_port[0])
624
625     @staticmethod
626     def _replace_quoted_with_value(quoted, value, count=1):
627         new_string = re.sub('"[^"]*"', '"{}"'.format(value), quoted, count)
628         return new_string
629
630     def _insert_additional_file(self, value):
631         file_str = value.split('"')
632         base_name = os.path.basename(file_str[1])
633         file_str[1] = self.additional_files[base_name]
634         return '"'.join(file_str)
635
636     def generate_prox_config_file(self, config_path):
637         sections = []
638         prox_config = ConfigParser(config_path, sections)
639         prox_config.parse()
640
641         # Ensure MAC is set "hardware"
642         all_ports = self.vnfd_helper.port_pairs.all_ports
643         # use dpdk port number
644         for port_name in all_ports:
645             port_num = self.vnfd_helper.port_num(port_name)
646             port_section_name = "port {}".format(port_num)
647             for section_name, section in sections:
648                 if port_section_name != section_name:
649                     continue
650
651                 for index, section_data in enumerate(section):
652                     if section_data[0] == "mac":
653                         section_data[1] = "hardware"
654
655         # search for dst mac
656         for _, section in sections:
657             # for index, (item_key, item_val) in enumerate(section):
658             for index, section_data in enumerate(section):
659                 item_key, item_val = section_data
660                 if item_val.startswith("@@dst_mac"):
661                     tx_port_iter = re.finditer(r'\d+', item_val)
662                     tx_port_no = int(next(tx_port_iter).group(0))
663                     intf = self.vnfd_helper.find_interface_by_port(tx_port_no)
664                     mac = intf["virtual-interface"]["dst_mac"]
665                     section_data[1] = mac.replace(":", " ", 6)
666
667                 if item_key == "dst mac" and item_val.startswith("@@"):
668                     tx_port_iter = re.finditer(r'\d+', item_val)
669                     tx_port_no = int(next(tx_port_iter).group(0))
670                     intf = self.vnfd_helper.find_interface_by_port(tx_port_no)
671                     mac = intf["virtual-interface"]["dst_mac"]
672                     section_data[1] = mac
673
674         # if addition file specified in prox config
675         if not self.additional_files:
676             return sections
677
678         for section_name, section in sections:
679             for index, section_data in enumerate(section):
680                 try:
681                     if section_data[0].startswith("dofile"):
682                         section_data[0] = self._insert_additional_file(section_data[0])
683
684                     if section_data[1].startswith("dofile"):
685                         section_data[1] = self._insert_additional_file(section_data[1])
686                 except:
687                     pass
688
689         return sections
690
691     @staticmethod
692     def write_prox_config(prox_config):
693         """
694         Write an .ini-format config file for PROX
695         PROX does not allow a space before/after the =, so we need
696         a custom method
697         """
698         out = []
699         for i, (section_name, section) in enumerate(prox_config):
700             out.append("[{}]".format(section_name))
701             for index, item in enumerate(section):
702                 key, value = item
703                 if key == "__name__":
704                     continue
705                 if value is not None and value != '@':
706                     key = "=".join((key, str(value).replace('\n', '\n\t')))
707                     out.append(key)
708                 else:
709                     key = str(key).replace('\n', '\n\t')
710                     out.append(key)
711         return os.linesep.join(out)
712
713     def put_string_to_file(self, s, remote_path):
714         file_obj = cStringIO(s)
715         self.ssh_helper.put_file_obj(file_obj, remote_path)
716         return remote_path
717
718     def generate_prox_lua_file(self):
719         p = OrderedDict()
720         all_ports = self.vnfd_helper.port_pairs.all_ports
721         lua_param = self.LUA_PARAMETER_NAME
722         for port_name in all_ports:
723             peer = self.LUA_PARAMETER_PEER[lua_param]
724             port_num = self.vnfd_helper.port_num(port_name)
725             intf = self.vnfd_helper.find_interface(name=port_name)
726             vintf = intf['virtual-interface']
727             local_ip = vintf["local_ip"]
728             dst_ip = vintf["dst_ip"]
729             local_ip_hex = ip_to_hex(local_ip, separator=' ')
730             dst_ip_hex = ip_to_hex(dst_ip, separator=' ')
731             p.update([
732                 ("{}_hex_ip_port_{}".format(lua_param, port_num), local_ip_hex),
733                 ("{}_ip_port_{}".format(lua_param, port_num), local_ip),
734                 ("{}_hex_ip_port_{}".format(peer, port_num), dst_ip_hex),
735                 ("{}_ip_port_{}".format(peer, port_num), dst_ip),
736             ])
737         lua = os.linesep.join(('{}:"{}"'.format(k, v) for k, v in p.items()))
738         return lua
739
740     def upload_prox_lua(self, config_dir, prox_config_dict):
741         # we could have multiple lua directives
742         lau_dict = prox_config_dict.get('lua', {})
743         find_iter = (re.findall(r'\("([^"]+)"\)', k) for k in lau_dict)
744         lua_file = next((found[0] for found in find_iter if found), None)
745         if not lua_file:
746             return ""
747
748         out = self.generate_prox_lua_file()
749         remote_path = os.path.join(config_dir, lua_file)
750         return self.put_string_to_file(out, remote_path)
751
752     def upload_prox_config(self, config_file, prox_config_dict):
753         # prox can't handle spaces around ' = ' so use custom method
754         out = StringIO(self.write_prox_config(prox_config_dict))
755         out.seek(0)
756         remote_path = os.path.join("/tmp", config_file)
757         self.ssh_helper.put_file_obj(out, remote_path)
758
759         return remote_path
760
761     def build_config_file(self):
762         task_path = self.scenario_helper.task_path
763         options = self.scenario_helper.options
764         config_path = options['prox_config']
765         config_file = os.path.basename(config_path)
766         config_path = find_relative_file(config_path, task_path)
767         self.additional_files = {}
768
769         prox_files = options.get('prox_files', [])
770         if isinstance(prox_files, six.string_types):
771             prox_files = [prox_files]
772         for key_prox_file in prox_files:
773             base_prox_file = os.path.basename(key_prox_file)
774             remote_prox_file = self.copy_to_target(key_prox_file, base_prox_file)
775             self.additional_files[base_prox_file] = remote_prox_file
776
777         self.prox_config_dict = self.generate_prox_config_file(config_path)
778         self.remote_path = self.upload_prox_config(config_file, self.prox_config_dict)
779
780     def build_config(self):
781
782         options = self.scenario_helper.options
783
784         prox_args = options['prox_args']
785         LOG.info("Provision and start the %s", self.APP_NAME)
786         self._build_pipeline_kwargs()
787         self.pipeline_kwargs["args"] = " ".join(
788             " ".join([k, v if v else ""]) for k, v in prox_args.items())
789         self.pipeline_kwargs["cfg_file"] = self.remote_path
790
791         cmd_template = "sudo bash -c 'cd {tool_dir}; {tool_path} -o cli {args} -f {cfg_file} '"
792         prox_cmd = cmd_template.format(**self.pipeline_kwargs)
793         return prox_cmd
794
795
796 class ProxResourceHelper(ClientResourceHelper):
797
798     RESOURCE_WORD = 'prox'
799     PROX_CORE_GEN_MODE = "gen"
800     PROX_CORE_LAT_MODE = "lat"
801     PROX_CORE_MPLS_TEST = "MPLS tag/untag"
802
803     PROX_MODE = ""
804
805     WAIT_TIME = 3
806
807     @staticmethod
808     def line_rate_to_pps(pkt_size, n_ports):
809         # FIXME Don't hardcode 10Gb/s
810         return n_ports * TEN_GIGABIT / BITS_PER_BYTE / (pkt_size + 20)
811
812     @staticmethod
813     def find_pci(pci, bound_pci):
814         # we have to substring match PCI bus address from the end
815         return any(b.endswith(pci) for b in bound_pci)
816
817     def __init__(self, setup_helper):
818         super(ProxResourceHelper, self).__init__(setup_helper)
819         self.mgmt_interface = self.vnfd_helper.mgmt_interface
820         self._user = self.mgmt_interface["user"]
821         self._ip = self.mgmt_interface["ip"]
822
823         self.done = False
824         self._cpu_topology = None
825         self._vpci_to_if_name_map = None
826         self.additional_file = {}
827         self.remote_prox_file_name = None
828         self.lower = None
829         self.upper = None
830         self._test_cores = None
831         self._latency_cores = None
832         self._tagged_cores = None
833         self._plain_cores = None
834
835     @property
836     def sut(self):
837         if not self.client:
838             self.client = self._connect()
839         return self.client
840
841     @property
842     def cpu_topology(self):
843         if not self._cpu_topology:
844             stdout = io.BytesIO()
845             self.ssh_helper.get_file_obj("/proc/cpuinfo", stdout)
846             self._cpu_topology = SocketTopology.parse_cpuinfo(stdout.getvalue().decode('utf-8'))
847         return self._cpu_topology
848
849     @property
850     def test_cores(self):
851         if not self._test_cores:
852             self._test_cores = self.get_cores(self.PROX_CORE_GEN_MODE)
853         return self._test_cores
854
855     @property
856     def mpls_cores(self):
857         if not self._tagged_cores:
858             self._tagged_cores, self._plain_cores = self.get_cores_mpls(self.PROX_CORE_GEN_MODE)
859         return self._tagged_cores, self._plain_cores
860
861     @property
862     def tagged_cores(self):
863         return self.mpls_cores[0]
864
865     @property
866     def plain_cores(self):
867         return self.mpls_cores[1]
868
869     @property
870     def latency_cores(self):
871         if not self._latency_cores:
872             self._latency_cores = self.get_cores(self.PROX_CORE_LAT_MODE)
873         return self._latency_cores
874
875     def run_traffic(self, traffic_profile):
876         self.lower = 0.0
877         self.upper = 100.0
878
879         traffic_profile.init(self._queue)
880         # this frees up the run_traffic loop
881         self.client_started.value = 1
882
883         while not self._terminated.value:
884             # move it all to traffic_profile
885             self._run_traffic_once(traffic_profile)
886
887     def _run_traffic_once(self, traffic_profile):
888         traffic_profile.execute_traffic(self)
889         if traffic_profile.done:
890             self._queue.put({'done': True})
891             LOG.debug("tg_prox done")
892             self._terminated.value = 1
893
894     def start_collect(self):
895         pass
896
897     def terminate(self):
898         # should not be called, use VNF terminate
899         raise NotImplementedError()
900
901     def up_post(self):
902         return self.sut  # force connection
903
904     def execute(self, cmd, *args, **kwargs):
905         func = getattr(self.sut, cmd, None)
906         if func:
907             return func(*args, **kwargs)
908
909     @contextmanager
910     def traffic_context(self, pkt_size, value):
911         self.sut.stop_all()
912         self.sut.reset_stats()
913         if self.get_test_type() == self.PROX_CORE_MPLS_TEST:
914             self.sut.set_pkt_size(self.tagged_cores, pkt_size)
915             self.sut.set_pkt_size(self.plain_cores, pkt_size - 4)
916             self.sut.set_speed(self.tagged_cores, value)
917             ratio = 1.0 * (pkt_size - 4 + 20) / (pkt_size + 20)
918             self.sut.set_speed(self.plain_cores, value * ratio)
919         else:
920             self.sut.set_pkt_size(self.test_cores, pkt_size)
921             self.sut.set_speed(self.test_cores, value)
922
923         self.sut.start_all()
924         try:
925             yield
926         finally:
927             self.sut.stop_all()
928
929     def run_test(self, pkt_size, duration, value, tolerated_loss=0.0):
930         # do this assert in init?  unless we expect interface count to
931         # change from one run to another run...
932         ports = self.vnfd_helper.port_pairs.all_ports
933         port_count = len(ports)
934         assert port_count in {1, 2, 4}, \
935             "Invalid number of ports: 1, 2 or 4 ports only supported at this time"
936
937         with self.traffic_context(pkt_size, value):
938             # Getting statistics to calculate PPS at right speed....
939             tsc_hz = float(self.sut.hz())
940             time.sleep(2)
941             with self.sut.measure_tot_stats() as data:
942                 time.sleep(duration)
943
944             # Get stats before stopping the cores. Stopping cores takes some time
945             # and might skew results otherwise.
946             latency = self.get_latency()
947
948         deltas = data['delta']
949         rx_total, tx_total = self.sut.port_stats(range(port_count))[6:8]
950         pps = value / 100.0 * self.line_rate_to_pps(pkt_size, port_count)
951
952         samples = {}
953         # we are currently using enumeration to map logical port num to interface
954         for port_name in ports:
955             port = self.vnfd_helper.port_num(port_name)
956             port_rx_total, port_tx_total = self.sut.port_stats([port])[6:8]
957             samples[port_name] = {
958                 "in_packets": port_rx_total,
959                 "out_packets": port_tx_total,
960             }
961
962         result = ProxTestDataTuple(tolerated_loss, tsc_hz, deltas.rx, deltas.tx,
963                                    deltas.tsc, latency, rx_total, tx_total, pps)
964         result.log_data()
965         return result, samples
966
967     def get_test_type(self):
968         test_type = None
969         for section_name, section in self.setup_helper.prox_config_dict:
970             if section_name != "global":
971                 continue
972
973             for key, value in section:
974                 if key == "name" and value == self.PROX_CORE_MPLS_TEST:
975                     test_type = self.PROX_CORE_MPLS_TEST
976
977         return test_type
978
979     def get_cores(self, mode):
980         cores = []
981
982         for section_name, section in self.setup_helper.prox_config_dict:
983             if not section_name.startswith("core"):
984                 continue
985
986             for key, value in section:
987                 if key == "mode" and value == mode:
988                     core_tuple = CoreSocketTuple(section_name)
989                     core = core_tuple.find_in_topology(self.cpu_topology)
990                     cores.append(core)
991
992         return cores
993
994     def get_cores_mpls(self, mode=PROX_CORE_GEN_MODE):
995         cores_tagged = []
996         cores_plain = []
997         for section_name, section in self.setup_helper.prox_config_dict:
998             if not section_name.startswith("core"):
999                 continue
1000
1001             if all(key != "mode" or value != mode for key, value in section):
1002                 continue
1003
1004             for item_key, item_value in section:
1005                 if item_key == "name" and item_value.startswith("tag"):
1006                     core_tuple = CoreSocketTuple(section_name)
1007                     core_tag = core_tuple.find_in_topology(self.cpu_topology)
1008                     cores_tagged.append(core_tag)
1009
1010                 elif item_key == "name" and item_value.startswith("udp"):
1011                     core_tuple = CoreSocketTuple(section_name)
1012                     core_udp = core_tuple.find_in_topology(self.cpu_topology)
1013                     cores_plain.append(core_udp)
1014
1015         return cores_tagged, cores_plain
1016
1017     def get_latency(self):
1018         """
1019         :return: return lat_min, lat_max, lat_avg
1020         :rtype: list
1021         """
1022         if self._latency_cores:
1023             return self.sut.lat_stats(self._latency_cores)
1024         return []
1025
1026     def _get_logical_if_name(self, vpci):
1027         return self._vpci_to_if_name_map[vpci]
1028
1029     def _connect(self, client=None):
1030         """Run and connect to prox on the remote system """
1031         # De-allocating a large amount of hugepages takes some time. If a new
1032         # PROX instance is started immediately after killing the previous one,
1033         # it might not be able to allocate hugepages, because they are still
1034         # being freed. Hence the -w switch.
1035         # self.connection.execute("sudo killall -w Prox 2>/dev/null")
1036         # prox_cmd = "export TERM=xterm; cd "+ self.bin_path +"; ./Prox -t
1037         # -f ./handle_none-4.cfg"
1038         # prox_cmd = "export TERM=xterm; export RTE_SDK=" + self._dpdk_dir +
1039         #  "; " \
1040         #    + "export RTE_TARGET=" + self._dpdk_target + ";" \
1041         #    + " cd " + self._prox_dir + "; make HW_DIRECT_STATS=y -j50;
1042         # sudo " \
1043         #    + "./build/Prox " + prox_args
1044         # log.debug("Starting PROX with command [%s]", prox_cmd)
1045         # thread.start_new_thread(self.ssh_check_quit, (self, self._user,
1046         # self._ip, prox_cmd))
1047         if client is None:
1048             client = ProxSocketHelper()
1049
1050         # try connecting to Prox for 60s
1051         for _ in range(RETRY_SECONDS):
1052             time.sleep(RETRY_INTERVAL)
1053             try:
1054                 client.connect(self._ip, PROX_PORT)
1055             except (socket.gaierror, socket.error):
1056                 continue
1057             else:
1058                 return client
1059
1060         msg = "Failed to connect to prox, please check if system {} accepts connections on port {}"
1061         raise Exception(msg.format(self._ip, PROX_PORT))