Merge "docs: Update step driven test documentation"
[vswitchperf.git] / testcases / testcase.py
index 18c3186..68b8aec 100644 (file)
@@ -1,4 +1,4 @@
-# Copyright 2015-2017 Intel Corporation.
+# Copyright 2015-2018 Intel Corporation, Tieto and others.
 #
 # Licensed under the Apache License, Version 2.0 (the "License");
 # you may not use this file except in compliance with the License.
@@ -26,7 +26,7 @@ import subprocess
 
 from datetime import datetime as dt
 from conf import settings as S
-from conf import get_test_param, merge_spec
+from conf import merge_spec
 import core.component_factory as component_factory
 from core.loader import Loader
 from core.results.results_constants import ResultsConstants
@@ -85,6 +85,7 @@ class TestCase(object):
         S.setValue('VSWITCH', cfg.get('vSwitch', S.getValue('VSWITCH')))
         S.setValue('VNF', cfg.get('VNF', S.getValue('VNF')))
         S.setValue('TRAFFICGEN', cfg.get('Trafficgen', S.getValue('TRAFFICGEN')))
+        S.setValue('TUNNEL_TYPE', cfg.get('Tunnel Type', S.getValue('TUNNEL_TYPE')))
         test_params = copy.deepcopy(S.getValue('TEST_PARAMS'))
         tc_test_params = cfg.get('Parameters', S.getValue('TEST_PARAMS'))
         test_params = merge_spec(test_params, tc_test_params)
@@ -125,16 +126,7 @@ class TestCase(object):
         self.deployment = cfg['Deployment']
         self._frame_mod = cfg.get('Frame Modification', None)
 
-        self._tunnel_type = None
-        self._tunnel_operation = None
-
-        if self.deployment == 'op2p':
-            self._tunnel_operation = cfg['Tunnel Operation']
-
-            if 'Tunnel Type' in cfg:
-                self._tunnel_type = cfg['Tunnel Type']
-                self._tunnel_type = get_test_param('TUNNEL_TYPE',
-                                                   self._tunnel_type)
+        self._tunnel_operation = cfg.get('Tunnel Operation', None)
 
         # check if test requires background load and which generator it uses
         self._load_cfg = cfg.get('Load', None)
@@ -145,16 +137,14 @@ class TestCase(object):
 
         # set traffic details, so they can be passed to vswitch and traffic ctls
         self._traffic = copy.deepcopy(S.getValue('TRAFFIC'))
-        self._traffic.update({'bidir': bidirectional,
-                              'tunnel_type': self._tunnel_type,})
-
-        self._traffic = functions.check_traffic(self._traffic)
+        self._traffic.update({'bidir': bidirectional})
 
         # Packet Forwarding mode
         self._vswitch_none = str(S.getValue('VSWITCH')).strip().lower() == 'none'
 
         # trafficgen configuration required for tests of tunneling protocols
-        if self.deployment == "op2p":
+        if self._tunnel_operation:
+            self._traffic.update({'tunnel_type': S.getValue('TUNNEL_TYPE')})
             self._traffic['l2'].update({'srcmac':
                                         S.getValue('TRAFFICGEN_PORT1_MAC'),
                                         'dstmac':
@@ -166,9 +156,9 @@ class TestCase(object):
                                         S.getValue('TRAFFICGEN_PORT2_IP')})
 
             if self._tunnel_operation == "decapsulation":
