Merge "Addition of Prox NSB changes in yardstick"
[yardstick.git] / yardstick / network_services / vnf_generic / vnf / prox_helpers.py
1 # Copyright (c) 2017 Intel Corporation
2 #
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at
6 #
7 #      http://www.apache.org/licenses/LICENSE-2.0
8 #
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
14 from __future__ import absolute_import
15
16 import array
17 import operator
18 import logging
19 import io
20 import os
21 import re
22 import select
23 import socket
24
25 from collections import OrderedDict, namedtuple
26 import time
27 from contextlib import contextmanager
28 from itertools import repeat, chain
29
30 import six
31 from six.moves import zip, StringIO
32 from six.moves import cStringIO
33
34 from yardstick.benchmark.scenarios.networking.vnf_generic import find_relative_file
35 from yardstick.common.utils import SocketTopology, ip_to_hex, join_non_strings, try_int
36 from yardstick.network_services.vnf_generic.vnf.iniparser import ConfigParser
37 from yardstick.network_services.vnf_generic.vnf.sample_vnf import ClientResourceHelper
38 from yardstick.network_services.vnf_generic.vnf.sample_vnf import DpdkVnfSetupEnvHelper
39
40
41 PROX_PORT = 8474
42
43 SECTION_NAME = 0
44 SECTION_CONTENTS = 1
45
46 LOG = logging.getLogger(__name__)
47 LOG.setLevel(logging.DEBUG)
48
49 TEN_GIGABIT = 1e10
50 BITS_PER_BYTE = 8
51 RETRY_SECONDS = 60
52 RETRY_INTERVAL = 1
53
54 CONFIGURATION_OPTIONS = (
55     # dict key           section     key               default value
56     ('pktSizes', 'general', 'pkt_sizes', '64,128,256,512,1024,1280,1518'),
57     ('testDuration', 'general', 'test_duration', 5.0),
58     ('testPrecision', 'general', 'test_precision', 1.0),
59     ('tests', 'general', 'tests', None),
60     ('toleratedLoss', 'general', 'tolerated_loss', 0.0),
61
62     ('logFile', 'logging', 'file', 'dats.log'),
63     ('logDateFormat', 'logging', 'datefmt', None),
64     ('logLevel', 'logging', 'level', 'INFO'),
65     ('logOverwrite', 'logging', 'overwrite', 1),
66
67     ('testerIp', 'tester', 'ip', None),
68     ('testerUser', 'tester', 'user', 'root'),
69     ('testerDpdkDir', 'tester', 'rte_sdk', '/root/dpdk'),
70     ('testerDpdkTgt', 'tester', 'rte_target', 'x86_64-native-linuxapp-gcc'),
71     ('testerProxDir', 'tester', 'prox_dir', '/root/prox'),
72     ('testerSocketId', 'tester', 'socket_id', 0),
73
74     ('sutIp', 'sut', 'ip', None),
75     ('sutUser', 'sut', 'user', 'root'),
76     ('sutDpdkDir', 'sut', 'rte_sdk', '/root/dpdk'),
77     ('sutDpdkTgt', 'sut', 'rte_target', 'x86_64-native-linuxapp-gcc'),
78     ('sutProxDir', 'sut', 'prox_dir', '/root/prox'),
79     ('sutSocketId', 'sut', 'socket_id', 0),
80 )
81
82
83 class CoreSocketTuple(namedtuple('CoreTuple', 'core_id, socket_id, hyperthread')):
84
85     CORE_RE = re.compile(r"core\s+(\d+)(?:s(\d+))?(h)?$")
86
87     def __new__(cls, *args):
88         try:
89             matches = cls.CORE_RE.search(str(args[0]))
90             if matches:
91                 args = matches.groups()
92
93             return super(CoreSocketTuple, cls).__new__(cls, int(args[0]), try_int(args[1], 0),
94                                                        'h' if args[2] else '')
95
96         except (AttributeError, TypeError, IndexError, ValueError):
97             raise ValueError('Invalid core spec {}'.format(args))
98
99     def is_hyperthread(self):
100         return self.hyperthread == 'h'
101
102     @property
103     def index(self):
104         return int(self.is_hyperthread())
105
106     def find_in_topology(self, cpu_topology):
107         try:
108             socket_core_match = cpu_topology[self.socket_id][self.core_id]
109             sorted_match = sorted(socket_core_match.values())
110             return sorted_match[self.index][0]
111         except (KeyError, IndexError):
112             template = "Core {}{} on socket {} does not exist"
113             raise ValueError(template.format(self.core_id, self.hyperthread, self.socket_id))
114
115
116 class TotStatsTuple(namedtuple('TotStats', 'rx,tx,tsc,hz')):
117
118     def __new__(cls, *args):
119         try:
120             assert args[0] is not str(args[0])
121             args = tuple(args[0])
122         except (AssertionError, IndexError, TypeError):
123             pass
124
125         return super(TotStatsTuple, cls).__new__(cls, *args)
126
127
128 class ProxTestDataTuple(namedtuple('ProxTestDataTuple', 'tolerated,tsc_hz,delta_rx,'
129                                                         'delta_tx,delta_tsc,'
130                                                         'latency,rx_total,tx_total,pps')):
131
132     @property
133     def pkt_loss(self):
134         try:
135             return 1e2 * self.drop_total / float(self.tx_total)
136         except ZeroDivisionError:
137             return 100.0
138
139     @property
140     def mpps(self):
141         # calculate the effective throughput in Mpps
142         return float(self.delta_tx) * self.tsc_hz / self.delta_tsc / 1e6
143
144     @property
145     def can_be_lost(self):
146         return int(self.tx_total * self.tolerated / 1e2)
147
148     @property
149     def drop_total(self):
150         return self.tx_total - self.rx_total
151
152     @property
153     def success(self):
154         return self.drop_total <= self.can_be_lost
155
156     def get_samples(self, pkt_size, pkt_loss=None, port_samples=None):
157         if pkt_loss is None:
158             pkt_loss = self.pkt_loss
159
160         if port_samples is None:
161             port_samples = {}
162
163         latency_keys = [
164             "LatencyMin",
165             "LatencyMax",
166             "LatencyAvg",
167         ]
168
169         samples = {
170             "Throughput": self.mpps,
171             "DropPackets": pkt_loss,
172             "CurrentDropPackets": pkt_loss,
173             "TxThroughput": self.pps / 1e6,
174             "RxThroughput": self.mpps,
175             "PktSize": pkt_size,
176         }
177         if port_samples:
178             samples.update(port_samples)
179
180         samples.update((key, value) for key, value in zip(latency_keys, self.latency))
181         return samples
182
183     def log_data(self, logger=None):
184         if logger is None:
185             logger = LOG
186
187         template = "RX: %d; TX: %d; dropped: %d (tolerated: %d)"
188         logger.debug(template, self.rx_total, self.tx_total, self.drop_total, self.can_be_lost)
189         logger.debug("Mpps configured: %f; Mpps effective %f", self.pps / 1e6, self.mpps)
190
191
192 class PacketDump(object):
193
194     @staticmethod
195     def assert_func(func, value1, value2, template=None):
196         assert func(value1, value2), template.format(value1, value2)
197
198     def __init__(self, port_id, data_len, payload):
199         template = "Packet dump has specified length {}, but payload is {} bytes long"
200         self.assert_func(operator.eq, data_len, len(payload), template)
201         self._port_id = port_id
202         self._data_len = data_len
203         self._payload = payload
204
205     @property
206     def port_id(self):
207         """Get the port id of the packet dump"""
208         return self._port_id
209
210     @property
211     def data_len(self):
212         """Get the length of the data received"""
213         return self._data_len
214
215     def __str__(self):
216         return '<PacketDump port: {} payload: {}>'.format(self._port_id, self._payload)
217
218     def payload(self, start=None, end=None):
219         """Get part of the payload as a list of ordinals.
220
221         Returns a list of byte values, matching the contents of the packet dump.
222         Optional start and end parameters can be specified to retrieve only a
223         part of the packet contents.
224
225         The number of elements in the list is equal to end - start + 1, so end
226         is the offset of the last character.
227
228         Args:
229             start (pos. int): the starting offset in the payload. If it is not
230                 specified or None, offset 0 is assumed.
231             end (pos. int): the ending offset of the payload. If it is not
232                 specified or None, the contents until the end of the packet are
233                 returned.
234
235         Returns:
236             [int, int, ...]. Each int represents the ordinal value of a byte in
237             the packet payload.
238         """
239         if start is None:
240             start = 0
241
242         if end is None:
243             end = self.data_len - 1
244
245         # Bounds checking on offsets
246         template = "Start offset must be non-negative"
247         self.assert_func(operator.ge, start, 0, template)
248
249         template = "End offset must be less than {1}"
250         self.assert_func(operator.lt, end, self.data_len, template)
251
252         # Adjust for splice operation: end offset must be 1 more than the offset
253         # of the last desired character.
254         end += 1
255
256         return self._payload[start:end]
257
258
259 class ProxSocketHelper(object):
260
261     def __init__(self, sock=None):
262         """ creates new prox instance """
263         super(ProxSocketHelper, self).__init__()
264
265         if sock is None:
266             sock = socket.socket()
267
268         self._sock = sock
269         self._pkt_dumps = []
270
271     def connect(self, ip, port):
272         """Connect to the prox instance on the remote system"""
273         self._sock.connect((ip, port))
274
275     def get_socket(self):
276         """ get the socket connected to the remote instance """
277         return self._sock
278
279     def _parse_socket_data(self, decoded_data, pkt_dump_only):
280         def get_newline_index():
281             return decoded_data.find('\n', index)
282
283         ret_str = ''
284         index = 0
285         for newline_index in iter(get_newline_index, -1):
286             ret_str = decoded_data[index:newline_index]
287
288             try:
289                 mode, port_id, data_len = ret_str.split(',', 2)
290             except ValueError:
291                 mode, port_id, data_len = None, None, None
292
293             if mode != 'pktdump':
294                 # Regular 1-line message. Stop reading from the socket.
295                 LOG.debug("Regular response read")
296                 return ret_str
297
298             LOG.debug("Packet dump header read: [%s]", ret_str)
299
300             # The line is a packet dump header. Parse it, read the
301             # packet payload, store the dump for later retrieval.
302             # Skip over the packet dump and continue processing: a
303             # 1-line response may follow the packet dump.
304
305             data_len = int(data_len)
306             data_start = newline_index + 1  # + 1 to skip over \n
307             data_end = data_start + data_len
308             sub_data = decoded_data[data_start:data_end]
309             pkt_payload = array.array('B', (ord(v) for v in sub_data))
310             pkt_dump = PacketDump(int(port_id), data_len, pkt_payload)
311             self._pkt_dumps.append(pkt_dump)
312
313             if pkt_dump_only:
314                 # Return boolean instead of string to signal
315                 # successful reception of the packet dump.
316                 LOG.debug("Packet dump stored, returning")
317                 return True
318
319             index = data_end + 1
320
321         return ret_str
322
323     def get_data(self, pkt_dump_only=False, timeout=1):
324         """ read data from the socket """
325         # This method behaves slightly differently depending on whether it is
326         # called to read the response to a command (pkt_dump_only = 0) or if
327         # it is called specifically to read a packet dump (pkt_dump_only = 1).
328         #
329         # Packet dumps look like:
330         #   pktdump,<port_id>,<data_len>\n
331         #   <packet contents as byte array>\n
332         # This means the total packet dump message consists of 2 lines instead
333         # of 1 line.
334         #
335         # - Response for a command (pkt_dump_only = 0):
336         #   1) Read response from the socket until \n (end of message)
337         #   2a) If the response is a packet dump header (starts with "pktdump,"):
338         #     - Read the packet payload and store the packet dump for later
339         #       retrieval.
340         #     - Reset the state and restart from 1). Eventually state 2b) will
341         #       be reached and the function will return.
342         #   2b) If the response is not a packet dump:
343         #     - Return the received message as a string
344         #
345         # - Explicit request to read a packet dump (pkt_dump_only = 1):
346         #   - Read the dump header and payload
347         #   - Store the packet dump for later retrieval
348         #   - Return True to signify a packet dump was successfully read
349
350         def is_ready():
351             # recv() is blocking, so avoid calling it when no data is waiting.
352             ready = select.select([self._sock], [], [], timeout)
353             return bool(ready[0])
354
355         status = False
356         ret_str = ""
357         for status in iter(is_ready, False):
358             decoded_data = self._sock.recv(256).decode('utf-8')
359             ret_str = self._parse_socket_data(decoded_data, pkt_dump_only)
360
361         LOG.debug("Received data from socket: [%s]", ret_str)
362         return ret_str if status else ''
363
364     def put_command(self, to_send):
365         """ send data to the remote instance """
366         LOG.debug("Sending data to socket: [%s]", to_send.rstrip('\n'))
367         try:
368             self._sock.sendall(to_send.encode('utf-8'))
369         except:
370             pass
371
372     def get_packet_dump(self):
373         """ get the next packet dump """
374         if self._pkt_dumps:
375             return self._pkt_dumps.pop(0)
376         return None
377
378     def stop_all_reset(self):
379         """ stop the remote instance and reset stats """
380         LOG.debug("Stop all and reset stats")
381         self.stop_all()
382         self.reset_stats()
383
384     def stop_all(self):
385         """ stop all cores on the remote instance """
386         LOG.debug("Stop all")
387         self.put_command("stop all\n")
388         time.sleep(3)
389
390     def stop(self, cores, task=''):
391         """ stop specific cores on the remote instance """
392         LOG.debug("Stopping cores %s", cores)
393         self.put_command("stop {} {}\n".format(join_non_strings(',', cores), task))
394         time.sleep(3)
395
396     def start_all(self):
397         """ start all cores on the remote instance """
398         LOG.debug("Start all")
399         self.put_command("start all\n")
400
401     def start(self, cores):
402         """ start specific cores on the remote instance """
403         LOG.debug("Starting cores %s", cores)
404         self.put_command("start {}\n".format(join_non_strings(',', cores)))
405         time.sleep(3)
406
407     def reset_stats(self):
408         """ reset the statistics on the remote instance """
409         LOG.debug("Reset stats")
410         self.put_command("reset stats\n")
411         time.sleep(1)
412
413     def _run_template_over_cores(self, template, cores, *args):
414         for core in cores:
415             self.put_command(template.format(core, *args))
416
417     def set_pkt_size(self, cores, pkt_size):
418         """ set the packet size to generate on the remote instance """
419         LOG.debug("Set packet size for core(s) %s to %d", cores, pkt_size)
420         pkt_size -= 4
421         self._run_template_over_cores("pkt_size {} 0 {}\n", cores, pkt_size)
422         time.sleep(1)
423
424     def set_value(self, cores, offset, value, length):
425         """ set value on the remote instance """
426         msg = "Set value for core(s) %s to '%s' (length %d), offset %d"
427         LOG.debug(msg, cores, value, length, offset)
428         template = "set value {} 0 {} {} {}\n"
429         self._run_template_over_cores(template, cores, offset, value, length)
430
431     def reset_values(self, cores):
432         """ reset values on the remote instance """
433         LOG.debug("Set value for core(s) %s", cores)
434         self._run_template_over_cores("reset values {} 0\n", cores)
435
436     def set_speed(self, cores, speed):
437         """ set speed on the remote instance """
438         LOG.debug("Set speed for core(s) %s to %g", cores, speed)
439         self._run_template_over_cores("speed {} 0 {}\n", cores, speed)
440
441     def slope_speed(self, cores_speed, duration, n_steps=0):
442         """will start to increase speed from 0 to N where N is taken from
443         a['speed'] for each a in cores_speed"""
444         # by default, each step will take 0.5 sec
445         if n_steps == 0:
446             n_steps = duration * 2
447
448         private_core_data = []
449         step_duration = float(duration) / n_steps
450         for core_data in cores_speed:
451             target = float(core_data['speed'])
452             private_core_data.append({
453                 'cores': core_data['cores'],
454                 'zero': 0,
455                 'delta': target / n_steps,
456                 'current': 0,
457                 'speed': target,
458             })
459
460         deltas_keys_iter = repeat(('current', 'delta'), n_steps - 1)
461         for key1, key2 in chain(deltas_keys_iter, [('zero', 'speed')]):
462             time.sleep(step_duration)
463             for core_data in private_core_data:
464                 core_data['current'] = core_data[key1] + core_data[key2]
465                 self.set_speed(core_data['cores'], core_data['current'])
466
467     def set_pps(self, cores, pps, pkt_size):
468         """ set packets per second for specific cores on the remote instance """
469         msg = "Set packets per sec for core(s) %s to %g%% of line rate (packet size: %d)"
470         LOG.debug(msg, cores, pps, pkt_size)
471
472         # speed in percent of line-rate
473         speed = float(pps) * (pkt_size + 20) / TEN_GIGABIT / BITS_PER_BYTE
474         self._run_template_over_cores("speed {} 0 {}\n", cores, speed)
475
476     def lat_stats(self, cores, task=0):
477         """Get the latency statistics from the remote system"""
478         # 1-based index, if max core is 4, then 0, 1, 2, 3, 4  len = 5
479         lat_min = {}
480         lat_max = {}
481         lat_avg = {}
482         for core in cores:
483             self.put_command("lat stats {} {} \n".format(core, task))
484             ret = self.get_data()
485
486             try:
487                 lat_min[core], lat_max[core], lat_avg[core] = \
488                     tuple(int(n) for n in ret.split(",")[:3])
489
490             except (AttributeError, ValueError, TypeError):
491                 pass
492
493         return lat_min, lat_max, lat_avg
494
495     def get_all_tot_stats(self):
496         self.put_command("tot stats\n")
497         all_stats_str = self.get_data().split(",")
498         if len(all_stats_str) != 4:
499             all_stats = [0] * 4
500             return all_stats
501         all_stats = TotStatsTuple(int(v) for v in all_stats_str)
502         self.master_stats = all_stats
503         return all_stats
504
505     def hz(self):
506         return self.get_all_tot_stats()[3]
507
508     # Deprecated
509     # TODO: remove
510     def rx_stats(self, cores, task=0):
511         return self.core_stats(cores, task)
512
513     def core_stats(self, cores, task=0):
514         """Get the receive statistics from the remote system"""
515         rx = tx = drop = tsc = 0
516         for core in cores:
517             self.put_command("core stats {} {}\n".format(core, task))
518             ret = self.get_data().split(",")
519             rx += int(ret[0])
520             tx += int(ret[1])
521             drop += int(ret[2])
522             tsc = int(ret[3])
523         return rx, tx, drop, tsc
524
525     def port_stats(self, ports):
526         """get counter values from a specific port"""
527         tot_result = [0] * 12
528         for port in ports:
529             self.put_command("port_stats {}\n".format(port))
530             ret = [try_int(s, 0) for s in self.get_data().split(",")]
531             tot_result = [sum(x) for x in zip(tot_result, ret)]
532         return tot_result
533
534     @contextmanager
535     def measure_tot_stats(self):
536         start = self.get_all_tot_stats()
537         container = {'start_tot': start}
538         try:
539             yield container
540         finally:
541             container['end_tot'] = end = self.get_all_tot_stats()
542
543         container['delta'] = TotStatsTuple(end - start for start, end in zip(start, end))
544
545     def tot_stats(self):
546         """Get the total statistics from the remote system"""
547         stats = self.get_all_tot_stats()
548         return stats[:3]
549
550     def tot_ierrors(self):
551         """Get the total ierrors from the remote system"""
552         self.put_command("tot ierrors tot\n")
553         recv = self.get_data().split(',')
554         tot_ierrors = int(recv[0])
555         tsc = int(recv[0])
556         return tot_ierrors, tsc
557
558     def set_count(self, count, cores):
559         """Set the number of packets to send on the specified core"""
560         self._run_template_over_cores("count {} 0 {}\n", cores, count)
561
562     def dump_rx(self, core_id, task_id=0, count=1):
563         """Activate dump on rx on the specified core"""
564         LOG.debug("Activating dump on RX for core %d, task %d, count %d", core_id, task_id, count)
565         self.put_command("dump_rx {} {} {}\n".format(core_id, task_id, count))
566         time.sleep(1.5)     # Give PROX time to set up packet dumping
567
568     def quit(self):
569         self.stop_all()
570         self._quit()
571         self.force_quit()
572
573     def _quit(self):
574         """ stop all cores on the remote instance """
575         LOG.debug("Quit prox")
576         self.put_command("quit\n")
577         time.sleep(3)
578
579     def force_quit(self):
580         """ stop all cores on the remote instance """
581         LOG.debug("Force Quit prox")
582         self.put_command("quit_force\n")
583         time.sleep(3)
584
585
586 class ProxDpdkVnfSetupEnvHelper(DpdkVnfSetupEnvHelper):
587     # the actual app is lowercase
588     APP_NAME = 'prox'
589
590     LUA_PARAMETER_NAME = ""
591     LUA_PARAMETER_PEER = {
592         "gen": "sut",
593         "sut": "gen",
594     }
595
596     def __init__(self, vnfd_helper, ssh_helper, scenario_helper):
597         self.remote_path = None
598         super(ProxDpdkVnfSetupEnvHelper, self).__init__(vnfd_helper, ssh_helper, scenario_helper)
599         self.remote_prox_file_name = None
600         self.prox_config_dict = None
601         self.additional_files = {}
602
603     def _build_pipeline_kwargs(self):
604         tool_path = self.ssh_helper.provision_tool(tool_file=self.APP_NAME)
605         self.pipeline_kwargs = {
606             'tool_path': tool_path,
607             'tool_dir': os.path.dirname(tool_path),
608         }
609
610     def copy_to_target(self, config_file_path, prox_file):
611         remote_path = os.path.join("/tmp", prox_file)
612         self.ssh_helper.put(config_file_path, remote_path)
613         return remote_path
614
615     @staticmethod
616     def _get_tx_port(section, sections):
617         iface_port = [-1]
618         for item in sections[section]:
619             if item[0] == "tx port":
620                 iface_port = re.findall(r'\d+', item[1])
621                 # do we want the last one?
622                 #   if yes, then can we reverse?
623         return int(iface_port[0])
624
625     @staticmethod
626     def _replace_quoted_with_value(quoted, value, count=1):
627         new_string = re.sub('"[^"]*"', '"{}"'.format(value), quoted, count)
628         return new_string
629
630     def _insert_additional_file(self, value):
631         file_str = value.split('"')
632         base_name = os.path.basename(file_str[1])
633         file_str[1] = self.additional_files[base_name]
634         return '"'.join(file_str)
635
636     def generate_prox_config_file(self, config_path):
637         sections = []
638         prox_config = ConfigParser(config_path, sections)
639         prox_config.parse()
640
641         # Ensure MAC is set "hardware"
642         ext_intf = self.vnfd_helper.interfaces
643         # we are using enumeration to map logical port numbers to interfaces
644         for port_num, intf in enumerate(ext_intf):
645             port_section_name = "port {}".format(port_num)
646             for section_name, section in sections:
647                 if port_section_name != section_name:
648                     continue
649
650                 for index, section_data in enumerate(section):
651                     if section_data[0] == "mac":
652                         section_data[1] = "hardware"
653
654         # search for dst mac
655         for _, section in sections:
656             # for index, (item_key, item_val) in enumerate(section):
657             for index, section_data in enumerate(section):
658                 item_key, item_val = section_data
659                 if item_val.startswith("@@dst_mac"):
660                     tx_port_iter = re.finditer(r'\d+', item_val)
661                     tx_port_no = int(next(tx_port_iter).group(0))
662                     mac = ext_intf[tx_port_no]["virtual-interface"]["dst_mac"]
663                     section_data[1] = mac.replace(":", " ", 6)
664
665                 if item_key == "dst mac" and item_val.startswith("@@"):
666                     tx_port_iter = re.finditer(r'\d+', item_val)
667                     tx_port_no = int(next(tx_port_iter).group(0))
668                     mac = ext_intf[tx_port_no]["virtual-interface"]["dst_mac"]
669                     section_data[1] = mac
670
671         # if addition file specified in prox config
672         if not self.additional_files:
673             return sections
674
675         for section_name, section in sections:
676             for index, section_data in enumerate(section):
677                 try:
678                     if section_data[0].startswith("dofile"):
679                         section_data[0] = self._insert_additional_file(section_data[0])
680
681                     if section_data[1].startswith("dofile"):
682                         section_data[1] = self._insert_additional_file(section_data[1])
683                 except:
684                     pass
685
686         return sections
687
688     @staticmethod
689     def write_prox_config(prox_config):
690         """
691         Write an .ini-format config file for PROX
692         PROX does not allow a space before/after the =, so we need
693         a custom method
694         """
695         out = []
696         for i, (section_name, section) in enumerate(prox_config):
697             out.append("[{}]".format(section_name))
698             for index, item in enumerate(section):
699                 key, value = item
700                 if key == "__name__":
701                     continue
702                 if value is not None and value != '@':
703                     key = "=".join((key, str(value).replace('\n', '\n\t')))
704                     out.append(key)
705                 else:
706                     key = str(key).replace('\n', '\n\t')
707                     out.append(key)
708         return os.linesep.join(out)
709
710     def put_string_to_file(self, s, remote_path):
711         file_obj = cStringIO(s)
712         self.ssh_helper.put_file_obj(file_obj, remote_path)
713         return remote_path
714
715     def generate_prox_lua_file(self):
716         p = OrderedDict()
717         ext_intf = self.vnfd_helper.interfaces
718         lua_param = self.LUA_PARAMETER_NAME
719         for intf in ext_intf:
720             peer = self.LUA_PARAMETER_PEER[lua_param]
721             port_num = intf["virtual-interface"]["dpdk_port_num"]
722             local_ip = intf["local_ip"]
723             dst_ip = intf["dst_ip"]
724             local_ip_hex = ip_to_hex(local_ip, separator=' ')
725             dst_ip_hex = ip_to_hex(dst_ip, separator=' ')
726             p.update([
727                 ("{}_hex_ip_port_{}".format(lua_param, port_num), local_ip_hex),
728                 ("{}_ip_port_{}".format(lua_param, port_num), local_ip),
729                 ("{}_hex_ip_port_{}".format(peer, port_num), dst_ip_hex),
730                 ("{}_ip_port_{}".format(peer, port_num), dst_ip),
731             ])
732         lua = os.linesep.join(('{}:"{}"'.format(k, v) for k, v in p.items()))
733         return lua
734
735     def upload_prox_lua(self, config_dir, prox_config_dict):
736         # we could have multiple lua directives
737         lau_dict = prox_config_dict.get('lua', {})
738         find_iter = (re.findall(r'\("([^"]+)"\)', k) for k in lau_dict)
739         lua_file = next((found[0] for found in find_iter if found), None)
740         if not lua_file:
741             return ""
742
743         out = self.generate_prox_lua_file()
744         remote_path = os.path.join(config_dir, lua_file)
745         return self.put_string_to_file(out, remote_path)
746
747     def upload_prox_config(self, config_file, prox_config_dict):
748         # prox can't handle spaces around ' = ' so use custom method
749         out = StringIO(self.write_prox_config(prox_config_dict))
750         out.seek(0)
751         remote_path = os.path.join("/tmp", config_file)
752         self.ssh_helper.put_file_obj(out, remote_path)
753
754         return remote_path
755
756     def build_config_file(self):
757         task_path = self.scenario_helper.task_path
758         options = self.scenario_helper.options
759         config_path = options['prox_config']
760         config_file = os.path.basename(config_path)
761         config_path = find_relative_file(config_path, task_path)
762         self.additional_files = {}
763
764         prox_files = options.get('prox_files', [])
765         if isinstance(prox_files, six.string_types):
766             prox_files = [prox_files]
767         for key_prox_file in prox_files:
768             base_prox_file = os.path.basename(key_prox_file)
769             remote_prox_file = self.copy_to_target(key_prox_file, base_prox_file)
770             self.additional_files[base_prox_file] = remote_prox_file
771
772         self.prox_config_dict = self.generate_prox_config_file(config_path)
773         self.remote_path = self.upload_prox_config(config_file, self.prox_config_dict)
774
775     def build_config(self):
776
777         options = self.scenario_helper.options
778
779         prox_args = options['prox_args']
780         LOG.info("Provision and start the %s", self.APP_NAME)
781         self._build_pipeline_kwargs()
782         self.pipeline_kwargs["args"] = " ".join(
783             " ".join([k, v if v else ""]) for k, v in prox_args.items())
784         self.pipeline_kwargs["cfg_file"] = self.remote_path
785
786         cmd_template = "sudo bash -c 'cd {tool_dir}; {tool_path} -o cli {args} -f {cfg_file} '"
787         prox_cmd = cmd_template.format(**self.pipeline_kwargs)
788         return prox_cmd
789
790
791 class ProxResourceHelper(ClientResourceHelper):
792
793     RESOURCE_WORD = 'prox'
794     PROX_CORE_GEN_MODE = "gen"
795     PROX_CORE_LAT_MODE = "lat"
796     PROX_CORE_MPLS_TEST = "MPLS tag/untag"
797
798     PROX_MODE = ""
799
800     WAIT_TIME = 3
801
802     @staticmethod
803     def line_rate_to_pps(pkt_size, n_ports):
804         # FIXME Don't hardcode 10Gb/s
805         return n_ports * TEN_GIGABIT / BITS_PER_BYTE / (pkt_size + 20)
806
807     @staticmethod
808     def find_pci(pci, bound_pci):
809         # we have to substring match PCI bus address from the end
810         return any(b.endswith(pci) for b in bound_pci)
811
812     def __init__(self, setup_helper):
813         super(ProxResourceHelper, self).__init__(setup_helper)
814         self.mgmt_interface = self.vnfd_helper.mgmt_interface
815         self._user = self.mgmt_interface["user"]
816         self._ip = self.mgmt_interface["ip"]
817
818         self.done = False
819         self._cpu_topology = None
820         self._vpci_to_if_name_map = None
821         self.additional_file = {}
822         self.remote_prox_file_name = None
823         self.lower = None
824         self.upper = None
825         self._test_cores = None
826         self._latency_cores = None
827         self._tagged_cores = None
828         self._plain_cores = None
829
830     @property
831     def sut(self):
832         if not self.client:
833             self.client = self._connect()
834         return self.client
835
836     @property
837     def cpu_topology(self):
838         if not self._cpu_topology:
839             stdout = io.BytesIO()
840             self.ssh_helper.get_file_obj("/proc/cpuinfo", stdout)
841             self._cpu_topology = SocketTopology.parse_cpuinfo(stdout.getvalue().decode('utf-8'))
842         return self._cpu_topology
843
844     @property
845     def test_cores(self):
846         if not self._test_cores:
847             self._test_cores = self.get_cores(self.PROX_CORE_GEN_MODE)
848         return self._test_cores
849
850     @property
851     def mpls_cores(self):
852         if not self._tagged_cores:
853             self._tagged_cores, self._plain_cores = self.get_cores_mpls(self.PROX_CORE_GEN_MODE)
854         return self._tagged_cores, self._plain_cores
855
856     @property
857     def tagged_cores(self):
858         return self.mpls_cores[0]
859
860     @property
861     def plain_cores(self):
862         return self.mpls_cores[1]
863
864     @property
865     def latency_cores(self):
866         if not self._latency_cores:
867             self._latency_cores = self.get_cores(self.PROX_CORE_LAT_MODE)
868         return self._latency_cores
869
870     def run_traffic(self, traffic_profile):
871         self.lower = 0.0
872         self.upper = 100.0
873
874         traffic_profile.init(self._queue)
875         # this frees up the run_traffic loop
876         self.client_started.value = 1
877
878         while not self._terminated.value:
879             # move it all to traffic_profile
880             self._run_traffic_once(traffic_profile)
881
882     def _run_traffic_once(self, traffic_profile):
883         traffic_profile.execute(self)
884         if traffic_profile.done:
885             self._queue.put({'done': True})
886             LOG.debug("tg_prox done")
887             self._terminated.value = 1
888
889     def start_collect(self):
890         pass
891
892     def terminate(self):
893         # should not be called, use VNF terminate
894         raise NotImplementedError()
895
896     def up_post(self):
897         return self.sut  # force connection
898
899     def execute(self, cmd, *args, **kwargs):
900         func = getattr(self.sut, cmd, None)
901         if func:
902             return func(*args, **kwargs)
903
904     @contextmanager
905     def traffic_context(self, pkt_size, value):
906         self.sut.stop_all()
907         self.sut.reset_stats()
908         if self.get_test_type() == self.PROX_CORE_MPLS_TEST:
909             self.sut.set_pkt_size(self.tagged_cores, pkt_size)
910             self.sut.set_pkt_size(self.plain_cores, pkt_size - 4)
911             self.sut.set_speed(self.tagged_cores, value)
912             ratio = 1.0 * (pkt_size - 4 + 20) / (pkt_size + 20)
913             self.sut.set_speed(self.plain_cores, value * ratio)
914         else:
915             self.sut.set_pkt_size(self.test_cores, pkt_size)
916             self.sut.set_speed(self.test_cores, value)
917
918         self.sut.start_all()
919         try:
920             yield
921         finally:
922             self.sut.stop_all()
923
924     def run_test(self, pkt_size, duration, value, tolerated_loss=0.0):
925         # type: (object, object, object, object) -> object
926         # do this assert in init?  unless we expect interface count to
927         # change from one run to another run...
928         interfaces = self.vnfd_helper.interfaces
929         interface_count = len(interfaces)
930         assert interface_count in {1, 2, 4}, \
931             "Invalid number of ports: 1, 2 or 4 ports only supported at this time"
932
933         with self.traffic_context(pkt_size, value):
934             # Getting statistics to calculate PPS at right speed....
935             tsc_hz = float(self.sut.hz())
936             time.sleep(2)
937             with self.sut.measure_tot_stats() as data:
938                 time.sleep(duration)
939
940             # Get stats before stopping the cores. Stopping cores takes some time
941             # and might skew results otherwise.
942             latency = self.get_latency()
943
944         deltas = data['delta']
945         rx_total, tx_total = self.sut.port_stats(range(interface_count))[6:8]
946         pps = value / 100.0 * self.line_rate_to_pps(pkt_size, interface_count)
947
948         samples = {}
949         # we are currently using enumeration to map logical port num to interface
950         for index, iface in enumerate(interfaces):
951             port_rx_total, port_tx_total = self.sut.port_stats([index])[6:8]
952             samples[iface["name"]] = {"in_packets": port_rx_total,
953                                       "out_packets": port_tx_total}
954
955         result = ProxTestDataTuple(tolerated_loss, tsc_hz, deltas.rx, deltas.tx,
956                                    deltas.tsc, latency, rx_total, tx_total, pps)
957         result.log_data()
958         return result, samples
959
960     def get_test_type(self):
961         test_type = None
962         for section_name, section in self.setup_helper.prox_config_dict:
963             if section_name != "global":
964                 continue
965
966             for key, value in section:
967                 if key == "name" and value == self.PROX_CORE_MPLS_TEST:
968                     test_type = self.PROX_CORE_MPLS_TEST
969
970         return test_type
971
972     def get_cores(self, mode):
973         cores = []
974
975         for section_name, section in self.setup_helper.prox_config_dict:
976             if not section_name.startswith("core"):
977                 continue
978
979             for key, value in section:
980                 if key == "mode" and value == mode:
981                     core_tuple = CoreSocketTuple(section_name)
982                     core = core_tuple.find_in_topology(self.cpu_topology)
983                     cores.append(core)
984
985         return cores
986
987     def get_cores_mpls(self, mode=PROX_CORE_GEN_MODE):
988         cores_tagged = []
989         cores_plain = []
990         for section_name, section in self.setup_helper.prox_config_dict:
991             if not section_name.startswith("core"):
992                 continue
993
994             if all(key != "mode" or value != mode for key, value in section):
995                 continue
996
997             for item_key, item_value in section:
998                 if item_key == "name" and item_value.startswith("tag"):
999                     core_tuple = CoreSocketTuple(section_name)
1000                     core_tag = core_tuple.find_in_topology(self.cpu_topology)
1001                     cores_tagged.append(core_tag)
1002
1003                 elif item_key == "name" and item_value.startswith("udp"):
1004                     core_tuple = CoreSocketTuple(section_name)
1005                     core_udp = core_tuple.find_in_topology(self.cpu_topology)
1006                     cores_plain.append(core_udp)
1007
1008         return cores_tagged, cores_plain
1009
1010     def get_latency(self):
1011         """
1012         :return: return lat_min, lat_max, lat_avg
1013         :rtype: list
1014         """
1015         if self._latency_cores:
1016             return self.sut.lat_stats(self._latency_cores)
1017         return []
1018
1019     def _get_logical_if_name(self, vpci):
1020         return self._vpci_to_if_name_map[vpci]
1021
1022     def _connect(self, client=None):
1023         """Run and connect to prox on the remote system """
1024         # De-allocating a large amount of hugepages takes some time. If a new
1025         # PROX instance is started immediately after killing the previous one,
1026         # it might not be able to allocate hugepages, because they are still
1027         # being freed. Hence the -w switch.
1028         # self.connection.execute("sudo killall -w Prox 2>/dev/null")
1029         # prox_cmd = "export TERM=xterm; cd "+ self.bin_path +"; ./Prox -t
1030         # -f ./handle_none-4.cfg"
1031         # prox_cmd = "export TERM=xterm; export RTE_SDK=" + self._dpdk_dir +
1032         #  "; " \
1033         #    + "export RTE_TARGET=" + self._dpdk_target + ";" \
1034         #    + " cd " + self._prox_dir + "; make HW_DIRECT_STATS=y -j50;
1035         # sudo " \
1036         #    + "./build/Prox " + prox_args
1037         # log.debug("Starting PROX with command [%s]", prox_cmd)
1038         # thread.start_new_thread(self.ssh_check_quit, (self, self._user,
1039         # self._ip, prox_cmd))
1040         if client is None:
1041             client = ProxSocketHelper()
1042
1043         # try connecting to Prox for 60s
1044         for _ in range(RETRY_SECONDS):
1045             time.sleep(RETRY_INTERVAL)
1046             try:
1047                 client.connect(self._ip, PROX_PORT)
1048             except (socket.gaierror, socket.error):
1049                 continue
1050             else:
1051                 return client
1052
1053         msg = "Failed to connect to prox, please check if system {} accepts connections on port {}"
1054         raise Exception(msg.format(self._ip, PROX_PORT))