Merge "utils: create TASK_LOG_DIR if it doesn't exist"
[yardstick.git] / yardstick / network_services / vnf_generic / vnf / prox_helpers.py
1 # Copyright (c) 2017 Intel Corporation
2 #
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at
6 #
7 #      http://www.apache.org/licenses/LICENSE-2.0
8 #
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
14 from __future__ import absolute_import
15
16 import array
17 import operator
18 import logging
19 import os
20 import re
21 import select
22 import socket
23 from collections import OrderedDict, namedtuple
24 import time
25 from contextlib import contextmanager
26 from itertools import repeat, chain
27
28 from six.moves import zip, StringIO
29
30 from yardstick.benchmark.scenarios.networking.vnf_generic import find_relative_file
31 from yardstick.common.utils import SocketTopology, ip_to_hex, join_non_strings
32 from yardstick.network_services.vnf_generic.vnf.iniparser import ConfigParser
33 from yardstick.network_services.vnf_generic.vnf.sample_vnf import ClientResourceHelper
34 from yardstick.network_services.vnf_generic.vnf.sample_vnf import DpdkVnfSetupEnvHelper
35
36 PROX_PORT = 8474
37
38 LOG = logging.getLogger(__name__)
39
40 TEN_GIGABIT = 1e10
41 BITS_PER_BYTE = 8
42 RETRY_SECONDS = 60
43 RETRY_INTERVAL = 1
44
45 CONFIGURATION_OPTIONS = (
46     # dict key           section     key               default value
47     ('pktSizes', 'general', 'pkt_sizes', '64,128,256,512,1024,1280,1518'),
48     ('testDuration', 'general', 'test_duration', 5.0),
49     ('testPrecision', 'general', 'test_precision', 1.0),
50     ('tests', 'general', 'tests', None),
51     ('toleratedLoss', 'general', 'tolerated_loss', 0.0),
52
53     ('logFile', 'logging', 'file', 'dats.log'),
54     ('logDateFormat', 'logging', 'datefmt', None),
55     ('logLevel', 'logging', 'level', 'INFO'),
56     ('logOverwrite', 'logging', 'overwrite', 1),
57
58     ('testerIp', 'tester', 'ip', None),
59     ('testerUser', 'tester', 'user', 'root'),
60     ('testerDpdkDir', 'tester', 'rte_sdk', '/root/dpdk'),
61     ('testerDpdkTgt', 'tester', 'rte_target', 'x86_64-native-linuxapp-gcc'),
62     ('testerProxDir', 'tester', 'prox_dir', '/root/prox'),
63     ('testerSocketId', 'tester', 'socket_id', 0),
64
65     ('sutIp', 'sut', 'ip', None),
66     ('sutUser', 'sut', 'user', 'root'),
67     ('sutDpdkDir', 'sut', 'rte_sdk', '/root/dpdk'),
68     ('sutDpdkTgt', 'sut', 'rte_target', 'x86_64-native-linuxapp-gcc'),
69     ('sutProxDir', 'sut', 'prox_dir', '/root/prox'),
70     ('sutSocketId', 'sut', 'socket_id', 0),
71 )
72
73
74 class CoreSocketTuple(namedtuple('CoreTuple', 'core_id, socket_id, hyperthread')):
75
76     CORE_RE = re.compile(r"core\s+(\d+)(?:s(\d+))?(h)?")
77
78     def __new__(cls, *args):
79         try:
80             matches = cls.CORE_RE.search(str(args[0]))
81             if matches:
82                 args = matches.groups()
83
84             return super(CoreSocketTuple, cls).__new__(cls, int(args[0]), int(args[1]),
85                                                        'h' if args[2] else '')
86
87         except (AttributeError, TypeError, IndexError, ValueError):
88             raise ValueError('Invalid core spec {}'.format(args))
89
90     def is_hyperthread(self):
91         return self.hyperthread == 'h'
92
93     @property
94     def index(self):
95         return int(self.is_hyperthread())
96
97     def find_in_topology(self, cpu_topology):
98         try:
99             socket_core_match = cpu_topology[self.socket_id][self.core_id]
100             sorted_match = sorted(socket_core_match.values())
101             return sorted_match[self.index][0]
102         except (KeyError, IndexError):
103             template = "Core {}{} on socket {} does not exist"
104             raise ValueError(template.format(self.core_id, self.hyperthread, self.socket_id))
105
106
107 class TotStatsTuple(namedtuple('TotStats', 'rx,tx,tsc,hz')):
108
109     def __new__(cls, *args):
110         try:
111             assert args[0] is not str(args[0])
112             args = tuple(args[0])
113         except (AssertionError, IndexError, TypeError):
114             pass
115
116         return super(TotStatsTuple, cls).__new__(cls, *args)
117
118
119 class ProxTestDataTuple(namedtuple('ProxTestDataTuple', 'tolerated,tsc_hz,delta_rx,'
120                                                         'delta_tx,delta_tsc,'
121                                                         'latency,rx_total,tx_total,pps')):
122
123     @property
124     def pkt_loss(self):
125         try:
126             return 1e2 * self.drop_total / float(self.tx_total)
127         except ZeroDivisionError:
128             return 100.0
129
130     @property
131     def mpps(self):
132         # calculate the effective throughput in Mpps
133         return float(self.delta_tx) * self.tsc_hz / self.delta_tsc / 1e6
134
135     @property
136     def can_be_lost(self):
137         return int(self.tx_total * self.tolerated / 1e2)
138
139     @property
140     def drop_total(self):
141         return self.tx_total - self.rx_total
142
143     @property
144     def success(self):
145         return self.drop_total <= self.can_be_lost
146
147     def get_samples(self, pkt_size, pkt_loss=None):
148         if pkt_loss is None:
149             pkt_loss = self.pkt_loss
150
151         latency_keys = [
152             "LatencyMin",
153             "LatencyMax",
154             "LatencyAvg",
155         ]
156
157         samples = {
158             "Throughput": self.mpps,
159             "DropPackets": pkt_loss,
160             "CurrentDropPackets": pkt_loss,
161             "TxThroughput": self.pps / 1e6,
162             "RxThroughput": self.mpps,
163             "PktSize": pkt_size,
164         }
165
166         samples.update((key, value) for key, value in zip(latency_keys, self.latency))
167         return samples
168
169     def log_data(self, logger=None):
170         if logger is None:
171             logger = LOG
172
173         template = "RX: %d; TX: %d; dropped: %d (tolerated: %d)"
174         logger.debug(template, self.rx_total, self.tx_total, self.drop_total, self.can_be_lost)
175         logger.debug("Mpps configured: %f; Mpps effective %f", self.pps / 1e6, self.mpps)
176
177
178 class PacketDump(object):
179
180     @staticmethod
181     def assert_func(func, value1, value2, template=None):
182         assert func(value1, value2), template.format(value1, value2)
183
184     def __init__(self, port_id, data_len, payload):
185         template = "Packet dump has specified length {}, but payload is {} bytes long"
186         self.assert_func(operator.eq, data_len, len(payload), template)
187         self._port_id = port_id
188         self._data_len = data_len
189         self._payload = payload
190
191     @property
192     def port_id(self):
193         """Get the port id of the packet dump"""
194         return self._port_id
195
196     @property
197     def data_len(self):
198         """Get the length of the data received"""
199         return self._data_len
200
201     def __str__(self):
202         return '<PacketDump port: {} payload: {}>'.format(self._port_id, self._payload)
203
204     def payload(self, start=None, end=None):
205         """Get part of the payload as a list of ordinals.
206
207         Returns a list of byte values, matching the contents of the packet dump.
208         Optional start and end parameters can be specified to retrieve only a
209         part of the packet contents.
210
211         The number of elements in the list is equal to end - start + 1, so end
212         is the offset of the last character.
213
214         Args:
215             start (pos. int): the starting offset in the payload. If it is not
216                 specified or None, offset 0 is assumed.
217             end (pos. int): the ending offset of the payload. If it is not
218                 specified or None, the contents until the end of the packet are
219                 returned.
220
221         Returns:
222             [int, int, ...]. Each int represents the ordinal value of a byte in
223             the packet payload.
224         """
225         if start is None:
226             start = 0
227
228         if end is None:
229             end = self.data_len - 1
230
231         # Bounds checking on offsets
232         template = "Start offset must be non-negative"
233         self.assert_func(operator.ge, start, 0, template)
234
235         template = "End offset must be less than {1}"
236         self.assert_func(operator.lt, end, self.data_len, template)
237
238         # Adjust for splice operation: end offset must be 1 more than the offset
239         # of the last desired character.
240         end += 1
241
242         return self._payload[start:end]
243
244
245 class ProxSocketHelper(object):
246
247     def __init__(self, sock=None):
248         """ creates new prox instance """
249         super(ProxSocketHelper, self).__init__()
250
251         if sock is None:
252             sock = socket.socket()
253
254         self._sock = sock
255         self._pkt_dumps = []
256
257     def connect(self, ip, port):
258         """Connect to the prox instance on the remote system"""
259         self._sock.connect((ip, port))
260
261     def get_socket(self):
262         """ get the socket connected to the remote instance """
263         return self._sock
264
265     def _parse_socket_data(self, decoded_data, pkt_dump_only):
266         def get_newline_index():
267             return decoded_data.find('\n', index)
268
269         ret_str = ''
270         index = 0
271         for newline_index in iter(get_newline_index, -1):
272             ret_str = decoded_data[index:newline_index]
273
274             try:
275                 mode, port_id, data_len = ret_str.split(',', 2)
276             except ValueError:
277                 mode, port_id, data_len = None, None, None
278
279             if mode != 'pktdump':
280                 # Regular 1-line message. Stop reading from the socket.
281                 LOG.debug("Regular response read")
282                 return ret_str
283
284             LOG.debug("Packet dump header read: [%s]", ret_str)
285
286             # The line is a packet dump header. Parse it, read the
287             # packet payload, store the dump for later retrieval.
288             # Skip over the packet dump and continue processing: a
289             # 1-line response may follow the packet dump.
290
291             data_len = int(data_len)
292             data_start = newline_index + 1  # + 1 to skip over \n
293             data_end = data_start + data_len
294             sub_data = decoded_data[data_start:data_end]
295             pkt_payload = array.array('B', (ord(v) for v in sub_data))
296             pkt_dump = PacketDump(int(port_id), data_len, pkt_payload)
297             self._pkt_dumps.append(pkt_dump)
298
299             if pkt_dump_only:
300                 # Return boolean instead of string to signal
301                 # successful reception of the packet dump.
302                 LOG.debug("Packet dump stored, returning")
303                 return True
304
305             index = data_end + 1
306
307         return ret_str
308
309     def get_data(self, pkt_dump_only=False, timeout=1):
310         """ read data from the socket """
311         # This method behaves slightly differently depending on whether it is
312         # called to read the response to a command (pkt_dump_only = 0) or if
313         # it is called specifically to read a packet dump (pkt_dump_only = 1).
314         #
315         # Packet dumps look like:
316         #   pktdump,<port_id>,<data_len>\n
317         #   <packet contents as byte array>\n
318         # This means the total packet dump message consists of 2 lines instead
319         # of 1 line.
320         #
321         # - Response for a command (pkt_dump_only = 0):
322         #   1) Read response from the socket until \n (end of message)
323         #   2a) If the response is a packet dump header (starts with "pktdump,"):
324         #     - Read the packet payload and store the packet dump for later
325         #       retrieval.
326         #     - Reset the state and restart from 1). Eventually state 2b) will
327         #       be reached and the function will return.
328         #   2b) If the response is not a packet dump:
329         #     - Return the received message as a string
330         #
331         # - Explicit request to read a packet dump (pkt_dump_only = 1):
332         #   - Read the dump header and payload
333         #   - Store the packet dump for later retrieval
334         #   - Return True to signify a packet dump was successfully read
335
336         def is_ready():
337             # recv() is blocking, so avoid calling it when no data is waiting.
338             ready = select.select([self._sock], [], [], timeout)
339             return bool(ready[0])
340
341         status = False
342         ret_str = ""
343         for status in iter(is_ready, False):
344             LOG.debug("Reading from socket")
345             decoded_data = self._sock.recv(256).decode('utf-8')
346             ret_str = self._parse_socket_data(decoded_data, pkt_dump_only)
347
348         LOG.debug("Received data from socket: [%s]", ret_str)
349         return ret_str if status else ''
350
351     def put_command(self, to_send):
352         """ send data to the remote instance """
353         LOG.debug("Sending data to socket: [%s]", to_send.rstrip('\n'))
354         self._sock.sendall(to_send.encode('utf-8'))
355
356     def get_packet_dump(self):
357         """ get the next packet dump """
358         if self._pkt_dumps:
359             return self._pkt_dumps.pop(0)
360         return None
361
362     def stop_all_reset(self):
363         """ stop the remote instance and reset stats """
364         LOG.debug("Stop all and reset stats")
365         self.stop_all()
366         self.reset_stats()
367
368     def stop_all(self):
369         """ stop all cores on the remote instance """
370         LOG.debug("Stop all")
371         self.put_command("stop all\n")
372         time.sleep(3)
373
374     def stop(self, cores, task=''):
375         """ stop specific cores on the remote instance """
376         LOG.debug("Stopping cores %s", cores)
377         self.put_command("stop {} {}\n".format(join_non_strings(',', cores), task))
378         time.sleep(3)
379
380     def start_all(self):
381         """ start all cores on the remote instance """
382         LOG.debug("Start all")
383         self.put_command("start all\n")
384
385     def start(self, cores):
386         """ start specific cores on the remote instance """
387         LOG.debug("Starting cores %s", cores)
388         self.put_command("start {}\n".format(join_non_strings(',', cores)))
389         time.sleep(3)
390
391     def reset_stats(self):
392         """ reset the statistics on the remote instance """
393         LOG.debug("Reset stats")
394         self.put_command("reset stats\n")
395         time.sleep(1)
396
397     def _run_template_over_cores(self, template, cores, *args):
398         for core in cores:
399             self.put_command(template.format(core, *args))
400
401     def set_pkt_size(self, cores, pkt_size):
402         """ set the packet size to generate on the remote instance """
403         LOG.debug("Set packet size for core(s) %s to %d", cores, pkt_size)
404         pkt_size -= 4
405         self._run_template_over_cores("pkt_size {} 0 {}\n", cores, pkt_size)
406         time.sleep(1)
407
408     def set_value(self, cores, offset, value, length):
409         """ set value on the remote instance """
410         msg = "Set value for core(s) %s to '%s' (length %d), offset %d"
411         LOG.debug(msg, cores, value, length, offset)
412         template = "set value {} 0 {} {} {}\n"
413         self._run_template_over_cores(template, cores, offset, value, length)
414
415     def reset_values(self, cores):
416         """ reset values on the remote instance """
417         LOG.debug("Set value for core(s) %s", cores)
418         self._run_template_over_cores("reset values {} 0\n", cores)
419
420     def set_speed(self, cores, speed):
421         """ set speed on the remote instance """
422         LOG.debug("Set speed for core(s) %s to %g", cores, speed)
423         self._run_template_over_cores("speed {} 0 {}\n", cores, speed)
424
425     def slope_speed(self, cores_speed, duration, n_steps=0):
426         """will start to increase speed from 0 to N where N is taken from
427         a['speed'] for each a in cores_speed"""
428         # by default, each step will take 0.5 sec
429         if n_steps == 0:
430             n_steps = duration * 2
431
432         private_core_data = []
433         step_duration = float(duration) / n_steps
434         for core_data in cores_speed:
435             target = float(core_data['speed'])
436             private_core_data.append({
437                 'cores': core_data['cores'],
438                 'zero': 0,
439                 'delta': target / n_steps,
440                 'current': 0,
441                 'speed': target,
442             })
443
444         deltas_keys_iter = repeat(('current', 'delta'), n_steps - 1)
445         for key1, key2 in chain(deltas_keys_iter, [('zero', 'speed')]):
446             time.sleep(step_duration)
447             for core_data in private_core_data:
448                 core_data['current'] = core_data[key1] + core_data[key2]
449                 self.set_speed(core_data['cores'], core_data['current'])
450
451     def set_pps(self, cores, pps, pkt_size):
452         """ set packets per second for specific cores on the remote instance """
453         msg = "Set packets per sec for core(s) %s to %g%% of line rate (packet size: %d)"
454         LOG.debug(msg, cores, pps, pkt_size)
455
456         # speed in percent of line-rate
457         speed = float(pps) * (pkt_size + 20) / TEN_GIGABIT / BITS_PER_BYTE
458         self._run_template_over_cores("speed {} 0 {}\n", cores, speed)
459
460     def lat_stats(self, cores, task=0):
461         """Get the latency statistics from the remote system"""
462         # 1-based index, if max core is 4, then 0, 1, 2, 3, 4  len = 5
463         lat_min = {}
464         lat_max = {}
465         lat_avg = {}
466         for core in cores:
467             self.put_command("lat stats {} {} \n".format(core, task))
468             ret = self.get_data()
469
470             try:
471                 lat_min[core], lat_max[core], lat_avg[core] = \
472                     tuple(int(n) for n in ret.split(",")[:3])
473
474             except (AttributeError, ValueError, TypeError):
475                 pass
476
477         return lat_min, lat_max, lat_avg
478
479     def get_all_tot_stats(self):
480         self.put_command("tot stats\n")
481         all_stats = TotStatsTuple(int(v) for v in self.get_data().split(","))
482         return all_stats
483
484     def hz(self):
485         return self.get_all_tot_stats().hz
486
487     # Deprecated
488     # TODO: remove
489     def rx_stats(self, cores, task=0):
490         return self.core_stats(cores, task)
491
492     def core_stats(self, cores, task=0):
493         """Get the receive statistics from the remote system"""
494         rx = tx = drop = tsc = 0
495         for core in cores:
496             self.put_command("core stats {} {}\n".format(core, task))
497             ret = self.get_data().split(",")
498             rx += int(ret[0])
499             tx += int(ret[1])
500             drop += int(ret[2])
501             tsc = int(ret[3])
502         return rx, tx, drop, tsc
503
504     def port_stats(self, ports):
505         """get counter values from a specific port"""
506         tot_result = list(repeat(0, 12))
507         for port in ports:
508             self.put_command("port_stats {}\n".format(port))
509             for index, n in enumerate(self.get_data().split(',')):
510                 tot_result[index] += int(n)
511         return tot_result
512
513     @contextmanager
514     def measure_tot_stats(self):
515         start = self.get_all_tot_stats()
516         container = {'start_tot': start}
517         try:
518             yield container
519         finally:
520             container['end_tot'] = end = self.get_all_tot_stats()
521
522         container['delta'] = TotStatsTuple(end - start for start, end in zip(start, end))
523
524     def tot_stats(self):
525         """Get the total statistics from the remote system"""
526         stats = self.get_all_tot_stats()
527         return stats[:3]
528
529     def tot_ierrors(self):
530         """Get the total ierrors from the remote system"""
531         self.put_command("tot ierrors tot\n")
532         recv = self.get_data().split(',')
533         tot_ierrors = int(recv[0])
534         tsc = int(recv[0])
535         return tot_ierrors, tsc
536
537     def set_count(self, count, cores):
538         """Set the number of packets to send on the specified core"""
539         self._run_template_over_cores("count {} 0 {}\n", cores, count)
540
541     def dump_rx(self, core_id, task_id=0, count=1):
542         """Activate dump on rx on the specified core"""
543         LOG.debug("Activating dump on RX for core %d, task %d, count %d", core_id, task_id, count)
544         self.put_command("dump_rx {} {} {}\n".format(core_id, task_id, count))
545         time.sleep(1.5)     # Give PROX time to set up packet dumping
546
547     def quit(self):
548         self.stop_all()
549         self._quit()
550         self.force_quit()
551
552     def _quit(self):
553         """ stop all cores on the remote instance """
554         LOG.debug("Quit prox")
555         self.put_command("quit\n")
556         time.sleep(3)
557
558     def force_quit(self):
559         """ stop all cores on the remote instance """
560         LOG.debug("Force Quit prox")
561         self.put_command("quit_force\n")
562         time.sleep(3)
563
564
565 class ProxDpdkVnfSetupEnvHelper(DpdkVnfSetupEnvHelper):
566
567     def __init__(self, vnfd_helper, ssh_helper, scenario_helper):
568         super(ProxDpdkVnfSetupEnvHelper, self).__init__(vnfd_helper, ssh_helper, scenario_helper)
569         self.dpdk_root = "/root/dpdk-17.02"
570
571     def setup_vnf_environment(self):
572         super(ProxDpdkVnfSetupEnvHelper, self).setup_vnf_environment()
573
574         # debug dump after binding
575         self.ssh_helper.execute("sudo {} -s".format(self.dpdk_nic_bind))
576
577     def rebind_drivers(self, force=True):
578         if force:
579             force = '--force '
580         else:
581             force = ''
582         cmd_template = "{} {}-b {} {}"
583         if not self.used_drivers:
584             self._find_used_drivers()
585         for vpci, (_, driver) in self.used_drivers.items():
586             self.ssh_helper.execute(cmd_template.format(self.dpdk_nic_bind, force, driver, vpci))
587
588     def _setup_dpdk(self):
589         self._setup_hugepages()
590
591         self.ssh_helper.execute("pkill prox")
592         self.ssh_helper.execute("sudo modprobe uio")
593
594         # for baremetal
595         self.ssh_helper.execute("sudo modprobe msr")
596
597         # why remove?, just keep it loaded
598         # self.connection.execute("sudo rmmod igb_uio")
599
600         igb_uio_path = os.path.join(self.dpdk_root, "x86_64-native-linuxapp-gcc/kmod/igb_uio.ko")
601         self.ssh_helper.execute("sudo insmod {}".format(igb_uio_path))
602
603         # quick hack to allow non-root copy
604         self.ssh_helper.execute("sudo chmod 0777 {}".format(self.ssh_helper.bin_path))
605
606
607 class ProxResourceHelper(ClientResourceHelper):
608
609     PROX_CORE_GEN_MODE = "gen"
610     PROX_CORE_LAT_MODE = "lat"
611
612     PROX_MODE = ""
613
614     LUA_PARAMETER_NAME = ""
615     LUA_PARAMETER_PEER = {
616         "gen": "sut",
617         "sut": "gen",
618     }
619
620     WAIT_TIME = 3
621
622     @staticmethod
623     def _replace_quoted_with_value(quoted, value, count=1):
624         new_string = re.sub('"[^"]*"', '"{}"'.format(value), quoted, count)
625         return new_string
626
627     @staticmethod
628     def _get_tx_port(section, sections):
629         iface_port = [-1]
630         for item in sections[section]:
631             if item[0] == "tx port":
632                 iface_port = re.findall(r'\d+', item[1])
633                 # do we want the last one?
634                 #   if yes, then can we reverse?
635         return int(iface_port[0])
636
637     @staticmethod
638     def line_rate_to_pps(pkt_size, n_ports):
639         # FIXME Don't hardcode 10Gb/s
640         return n_ports * TEN_GIGABIT / BITS_PER_BYTE / (pkt_size + 20)
641
642     @staticmethod
643     def find_pci(pci, bound_pci):
644         # we have to substring match PCI bus address from the end
645         return any(b.endswith(pci) for b in bound_pci)
646
647     @staticmethod
648     def write_prox_config(prox_config):
649         """
650         Write an .ini-format config file for PROX
651         PROX does not allow a space before/after the =, so we need
652         a custom method
653         """
654         out = []
655         for section_name, section_value in prox_config.items():
656             out.append("[{}]".format(section_name))
657             for key, value in section_value:
658                 if key == "__name__":
659                     continue
660                 if value is not None:
661                     key = "=".join((key, str(value).replace('\n', '\n\t')))
662                 out.append(key)
663         return os.linesep.join(out)
664
665     def __init__(self, setup_helper):
666         super(ProxResourceHelper, self).__init__(setup_helper)
667         self.mgmt_interface = self.vnfd_helper.mgmt_interface
668         self._user = self.mgmt_interface["user"]
669         self._ip = self.mgmt_interface["ip"]
670
671         self.done = False
672         self._cpu_topology = None
673         self._vpci_to_if_name_map = None
674         self.additional_file = False
675         self.remote_prox_file_name = None
676         self.prox_config_dict = None
677         self.lower = None
678         self.upper = None
679         self._test_cores = None
680         self._latency_cores = None
681
682     @property
683     def sut(self):
684         if not self.client:
685             self.client = ProxSocketHelper()
686         return self.client
687
688     @property
689     def cpu_topology(self):
690         if not self._cpu_topology:
691             stdout = self.ssh_helper.execute("cat /proc/cpuinfo")[1]
692             self._cpu_topology = SocketTopology.parse_cpuinfo(stdout)
693         return self._cpu_topology
694
695     @property
696     def vpci_to_if_name_map(self):
697         if self._vpci_to_if_name_map is None:
698             self._vpci_to_if_name_map = {
699                 interface["virtual-interface"]["vpci"]: interface["name"]
700                 for interface in self.vnfd_helper.interfaces
701             }
702         return self._vpci_to_if_name_map
703
704     @property
705     def test_cores(self):
706         if not self._test_cores:
707             self._test_cores = self.get_cores(self.PROX_CORE_GEN_MODE)
708         return self._test_cores
709
710     @property
711     def latency_cores(self):
712         if not self._latency_cores:
713             self._latency_cores = self.get_cores(self.PROX_CORE_LAT_MODE)
714         return self._latency_cores
715
716     def run_traffic(self, traffic_profile):
717         self.lower = 0.0
718         self.upper = 100.0
719
720         traffic_profile.init(self._queue)
721         # this frees up the run_traffic loop
722         self.client_started.value = 1
723
724         while not self._terminated.value:
725             # move it all to traffic_profile
726             self._run_traffic_once(traffic_profile)
727
728     def _run_traffic_once(self, traffic_profile):
729         traffic_profile.execute(self)
730         if traffic_profile.done:
731             self._queue.put({'done': True})
732             LOG.debug("tg_prox done")
733             self._terminated.value = 1
734
735     def start_collect(self):
736         pass
737
738     def terminate(self):
739         super(ProxResourceHelper, self).terminate()
740         self.ssh_helper.execute('sudo pkill prox')
741         self.setup_helper.rebind_drivers()
742
743     def get_process_args(self):
744         task_path = self.scenario_helper.task_path
745         options = self.scenario_helper.options
746
747         prox_args = options['prox_args']
748         prox_path = options['prox_path']
749         config_path = options['prox_config']
750
751         config_file = os.path.basename(config_path)
752         config_path = find_relative_file(config_path, task_path)
753
754         try:
755             prox_file_config_path = options['prox_files']
756             prox_file_file = os.path.basename(prox_file_config_path)
757             prox_file_config_path = find_relative_file(prox_file_config_path, task_path)
758             self.remote_prox_file_name = self.copy_to_target(prox_file_config_path, prox_file_file)
759             self.additional_file = True
760         except:
761             self.additional_file = False
762
763         self.prox_config_dict = self.generate_prox_config_file(config_path)
764
765         remote_path = self.upload_prox_config(config_file, self.prox_config_dict)
766         return prox_args, prox_path, remote_path
767
768     def up_post(self):
769         return self.sut  # force connection
770
771     def execute(self, cmd, *args, **kwargs):
772         func = getattr(self.sut, cmd, None)
773         if func:
774             return func(*args, **kwargs)
775
776     def copy_to_target(self, config_file_path, prox_file):
777         remote_path = os.path.join("/tmp", prox_file)
778         self.ssh_helper.put(config_file_path, remote_path)
779         return remote_path
780
781     def upload_prox_config(self, config_file, prox_config_dict):
782         # prox can't handle spaces around ' = ' so use custom method
783         out = StringIO(self.write_prox_config(prox_config_dict))
784         out.seek(0)
785         remote_path = os.path.join("/tmp", config_file)
786         self.ssh_helper.put_file_obj(out, remote_path)
787
788         return remote_path
789
790     @contextmanager
791     def traffic_context(self, pkt_size, value):
792         self.sut.stop_all()
793         self.sut.reset_stats()
794         self.sut.set_pkt_size(self.test_cores, pkt_size)
795         self.sut.set_speed(self.test_cores, value)
796         self.sut.start_all()
797         try:
798             yield
799         finally:
800             self.sut.stop_all()
801
802     def run_test(self, pkt_size, duration, value, tolerated_loss=0.0):
803         # do this assert in init?  unless we expect interface count to
804         # change from one run to another run...
805         interfaces = self.vnfd_helper.interfaces
806         interface_count = len(interfaces)
807         assert interface_count in {2, 4}, \
808             "Invalid no of ports, 2 or 4 ports only supported at this time"
809
810         with self.traffic_context(pkt_size, value):
811             # Getting statistics to calculate PPS at right speed....
812             tsc_hz = float(self.sut.hz())
813             time.sleep(2)
814             with self.sut.measure_tot_stats() as data:
815                 time.sleep(duration)
816
817             # Get stats before stopping the cores. Stopping cores takes some time
818             # and might skew results otherwise.
819             latency = self.get_latency()
820
821         deltas = data['delta']
822         rx_total, tx_total = self.sut.port_stats(range(interface_count))[6:8]
823         pps = value / 100.0 * self.line_rate_to_pps(pkt_size, interface_count)
824
825         result = ProxTestDataTuple(tolerated_loss, tsc_hz, deltas.rx, deltas.tx,
826                                    deltas.tsc, latency, rx_total, tx_total, pps)
827
828         result.log_data()
829         return result
830
831     def get_cores(self, mode):
832         cores = []
833         for section_name, section_data in self.prox_config_dict.items():
834             if section_name.startswith("core"):
835                 for index, item in enumerate(section_data):
836                     if item[0] == "mode" and item[1] == mode:
837                         core = CoreSocketTuple(section_name).find_in_topology(self.cpu_topology)
838                         cores.append(core)
839         return cores
840
841     def upload_prox_lua(self, config_dir, prox_config_dict):
842         # we could have multiple lua directives
843         lau_dict = prox_config_dict.get('lua', {})
844         find_iter = (re.findall('\("([^"]+)"\)', k) for k in lau_dict)
845         lua_file = next((found[0] for found in find_iter if found), None)
846         if not lua_file:
847             return ""
848
849         out = self.generate_prox_lua_file()
850         remote_path = os.path.join(config_dir, lua_file)
851         return self.put_string_to_file(out, remote_path)
852
853     def put_string_to_file(self, s, remote_path):
854         self.ssh_helper.run("cat > '{}'".format(remote_path), stdin=s)
855         return remote_path
856
857     def generate_prox_lua_file(self):
858         p = OrderedDict()
859         ext_intf = self.vnfd_helper.interfaces
860         lua_param = self.LUA_PARAMETER_NAME
861         for intf in ext_intf:
862             peer = self.LUA_PARAMETER_PEER[lua_param]
863             port_num = intf["virtual-interface"]["dpdk_port_num"]
864             local_ip = intf["local_ip"]
865             dst_ip = intf["dst_ip"]
866             local_ip_hex = ip_to_hex(local_ip, separator=' ')
867             dst_ip_hex = ip_to_hex(dst_ip, separator=' ')
868             p.update([
869                 ("{}_hex_ip_port_{}".format(lua_param, port_num), local_ip_hex),
870                 ("{}_ip_port_{}".format(lua_param, port_num), local_ip),
871                 ("{}_hex_ip_port_{}".format(peer, port_num), dst_ip_hex),
872                 ("{}_ip_port_{}".format(peer, port_num), dst_ip),
873             ])
874         lua = os.linesep.join(('{}:"{}"'.format(k, v) for k, v in p.items()))
875         return lua
876
877     def generate_prox_config_file(self, config_path):
878         sections = {}
879         prox_config = ConfigParser(config_path, sections)
880         prox_config.parse()
881
882         # Ensure MAC is set "hardware"
883         ext_intf = self.vnfd_helper.interfaces
884         for intf in ext_intf:
885             port_num = intf["virtual-interface"]["dpdk_port_num"]
886             section_name = "port {}".format(port_num)
887             for index, section_data in enumerate(sections.get(section_name, [])):
888                 if section_data[0] == "mac":
889                     sections[section_name][index][1] = "hardware"
890
891         # search for dest mac
892         for section_name, section_data in sections.items():
893             for index, section_attr in enumerate(section_data):
894                 if section_attr[0] != "dst mac":
895                     continue
896
897                 tx_port_no = self._get_tx_port(section_name, sections)
898                 if tx_port_no == -1:
899                     raise Exception("Failed ..destination MAC undefined")
900
901                 dst_mac = ext_intf[tx_port_no]["virtual-interface"]["dst_mac"]
902                 section_attr[1] = dst_mac
903
904         # if addition file specified in prox config
905         if self.additional_file:
906             remote_name = self.remote_prox_file_name
907             for section_data in sections.values():
908                 for index, section_attr in enumerate(section_data):
909                     try:
910                         if section_attr[1].startswith("dofile"):
911                             new_string = self._replace_quoted_with_value(section_attr[1],
912                                                                          remote_name)
913                             section_attr[1] = new_string
914                     except:
915                         pass
916
917         return sections
918
919     def get_latency(self):
920         """
921         :return: return lat_min, lat_max, lat_avg
922         :rtype: list
923         """
924         if self._latency_cores:
925             return self.sut.lat_stats(self._latency_cores)
926         return []
927
928     def _get_logical_if_name(self, vpci):
929         return self._vpci_to_if_name_map[vpci]
930
931     def _connect(self, client=None):
932         """Run and connect to prox on the remote system """
933         # De-allocating a large amount of hugepages takes some time. If a new
934         # PROX instance is started immediately after killing the previous one,
935         # it might not be able to allocate hugepages, because they are still
936         # being freed. Hence the -w switch.
937         # self.connection.execute("sudo killall -w Prox 2>/dev/null")
938         # prox_cmd = "export TERM=xterm; cd "+ self.bin_path +"; ./Prox -t
939         # -f ./handle_none-4.cfg"
940         # prox_cmd = "export TERM=xterm; export RTE_SDK=" + self._dpdk_dir +
941         #  "; " \
942         #    + "export RTE_TARGET=" + self._dpdk_target + ";" \
943         #    + " cd " + self._prox_dir + "; make HW_DIRECT_STATS=y -j50;
944         # sudo " \
945         #    + "./build/Prox " + prox_args
946         # log.debug("Starting PROX with command [%s]", prox_cmd)
947         # thread.start_new_thread(self.ssh_check_quit, (self, self._user,
948         # self._ip, prox_cmd))
949         if client is None:
950             client = ProxSocketHelper()
951
952         # try connecting to Prox for 60s
953         for _ in range(RETRY_SECONDS):
954             time.sleep(RETRY_INTERVAL)
955             try:
956                 client.connect(self._ip, PROX_PORT)
957             except (socket.gaierror, socket.error):
958                 continue
959             else:
960                 return client
961
962         msg = "Failed to connect to prox, please check if system {} accepts connections on port {}"
963         raise Exception(msg.format(self._ip, PROX_PORT))