X-Git-Url: https://gerrit.opnfv.org/gerrit/gitweb?a=blobdiff_plain;f=yardstick%2Fnetwork_services%2Fvnf_generic%2Fvnf%2Fsample_vnf.py;h=b2c5a98bcf1f7ae745699bbbd92e7de17940bff3;hb=771215553f56993dd68b06dba561dcee17d7fc4f;hp=1ee71aa255901c2b5a7dde22e9d52ae426ee9463;hpb=f6c837fbaa0000c2749666e9b8fb092a26a7e0f3;p=yardstick.git diff --git a/yardstick/network_services/vnf_generic/vnf/sample_vnf.py b/yardstick/network_services/vnf_generic/vnf/sample_vnf.py index 1ee71aa25..b2c5a98bc 100644 --- a/yardstick/network_services/vnf_generic/vnf/sample_vnf.py +++ b/yardstick/network_services/vnf_generic/vnf/sample_vnf.py @@ -13,15 +13,16 @@ # limitations under the License. import logging +import decimal from multiprocessing import Queue, Value, Process - import os import posixpath import re -import six +import uuid import subprocess import time + from trex_stl_lib.trex_stl_client import LoggerApi from trex_stl_lib.trex_stl_client import STLClient from trex_stl_lib.trex_stl_exceptions import STLError @@ -112,19 +113,6 @@ class DpdkVnfSetupEnvHelper(SetupEnvHelper): self.used_drivers = None self.dpdk_bind_helper = DpdkBindHelper(ssh_helper) - def _setup_hugepages(self): - meminfo = utils.read_meminfo(self.ssh_helper) - hp_size_kb = int(meminfo['Hugepagesize']) - hugepages_gb = self.scenario_helper.all_options.get('hugepages_gb', 16) - nr_hugepages = int(abs(hugepages_gb * 1024 * 1024 / hp_size_kb)) - self.ssh_helper.execute('echo %s | sudo tee %s' % - (nr_hugepages, self.NR_HUGEPAGES_PATH)) - hp = six.BytesIO() - self.ssh_helper.get_file_obj(self.NR_HUGEPAGES_PATH, hp) - nr_hugepages_set = int(hp.getvalue().decode('utf-8').splitlines()[0]) - LOG.info('Hugepages size (kB): %s, number claimed: %s, number set: %s', - hp_size_kb, nr_hugepages, nr_hugepages_set) - def build_config(self): vnf_cfg = self.scenario_helper.vnf_cfg task_path = self.scenario_helper.task_path @@ -237,7 +225,8 @@ class DpdkVnfSetupEnvHelper(SetupEnvHelper): def _setup_dpdk(self): """Setup DPDK environment needed for VNF to run""" - self._setup_hugepages() + hugepages_gb = self.scenario_helper.all_options.get('hugepages_gb', 16) + utils.setup_hugepages(self.ssh_helper, hugepages_gb * 1024 * 1024) self.dpdk_bind_helper.load_dpdk_driver() exit_status = self.dpdk_bind_helper.check_dpdk_driver() @@ -408,12 +397,13 @@ class ClientResourceHelper(ResourceHelper): time.sleep(self.QUEUE_WAIT_TIME) self._queue.put(samples) - def run_traffic(self, traffic_profile): + def run_traffic(self, traffic_profile, mq_producer): # if we don't do this we can hang waiting for the queue to drain # have to do this in the subprocess self._queue.cancel_join_thread() # fixme: fix passing correct trex config file, # instead of searching the default path + mq_producer.tg_method_started() try: self._build_ports() self.client = self._connect() @@ -421,8 +411,12 @@ class ClientResourceHelper(ResourceHelper): self.client.remove_all_streams(self.all_ports) # remove all streams traffic_profile.register_generator(self) + iteration_index = 0 while self._terminated.value == 0: - self._run_traffic_once(traffic_profile) + iteration_index += 1 + if self._run_traffic_once(traffic_profile): + self._terminated.value = 1 + mq_producer.tg_method_iteration(iteration_index) self.client.stop(self.all_ports) self.client.disconnect() @@ -433,6 +427,8 @@ class ClientResourceHelper(ResourceHelper): return # return if trex/tg server is stopped. raise + mq_producer.tg_method_finished() + def terminate(self): self._terminated.value = 1 # stop client @@ -491,6 +487,7 @@ class Rfc2544ResourceHelper(object): self._rfc2544 = None self._tolerance_low = None self._tolerance_high = None + self._tolerance_precision = None @property def rfc2544(self): @@ -510,6 +507,12 @@ class Rfc2544ResourceHelper(object): self.get_rfc_tolerance() return self._tolerance_high + @property + def tolerance_precision(self): + if self._tolerance_precision is None: + self.get_rfc_tolerance() + return self._tolerance_precision + @property def correlated_traffic(self): if self._correlated_traffic is None: @@ -529,9 +532,13 @@ class Rfc2544ResourceHelper(object): def get_rfc_tolerance(self): tolerance_str = self.get_rfc2544('allowed_drop_rate', self.DEFAULT_TOLERANCE) - tolerance_iter = iter(sorted(float(t.strip()) for t in tolerance_str.split('-'))) - self._tolerance_low = next(tolerance_iter) - self._tolerance_high = next(tolerance_iter, self.tolerance_low) + tolerance_iter = iter(sorted( + decimal.Decimal(t.strip()) for t in tolerance_str.split('-'))) + tolerance_low = next(tolerance_iter) + tolerance_high = next(tolerance_iter, tolerance_low) + self._tolerance_precision = abs(tolerance_high.as_tuple().exponent) + self._tolerance_high = float(tolerance_high) + self._tolerance_low = float(tolerance_low) class SampleVNFDeployHelper(object): @@ -612,6 +619,7 @@ class ScenarioHelper(object): test_timeout = self.options.get('timeout', constants.DEFAULT_VNF_TIMEOUT) return test_duration if test_duration > test_timeout else test_timeout + class SampleVNF(GenericVNF): """ Class providing file-like API for generic VNF implementation """ @@ -621,8 +629,9 @@ class SampleVNF(GenericVNF): APP_NAME = "SampleVNF" # we run the VNF interactively, so the ssh command will timeout after this long - def __init__(self, name, vnfd, setup_env_helper_type=None, resource_helper_type=None): - super(SampleVNF, self).__init__(name, vnfd) + def __init__(self, name, vnfd, task_id, setup_env_helper_type=None, + resource_helper_type=None): + super(SampleVNF, self).__init__(name, vnfd, task_id) self.bin_path = get_nsb_option('bin_path', '') self.scenario_helper = ScenarioHelper(self.name) @@ -853,8 +862,9 @@ class SampleVNFTrafficGen(GenericTrafficGen): APP_NAME = 'Sample' RUN_WAIT = 1 - def __init__(self, name, vnfd, setup_env_helper_type=None, resource_helper_type=None): - super(SampleVNFTrafficGen, self).__init__(name, vnfd) + def __init__(self, name, vnfd, task_id, setup_env_helper_type=None, + resource_helper_type=None): + super(SampleVNFTrafficGen, self).__init__(name, vnfd, task_id) self.bin_path = get_nsb_option('bin_path', '') self.scenario_helper = ScenarioHelper(self.name) @@ -911,12 +921,13 @@ class SampleVNFTrafficGen(GenericTrafficGen): LOG.info("%s TG Server is up and running.", self.APP_NAME) return self._tg_process.exitcode - def _traffic_runner(self, traffic_profile): + def _traffic_runner(self, traffic_profile, mq_id): # always drop connections first thing in new processes # so we don't get paramiko errors self.ssh_helper.drop_connection() LOG.info("Starting %s client...", self.APP_NAME) - self.resource_helper.run_traffic(traffic_profile) + self._mq_producer = self._setup_mq_producer(mq_id) + self.resource_helper.run_traffic(traffic_profile, self._mq_producer) def run_traffic(self, traffic_profile): """ Generate traffic on the wire according to the given params. @@ -926,10 +937,12 @@ class SampleVNFTrafficGen(GenericTrafficGen): :param traffic_profile: :return: True/False """ - name = "{}-{}-{}-{}".format(self.name, self.APP_NAME, traffic_profile.__class__.__name__, + name = '{}-{}-{}-{}'.format(self.name, self.APP_NAME, + traffic_profile.__class__.__name__, os.getpid()) - self._traffic_process = Process(name=name, target=self._traffic_runner, - args=(traffic_profile,)) + self._traffic_process = Process( + name=name, target=self._traffic_runner, + args=(traffic_profile, uuid.uuid1().int)) self._traffic_process.start() # Wait for traffic process to start while self.resource_helper.client_started.value == 0: @@ -938,8 +951,6 @@ class SampleVNFTrafficGen(GenericTrafficGen): if not self._traffic_process.is_alive(): break - return self._traffic_process.is_alive() - def collect_kpi(self): # check if the tg processes have exited physical_node = Context.get_physical_node_from_server(