Merge "Introduced timeout to post method of HttpClient"
[yardstick.git] / yardstick / network_services / vnf_generic / vnf / prox_helpers.py
1 # Copyright (c) 2017 Intel Corporation
2 #
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at
6 #
7 #      http://www.apache.org/licenses/LICENSE-2.0
8 #
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
14 from __future__ import absolute_import
15
16 import array
17 import operator
18 import logging
19 import io
20 import os
21 import re
22 import select
23 import socket
24
25 from collections import OrderedDict, namedtuple
26 import time
27 from contextlib import contextmanager
28 from itertools import repeat, chain
29
30 import six
31 from multiprocessing import Queue
32 from six.moves import zip, StringIO
33 from six.moves import cStringIO
34
35 from yardstick.benchmark.scenarios.networking.vnf_generic import find_relative_file
36 from yardstick.common.utils import SocketTopology, ip_to_hex, join_non_strings, try_int
37 from yardstick.network_services.vnf_generic.vnf.iniparser import ConfigParser
38 from yardstick.network_services.vnf_generic.vnf.sample_vnf import ClientResourceHelper
39 from yardstick.network_services.vnf_generic.vnf.sample_vnf import DpdkVnfSetupEnvHelper
40
41
42 PROX_PORT = 8474
43
44 SECTION_NAME = 0
45 SECTION_CONTENTS = 1
46
47 LOG = logging.getLogger(__name__)
48 LOG.setLevel(logging.DEBUG)
49
50 TEN_GIGABIT = 1e10
51 BITS_PER_BYTE = 8
52 RETRY_SECONDS = 60
53 RETRY_INTERVAL = 1
54
55 CONFIGURATION_OPTIONS = (
56     # dict key           section     key               default value
57     ('pktSizes', 'general', 'pkt_sizes', '64,128,256,512,1024,1280,1518'),
58     ('testDuration', 'general', 'test_duration', 5.0),
59     ('testPrecision', 'general', 'test_precision', 1.0),
60     ('tests', 'general', 'tests', None),
61     ('toleratedLoss', 'general', 'tolerated_loss', 0.0),
62
63     ('logFile', 'logging', 'file', 'dats.log'),
64     ('logDateFormat', 'logging', 'datefmt', None),
65     ('logLevel', 'logging', 'level', 'INFO'),
66     ('logOverwrite', 'logging', 'overwrite', 1),
67
68     ('testerIp', 'tester', 'ip', None),
69     ('testerUser', 'tester', 'user', 'root'),
70     ('testerDpdkDir', 'tester', 'rte_sdk', '/root/dpdk'),
71     ('testerDpdkTgt', 'tester', 'rte_target', 'x86_64-native-linuxapp-gcc'),
72     ('testerProxDir', 'tester', 'prox_dir', '/root/prox'),
73     ('testerSocketId', 'tester', 'socket_id', 0),
74
75     ('sutIp', 'sut', 'ip', None),
76     ('sutUser', 'sut', 'user', 'root'),
77     ('sutDpdkDir', 'sut', 'rte_sdk', '/root/dpdk'),
78     ('sutDpdkTgt', 'sut', 'rte_target', 'x86_64-native-linuxapp-gcc'),
79     ('sutProxDir', 'sut', 'prox_dir', '/root/prox'),
80     ('sutSocketId', 'sut', 'socket_id', 0),
81 )
82
83
84 class CoreSocketTuple(namedtuple('CoreTuple', 'core_id, socket_id, hyperthread')):
85
86     CORE_RE = re.compile(r"core\s+(\d+)(?:s(\d+))?(h)?$")
87
88     def __new__(cls, *args):
89         try:
90             matches = cls.CORE_RE.search(str(args[0]))
91             if matches:
92                 args = matches.groups()
93
94             return super(CoreSocketTuple, cls).__new__(cls, int(args[0]), try_int(args[1], 0),
95                                                        'h' if args[2] else '')
96
97         except (AttributeError, TypeError, IndexError, ValueError):
98             raise ValueError('Invalid core spec {}'.format(args))
99
100     def is_hyperthread(self):
101         return self.hyperthread == 'h'
102
103     @property
104     def index(self):
105         return int(self.is_hyperthread())
106
107     def find_in_topology(self, cpu_topology):
108         try:
109             socket_core_match = cpu_topology[self.socket_id][self.core_id]
110             sorted_match = sorted(socket_core_match.values())
111             return sorted_match[self.index][0]
112         except (KeyError, IndexError):
113             template = "Core {}{} on socket {} does not exist"
114             raise ValueError(template.format(self.core_id, self.hyperthread, self.socket_id))
115
116
117 class TotStatsTuple(namedtuple('TotStats', 'rx,tx,tsc,hz')):
118
119     def __new__(cls, *args):
120         try:
121             assert args[0] is not str(args[0])
122             args = tuple(args[0])
123         except (AssertionError, IndexError, TypeError):
124             pass
125
126         return super(TotStatsTuple, cls).__new__(cls, *args)
127
128
129 class ProxTestDataTuple(namedtuple('ProxTestDataTuple', 'tolerated,tsc_hz,delta_rx,'
130                                                         'delta_tx,delta_tsc,'
131                                                         'latency,rx_total,tx_total,pps')):
132
133     @property
134     def pkt_loss(self):
135         try:
136             return 1e2 * self.drop_total / float(self.tx_total)
137         except ZeroDivisionError:
138             return 100.0
139
140     @property
141     def mpps(self):
142         # calculate the effective throughput in Mpps
143         return float(self.delta_tx) * self.tsc_hz / self.delta_tsc / 1e6
144
145     @property
146     def can_be_lost(self):
147         return int(self.tx_total * self.tolerated / 1e2)
148
149     @property
150     def drop_total(self):
151         return self.tx_total - self.rx_total
152
153     @property
154     def success(self):
155         return self.drop_total <= self.can_be_lost
156
157     def get_samples(self, pkt_size, pkt_loss=None, port_samples=None):
158         if pkt_loss is None:
159             pkt_loss = self.pkt_loss
160
161         if port_samples is None:
162             port_samples = {}
163
164         latency_keys = [
165             "LatencyMin",
166             "LatencyMax",
167             "LatencyAvg",
168         ]
169
170         samples = {
171             "Throughput": self.mpps,
172             "DropPackets": pkt_loss,
173             "CurrentDropPackets": pkt_loss,
174             "TxThroughput": self.pps / 1e6,
175             "RxThroughput": self.mpps,
176             "PktSize": pkt_size,
177         }
178         if port_samples:
179             samples.update(port_samples)
180
181         samples.update((key, value) for key, value in zip(latency_keys, self.latency))
182         return samples
183
184     def log_data(self, logger=None):
185         if logger is None:
186             logger = LOG
187
188         template = "RX: %d; TX: %d; dropped: %d (tolerated: %d)"
189         logger.debug(template, self.rx_total, self.tx_total, self.drop_total, self.can_be_lost)
190         logger.debug("Mpps configured: %f; Mpps effective %f", self.pps / 1e6, self.mpps)
191
192
193 class PacketDump(object):
194
195     @staticmethod
196     def assert_func(func, value1, value2, template=None):
197         assert func(value1, value2), template.format(value1, value2)
198
199     def __init__(self, port_id, data_len, payload):
200         template = "Packet dump has specified length {}, but payload is {} bytes long"
201         self.assert_func(operator.eq, data_len, len(payload), template)
202         self._port_id = port_id
203         self._data_len = data_len
204         self._payload = payload
205
206     @property
207     def port_id(self):
208         """Get the port id of the packet dump"""
209         return self._port_id
210
211     @property
212     def data_len(self):
213         """Get the length of the data received"""
214         return self._data_len
215
216     def __str__(self):
217         return '<PacketDump port: {} payload: {}>'.format(self._port_id, self._payload)
218
219     def payload(self, start=None, end=None):
220         """Get part of the payload as a list of ordinals.
221
222         Returns a list of byte values, matching the contents of the packet dump.
223         Optional start and end parameters can be specified to retrieve only a
224         part of the packet contents.
225
226         The number of elements in the list is equal to end - start + 1, so end
227         is the offset of the last character.
228
229         Args:
230             start (pos. int): the starting offset in the payload. If it is not
231                 specified or None, offset 0 is assumed.
232             end (pos. int): the ending offset of the payload. If it is not
233                 specified or None, the contents until the end of the packet are
234                 returned.
235
236         Returns:
237             [int, int, ...]. Each int represents the ordinal value of a byte in
238             the packet payload.
239         """
240         if start is None:
241             start = 0
242
243         if end is None:
244             end = self.data_len - 1
245
246         # Bounds checking on offsets
247         template = "Start offset must be non-negative"
248         self.assert_func(operator.ge, start, 0, template)
249
250         template = "End offset must be less than {1}"
251         self.assert_func(operator.lt, end, self.data_len, template)
252
253         # Adjust for splice operation: end offset must be 1 more than the offset
254         # of the last desired character.
255         end += 1
256
257         return self._payload[start:end]
258
259
260 class ProxSocketHelper(object):
261
262     def __init__(self, sock=None):
263         """ creates new prox instance """
264         super(ProxSocketHelper, self).__init__()
265
266         if sock is None:
267             sock = socket.socket()
268
269         self._sock = sock
270         self._pkt_dumps = []
271
272     def connect(self, ip, port):
273         """Connect to the prox instance on the remote system"""
274         self._sock.connect((ip, port))
275
276     def get_socket(self):
277         """ get the socket connected to the remote instance """
278         return self._sock
279
280     def _parse_socket_data(self, decoded_data, pkt_dump_only):
281         def get_newline_index():
282             return decoded_data.find('\n', index)
283
284         ret_str = ''
285         index = 0
286         for newline_index in iter(get_newline_index, -1):
287             ret_str = decoded_data[index:newline_index]
288
289             try:
290                 mode, port_id, data_len = ret_str.split(',', 2)
291             except ValueError:
292                 mode, port_id, data_len = None, None, None
293
294             if mode != 'pktdump':
295                 # Regular 1-line message. Stop reading from the socket.
296                 LOG.debug("Regular response read")
297                 return ret_str
298
299             LOG.debug("Packet dump header read: [%s]", ret_str)
300
301             # The line is a packet dump header. Parse it, read the
302             # packet payload, store the dump for later retrieval.
303             # Skip over the packet dump and continue processing: a
304             # 1-line response may follow the packet dump.
305
306             data_len = int(data_len)
307             data_start = newline_index + 1  # + 1 to skip over \n
308             data_end = data_start + data_len
309             sub_data = decoded_data[data_start:data_end]
310             pkt_payload = array.array('B', (ord(v) for v in sub_data))
311             pkt_dump = PacketDump(int(port_id), data_len, pkt_payload)
312             self._pkt_dumps.append(pkt_dump)
313
314             if pkt_dump_only:
315                 # Return boolean instead of string to signal
316                 # successful reception of the packet dump.
317                 LOG.debug("Packet dump stored, returning")
318                 return True
319
320             index = data_end + 1
321
322         return ret_str
323
324     def get_data(self, pkt_dump_only=False, timeout=1):
325         """ read data from the socket """
326         # This method behaves slightly differently depending on whether it is
327         # called to read the response to a command (pkt_dump_only = 0) or if
328         # it is called specifically to read a packet dump (pkt_dump_only = 1).
329         #
330         # Packet dumps look like:
331         #   pktdump,<port_id>,<data_len>\n
332         #   <packet contents as byte array>\n
333         # This means the total packet dump message consists of 2 lines instead
334         # of 1 line.
335         #
336         # - Response for a command (pkt_dump_only = 0):
337         #   1) Read response from the socket until \n (end of message)
338         #   2a) If the response is a packet dump header (starts with "pktdump,"):
339         #     - Read the packet payload and store the packet dump for later
340         #       retrieval.
341         #     - Reset the state and restart from 1). Eventually state 2b) will
342         #       be reached and the function will return.
343         #   2b) If the response is not a packet dump:
344         #     - Return the received message as a string
345         #
346         # - Explicit request to read a packet dump (pkt_dump_only = 1):
347         #   - Read the dump header and payload
348         #   - Store the packet dump for later retrieval
349         #   - Return True to signify a packet dump was successfully read
350
351         def is_ready():
352             # recv() is blocking, so avoid calling it when no data is waiting.
353             ready = select.select([self._sock], [], [], timeout)
354             return bool(ready[0])
355
356         status = False
357         ret_str = ""
358         for status in iter(is_ready, False):
359             decoded_data = self._sock.recv(256).decode('utf-8')
360             ret_str = self._parse_socket_data(decoded_data, pkt_dump_only)
361
362         LOG.debug("Received data from socket: [%s]", ret_str)
363         return ret_str if status else ''
364
365     def put_command(self, to_send):
366         """ send data to the remote instance """
367         LOG.debug("Sending data to socket: [%s]", to_send.rstrip('\n'))
368         try:
369             self._sock.sendall(to_send.encode('utf-8'))
370         except:
371             pass
372
373     def get_packet_dump(self):
374         """ get the next packet dump """
375         if self._pkt_dumps:
376             return self._pkt_dumps.pop(0)
377         return None
378
379     def stop_all_reset(self):
380         """ stop the remote instance and reset stats """
381         LOG.debug("Stop all and reset stats")
382         self.stop_all()
383         self.reset_stats()
384
385     def stop_all(self):
386         """ stop all cores on the remote instance """
387         LOG.debug("Stop all")
388         self.put_command("stop all\n")
389         time.sleep(3)
390
391     def stop(self, cores, task=''):
392         """ stop specific cores on the remote instance """
393         LOG.debug("Stopping cores %s", cores)
394         self.put_command("stop {} {}\n".format(join_non_strings(',', cores), task))
395         time.sleep(3)
396
397     def start_all(self):
398         """ start all cores on the remote instance """
399         LOG.debug("Start all")
400         self.put_command("start all\n")
401
402     def start(self, cores):
403         """ start specific cores on the remote instance """
404         LOG.debug("Starting cores %s", cores)
405         self.put_command("start {}\n".format(join_non_strings(',', cores)))
406         time.sleep(3)
407
408     def reset_stats(self):
409         """ reset the statistics on the remote instance """
410         LOG.debug("Reset stats")
411         self.put_command("reset stats\n")
412         time.sleep(1)
413
414     def _run_template_over_cores(self, template, cores, *args):
415         for core in cores:
416             self.put_command(template.format(core, *args))
417
418     def set_pkt_size(self, cores, pkt_size):
419         """ set the packet size to generate on the remote instance """
420         LOG.debug("Set packet size for core(s) %s to %d", cores, pkt_size)
421         pkt_size -= 4
422         self._run_template_over_cores("pkt_size {} 0 {}\n", cores, pkt_size)
423         time.sleep(1)
424
425     def set_value(self, cores, offset, value, length):
426         """ set value on the remote instance """
427         msg = "Set value for core(s) %s to '%s' (length %d), offset %d"
428         LOG.debug(msg, cores, value, length, offset)
429         template = "set value {} 0 {} {} {}\n"
430         self._run_template_over_cores(template, cores, offset, value, length)
431
432     def reset_values(self, cores):
433         """ reset values on the remote instance """
434         LOG.debug("Set value for core(s) %s", cores)
435         self._run_template_over_cores("reset values {} 0\n", cores)
436
437     def set_speed(self, cores, speed):
438         """ set speed on the remote instance """
439         LOG.debug("Set speed for core(s) %s to %g", cores, speed)
440         self._run_template_over_cores("speed {} 0 {}\n", cores, speed)
441
442     def slope_speed(self, cores_speed, duration, n_steps=0):
443         """will start to increase speed from 0 to N where N is taken from
444         a['speed'] for each a in cores_speed"""
445         # by default, each step will take 0.5 sec
446         if n_steps == 0:
447             n_steps = duration * 2
448
449         private_core_data = []
450         step_duration = float(duration) / n_steps
451         for core_data in cores_speed:
452             target = float(core_data['speed'])
453             private_core_data.append({
454                 'cores': core_data['cores'],
455                 'zero': 0,
456                 'delta': target / n_steps,
457                 'current': 0,
458                 'speed': target,
459             })
460
461         deltas_keys_iter = repeat(('current', 'delta'), n_steps - 1)
462         for key1, key2 in chain(deltas_keys_iter, [('zero', 'speed')]):
463             time.sleep(step_duration)
464             for core_data in private_core_data:
465                 core_data['current'] = core_data[key1] + core_data[key2]
466                 self.set_speed(core_data['cores'], core_data['current'])
467
468     def set_pps(self, cores, pps, pkt_size):
469         """ set packets per second for specific cores on the remote instance """
470         msg = "Set packets per sec for core(s) %s to %g%% of line rate (packet size: %d)"
471         LOG.debug(msg, cores, pps, pkt_size)
472
473         # speed in percent of line-rate
474         speed = float(pps) * (pkt_size + 20) / TEN_GIGABIT / BITS_PER_BYTE
475         self._run_template_over_cores("speed {} 0 {}\n", cores, speed)
476
477     def lat_stats(self, cores, task=0):
478         """Get the latency statistics from the remote system"""
479         # 1-based index, if max core is 4, then 0, 1, 2, 3, 4  len = 5
480         lat_min = {}
481         lat_max = {}
482         lat_avg = {}
483         for core in cores:
484             self.put_command("lat stats {} {} \n".format(core, task))
485             ret = self.get_data()
486
487             try:
488                 lat_min[core], lat_max[core], lat_avg[core] = \
489                     tuple(int(n) for n in ret.split(",")[:3])
490
491             except (AttributeError, ValueError, TypeError):
492                 pass
493
494         return lat_min, lat_max, lat_avg
495
496     def get_all_tot_stats(self):
497         self.put_command("tot stats\n")
498         all_stats_str = self.get_data().split(",")
499         if len(all_stats_str) != 4:
500             all_stats = [0] * 4
501             return all_stats
502         all_stats = TotStatsTuple(int(v) for v in all_stats_str)
503         self.master_stats = all_stats
504         return all_stats
505
506     def hz(self):
507         return self.get_all_tot_stats()[3]
508
509     # Deprecated
510     # TODO: remove
511     def rx_stats(self, cores, task=0):
512         return self.core_stats(cores, task)
513
514     def core_stats(self, cores, task=0):
515         """Get the receive statistics from the remote system"""
516         rx = tx = drop = tsc = 0
517         for core in cores:
518             self.put_command("core stats {} {}\n".format(core, task))
519             ret = self.get_data().split(",")
520             rx += int(ret[0])
521             tx += int(ret[1])
522             drop += int(ret[2])
523             tsc = int(ret[3])
524         return rx, tx, drop, tsc
525
526     def port_stats(self, ports):
527         """get counter values from a specific port"""
528         tot_result = [0] * 12
529         for port in ports:
530             self.put_command("port_stats {}\n".format(port))
531             ret = [try_int(s, 0) for s in self.get_data().split(",")]
532             tot_result = [sum(x) for x in zip(tot_result, ret)]
533         return tot_result
534
535     @contextmanager
536     def measure_tot_stats(self):
537         start = self.get_all_tot_stats()
538         container = {'start_tot': start}
539         try:
540             yield container
541         finally:
542             container['end_tot'] = end = self.get_all_tot_stats()
543
544         container['delta'] = TotStatsTuple(end - start for start, end in zip(start, end))
545
546     def tot_stats(self):
547         """Get the total statistics from the remote system"""
548         stats = self.get_all_tot_stats()
549         return stats[:3]
550
551     def tot_ierrors(self):
552         """Get the total ierrors from the remote system"""
553         self.put_command("tot ierrors tot\n")
554         recv = self.get_data().split(',')
555         tot_ierrors = int(recv[0])
556         tsc = int(recv[0])
557         return tot_ierrors, tsc
558
559     def set_count(self, count, cores):
560         """Set the number of packets to send on the specified core"""
561         self._run_template_over_cores("count {} 0 {}\n", cores, count)
562
563     def dump_rx(self, core_id, task_id=0, count=1):
564         """Activate dump on rx on the specified core"""
565         LOG.debug("Activating dump on RX for core %d, task %d, count %d", core_id, task_id, count)
566         self.put_command("dump_rx {} {} {}\n".format(core_id, task_id, count))
567         time.sleep(1.5)     # Give PROX time to set up packet dumping
568
569     def quit(self):
570         self.stop_all()
571         self._quit()
572         self.force_quit()
573
574     def _quit(self):
575         """ stop all cores on the remote instance """
576         LOG.debug("Quit prox")
577         self.put_command("quit\n")
578         time.sleep(3)
579
580     def force_quit(self):
581         """ stop all cores on the remote instance """
582         LOG.debug("Force Quit prox")
583         self.put_command("quit_force\n")
584         time.sleep(3)
585
586
587 class ProxDpdkVnfSetupEnvHelper(DpdkVnfSetupEnvHelper):
588     # the actual app is lowercase
589     APP_NAME = 'prox'
590
591     LUA_PARAMETER_NAME = ""
592     LUA_PARAMETER_PEER = {
593         "gen": "sut",
594         "sut": "gen",
595     }
596
597     def __init__(self, vnfd_helper, ssh_helper, scenario_helper):
598         self.remote_path = None
599         super(ProxDpdkVnfSetupEnvHelper, self).__init__(vnfd_helper, ssh_helper, scenario_helper)
600         self.remote_prox_file_name = None
601         self._prox_config_data = None
602         self.additional_files = {}
603         self.config_queue = Queue()
604
605     def _build_pipeline_kwargs(self):
606         tool_path = self.ssh_helper.provision_tool(tool_file=self.APP_NAME)
607         self.pipeline_kwargs = {
608             'tool_path': tool_path,
609             'tool_dir': os.path.dirname(tool_path),
610         }
611
612     def copy_to_target(self, config_file_path, prox_file):
613         remote_path = os.path.join("/tmp", prox_file)
614         self.ssh_helper.put(config_file_path, remote_path)
615         return remote_path
616
617     @staticmethod
618     def _get_tx_port(section, sections):
619         iface_port = [-1]
620         for item in sections[section]:
621             if item[0] == "tx port":
622                 iface_port = re.findall(r'\d+', item[1])
623                 # do we want the last one?
624                 #   if yes, then can we reverse?
625         return int(iface_port[0])
626
627     @staticmethod
628     def _replace_quoted_with_value(quoted, value, count=1):
629         new_string = re.sub('"[^"]*"', '"{}"'.format(value), quoted, count)
630         return new_string
631
632     def _insert_additional_file(self, value):
633         file_str = value.split('"')
634         base_name = os.path.basename(file_str[1])
635         file_str[1] = self.additional_files[base_name]
636         return '"'.join(file_str)
637
638     def generate_prox_config_file(self, config_path):
639         sections = []
640         prox_config = ConfigParser(config_path, sections)
641         prox_config.parse()
642
643         # Ensure MAC is set "hardware"
644         all_ports = self.vnfd_helper.port_pairs.all_ports
645         # use dpdk port number
646         for port_name in all_ports:
647             port_num = self.vnfd_helper.port_num(port_name)
648             port_section_name = "port {}".format(port_num)
649             for section_name, section in sections:
650                 if port_section_name != section_name:
651                     continue
652
653                 for index, section_data in enumerate(section):
654                     if section_data[0] == "mac":
655                         section_data[1] = "hardware"
656
657         # search for dst mac
658         for _, section in sections:
659             # for index, (item_key, item_val) in enumerate(section):
660             for index, section_data in enumerate(section):
661                 item_key, item_val = section_data
662                 if item_val.startswith("@@dst_mac"):
663                     tx_port_iter = re.finditer(r'\d+', item_val)
664                     tx_port_no = int(next(tx_port_iter).group(0))
665                     intf = self.vnfd_helper.find_interface_by_port(tx_port_no)
666                     mac = intf["virtual-interface"]["dst_mac"]
667                     section_data[1] = mac.replace(":", " ", 6)
668
669                 if item_key == "dst mac" and item_val.startswith("@@"):
670                     tx_port_iter = re.finditer(r'\d+', item_val)
671                     tx_port_no = int(next(tx_port_iter).group(0))
672                     intf = self.vnfd_helper.find_interface_by_port(tx_port_no)
673                     mac = intf["virtual-interface"]["dst_mac"]
674                     section_data[1] = mac
675
676         # if addition file specified in prox config
677         if not self.additional_files:
678             return sections
679
680         for section_name, section in sections:
681             for index, section_data in enumerate(section):
682                 try:
683                     if section_data[0].startswith("dofile"):
684                         section_data[0] = self._insert_additional_file(section_data[0])
685
686                     if section_data[1].startswith("dofile"):
687                         section_data[1] = self._insert_additional_file(section_data[1])
688                 except:
689                     pass
690
691         return sections
692
693     @staticmethod
694     def write_prox_config(prox_config):
695         """
696         Write an .ini-format config file for PROX
697         PROX does not allow a space before/after the =, so we need
698         a custom method
699         """
700         out = []
701         for i, (section_name, section) in enumerate(prox_config):
702             out.append("[{}]".format(section_name))
703             for index, item in enumerate(section):
704                 key, value = item
705                 if key == "__name__":
706                     continue
707                 if value is not None and value != '@':
708                     key = "=".join((key, str(value).replace('\n', '\n\t')))
709                     out.append(key)
710                 else:
711                     key = str(key).replace('\n', '\n\t')
712                     out.append(key)
713         return os.linesep.join(out)
714
715     def put_string_to_file(self, s, remote_path):
716         file_obj = cStringIO(s)
717         self.ssh_helper.put_file_obj(file_obj, remote_path)
718         return remote_path
719
720     def generate_prox_lua_file(self):
721         p = OrderedDict()
722         all_ports = self.vnfd_helper.port_pairs.all_ports
723         lua_param = self.LUA_PARAMETER_NAME
724         for port_name in all_ports:
725             peer = self.LUA_PARAMETER_PEER[lua_param]
726             port_num = self.vnfd_helper.port_num(port_name)
727             intf = self.vnfd_helper.find_interface(name=port_name)
728             vintf = intf['virtual-interface']
729             local_ip = vintf["local_ip"]
730             dst_ip = vintf["dst_ip"]
731             local_ip_hex = ip_to_hex(local_ip, separator=' ')
732             dst_ip_hex = ip_to_hex(dst_ip, separator=' ')
733             p.update([
734                 ("{}_hex_ip_port_{}".format(lua_param, port_num), local_ip_hex),
735                 ("{}_ip_port_{}".format(lua_param, port_num), local_ip),
736                 ("{}_hex_ip_port_{}".format(peer, port_num), dst_ip_hex),
737                 ("{}_ip_port_{}".format(peer, port_num), dst_ip),
738             ])
739         lua = os.linesep.join(('{}:"{}"'.format(k, v) for k, v in p.items()))
740         return lua
741
742     def upload_prox_lua(self, config_dir, prox_config_dict):
743         # we could have multiple lua directives
744         lau_dict = prox_config_dict.get('lua', {})
745         find_iter = (re.findall(r'\("([^"]+)"\)', k) for k in lau_dict)
746         lua_file = next((found[0] for found in find_iter if found), None)
747         if not lua_file:
748             return ""
749
750         out = self.generate_prox_lua_file()
751         remote_path = os.path.join(config_dir, lua_file)
752         return self.put_string_to_file(out, remote_path)
753
754     def upload_prox_config(self, config_file, prox_config_dict):
755         # prox can't handle spaces around ' = ' so use custom method
756         out = StringIO(self.write_prox_config(prox_config_dict))
757         out.seek(0)
758         remote_path = os.path.join("/tmp", config_file)
759         self.ssh_helper.put_file_obj(out, remote_path)
760
761         return remote_path
762
763     CONFIG_QUEUE_TIMEOUT = 120
764
765     @property
766     def prox_config_data(self):
767         if self._prox_config_data is None:
768             # this will block, but it needs too
769             self._prox_config_data = self.config_queue.get(True, self.CONFIG_QUEUE_TIMEOUT)
770         return self._prox_config_data
771
772     def build_config_file(self):
773         task_path = self.scenario_helper.task_path
774         options = self.scenario_helper.options
775         config_path = options['prox_config']
776         config_file = os.path.basename(config_path)
777         config_path = find_relative_file(config_path, task_path)
778         self.additional_files = {}
779
780         prox_files = options.get('prox_files', [])
781         if isinstance(prox_files, six.string_types):
782             prox_files = [prox_files]
783         for key_prox_file in prox_files:
784             base_prox_file = os.path.basename(key_prox_file)
785             key_prox_path = find_relative_file(key_prox_file, task_path)
786             remote_prox_file = self.copy_to_target(key_prox_path, base_prox_file)
787             self.additional_files[base_prox_file] = remote_prox_file
788
789         self._prox_config_data = self.generate_prox_config_file(config_path)
790         # copy config to queue so we can read it from traffic_runner process
791         self.config_queue.put(self._prox_config_data)
792         self.remote_path = self.upload_prox_config(config_file, self._prox_config_data)
793
794     def build_config(self):
795         self.build_config_file()
796
797         options = self.scenario_helper.options
798
799         prox_args = options['prox_args']
800         LOG.info("Provision and start the %s", self.APP_NAME)
801         self._build_pipeline_kwargs()
802         self.pipeline_kwargs["args"] = " ".join(
803             " ".join([k, v if v else ""]) for k, v in prox_args.items())
804         self.pipeline_kwargs["cfg_file"] = self.remote_path
805
806         cmd_template = "sudo bash -c 'cd {tool_dir}; {tool_path} -o cli {args} -f {cfg_file} '"
807         prox_cmd = cmd_template.format(**self.pipeline_kwargs)
808         return prox_cmd
809
810
811 # this might be bad, sometimes we want regular ResourceHelper methods, like collect_kpi
812 class ProxResourceHelper(ClientResourceHelper):
813
814     RESOURCE_WORD = 'prox'
815     PROX_CORE_GEN_MODE = "gen"
816     PROX_CORE_LAT_MODE = "lat"
817     PROX_CORE_MPLS_TEST = "MPLS tag/untag"
818
819     PROX_MODE = ""
820
821     WAIT_TIME = 3
822
823     @staticmethod
824     def line_rate_to_pps(pkt_size, n_ports):
825         # FIXME Don't hardcode 10Gb/s
826         return n_ports * TEN_GIGABIT / BITS_PER_BYTE / (pkt_size + 20)
827
828     @staticmethod
829     def find_pci(pci, bound_pci):
830         # we have to substring match PCI bus address from the end
831         return any(b.endswith(pci) for b in bound_pci)
832
833     def __init__(self, setup_helper):
834         super(ProxResourceHelper, self).__init__(setup_helper)
835         self.mgmt_interface = self.vnfd_helper.mgmt_interface
836         self._user = self.mgmt_interface["user"]
837         self._ip = self.mgmt_interface["ip"]
838
839         self.done = False
840         self._cpu_topology = None
841         self._vpci_to_if_name_map = None
842         self.additional_file = {}
843         self.remote_prox_file_name = None
844         self.lower = None
845         self.upper = None
846         self._test_cores = None
847         self._latency_cores = None
848         self._tagged_cores = None
849         self._plain_cores = None
850
851     @property
852     def sut(self):
853         if not self.client:
854             self.client = self._connect()
855         return self.client
856
857     @property
858     def cpu_topology(self):
859         if not self._cpu_topology:
860             stdout = io.BytesIO()
861             self.ssh_helper.get_file_obj("/proc/cpuinfo", stdout)
862             self._cpu_topology = SocketTopology.parse_cpuinfo(stdout.getvalue().decode('utf-8'))
863         return self._cpu_topology
864
865     @property
866     def test_cores(self):
867         if not self._test_cores:
868             self._test_cores = self.get_cores(self.PROX_CORE_GEN_MODE)
869         return self._test_cores
870
871     @property
872     def mpls_cores(self):
873         if not self._tagged_cores:
874             self._tagged_cores, self._plain_cores = self.get_cores_mpls(self.PROX_CORE_GEN_MODE)
875         return self._tagged_cores, self._plain_cores
876
877     @property
878     def tagged_cores(self):
879         return self.mpls_cores[0]
880
881     @property
882     def plain_cores(self):
883         return self.mpls_cores[1]
884
885     @property
886     def latency_cores(self):
887         if not self._latency_cores:
888             self._latency_cores = self.get_cores(self.PROX_CORE_LAT_MODE)
889         return self._latency_cores
890
891     def run_traffic(self, traffic_profile):
892         self.lower = 0.0
893         self.upper = 100.0
894
895         traffic_profile.init(self._queue)
896         # this frees up the run_traffic loop
897         self.client_started.value = 1
898
899         while not self._terminated.value:
900             # move it all to traffic_profile
901             self._run_traffic_once(traffic_profile)
902
903     def _run_traffic_once(self, traffic_profile):
904         traffic_profile.execute_traffic(self)
905         if traffic_profile.done:
906             self._queue.put({'done': True})
907             LOG.debug("tg_prox done")
908             self._terminated.value = 1
909
910     # For VNF use ResourceHelper method to collect KPIs directly.
911     # for TG leave the superclass ClientResourceHelper collect_kpi_method intact
912     def collect_collectd_kpi(self):
913         return self._collect_resource_kpi()
914
915     def collect_kpi(self):
916         result = super(ProxResourceHelper, self).collect_kpi()
917         # add in collectd kpis manually
918         if result:
919             result['collect_stats'] = self._collect_resource_kpi()
920         return result
921
922     def terminate(self):
923         # should not be called, use VNF terminate
924         raise NotImplementedError()
925
926     def up_post(self):
927         return self.sut  # force connection
928
929     def execute(self, cmd, *args, **kwargs):
930         func = getattr(self.sut, cmd, None)
931         if func:
932             return func(*args, **kwargs)
933
934     @contextmanager
935     def traffic_context(self, pkt_size, value):
936         self.sut.stop_all()
937         self.sut.reset_stats()
938         if self.get_test_type() == self.PROX_CORE_MPLS_TEST:
939             self.sut.set_pkt_size(self.tagged_cores, pkt_size)
940             self.sut.set_pkt_size(self.plain_cores, pkt_size - 4)
941             self.sut.set_speed(self.tagged_cores, value)
942             ratio = 1.0 * (pkt_size - 4 + 20) / (pkt_size + 20)
943             self.sut.set_speed(self.plain_cores, value * ratio)
944         else:
945             self.sut.set_pkt_size(self.test_cores, pkt_size)
946             self.sut.set_speed(self.test_cores, value)
947
948         self.sut.start_all()
949         try:
950             yield
951         finally:
952             self.sut.stop_all()
953
954     def run_test(self, pkt_size, duration, value, tolerated_loss=0.0):
955         # do this assert in init?  unless we expect interface count to
956         # change from one run to another run...
957         ports = self.vnfd_helper.port_pairs.all_ports
958         port_count = len(ports)
959         assert port_count in {1, 2, 4}, \
960             "Invalid number of ports: 1, 2 or 4 ports only supported at this time"
961
962         with self.traffic_context(pkt_size, value):
963             # Getting statistics to calculate PPS at right speed....
964             tsc_hz = float(self.sut.hz())
965             time.sleep(2)
966             with self.sut.measure_tot_stats() as data:
967                 time.sleep(duration)
968
969             # Get stats before stopping the cores. Stopping cores takes some time
970             # and might skew results otherwise.
971             latency = self.get_latency()
972
973         deltas = data['delta']
974         rx_total, tx_total = self.sut.port_stats(range(port_count))[6:8]
975         pps = value / 100.0 * self.line_rate_to_pps(pkt_size, port_count)
976
977         samples = {}
978         # we are currently using enumeration to map logical port num to interface
979         for port_name in ports:
980             port = self.vnfd_helper.port_num(port_name)
981             port_rx_total, port_tx_total = self.sut.port_stats([port])[6:8]
982             samples[port_name] = {
983                 "in_packets": port_rx_total,
984                 "out_packets": port_tx_total,
985             }
986
987         result = ProxTestDataTuple(tolerated_loss, tsc_hz, deltas.rx, deltas.tx,
988                                    deltas.tsc, latency, rx_total, tx_total, pps)
989         result.log_data()
990         return result, samples
991
992     def get_test_type(self):
993         test_type = None
994         for section_name, section in self.setup_helper.prox_config_data:
995             if section_name != "global":
996                 continue
997
998             for key, value in section:
999                 if key == "name" and value == self.PROX_CORE_MPLS_TEST:
1000                     test_type = self.PROX_CORE_MPLS_TEST
1001
1002         return test_type
1003
1004     def get_cores(self, mode):
1005         cores = []
1006
1007         for section_name, section in self.setup_helper.prox_config_data:
1008             if not section_name.startswith("core"):
1009                 continue
1010
1011             for key, value in section:
1012                 if key == "mode" and value == mode:
1013                     core_tuple = CoreSocketTuple(section_name)
1014                     core = core_tuple.find_in_topology(self.cpu_topology)
1015                     cores.append(core)
1016
1017         return cores
1018
1019     def get_cores_mpls(self, mode=PROX_CORE_GEN_MODE):
1020         cores_tagged = []
1021         cores_plain = []
1022         for section_name, section in self.setup_helper.prox_config_data:
1023             if not section_name.startswith("core"):
1024                 continue
1025
1026             if all(key != "mode" or value != mode for key, value in section):
1027                 continue
1028
1029             for item_key, item_value in section:
1030                 if item_key == "name" and item_value.startswith("tag"):
1031                     core_tuple = CoreSocketTuple(section_name)
1032                     core_tag = core_tuple.find_in_topology(self.cpu_topology)
1033                     cores_tagged.append(core_tag)
1034
1035                 elif item_key == "name" and item_value.startswith("udp"):
1036                     core_tuple = CoreSocketTuple(section_name)
1037                     core_udp = core_tuple.find_in_topology(self.cpu_topology)
1038                     cores_plain.append(core_udp)
1039
1040         return cores_tagged, cores_plain
1041
1042     def get_latency(self):
1043         """
1044         :return: return lat_min, lat_max, lat_avg
1045         :rtype: list
1046         """
1047         if self._latency_cores:
1048             return self.sut.lat_stats(self._latency_cores)
1049         return []
1050
1051     def _connect(self, client=None):
1052         """Run and connect to prox on the remote system """
1053         # De-allocating a large amount of hugepages takes some time. If a new
1054         # PROX instance is started immediately after killing the previous one,
1055         # it might not be able to allocate hugepages, because they are still
1056         # being freed. Hence the -w switch.
1057         # self.connection.execute("sudo killall -w Prox 2>/dev/null")
1058         # prox_cmd = "export TERM=xterm; cd "+ self.bin_path +"; ./Prox -t
1059         # -f ./handle_none-4.cfg"
1060         # prox_cmd = "export TERM=xterm; export RTE_SDK=" + self._dpdk_dir +
1061         #  "; " \
1062         #    + "export RTE_TARGET=" + self._dpdk_target + ";" \
1063         #    + " cd " + self._prox_dir + "; make HW_DIRECT_STATS=y -j50;
1064         # sudo " \
1065         #    + "./build/Prox " + prox_args
1066         # log.debug("Starting PROX with command [%s]", prox_cmd)
1067         # thread.start_new_thread(self.ssh_check_quit, (self, self._user,
1068         # self._ip, prox_cmd))
1069         if client is None:
1070             client = ProxSocketHelper()
1071
1072         # try connecting to Prox for 60s
1073         for _ in range(RETRY_SECONDS):
1074             time.sleep(RETRY_INTERVAL)
1075             try:
1076                 client.connect(self._ip, PROX_PORT)
1077             except (socket.gaierror, socket.error):
1078                 continue
1079             else:
1080                 return client
1081
1082         msg = "Failed to connect to prox, please check if system {} accepts connections on port {}"
1083         raise Exception(msg.format(self._ip, PROX_PORT))