+def check_and_set_locale():
+ """ Function will check locale settings. In case, that it isn't configured
+ properly, then default values specified by DEFAULT_LOCALE will be used.
+ """
+
+ system_locale = locale.getdefaultlocale()
+ if None in system_locale:
+ os.environ['LC_ALL'] = settings.getValue('DEFAULT_LOCALE')
+ _LOGGER.warning("Locale was not properly configured. Default values were set. Old locale: %s, New locale: %s",
+ system_locale, locale.getdefaultlocale())
+
+
+def generate_final_report():
+ """ Function will check if partial test results are available
+ and generates final report in rst format.
+ """
+
+ path = settings.getValue('RESULTS_PATH')
+ # check if there are any results in rst format
+ rst_results = glob.glob(os.path.join(path, 'result*rst'))
+ if len(rst_results):
+ try:
+ test_report = os.path.join(path, '{}_{}'.format(settings.getValue('VSWITCH'), _TEMPLATE_RST['final']))
+ # create report caption directly - it is not worth to execute jinja machinery
+ if settings.getValue('VSWITCH').lower() != 'none':
+ pkt_processor = Loader().get_vswitches()[settings.getValue('VSWITCH')].__doc__.strip().split('\n')[0]
+ else:
+ pkt_processor = Loader().get_pktfwds()[settings.getValue('PKTFWD')].__doc__.strip().split('\n')[0]
+ report_caption = '{}\n{} {}\n{}\n\n'.format(
+ '============================================================',
+ 'Performance report for',
+ pkt_processor,
+ '============================================================')
+
+ with open(_TEMPLATE_RST['tmp'], 'w') as file_:
+ file_.write(report_caption)
+
+ retval = subprocess.call('cat {} {} {} {} > {}'.format(_TEMPLATE_RST['tmp'], _TEMPLATE_RST['head'],
+ ' '.join(rst_results), _TEMPLATE_RST['foot'],
+ test_report), shell=True)
+ if retval == 0 and os.path.isfile(test_report):
+ _LOGGER.info('Overall test report written to "%s"', test_report)
+ else:
+ _LOGGER.error('Generatrion of overall test report has failed.')
+
+ # remove temporary file
+ os.remove(_TEMPLATE_RST['tmp'])
+
+ except subprocess.CalledProcessError:
+ _LOGGER.error('Generatrion of overall test report has failed.')
+
+
+def enable_sriov(nic_list):
+ """ Enable SRIOV for given enhanced PCI IDs
+
+ :param nic_list: A list of enhanced PCI IDs
+ """
+ # detect if sriov is required
+ sriov_nic = {}
+ for nic in nic_list:
+ if networkcard.is_sriov_nic(nic):
+ tmp_nic = nic.split('|')
+ if tmp_nic[0] in sriov_nic:
+ if int(tmp_nic[1][2:]) > sriov_nic[tmp_nic[0]]:
+ sriov_nic[tmp_nic[0]] = int(tmp_nic[1][2:])
+ else:
+ sriov_nic.update({tmp_nic[0] : int(tmp_nic[1][2:])})
+
+ # sriov is required for some NICs
+ if len(sriov_nic):
+ for nic in sriov_nic:
+ # check if SRIOV is supported and enough virt interfaces are available
+ if not networkcard.is_sriov_supported(nic) \
+ or networkcard.get_sriov_numvfs(nic) <= sriov_nic[nic]:
+ # if not, enable and set appropriate number of VFs
+ if not networkcard.set_sriov_numvfs(nic, sriov_nic[nic] + 1):
+ _LOGGER.error("SRIOV cannot be enabled for NIC %s", nic)
+ raise
+ else:
+ _LOGGER.debug("SRIOV enabled for NIC %s", nic)
+
+ # WORKAROUND: it has been observed with IXGBE(VF) driver,
+ # that NIC doesn't correclty dispatch traffic to VFs based
+ # on their MAC address. Unbind and bind to the same driver
+ # solves this issue.
+ networkcard.reinit_vfs(nic)
+
+ # After SRIOV is enabled it takes some time until network drivers
+ # properly initialize all cards.
+ # Wait also in case, that SRIOV was already configured as it can be
+ # configured automatically just before vsperf execution.
+ time.sleep(2)
+
+ return True
+
+ return False
+
+
+def disable_sriov(nic_list):
+ """ Disable SRIOV for given PCI IDs
+
+ :param nic_list: A list of enhanced PCI IDs
+ """
+ for nic in nic_list:
+ if networkcard.is_sriov_nic(nic):
+ if not networkcard.set_sriov_numvfs(nic.split('|')[0], 0):
+ _LOGGER.error("SRIOV cannot be disabled for NIC %s", nic)
+ raise
+ else:
+ _LOGGER.debug("SRIOV disabled for NIC %s", nic.split('|')[0])
+
+
+def handle_list_options(args):
+ """ Process --list cli arguments if needed
+
+ :param args: A dictionary with all CLI arguments
+ """
+ if args['list_trafficgens']:
+ print(Loader().get_trafficgens_printable())
+ sys.exit(0)
+
+ if args['list_collectors']:
+ print(Loader().get_collectors_printable())
+ sys.exit(0)
+
+ if args['list_vswitches']:
+ print(Loader().get_vswitches_printable())
+ sys.exit(0)
+
+ if args['list_vnfs']:
+ print(Loader().get_vnfs_printable())
+ sys.exit(0)
+
+ if args['list_fwdapps']:
+ print(Loader().get_pktfwds_printable())
+ sys.exit(0)
+
+ if args['list_settings']:
+ print(str(settings))
+ sys.exit(0)
+
+ if args['list']:
+ # configure tests
+ if args['integration']:
+ testcases = settings.getValue('INTEGRATION_TESTS')
+ else:
+ testcases = settings.getValue('PERFORMANCE_TESTS')
+
+ print("Available Tests:")
+ print("================")
+
+ for test in testcases:
+ print('* %-30s %s' % ('%s:' % test['Name'], test['Description']))
+ sys.exit(0)
+
+
+def vsperf_finalize():
+ """ Clean up before exit
+ """
+ # remove directory if no result files were created
+ try:
+ results_path = settings.getValue('RESULTS_PATH')
+ if os.path.exists(results_path):
+ files_list = os.listdir(results_path)
+ if files_list == []:
+ _LOGGER.info("Removing empty result directory: " + results_path)
+ shutil.rmtree(results_path)
+ except AttributeError:
+ # skip it if parameter doesn't exist
+ pass
+
+ # disable SRIOV if needed
+ try:
+ if settings.getValue('SRIOV_ENABLED'):
+ disable_sriov(settings.getValue('WHITELIST_NICS_ORIG'))
+ except AttributeError:
+ # skip it if parameter doesn't exist
+ pass
+
+
+class MockTestCase(unittest.TestCase):
+ """Allow use of xmlrunner to generate Jenkins compatible output without
+ using xmlrunner to actually run tests.
+
+ Usage:
+ suite = unittest.TestSuite()
+ suite.addTest(MockTestCase('Test1 passed ', True, 'Test1'))
+ suite.addTest(MockTestCase('Test2 failed because...', False, 'Test2'))
+ xmlrunner.XMLTestRunner(...).run(suite)
+ """
+
+ def __init__(self, msg, is_pass, test_name):
+ #remember the things
+ self.msg = msg
+ self.is_pass = is_pass
+
+ #dynamically create a test method with the right name
+ #but point the method at our generic test method
+ setattr(MockTestCase, test_name, self.generic_test)
+
+ super(MockTestCase, self).__init__(test_name)
+
+ def generic_test(self):
+ """Provide a generic function that raises or not based
+ on how self.is_pass was set in the constructor"""
+ self.assertTrue(self.is_pass, self.msg)
+
+