1 # Copyright 2015-2017 Intel Corporation.
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at
7 # http://www.apache.org/licenses/LICENSE-2.0
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
14 """TestCase base class
17 from collections import OrderedDict
27 from conf import settings as S
28 from conf import get_test_param, merge_spec
29 import core.component_factory as component_factory
30 from core.loader import Loader
31 from core.results.results_constants import ResultsConstants
32 from tools import tasks
33 from tools import hugepages
34 from tools import functions
35 from tools import namespace
36 from tools import veth
37 from tools.teststepstools import TestStepsTools
39 CHECK_PREFIX = 'validate_'
41 # pylint: disable=too-many-instance-attributes
42 class TestCase(object):
43 """TestCase base class
45 In this basic form runs RFC2544 throughput test
47 # pylint: disable=too-many-statements
48 def __init__(self, cfg):
49 """Pull out fields from test config
51 :param cfg: A dictionary of string-value pairs describing the test
52 configuration. Both the key and values strings use well-known
54 :param results_dir: Where the csv formatted results are written.
56 self._testcase_start_time = time.time()
57 self._hugepages_mounted = False
58 self._traffic_ctl = None
60 self._vswitch_ctl = None
61 self._collector = None
63 self._output_file = None
64 self._tc_results = None
65 self._settings_original = {}
66 self._settings_paths_modified = False
67 self._testcast_run_time = None
68 # initialization of step driven specific members
69 self._step_check = False # by default don't check result for step driven testcases
70 self._step_vnf_list = {}
71 self._step_result = []
72 self._step_status = None
73 self._testcase_run_time = None
75 # store all GUEST_ specific settings to keep original values before their expansion
76 for key in S.__dict__:
77 if key.startswith('GUEST_'):
78 self._settings_original[key] = S.getValue(key)
80 self._update_settings('VSWITCH', cfg.get('vSwitch', S.getValue('VSWITCH')))
81 self._update_settings('VNF', cfg.get('VNF', S.getValue('VNF')))
82 self._update_settings('TRAFFICGEN', cfg.get('Trafficgen', S.getValue('TRAFFICGEN')))
83 test_params = copy.deepcopy(S.getValue('TEST_PARAMS'))
84 tc_test_params = cfg.get('Parameters', S.getValue('TEST_PARAMS'))
85 test_params = merge_spec(test_params, tc_test_params)
86 self._update_settings('TEST_PARAMS', test_params)
89 # override all redefined GUEST_ values to have them expanded correctly
90 tmp_test_params = copy.deepcopy(S.getValue('TEST_PARAMS'))
91 for key in tmp_test_params:
92 if key.startswith('GUEST_'):
93 S.setValue(key, S.getValue(key))
94 S.getValue('TEST_PARAMS').pop(key)
96 # update global settings
97 functions.settings_update_paths()
99 # set test parameters; CLI options take precedence to testcase settings
100 self._logger = logging.getLogger(__name__)
101 self.name = cfg['Name']
102 self.desc = cfg.get('Description', 'No description given.')
103 self.test = cfg.get('TestSteps', None)
105 bidirectional = S.getValue('TRAFFIC')['bidir']
106 if not isinstance(S.getValue('TRAFFIC')['bidir'], str):
108 'Bi-dir value must be of type string')
109 bidirectional = bidirectional.title() # Keep things consistent
111 self.deployment = cfg['Deployment']
112 self._frame_mod = cfg.get('Frame Modification', None)
114 self._tunnel_type = None
115 self._tunnel_operation = None
117 if self.deployment == 'op2p':
118 self._tunnel_operation = cfg['Tunnel Operation']
120 if 'Tunnel Type' in cfg:
121 self._tunnel_type = cfg['Tunnel Type']
122 self._tunnel_type = get_test_param('TUNNEL_TYPE',
125 # check if test requires background load and which generator it uses
126 self._load_cfg = cfg.get('Load', None)
127 if self._load_cfg and 'tool' in self._load_cfg:
128 self._loadgen = self._load_cfg['tool']
130 # background load is not requested, so use dummy implementation
131 self._loadgen = "Dummy"
134 self._frame_mod = self._frame_mod.lower()
135 self._results_dir = S.getValue('RESULTS_PATH')
137 # set traffic details, so they can be passed to vswitch and traffic ctls
138 self._traffic = copy.deepcopy(S.getValue('TRAFFIC'))
139 self._traffic.update({'bidir': bidirectional,
140 'tunnel_type': self._tunnel_type,})
142 # Packet Forwarding mode
143 self._vswitch_none = S.getValue('VSWITCH').strip().lower() == 'none'
145 # trafficgen configuration required for tests of tunneling protocols
146 if self.deployment == "op2p":
147 self._traffic['l2'].update({'srcmac':
148 S.getValue('TRAFFICGEN_PORT1_MAC'),
150 S.getValue('TRAFFICGEN_PORT2_MAC')})
152 self._traffic['l3'].update({'srcip':
153 S.getValue('TRAFFICGEN_PORT1_IP'),
155 S.getValue('TRAFFICGEN_PORT2_IP')})
157 if self._tunnel_operation == "decapsulation":
158 self._traffic['l2'] = S.getValue(self._tunnel_type.upper() + '_FRAME_L2')
159 self._traffic['l3'] = S.getValue(self._tunnel_type.upper() + '_FRAME_L3')
160 self._traffic['l4'] = S.getValue(self._tunnel_type.upper() + '_FRAME_L4')
161 elif len(S.getValue('NICS')) and \
162 (S.getValue('NICS')[0]['type'] == 'vf' or
163 S.getValue('NICS')[1]['type'] == 'vf'):
164 mac1 = S.getValue('NICS')[0]['mac']
165 mac2 = S.getValue('NICS')[1]['mac']
167 self._traffic['l2'].update({'srcmac': mac2, 'dstmac': mac1})
169 self._logger.debug("MAC addresses can not be read")
171 # count how many VNFs are involved in TestSteps
173 for step in self.test:
174 if step[0].startswith('vnf'):
175 self._step_vnf_list[step[0]] = None
177 def run_initialize(self):
178 """ Prepare test execution environment
180 self._logger.debug(self.name)
182 # mount hugepages if needed
183 self._mount_hugepages()
185 self._logger.debug("Controllers:")
187 self._traffic_ctl = component_factory.create_traffic(
188 self._traffic['traffic_type'],
189 loader.get_trafficgen_class())
191 self._vnf_ctl = component_factory.create_vnf(
193 loader.get_vnf_class(),
194 len(self._step_vnf_list))
196 # verify enough hugepages are free to run the testcase
197 if not self._check_for_enough_hugepages():
198 raise RuntimeError('Not enough hugepages free to run test.')
200 # perform guest related handling
201 tmp_vm_count = self._vnf_ctl.get_vnfs_number() + len(self._step_vnf_list)
203 # copy sources of l2 forwarding tools into VM shared dir if needed
204 self._copy_fwd_tools_for_all_guests(tmp_vm_count)
206 # in case of multi VM in parallel, set the number of streams to the number of VMs
207 if self.deployment.startswith('pvpv'):
208 # for each VM NIC pair we need an unique stream
210 for vm_nic in S.getValue('GUEST_NICS_NR')[:tmp_vm_count]:
211 streams += int(vm_nic / 2) if vm_nic > 1 else 1
212 self._logger.debug("VMs with parallel connection were detected. "
213 "Thus Number of streams was set to %s", streams)
214 # update streams if needed; In case of additional VNFs deployed by TestSteps
215 # user can define a proper stream count manually
216 if 'multistream' not in self._traffic or self._traffic['multistream'] < streams:
217 self._traffic.update({'multistream': streams})
219 # OVS Vanilla requires guest VM MAC address and IPs to work
220 if 'linux_bridge' in S.getValue('GUEST_LOOPBACK'):
221 self._traffic['l2'].update({'srcmac': S.getValue('VANILLA_TGEN_PORT1_MAC'),
222 'dstmac': S.getValue('VANILLA_TGEN_PORT2_MAC')})
223 self._traffic['l3'].update({'srcip': S.getValue('VANILLA_TGEN_PORT1_IP'),
224 'dstip': S.getValue('VANILLA_TGEN_PORT2_IP')})
226 if self._vswitch_none:
227 self._vswitch_ctl = component_factory.create_pktfwd(
229 loader.get_pktfwd_class())
231 self._vswitch_ctl = component_factory.create_vswitch(
233 loader.get_vswitch_class(),
235 self._tunnel_operation)
237 self._collector = component_factory.create_collector(
238 loader.get_collector_class(),
239 self._results_dir, self.name)
240 self._loadgen = component_factory.create_loadgen(
244 self._output_file = os.path.join(self._results_dir, "result_" + self.name +
245 "_" + self.deployment + ".csv")
247 self._step_status = {'status' : True, 'details' : ''}
249 self._logger.debug("Setup:")
251 def run_finalize(self):
252 """ Tear down test execution environment and record test results
254 # Stop all VNFs started by TestSteps in case that something went wrong
255 self.step_stop_vnfs()
257 # umount hugepages if mounted
258 self._umount_hugepages()
260 # restore original settings
261 S.load_from_dict(self._settings_original)
263 # cleanup any namespaces created
264 if os.path.isdir('/tmp/namespaces'):
265 namespace_list = os.listdir('/tmp/namespaces')
266 if len(namespace_list):
267 self._logger.info('Cleaning up namespaces')
268 for name in namespace_list:
269 namespace.delete_namespace(name)
270 os.rmdir('/tmp/namespaces')
271 # cleanup any veth ports created
272 if os.path.isdir('/tmp/veth'):
273 veth_list = os.listdir('/tmp/veth')
275 self._logger.info('Cleaning up veth ports')
276 for eth in veth_list:
277 port1, port2 = eth.split('-')
278 veth.del_veth_port(port1, port2)
279 os.rmdir('/tmp/veth')
281 def run_report(self):
282 """ Report test results
284 self._logger.debug("self._collector Results:")
285 self._collector.print_results()
287 results = self._traffic_ctl.get_results()
289 self._logger.debug("Traffic Results:")
290 self._traffic_ctl.print_results()
292 self._tc_results = self._append_results(results)
293 TestCase.write_result_to_file(self._tc_results, self._output_file)
298 All setup and teardown through controllers is included.
300 # prepare test execution environment
301 self.run_initialize()
304 with self._vswitch_ctl, self._loadgen:
305 with self._vnf_ctl, self._collector:
306 if not self._vswitch_none:
309 with self._traffic_ctl:
310 # execute test based on TestSteps definition if needed...
312 # ...and continue with traffic generation, but keep
313 # in mind, that clean deployment does not configure
314 # OVS nor executes the traffic
315 if self.deployment != 'clean':
316 self._traffic_ctl.send_traffic(self._traffic)
318 # dump vswitch flows before they are affected by VNF termination
319 if not self._vswitch_none:
320 self._vswitch_ctl.dump_vswitch_flows()
322 # garbage collection for case that TestSteps modify existing deployment
323 self.step_stop_vnfs()
326 # tear down test execution environment and log results
329 self._testcase_run_time = time.strftime("%H:%M:%S",
330 time.gmtime(time.time() -
331 self._testcase_start_time))
332 logging.info("Testcase execution time: " + self._testcase_run_time)
333 # report test results
336 def _update_settings(self, param, value):
337 """ Check value of given configuration parameter
338 In case that new value is different, then testcase
339 specific settings is updated and original value stored
341 :param param: Name of parameter inside settings
342 :param value: Disired parameter value
344 orig_value = S.getValue(param)
345 if orig_value != value:
346 self._settings_original[param] = copy.deepcopy(orig_value)
347 S.setValue(param, value)
349 def _append_results(self, results):
351 Method appends mandatory Test Case results to list of dictionaries.
353 :param results: list of dictionaries which contains results from
356 :returns: modified list of dictionaries.
359 item[ResultsConstants.ID] = self.name
360 item[ResultsConstants.DEPLOYMENT] = self.deployment
361 item[ResultsConstants.TRAFFIC_TYPE] = self._traffic['l3']['proto']
362 item[ResultsConstants.TEST_RUN_TIME] = self._testcase_run_time
363 if self._traffic['multistream']:
364 item[ResultsConstants.SCAL_STREAM_COUNT] = self._traffic['multistream']
365 item[ResultsConstants.SCAL_STREAM_TYPE] = self._traffic['stream_type']
366 item[ResultsConstants.SCAL_PRE_INSTALLED_FLOWS] = self._traffic['pre_installed_flows']
367 if self._vnf_ctl.get_vnfs_number():
368 item[ResultsConstants.GUEST_LOOPBACK] = ' '.join(S.getValue('GUEST_LOOPBACK'))
369 if self._tunnel_type:
370 item[ResultsConstants.TUNNEL_TYPE] = self._tunnel_type
373 def _copy_fwd_tools_for_all_guests(self, vm_count):
374 """Copy dpdk and l2fwd code to GUEST_SHARE_DIR[s] based on selected deployment.
376 # consider only VNFs involved in the test
377 for guest_dir in set(S.getValue('GUEST_SHARE_DIR')[:vm_count]):
378 self._copy_fwd_tools_for_guest(guest_dir)
380 def _copy_fwd_tools_for_guest(self, guest_dir):
381 """Copy dpdk and l2fwd code to GUEST_SHARE_DIR of VM
383 :param index: Index of VM starting from 1 (i.e. 1st VM has index 1)
385 # remove shared dir if it exists to avoid issues with file consistency
386 if os.path.exists(guest_dir):
387 tasks.run_task(['rm', '-f', '-r', guest_dir], self._logger,
388 'Removing content of shared directory...', True)
390 # directory to share files between host and guest
391 os.makedirs(guest_dir)
393 # copy sources into shared dir only if neccessary
394 guest_loopback = set(S.getValue('GUEST_LOOPBACK'))
395 if 'testpmd' in guest_loopback:
397 # exclude whole .git/ subdirectory and all o-files;
398 # It is assumed, that the same RTE_TARGET is used in both host
399 # and VMs; This simplification significantly speeds up testpmd
400 # build. If we will need a different RTE_TARGET in VM,
401 # then we have to build whole DPDK from the scratch in VM.
402 # In that case we can copy just DPDK sources (e.g. by excluding
403 # all items obtained by git status -unormal --porcelain).
404 # NOTE: Excluding RTE_TARGET directory won't help on systems,
405 # where DPDK is built for multiple targets (e.g. for gcc & icc)
407 exclude.append(r'--exclude=.git/')
408 exclude.append(r'--exclude=*.o')
409 tasks.run_task(['rsync', '-a', '-r', '-l'] + exclude +
410 [os.path.join(S.getValue('TOOLS')['dpdk_src'], ''),
411 os.path.join(guest_dir, 'DPDK')],
413 'Copying DPDK to shared directory...',
415 except subprocess.CalledProcessError:
416 self._logger.error('Unable to copy DPDK to shared directory')
418 if 'l2fwd' in guest_loopback:
420 tasks.run_task(['rsync', '-a', '-r', '-l',
421 os.path.join(S.getValue('ROOT_DIR'), 'src/l2fwd/'),
422 os.path.join(guest_dir, 'l2fwd')],
424 'Copying l2fwd to shared directory...',
426 except subprocess.CalledProcessError:
427 self._logger.error('Unable to copy l2fwd to shared directory')
430 def _mount_hugepages(self):
431 """Mount hugepages if usage of DPDK or Qemu is detected
433 # pylint: disable=too-many-boolean-expressions
434 # hugepages are needed by DPDK and Qemu
435 if not self._hugepages_mounted and \
436 (self.deployment.count('v') or \
437 S.getValue('VSWITCH').lower().count('dpdk') or \
438 self._vswitch_none or \
439 self.test and 'vnf' in [step[0][0:3] for step in self.test]):
440 hugepages.mount_hugepages()
441 self._hugepages_mounted = True
443 def _umount_hugepages(self):
444 """Umount hugepages if they were mounted before
446 if self._hugepages_mounted:
447 hugepages.umount_hugepages()
448 self._hugepages_mounted = False
450 def _check_for_enough_hugepages(self):
451 """Check to make sure enough hugepages are free to satisfy the
455 hugepage_size = hugepages.get_hugepage_size()
456 # get hugepage amounts per guest involved in the test
457 for guest in range(self._vnf_ctl.get_vnfs_number()):
458 hugepages_needed += math.ceil((int(S.getValue(
459 'GUEST_MEMORY')[guest]) * 1000) / hugepage_size)
461 # get hugepage amounts for each socket on dpdk
462 sock0_mem, sock1_mem = 0, 0
463 if S.getValue('VSWITCH').lower().count('dpdk'):
464 # the import below needs to remain here and not put into the module
465 # imports because of an exception due to settings not yet loaded
466 from vswitches import ovs_dpdk_vhost
467 if ovs_dpdk_vhost.OvsDpdkVhost.old_dpdk_config():
469 r'-socket-mem\s+(\d+),(\d+)',
470 ''.join(S.getValue('VSWITCHD_DPDK_ARGS')))
472 sock0_mem, sock1_mem = (int(match.group(1)) * 1024 / hugepage_size,
473 int(match.group(2)) * 1024 / hugepage_size)
476 'Could not parse socket memory config in dpdk params.')
478 sock0_mem, sock1_mem = (
480 'VSWITCHD_DPDK_CONFIG')['dpdk-socket-mem'].split(','))
481 sock0_mem, sock1_mem = (int(sock0_mem) * 1024 / hugepage_size,
482 int(sock1_mem) * 1024 / hugepage_size)
484 # If hugepages needed, verify the amounts are free
485 if any([hugepages_needed, sock0_mem, sock1_mem]):
486 free_hugepages = hugepages.get_free_hugepages()
488 logging.info('Need %s hugepages free for guests',
490 result1 = free_hugepages >= hugepages_needed
491 free_hugepages -= hugepages_needed
496 logging.info('Need %s hugepages free for dpdk socket 0',
498 result2 = hugepages.get_free_hugepages('0') >= sock0_mem
499 free_hugepages -= sock0_mem
504 logging.info('Need %s hugepages free for dpdk socket 1',
506 result3 = hugepages.get_free_hugepages('1') >= sock1_mem
507 free_hugepages -= sock1_mem
511 logging.info('Need a total of %s total hugepages',
512 hugepages_needed + sock1_mem + sock0_mem)
514 # The only drawback here is sometimes dpdk doesn't release
515 # its hugepages on a test failure. This could cause a test
516 # to fail when dpdk would be OK to start because it will just
517 # use the previously allocated hugepages.
518 result4 = True if free_hugepages >= 0 else False
520 return all([result1, result2, result3, result4])
525 def write_result_to_file(results, output):
526 """Write list of dictionaries to a CSV file.
528 Each element on list will create separate row in output file.
529 If output file already exists, data will be appended at the end,
530 otherwise it will be created.
532 :param results: list of dictionaries.
533 :param output: path to output file.
535 with open(output, 'a') as csvfile:
537 logging.info("Write results to file: " + output)
538 fieldnames = TestCase._get_unique_keys(results)
540 writer = csv.DictWriter(csvfile, fieldnames)
542 if not csvfile.tell(): # file is now empty
545 for result in results:
546 writer.writerow(result)
549 def _get_unique_keys(list_of_dicts):
550 """Gets unique key values as ordered list of strings in given dicts
552 :param list_of_dicts: list of dictionaries.
554 :returns: list of unique keys(strings).
556 result = OrderedDict()
557 for item in list_of_dicts:
558 for key in item.keys():
561 return list(result.keys())
563 def _add_flows(self):
564 """Add flows to the vswitch
566 vswitch = self._vswitch_ctl.get_vswitch()
567 # NOTE BOM 15-08-07 the frame mod code assumes that the
568 # physical ports are ports 1 & 2. The actual numbers
569 # need to be retrived from the vSwitch and the metadata value
570 # updated accordingly.
571 bridge = S.getValue('VSWITCH_BRIDGE_NAME')
572 if self._frame_mod == "vlan":
573 # 0x8100 => VLAN ethertype
574 self._logger.debug(" **** VLAN ***** ")
575 flow = {'table':'2', 'priority':'1000', 'metadata':'2',
576 'actions': ['push_vlan:0x8100', 'goto_table:3']}
577 vswitch.add_flow(bridge, flow)
578 flow = {'table':'2', 'priority':'1000', 'metadata':'1',
579 'actions': ['push_vlan:0x8100', 'goto_table:3']}
580 vswitch.add_flow(bridge, flow)
581 elif self._frame_mod == "mpls":
582 # 0x8847 => MPLS unicast ethertype
583 self._logger.debug(" **** MPLS ***** ")
584 flow = {'table':'2', 'priority':'1000', 'metadata':'2',
585 'actions': ['push_mpls:0x8847', 'goto_table:3']}
586 vswitch.add_flow(bridge, flow)
587 flow = {'table':'2', 'priority':'1000', 'metadata':'1',
588 'actions': ['push_mpls:0x8847', 'goto_table:3']}
589 vswitch.add_flow(bridge, flow)
590 elif self._frame_mod == "mac":
591 flow = {'table':'2', 'priority':'1000', 'metadata':'2',
592 'actions': ['mod_dl_src:22:22:22:22:22:22',
594 vswitch.add_flow(bridge, flow)
595 flow = {'table':'2', 'priority':'1000', 'metadata':'1',
596 'actions': ['mod_dl_src:11:11:11:11:11:11',
598 vswitch.add_flow(bridge, flow)
599 elif self._frame_mod == "dscp":
600 # DSCP 184d == 0x4E<<2 => 'Expedited Forwarding'
601 flow = {'table':'2', 'priority':'1000', 'metadata':'2',
603 'actions': ['mod_nw_tos:184', 'goto_table:3']}
604 vswitch.add_flow(bridge, flow)
605 flow = {'table':'2', 'priority':'1000', 'metadata':'1',
607 'actions': ['mod_nw_tos:184', 'goto_table:3']}
608 vswitch.add_flow(bridge, flow)
609 elif self._frame_mod == "ttl":
610 # 251 and 241 are the highest prime numbers < 255
611 flow = {'table':'2', 'priority':'1000', 'metadata':'2',
613 'actions': ['mod_nw_ttl:251', 'goto_table:3']}
614 vswitch.add_flow(bridge, flow)
615 flow = {'table':'2', 'priority':'1000', 'metadata':'1',
617 'actions': ['mod_nw_ttl:241', 'goto_table:3']}
618 vswitch.add_flow(bridge, flow)
619 elif self._frame_mod == "ip_addr":
620 flow = {'table':'2', 'priority':'1000', 'metadata':'2',
622 'actions': ['mod_nw_src:10.10.10.10',
623 'mod_nw_dst:20.20.20.20',
625 vswitch.add_flow(bridge, flow)
626 flow = {'table':'2', 'priority':'1000', 'metadata':'1',
628 'actions': ['mod_nw_src:20.20.20.20',
629 'mod_nw_dst:10.10.10.10',
631 vswitch.add_flow(bridge, flow)
632 elif self._frame_mod == "ip_port":
633 # NOTE BOM 15-08-27 The traffic generated is assumed
634 # to be UDP (nw_proto 17d) which is the default case but
635 # we will need to pick up the actual traffic params in use.
636 flow = {'table':'2', 'priority':'1000', 'metadata':'2',
637 'dl_type':'0x0800', 'nw_proto':'17',
638 'actions': ['mod_tp_src:44444',
639 'mod_tp_dst:44444', 'goto_table:3']}
640 vswitch.add_flow(bridge, flow)
641 flow = {'table':'2', 'priority':'1000', 'metadata':'1',
642 'dl_type':'0x0800', 'nw_proto':'17',
643 'actions': ['mod_tp_src:44444',
644 'mod_tp_dst:44444', 'goto_table:3']}
645 vswitch.add_flow(bridge, flow)
651 # TestSteps realted methods
653 def step_report_status(self, label, status):
654 """ Log status of test step
656 self._logger.info("%s ... %s", label, 'OK' if status else 'FAILED')
658 def step_stop_vnfs(self):
659 """ Stop all VNFs started by TestSteps
661 for vnf in self._step_vnf_list:
662 if self._step_vnf_list[vnf]:
663 self._step_vnf_list[vnf].stop()
666 def step_eval_param(param, STEP):
667 # pylint: disable=invalid-name
668 """ Helper function for #STEP macro evaluation
670 if isinstance(param, str):
671 # evaluate every #STEP reference inside parameter itself
672 macros = re.findall(r'#STEP\[[\w\[\]\-\'\"]+\]', param)
675 # pylint: disable=eval-used
676 tmp_val = str(eval(macro[1:]))
677 param = param.replace(macro, tmp_val)
679 elif isinstance(param, list) or isinstance(param, tuple):
682 tmp_list.append(TestCase.step_eval_param(item, STEP))
684 elif isinstance(param, dict):
686 for (key, value) in param.items():
687 tmp_dict[key] = TestCase.step_eval_param(value, STEP)
693 def step_eval_params(params, step_result):
694 """ Evaluates referrences to results from previous steps
697 # evaluate all parameters if needed
699 eval_params.append(TestCase.step_eval_param(param, step_result))
703 """ Execute actions specified by TestSteps list
705 :return: False if any error was detected
712 # required for VNFs initialization
714 # initialize list with results
715 self._step_result = [None] * len(self.test)
717 # We have to suppress pylint report, because test_object has to be set according
718 # to the test step definition
719 # pylint: disable=redefined-variable-type
720 # run test step by step...
721 for i, step in enumerate(self.test):
722 step_ok = not self._step_check
723 if step[0] == 'vswitch':
724 test_object = self._vswitch_ctl.get_vswitch()
725 elif step[0] == 'namespace':
726 test_object = namespace
727 elif step[0] == 'veth':
729 elif step[0] == 'settings':
731 elif step[0] == 'tools':
732 test_object = TestStepsTools()
733 step[1] = step[1].title()
734 elif step[0] == 'trafficgen':
735 test_object = self._traffic_ctl
736 # in case of send_traffic or send_traffic_async methods, ensure
737 # that specified traffic values are merged with existing self._traffic
738 if step[1].startswith('send_traffic'):
739 tmp_traffic = copy.deepcopy(self._traffic)
740 tmp_traffic.update(step[2])
741 step[2] = tmp_traffic
742 elif step[0].startswith('vnf'):
743 if not self._step_vnf_list[step[0]]:
745 self._step_vnf_list[step[0]] = loader.get_vnf_class()()
746 test_object = self._step_vnf_list[step[0]]
747 elif step[0] == 'wait':
748 input(os.linesep + "Step {}: Press Enter to continue with "
749 "the next step...".format(i) + os.linesep + os.linesep)
752 self._logger.error("Unsupported test object %s", step[0])
753 self._step_status = {'status' : False, 'details' : ' '.join(step)}
754 self.step_report_status("Step '{}'".format(' '.join(step)),
755 self._step_status['status'])
758 test_method = getattr(test_object, step[1])
760 test_method_check = getattr(test_object, CHECK_PREFIX + step[1])
762 test_method_check = None
766 # eval parameters, but use only valid step_results
767 # to support negative indexes
768 step_params = TestCase.step_eval_params(step[2:], self._step_result[:i])
769 step_log = '{} {}'.format(' '.join(step[:2]), step_params)
770 self._logger.debug("Step %s '%s' start", i, step_log)
771 self._step_result[i] = test_method(*step_params)
772 self._logger.debug("Step %s '%s' results '%s'", i,
773 step_log, self._step_result[i])
776 step_ok = test_method_check(self._step_result[i], *step_params)
777 except (AssertionError, AttributeError, IndexError) as ex:
779 self._logger.error("Step %s raised %s", i, type(ex).__name__)
782 self.step_report_status("Step {} - '{}'".format(i, step_log), step_ok)
785 self._step_status = {'status' : False, 'details' : step_log}
786 # Stop all VNFs started by TestSteps
787 self.step_stop_vnfs()
790 # all steps processed without any issue