+
+
+ #
+ # TestSteps realted methods
+ #
+ def step_report_status(self, label, status):
+ """ Log status of test step
+ """
+ self._logger.info("%s ... %s", label, 'OK' if status else 'FAILED')
+
+ def step_stop_vnfs(self):
+ """ Stop all VNFs started by TestSteps
+ """
+ for vnf in self._step_vnf_list:
+ if self._step_vnf_list[vnf]:
+ self._step_vnf_list[vnf].stop()
+
+ def step_eval_param(self, param, step_result):
+ """ Helper function for #STEP macro evaluation
+ """
+ if isinstance(param, str):
+ # evaluate every #STEP reference inside parameter itself
+ macros = re.findall(r'(#STEP\[([\w\-:]+)\]((\[[\w\-\'\"]+\])*))', param)
+
+ if macros:
+ for macro in macros:
+ if macro[1] in self._step_result_mapping:
+ key = self._step_result_mapping[macro[1]]
+ else:
+ key = macro[1]
+ # pylint: disable=eval-used
+ tmp_val = str(eval('step_result[{}]{}'.format(key, macro[2])))
+ param = param.replace(macro[0], tmp_val)
+
+ # evaluate references to vsperf configuration options
+ macros = re.findall(r'\$(([\w\-]+)(\[[\w\[\]\-\'\"]+\])*)', param)
+ if macros:
+ for macro in macros:
+ # pylint: disable=eval-used
+ try:
+ tmp_val = str(eval("S.getValue('{}'){}".format(macro[1], macro[2])))
+ param = param.replace('${}'.format(macro[0]), tmp_val)
+ # ignore that required option can't be evaluated
+ except (IndexError, KeyError, AttributeError):
+ self._logger.debug("Skipping %s as it isn't a configuration "
+ "parameter.", '${}'.format(macro[0]))
+ return param
+ elif isinstance(param, (list, tuple)):
+ tmp_list = []
+ for item in param:
+ tmp_list.append(self.step_eval_param(item, step_result))
+ return tmp_list
+ elif isinstance(param, dict):
+ tmp_dict = {}
+ for (key, value) in param.items():
+ tmp_dict[key] = self.step_eval_param(value, step_result)
+ return tmp_dict
+ else:
+ return param
+
+ def step_eval_params(self, params, step_result):
+ """ Evaluates referrences to results from previous steps
+ """
+ eval_params = []
+ # evaluate all parameters if needed
+ for param in params:
+ eval_params.append(self.step_eval_param(param, step_result))
+ return eval_params
+
+ # pylint: disable=too-many-locals, too-many-branches, too-many-statements
+ def step_run(self):
+ """ Execute actions specified by TestSteps list
+
+ :return: False if any error was detected
+ True otherwise
+ """
+ # anything to do?
+ if not self.test:
+ return True
+
+ # required for VNFs initialization
+ loader = Loader()
+ # initialize list with results
+ self._step_result = [None] * len(self.test)
+
+ # run test step by step...
+ for i, step in enumerate(self.test):
+ step_ok = not self._step_check
+ step_check = self._step_check
+ regex = None
+ # configure step result mapping if step alias/label is detected
+ if step[0].startswith('#'):
+ key = step[0][1:]
+ if key.isdigit():
+ raise RuntimeError('Step alias can\'t be an integer value {}'.format(key))
+ if key in self._step_result_mapping:
+ raise RuntimeError('Step alias {} has been used already for step '
+ '{}'.format(key, self._step_result_mapping[key]))
+ self._step_result_mapping[step[0][1:]] = i
+ step = step[1:]
+
+ # store regex filter if it is specified
+ if isinstance(step[-1], str) and step[-1].startswith('|'):
+ # evalute macros and variables used in regex
+ regex = self.step_eval_params([step[-1][1:]], self._step_result[:i])[0]
+ step = step[:-1]
+
+ # check if step verification should be suppressed
+ if step[0].startswith('!'):
+ step_check = False
+ step_ok = True
+ step[0] = step[0][1:]
+ if step[0] == 'vswitch':
+ test_object = self._vswitch_ctl.get_vswitch()
+ elif step[0] == 'namespace':
+ test_object = namespace
+ elif step[0] == 'veth':
+ test_object = veth
+ elif step[0] == 'settings':
+ test_object = S
+ elif step[0] == 'tools':
+ test_object = TestStepsTools()
+ step[1] = step[1].title()
+ elif step[0] == 'trafficgen':
+ test_object = self._traffic_ctl
+ # in case of send_traffic or send_traffic_async methods, ensure
+ # that specified traffic values are merged with existing self._traffic
+ if step[1].startswith('send_traffic'):
+ tmp_traffic = copy.deepcopy(self._traffic)
+ tmp_traffic.update(step[2])
+ step[2] = tmp_traffic
+ # store indication that traffic has been sent
+ # so it is not sent again after the execution of teststeps
+ self._step_send_traffic = True
+ elif step[0].startswith('vnf'):
+ # use vnf started within TestSteps
+ if not self._step_vnf_list[step[0]]:
+ # initialize new VM
+ self._step_vnf_list[step[0]] = loader.get_vnf_class()()
+ test_object = self._step_vnf_list[step[0]]
+ elif step[0].startswith('VNF'):
+ if step[1] in ('start', 'stop'):
+ raise RuntimeError("Cannot execute start() or stop() method of "
+ "VNF deployed automatically by scenario.")
+ # use vnf started by scenario deployment (e.g. pvp)
+ vnf_index = int(step[0][3:])
+ try:
+ test_object = self._vnf_list[vnf_index]
+ except IndexError:
+ raise RuntimeError("VNF with index {} is not running.".format(vnf_index))
+ elif step[0] == 'wait':
+ input(os.linesep + "Step {}: Press Enter to continue with "
+ "the next step...".format(i) + os.linesep + os.linesep)
+ continue
+ elif step[0] == 'sleep':
+ self._logger.debug("Sleep %s seconds", step[1])
+ time.sleep(int(step[1]))
+ continue
+ elif step[0] == 'log':
+ test_object = self._logger
+ # there isn't a need for validation of log entry
+ step_check = False
+ step_ok = True
+ elif step[0] == 'pdb':
+ import pdb
+ pdb.set_trace()
+ continue
+ else:
+ self._logger.error("Unsupported test object %s", step[0])
+ self._step_status = {'status' : False, 'details' : ' '.join(step)}
+ self.step_report_status("Step '{}'".format(' '.join(step)),
+ self._step_status['status'])
+ return False
+
+ test_method = getattr(test_object, step[1])
+ if step_check:
+ test_method_check = getattr(test_object, CHECK_PREFIX + step[1])
+ else:
+ test_method_check = None
+
+ step_params = []
+ try:
+ # eval parameters, but use only valid step_results
+ # to support negative indexes
+ step_params = self.step_eval_params(step[2:], self._step_result[:i])
+ step_log = '{} {}'.format(' '.join(step[:2]), step_params)
+ step_log += ' filter "{}"'.format(regex) if regex else ''
+ self._logger.debug("Step %s '%s' start", i, step_log)
+ self._step_result[i] = test_method(*step_params)
+ if regex:
+ # apply regex to step output
+ self._step_result[i] = functions.filter_output(
+ self._step_result[i], regex)
+
+ self._logger.debug("Step %s '%s' results '%s'", i,
+ step_log, self._step_result[i])
+ time.sleep(S.getValue('TEST_STEP_DELAY'))
+ if step_check:
+ step_ok = test_method_check(self._step_result[i], *step_params)
+ except (AssertionError, AttributeError, IndexError) as ex:
+ step_ok = False
+ self._logger.error("Step %s raised %s", i, type(ex).__name__)
+
+ if step_check:
+ self.step_report_status("Step {} - '{}'".format(i, step_log), step_ok)
+
+ if not step_ok:
+ self._step_status = {'status' : False, 'details' : step_log}
+ # Stop all VNFs started by TestSteps
+ self.step_stop_vnfs()
+ return False
+
+ # all steps processed without any issue
+ return True
+
+ #
+ # get methods for TestCase members, which needs to be publicly available
+ #
+ def get_output_file(self):
+ """Return content of self._output_file member
+ """
+ return self._output_file
+
+ def get_desc(self):
+ """Return content of self.desc member
+ """
+ return self.desc
+
+ def get_versions(self):
+ """Return content of self.versions member
+ """
+ return self._versions
+
+ def get_traffic(self):
+ """Return content of self._traffic member
+ """
+ return self._traffic
+
+ def get_tc_results(self):
+ """Return content of self._tc_results member
+ """
+ return self._tc_results
+
+ def get_collector(self):
+ """Return content of self._collector member
+ """
+ return self._collector