X-Git-Url: https://gerrit.opnfv.org/gerrit/gitweb?a=blobdiff_plain;ds=sidebyside;f=yardstick%2Fnetwork_services%2Fvnf_generic%2Fvnf%2Fsample_vnf.py;h=3ef7c33c521f6c475e3d6006a660b704cefbbccf;hb=caee6b2a603388c02e961582d78244a2e9e98372;hp=34b0260b44414617e8ff1e3021b7b02ed6247915;hpb=ac190e58b127f1aa276e462292d776b5126e4944;p=yardstick.git diff --git a/yardstick/network_services/vnf_generic/vnf/sample_vnf.py b/yardstick/network_services/vnf_generic/vnf/sample_vnf.py index 34b0260b4..3ef7c33c5 100644 --- a/yardstick/network_services/vnf_generic/vnf/sample_vnf.py +++ b/yardstick/network_services/vnf_generic/vnf/sample_vnf.py @@ -11,15 +11,13 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. -""" Base class implementation for generic vnf implementation """ -from collections import Mapping import logging from multiprocessing import Queue, Value, Process - import os import posixpath import re +import uuid import subprocess import time @@ -32,6 +30,7 @@ from yardstick.benchmark.contexts.base import Context from yardstick.common import exceptions as y_exceptions from yardstick.common.process import check_if_process_failed from yardstick.common import utils +from yardstick.common import yaml_loader from yardstick.network_services import constants from yardstick.network_services.helpers.dpdkbindnic_helper import DpdkBindHelper, DpdkNode from yardstick.network_services.helpers.samplevnf_helper import MultiPortConfig @@ -41,7 +40,7 @@ from yardstick.network_services.vnf_generic.vnf.base import GenericTrafficGen from yardstick.network_services.vnf_generic.vnf.base import GenericVNF from yardstick.network_services.vnf_generic.vnf.base import QueueFileWrapper from yardstick.network_services.vnf_generic.vnf.vnf_ssh_helper import VnfSshHelper - +from yardstick.benchmark.contexts.node import NodeContext LOG = logging.getLogger(__name__) @@ -144,6 +143,13 @@ class DpdkVnfSetupEnvHelper(SetupEnvHelper): 'vnf_type': self.VNF_TYPE, } + # read actions/rules from file + acl_options = None + acl_file_name = self.scenario_helper.options.get('rules') + if acl_file_name: + with utils.open_relative_file(acl_file_name, task_path) as infile: + acl_options = yaml_loader.yaml_load(infile) + config_tpl_cfg = utils.find_relative_file(self.DEFAULT_CONFIG_TPL_CFG, task_path) config_basename = posixpath.basename(self.CFG_CONFIG) @@ -176,12 +182,17 @@ class DpdkVnfSetupEnvHelper(SetupEnvHelper): new_config = self._update_packet_type(new_config, traffic_options) self.ssh_helper.upload_config_file(config_basename, new_config) self.ssh_helper.upload_config_file(script_basename, - multiport.generate_script(self.vnfd_helper)) + multiport.generate_script(self.vnfd_helper, + self.get_flows_config(acl_options))) LOG.info("Provision and start the %s", self.APP_NAME) self._build_pipeline_kwargs() return self.PIPELINE_COMMAND.format(**self.pipeline_kwargs) + def get_flows_config(self, options=None): # pylint: disable=unused-argument + """No actions/rules (flows) by default""" + return None + def _build_pipeline_kwargs(self): tool_path = self.ssh_helper.provision_tool(tool_file=self.APP_NAME) # count the number of actual ports in the list of pairs @@ -193,11 +204,20 @@ class DpdkVnfSetupEnvHelper(SetupEnvHelper): port_nums = self.vnfd_helper.port_nums(ports) # create mask from all the dpdk port numbers ports_mask_hex = hex(sum(2 ** num for num in port_nums)) + + vnf_cfg = self.scenario_helper.vnf_cfg + lb_config = vnf_cfg.get('lb_config', 'SW') + worker_threads = vnf_cfg.get('worker_threads', 3) + hwlb = '' + if lb_config == 'HW': + hwlb = ' --hwlb %s' % worker_threads + self.pipeline_kwargs = { 'cfg_file': self.CFG_CONFIG, 'script': self.CFG_SCRIPT, 'port_mask_hex': ports_mask_hex, 'tool_path': tool_path, + 'hwlb': hwlb, } def setup_vnf_environment(self): @@ -300,6 +320,7 @@ class ResourceHelper(object): self.resource = None self.setup_helper = setup_helper self.ssh_helper = setup_helper.ssh_helper + self._enable = True def setup(self): self.resource = self.setup_helper.setup_vnf_environment() @@ -307,22 +328,33 @@ class ResourceHelper(object): def generate_cfg(self): pass + def update_from_context(self, context, attr_name): + """Disable resource helper in case of baremetal context. + + And update appropriate node collectd options in context + """ + if isinstance(context, NodeContext): + self._enable = False + context.update_collectd_options_for_node(self.setup_helper.collectd_options, + attr_name) + def _collect_resource_kpi(self): result = {} status = self.resource.check_if_system_agent_running("collectd")[0] - if status == 0: + if status == 0 and self._enable: result = self.resource.amqp_collect_nfvi_kpi() result = {"core": result} return result def start_collect(self): - self.resource.initiate_systemagent(self.ssh_helper.bin_path) - self.resource.start() - self.resource.amqp_process_for_nfvi_kpi() + if self._enable: + self.resource.initiate_systemagent(self.ssh_helper.bin_path) + self.resource.start() + self.resource.amqp_process_for_nfvi_kpi() def stop_collect(self): - if self.resource: + if self.resource and self._enable: self.resource.stop() def collect_kpi(self): @@ -366,48 +398,24 @@ class ClientResourceHelper(ResourceHelper): LOG.error('TRex client not connected') return {} - def generate_samples(self, ports, key=None, default=None): - # needs to be used ports - last_result = self.get_stats(ports) - key_value = last_result.get(key, default) - - if not isinstance(last_result, Mapping): # added for mock unit test - self._terminated.value = 1 - return {} - - samples = {} - # recalculate port for interface and see if it matches ports provided - for intf in self.vnfd_helper.interfaces: - name = intf["name"] - port = self.vnfd_helper.port_num(name) - if port in ports: - xe_value = last_result.get(port, {}) - samples[name] = { - "rx_throughput_fps": float(xe_value.get("rx_pps", 0.0)), - "tx_throughput_fps": float(xe_value.get("tx_pps", 0.0)), - "rx_throughput_mbps": float(xe_value.get("rx_bps", 0.0)), - "tx_throughput_mbps": float(xe_value.get("tx_bps", 0.0)), - "in_packets": int(xe_value.get("ipackets", 0)), - "out_packets": int(xe_value.get("opackets", 0)), - } - if key: - samples[name][key] = key_value - return samples + def _get_samples(self, ports, port_pg_id=False): + raise NotImplementedError() def _run_traffic_once(self, traffic_profile): traffic_profile.execute_traffic(self) self.client_started.value = 1 time.sleep(self.RUN_DURATION) - samples = self.generate_samples(traffic_profile.ports) + samples = self._get_samples(traffic_profile.ports) time.sleep(self.QUEUE_WAIT_TIME) self._queue.put(samples) - def run_traffic(self, traffic_profile): + def run_traffic(self, traffic_profile, mq_producer): # if we don't do this we can hang waiting for the queue to drain # have to do this in the subprocess self._queue.cancel_join_thread() # fixme: fix passing correct trex config file, # instead of searching the default path + mq_producer.tg_method_started() try: self._build_ports() self.client = self._connect() @@ -415,8 +423,11 @@ class ClientResourceHelper(ResourceHelper): self.client.remove_all_streams(self.all_ports) # remove all streams traffic_profile.register_generator(self) + iteration_index = 0 while self._terminated.value == 0: + iteration_index += 1 self._run_traffic_once(traffic_profile) + mq_producer.tg_method_iteration(iteration_index) self.client.stop(self.all_ports) self.client.disconnect() @@ -427,6 +438,8 @@ class ClientResourceHelper(ResourceHelper): return # return if trex/tg server is stopped. raise + mq_producer.tg_method_finished() + def terminate(self): self._terminated.value = 1 # stop client @@ -606,6 +619,7 @@ class ScenarioHelper(object): test_timeout = self.options.get('timeout', constants.DEFAULT_VNF_TIMEOUT) return test_duration if test_duration > test_timeout else test_timeout + class SampleVNF(GenericVNF): """ Class providing file-like API for generic VNF implementation """ @@ -615,8 +629,9 @@ class SampleVNF(GenericVNF): APP_NAME = "SampleVNF" # we run the VNF interactively, so the ssh command will timeout after this long - def __init__(self, name, vnfd, setup_env_helper_type=None, resource_helper_type=None): - super(SampleVNF, self).__init__(name, vnfd) + def __init__(self, name, vnfd, task_id, setup_env_helper_type=None, + resource_helper_type=None): + super(SampleVNF, self).__init__(name, vnfd, task_id) self.bin_path = get_nsb_option('bin_path', '') self.scenario_helper = ScenarioHelper(self.name) @@ -637,7 +652,6 @@ class SampleVNF(GenericVNF): self.resource_helper = resource_helper_type(self.setup_helper) self.context_cfg = None - self.nfvi_context = None self.pipeline_kwargs = {} self.uplink_ports = None self.downlink_ports = None @@ -664,8 +678,10 @@ class SampleVNF(GenericVNF): self._update_collectd_options(scenario_cfg, context_cfg) self.scenario_helper.scenario_cfg = scenario_cfg self.context_cfg = context_cfg - self.nfvi_context = Context.get_context_from_server(self.scenario_helper.nodes[self.name]) - # self.nfvi_context = None + self.resource_helper.update_from_context( + Context.get_context_from_server(self.scenario_helper.nodes[self.name]), + self.scenario_helper.nodes[self.name] + ) # vnf deploy is unsupported, use ansible playbooks if self.scenario_helper.options.get("vnf_deploy", False): @@ -816,18 +832,21 @@ class SampleVNF(GenericVNF): def collect_kpi(self): # we can't get KPIs if the VNF is down - check_if_process_failed(self._vnf_process) + check_if_process_failed(self._vnf_process, 0.01) stats = self.get_stats() m = re.search(self.COLLECT_KPI, stats, re.MULTILINE) + physical_node = Context.get_physical_node_from_server( + self.scenario_helper.nodes[self.name]) + + result = {"physical_node": physical_node} if m: - result = {k: int(m.group(v)) for k, v in self.COLLECT_MAP.items()} + result.update({k: int(m.group(v)) for k, v in self.COLLECT_MAP.items()}) result["collect_stats"] = self.resource_helper.collect_kpi() else: - result = { - "packets_in": 0, - "packets_fwd": 0, - "packets_dropped": 0, - } + result.update({"packets_in": 0, + "packets_fwd": 0, + "packets_dropped": 0}) + LOG.debug("%s collect KPIs %s", self.APP_NAME, result) return result @@ -843,8 +862,9 @@ class SampleVNFTrafficGen(GenericTrafficGen): APP_NAME = 'Sample' RUN_WAIT = 1 - def __init__(self, name, vnfd, setup_env_helper_type=None, resource_helper_type=None): - super(SampleVNFTrafficGen, self).__init__(name, vnfd) + def __init__(self, name, vnfd, task_id, setup_env_helper_type=None, + resource_helper_type=None): + super(SampleVNFTrafficGen, self).__init__(name, vnfd, task_id) self.bin_path = get_nsb_option('bin_path', '') self.scenario_helper = ScenarioHelper(self.name) @@ -873,6 +893,11 @@ class SampleVNFTrafficGen(GenericTrafficGen): def instantiate(self, scenario_cfg, context_cfg): self.scenario_helper.scenario_cfg = scenario_cfg + self.resource_helper.update_from_context( + Context.get_context_from_server(self.scenario_helper.nodes[self.name]), + self.scenario_helper.nodes[self.name] + ) + self.resource_helper.setup() # must generate_cfg after DPDK bind because we need port number self.resource_helper.generate_cfg() @@ -896,12 +921,13 @@ class SampleVNFTrafficGen(GenericTrafficGen): LOG.info("%s TG Server is up and running.", self.APP_NAME) return self._tg_process.exitcode - def _traffic_runner(self, traffic_profile): + def _traffic_runner(self, traffic_profile, mq_id): # always drop connections first thing in new processes # so we don't get paramiko errors self.ssh_helper.drop_connection() LOG.info("Starting %s client...", self.APP_NAME) - self.resource_helper.run_traffic(traffic_profile) + self._mq_producer = self._setup_mq_producer(mq_id) + self.resource_helper.run_traffic(traffic_profile, self._mq_producer) def run_traffic(self, traffic_profile): """ Generate traffic on the wire according to the given params. @@ -911,10 +937,12 @@ class SampleVNFTrafficGen(GenericTrafficGen): :param traffic_profile: :return: True/False """ - name = "{}-{}-{}-{}".format(self.name, self.APP_NAME, traffic_profile.__class__.__name__, + name = '{}-{}-{}-{}'.format(self.name, self.APP_NAME, + traffic_profile.__class__.__name__, os.getpid()) - self._traffic_process = Process(name=name, target=self._traffic_runner, - args=(traffic_profile,)) + self._traffic_process = Process( + name=name, target=self._traffic_runner, + args=(traffic_profile, uuid.uuid1().int)) self._traffic_process.start() # Wait for traffic process to start while self.resource_helper.client_started.value == 0: @@ -923,13 +951,16 @@ class SampleVNFTrafficGen(GenericTrafficGen): if not self._traffic_process.is_alive(): break - return self._traffic_process.is_alive() - def collect_kpi(self): # check if the tg processes have exited + physical_node = Context.get_physical_node_from_server( + self.scenario_helper.nodes[self.name]) + + result = {"physical_node": physical_node} for proc in (self._tg_process, self._traffic_process): check_if_process_failed(proc) - result = self.resource_helper.collect_kpi() + + result["collect_stats"] = self.resource_helper.collect_kpi() LOG.debug("%s collect KPIs %s", self.APP_NAME, result) return result