-                self._traffic['l2'] = S.getValue(self._tunnel_type.upper() + '_FRAME_L2')
-                self._traffic['l3'] = S.getValue(self._tunnel_type.upper() + '_FRAME_L3')
-                self._traffic['l4'] = S.getValue(self._tunnel_type.upper() + '_FRAME_L4')
+                self._traffic['l2'].update(S.getValue(S.getValue('TUNNEL_TYPE').upper() + '_FRAME_L2'))
+                self._traffic['l3'].update(S.getValue(S.getValue('TUNNEL_TYPE').upper() + '_FRAME_L3'))
+                self._traffic['l4'].update(S.getValue(S.getValue('TUNNEL_TYPE').upper() + '_FRAME_L4'))
                 self._traffic['l2']['dstmac'] = S.getValue('NICS')[1]['mac']
         elif len(S.getValue('NICS')) >= 2 and \
              (S.getValue('NICS')[0]['type'] == 'vf' or
@@ -180,6 +170,8 @@ class TestCase(object):
             else:
                 self._logger.debug("MAC addresses can not be read")
 
+        self._traffic = functions.check_traffic(self._traffic)
+
         # count how many VNFs are involved in TestSteps
         if self.test:
             for step in self.test:
@@ -287,7 +279,7 @@ class TestCase(object):
         # cleanup any namespaces created
         if os.path.isdir('/tmp/namespaces'):
             namespace_list = os.listdir('/tmp/namespaces')
-            if len(namespace_list):
+            if namespace_list:
                 self._logger.info('Cleaning up namespaces')
             for name in namespace_list:
                 namespace.delete_namespace(name)
@@ -295,7 +287,7 @@ class TestCase(object):
         # cleanup any veth ports created
         if os.path.isdir('/tmp/veth'):
             veth_list = os.listdir('/tmp/veth')
-            if len(veth_list):
+            if veth_list:
                 self._logger.info('Cleaning up veth ports')
             for eth in veth_list:
                 port1, port2 = eth.split('-')
@@ -322,8 +314,8 @@ class TestCase(object):
             if len(self._tc_results) < len(results):
                 if len(self._tc_results) > 1:
                     raise RuntimeError('Testcase results do not match:'
-                                       'results: {}\n'
-                                       'trafficgen results: {}\n',
+                                       'results: %s\n'
+                                       'trafficgen results: %s\n' %
                                        self._tc_results,
                                        results)
                 else:
@@ -379,7 +371,7 @@ class TestCase(object):
         self._testcase_run_time = time.strftime("%H:%M:%S",
                                                 time.gmtime(self._testcase_stop_time -
                                                             self._testcase_start_time))
-        logging.info("Testcase execution time: " + self._testcase_run_time)
+        logging.info("Testcase execution time: %s", self._testcase_run_time)
         # report test results
         self.run_report()
 
@@ -409,8 +401,8 @@ class TestCase(object):
                 item[ResultsConstants.SCAL_PRE_INSTALLED_FLOWS] = self._traffic['pre_installed_flows']
             if self._vnf_ctl.get_vnfs_number():
                 item[ResultsConstants.GUEST_LOOPBACK] = ' '.join(S.getValue('GUEST_LOOPBACK'))
-            if self._tunnel_type:
-                item[ResultsConstants.TUNNEL_TYPE] = self._tunnel_type
+            if self._tunnel_operation:
+                item[ResultsConstants.TUNNEL_TYPE] = S.getValue('TUNNEL_TYPE')
         return results
 
     def _copy_fwd_tools_for_all_guests(self, vm_count):
@@ -562,7 +554,7 @@ class TestCase(object):
         """
         with open(output, 'a') as csvfile:
 
-            logging.info("Write results to file: " + output)
+            logging.info("Write results to file: %s", output)
             fieldnames = TestCase._get_unique_keys(results)
 
             writer = csv.DictWriter(csvfile, fieldnames)
@@ -720,7 +712,7 @@ class TestCase(object):
                         self._logger.debug("Skipping %s as it isn't a configuration "
                                            "parameter.", '${}'.format(macro[0]))
             return param
-        elif isinstance(param, list) or isinstance(param, tuple):
+        elif isinstance(param, (list, tuple)):
             tmp_list = []
             for item in param:
                 tmp_list.append(self.step_eval_param(item, step_result))
@@ -758,9 +750,6 @@ class TestCase(object):
         # initialize list with results
         self._step_result = [None] * len(self.test)
 
-        # We have to suppress pylint report, because test_object has to be set according
-        # to the test step definition
-        # pylint: disable=redefined-variable-type
         # run test step by step...
         for i, step in enumerate(self.test):
             step_ok = not self._step_check