Merge "Add "resources" parameter in Kubernetes context"
authorRodolfo Alonso Hernandez <rodolfo.alonso.hernandez@intel.com>
Tue, 10 Jul 2018 09:23:29 +0000 (09:23 +0000)
committerGerrit Code Review <gerrit@opnfv.org>
Tue, 10 Jul 2018 09:23:29 +0000 (09:23 +0000)
31 files changed:
samples/vnf_samples/nsut/vfw/tc_heat_rfc2544_ipv4_1rule_1flow_64B_trex_iterationipc.yaml [new file with mode: 0644]
yardstick/benchmark/runners/base.py
yardstick/benchmark/runners/iteration_ipc.py [new file with mode: 0644]
yardstick/benchmark/scenarios/base.py
yardstick/benchmark/scenarios/networking/vnf_generic.py
yardstick/common/exceptions.py
yardstick/common/messaging/__init__.py
yardstick/common/messaging/consumer.py
yardstick/common/messaging/payloads.py
yardstick/common/messaging/producer.py
yardstick/common/utils.py
yardstick/network_services/libs/ixia_libs/ixnet/ixnet_api.py
yardstick/network_services/vnf_generic/vnf/base.py
yardstick/network_services/vnf_generic/vnf/prox_helpers.py
yardstick/network_services/vnf_generic/vnf/sample_vnf.py
yardstick/network_services/vnf_generic/vnf/tg_ping.py
yardstick/network_services/vnf_generic/vnf/tg_rfc2544_ixia.py
yardstick/tests/functional/common/messaging/test_messaging.py
yardstick/tests/functional/common/test_utils.py
yardstick/tests/unit/benchmark/runner/test_base.py
yardstick/tests/unit/benchmark/runner/test_iteration_ipc.py [new file with mode: 0644]
yardstick/tests/unit/benchmark/scenarios/networking/test_vnf_generic.py
yardstick/tests/unit/common/messaging/test_payloads.py
yardstick/tests/unit/common/messaging/test_producer.py
yardstick/tests/unit/common/test_utils.py
yardstick/tests/unit/network_services/libs/ixia_libs/test_ixnet_api.py
yardstick/tests/unit/network_services/vnf_generic/vnf/test_base.py
yardstick/tests/unit/network_services/vnf_generic/vnf/test_sample_vnf.py
yardstick/tests/unit/network_services/vnf_generic/vnf/test_tg_prox.py
yardstick/tests/unit/network_services/vnf_generic/vnf/test_tg_rfc2544_ixia.py
yardstick/tests/unit/network_services/vnf_generic/vnf/test_tg_trex.py

diff --git a/samples/vnf_samples/nsut/vfw/tc_heat_rfc2544_ipv4_1rule_1flow_64B_trex_iterationipc.yaml b/samples/vnf_samples/nsut/vfw/tc_heat_rfc2544_ipv4_1rule_1flow_64B_trex_iterationipc.yaml
new file mode 100644 (file)
index 0000000..184ed68
--- /dev/null
@@ -0,0 +1,96 @@
+# Copyright (c) 2016-2017 Intel Corporation
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+---
+{% set provider = provider or none %}
+{% set physical_networks = physical_networks or ['physnet1', 'physnet2'] %}
+{% set segmentation_id = segmentation_id or none %}
+
+schema: yardstick:task:0.1
+scenarios:
+- type: NSPerf
+  traffic_profile: ../../traffic_profiles/ipv4_throughput.yaml
+  topology: vfw-tg-topology.yaml
+  nodes:
+    tg__0: trafficgen_1.yardstick
+    vnf__0: vnf.yardstick
+  options:
+    hugepages_gb: 8
+    framesize:
+      uplink: {64B: 100}
+      downlink: {64B: 100}
+    flow:
+      src_ip: [{'tg__0': 'xe0'}]
+      dst_ip: [{'tg__0': 'xe1'}]
+      count: 1
+    traffic_type: 4
+    rfc2544:
+      allowed_drop_rate: 0.0001 - 0.0001
+    vnf__0:
+      rules: acl_1rule.yaml
+      vnf_config: {lb_config: 'SW', lb_count: 1, worker_config: '1C/1T', worker_threads: 1}
+  runner:
+    type: IterationIPC
+    iterations: 10
+    timeout: 60
+context:
+  # put node context first, so we don't HEAT deploy if node has errors
+  name: yardstick
+  image: yardstick-samplevnfs
+  flavor:
+    vcpus: 10
+    ram: 12288
+    disk: 6
+    extra_specs:
+      hw:cpu_sockets: 1
+      hw:cpu_cores: 10
+      hw:cpu_threads: 1
+  user: ubuntu
+  placement_groups:
+    pgrp1:
+      policy: "availability"
+  servers:
+    vnf:
+      floating_ip: true
+      placement: "pgrp1"
+    trafficgen_1:
+      floating_ip: true
+      placement: "pgrp1"
+  networks:
+    mgmt:
+      cidr: '10.0.1.0/24'
+    xe0:
+      cidr: '10.0.2.0/24'
+      gateway_ip: 'null'
+      {% if provider %}
+      provider: {{ provider }}
+      physical_network: {{ physical_networks[0] }}
+        {% if segmentation_id %}
+      segmentation_id: {{ segmentation_id }}
+        {% endif %}
+      {% endif %}
+      port_security_enabled: False
+      enable_dhcp: 'false'
+    xe1:
+      cidr: '10.0.3.0/24'
+      gateway_ip: 'null'
+      {% if provider %}
+      provider: {{ provider }}
+      physical_network: {{ physical_networks[1] }}
+        {% if segmentation_id %}
+      segmentation_id: {{ segmentation_id }}
+        {% endif %}
+      {% endif %}
+      port_security_enabled: False
+      enable_dhcp: 'false'
index fbdf6c2..af25574 100755 (executable)
 #    WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
 #    License for the specific language governing permissions and limitations
 #    under the License.
+#
+# This is a modified copy of ``rally/rally/benchmark/runners/base.py``
 
-# yardstick comment: this is a modified copy of
-# rally/rally/benchmark/runners/base.py
-
-from __future__ import absolute_import
-
+import importlib
 import logging
 import multiprocessing
 import subprocess
 import time
 import traceback
-from subprocess import CalledProcessError
-
-import importlib
 
-from six.moves.queue import Empty
+from six import moves
 
-import yardstick.common.utils as utils
 from yardstick.benchmark.scenarios import base as base_scenario
+from yardstick.common import messaging
+from yardstick.common.messaging import payloads
+from yardstick.common.messaging import producer
+from yardstick.common import utils
 from yardstick.dispatcher.base import Base as DispatcherBase
 
+
 log = logging.getLogger(__name__)
 
 
@@ -41,7 +40,7 @@ def _execute_shell_command(command):
     exitcode = 0
     try:
         output = subprocess.check_output(command, shell=True)
-    except CalledProcessError:
+    except subprocess.CalledProcessError:
         exitcode = -1
         output = traceback.format_exc()
         log.error("exec command '%s' error:\n ", command)
@@ -245,7 +244,7 @@ class Runner(object):
             log.debug("output_queue size %s", self.output_queue.qsize())
             try:
                 result.update(self.output_queue.get(True, 1))
-            except Empty:
+            except moves.queue.Empty:
                 pass
         return result
 
@@ -259,7 +258,7 @@ class Runner(object):
             log.debug("result_queue size %s", self.result_queue.qsize())
             try:
                 one_record = self.result_queue.get(True, 1)
-            except Empty:
+            except moves.queue.Empty:
                 pass
             else:
                 if output_in_influxdb:
@@ -272,3 +271,22 @@ class Runner(object):
         dispatchers = DispatcherBase.get(self.config['output_config'])
         dispatcher = next((d for d in dispatchers if d.__dispatcher_type__ == 'Influxdb'))
         dispatcher.upload_one_record(record, self.case_name, '', task_id=self.task_id)
+
+
+class RunnerProducer(producer.MessagingProducer):
+    """Class implementing the message producer for runners"""
+
+    def __init__(self, _id):
+        super(RunnerProducer, self).__init__(messaging.TOPIC_RUNNER, _id=_id)
+
+    def start_iteration(self, version=1, data=None):
+        data = {} if not data else data
+        self.send_message(
+            messaging.RUNNER_METHOD_START_ITERATION,
+            payloads.RunnerPayload(version=version, data=data))
+
+    def stop_iteration(self, version=1, data=None):
+        data = {} if not data else data
+        self.send_message(
+            messaging.RUNNER_METHOD_STOP_ITERATION,
+            payloads.RunnerPayload(version=version, data=data))
diff --git a/yardstick/benchmark/runners/iteration_ipc.py b/yardstick/benchmark/runners/iteration_ipc.py
new file mode 100644 (file)
index 0000000..a0335fd
--- /dev/null
@@ -0,0 +1,205 @@
+# Copyright 2018: Intel Corporation
+# All Rights Reserved.
+#
+#    Licensed under the Apache License, Version 2.0 (the "License"); you may
+#    not use this file except in compliance with the License. You may obtain
+#    a copy of the License at
+#
+#         http://www.apache.org/licenses/LICENSE-2.0
+#
+#    Unless required by applicable law or agreed to in writing, software
+#    distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+#    WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+#    License for the specific language governing permissions and limitations
+#    under the License.
+
+"""A runner that runs a configurable number of times before it returns. Each
+   iteration has a configurable timeout. The loop control depends on the
+   feedback received from the running VNFs. The context PIDs from the VNFs
+   to listen the messages from are given in the scenario "setup" method.
+"""
+
+import logging
+import multiprocessing
+import time
+import traceback
+
+import os
+
+from yardstick.benchmark.runners import base as base_runner
+from yardstick.common import exceptions
+from yardstick.common import messaging
+from yardstick.common import utils
+from yardstick.common.messaging import consumer
+from yardstick.common.messaging import payloads
+
+
+LOG = logging.getLogger(__name__)
+
+QUEUE_PUT_TIMEOUT = 10
+ITERATION_TIMEOUT = 180
+
+
+class RunnerIterationIPCEndpoint(consumer.NotificationHandler):
+    """Endpoint class for ``RunnerIterationIPCConsumer``"""
+
+    def tg_method_started(self, ctxt, **kwargs):
+        if ctxt['id'] in self._ctx_ids:
+            self._queue.put(
+                {'id': ctxt['id'],
+                 'action': messaging.TG_METHOD_STARTED,
+                 'payload': payloads.TrafficGeneratorPayload.dict_to_obj(
+                     kwargs)},
+                QUEUE_PUT_TIMEOUT)
+
+    def tg_method_finished(self, ctxt, **kwargs):
+        if ctxt['id'] in self._ctx_ids:
+            self._queue.put(
+                {'id': ctxt['id'],
+                 'action': messaging.TG_METHOD_FINISHED,
+                 'payload': payloads.TrafficGeneratorPayload.dict_to_obj(
+                     kwargs)})
+
+    def tg_method_iteration(self, ctxt, **kwargs):
+        if ctxt['id'] in self._ctx_ids:
+            self._queue.put(
+                {'id': ctxt['id'],
+                 'action': messaging.TG_METHOD_ITERATION,
+                 'payload': payloads.TrafficGeneratorPayload.dict_to_obj(
+                     kwargs)})
+
+
+class RunnerIterationIPCConsumer(consumer.MessagingConsumer):
+    """MQ consumer for "IterationIPC" runner"""
+
+    def __init__(self, _id, ctx_ids):
+        self._id = _id
+        self._queue = multiprocessing.Queue()
+        endpoints = [RunnerIterationIPCEndpoint(_id, ctx_ids, self._queue)]
+        super(RunnerIterationIPCConsumer, self).__init__(
+            messaging.TOPIC_TG, ctx_ids, endpoints)
+        self._kpi_per_id = {ctx: [] for ctx in ctx_ids}
+        self.iteration_index = None
+
+    def is_all_kpis_received_in_iteration(self):
+        """Check if all producers registered have sent the ITERATION msg
+
+        During the present iteration, all producers (traffic generators) must
+        start and finish the traffic injection, and at the end of the traffic
+        injection a TG_METHOD_ITERATION must be sent. This function will check
+        all KPIs in the present iteration are received. E.g.:
+          self.iteration_index = 2
+
+          self._kpi_per_id = {
+            'ctx1': [kpi0, kpi1, kpi2],
+            'ctx2': [kpi0, kpi1]}          --> return False
+
+          self._kpi_per_id = {
+            'ctx1': [kpi0, kpi1, kpi2],
+            'ctx2': [kpi0, kpi1, kpi2]}    --> return True
+        """
+        while not self._queue.empty():
+            msg = self._queue.get(True, 1)
+            if msg['action'] == messaging.TG_METHOD_ITERATION:
+                id_iter_list = self._kpi_per_id[msg['id']]
+                id_iter_list.append(msg['payload'].kpi)
+
+        return all(len(id_iter_list) == self.iteration_index
+                   for id_iter_list in self._kpi_per_id.values())
+
+
+def _worker_process(queue, cls, method_name, scenario_cfg,
+                    context_cfg, aborted, output_queue):  # pragma: no cover
+    runner_cfg = scenario_cfg['runner']
+
+    timeout = runner_cfg.get('timeout', ITERATION_TIMEOUT)
+    iterations = runner_cfg.get('iterations', 1)
+    run_step = runner_cfg.get('run_step', 'setup,run,teardown')
+    LOG.info('Worker START. Iterations %d times, class %s', iterations, cls)
+
+    runner_cfg['runner_id'] = os.getpid()
+
+    benchmark = cls(scenario_cfg, context_cfg)
+    method = getattr(benchmark, method_name)
+
+    if 'setup' not in run_step:
+        raise exceptions.RunnerIterationIPCSetupActionNeeded()
+    benchmark.setup()
+    producer_ctxs = benchmark.get_mq_ids()
+    if not producer_ctxs:
+        raise exceptions.RunnerIterationIPCNoCtxs()
+
+    mq_consumer = RunnerIterationIPCConsumer(os.getpid(), producer_ctxs)
+    mq_consumer.start_rpc_server()
+    mq_producer = base_runner.RunnerProducer(scenario_cfg['task_id'])
+
+    iteration_index = 1
+    while 'run' in run_step:
+        LOG.debug('runner=%(runner)s seq=%(sequence)s START',
+                  {'runner': runner_cfg['runner_id'],
+                   'sequence': iteration_index})
+        data = {}
+        result = None
+        errors = ''
+        mq_consumer.iteration_index = iteration_index
+        mq_producer.start_iteration()
+
+        try:
+            utils.wait_until_true(
+                mq_consumer.is_all_kpis_received_in_iteration,
+                timeout=timeout, sleep=2)
+            result = method(data)
+        except Exception:  # pylint: disable=broad-except
+            errors = traceback.format_exc()
+            LOG.exception(errors)
+
+        mq_producer.stop_iteration()
+
+        if result:
+            output_queue.put(result, True, QUEUE_PUT_TIMEOUT)
+        benchmark_output = {'timestamp': time.time(),
+                            'sequence': iteration_index,
+                            'data': data,
+                            'errors': errors}
+        queue.put(benchmark_output, True, QUEUE_PUT_TIMEOUT)
+
+        LOG.debug('runner=%(runner)s seq=%(sequence)s END',
+                  {'runner': runner_cfg['runner_id'],
+                   'sequence': iteration_index})
+
+        iteration_index += 1
+        if iteration_index > iterations or aborted.is_set():
+            LOG.info('"IterationIPC" worker END')
+            break
+
+    if 'teardown' in run_step:
+        try:
+            benchmark.teardown()
+        except Exception:
+            LOG.exception('Exception during teardown process')
+            mq_consumer.stop_rpc_server()
+            raise SystemExit(1)
+
+    LOG.debug('Data queue size = %s', queue.qsize())
+    LOG.debug('Output queue size = %s', output_queue.qsize())
+    mq_consumer.stop_rpc_server()
+
+
+class IterationIPCRunner(base_runner.Runner):
+    """Run a scenario for a configurable number of times.
+
+    Each iteration has a configurable timeout. The loop control depends on the
+    feedback received from the running VNFs. The context PIDs from the VNFs to
+    listen the messages from are given in the scenario "setup" method.
+    """
+    __execution_type__ = 'IterationIPC'
+
+    def _run_benchmark(self, cls, method, scenario_cfg, context_cfg):
+        name = '{}-{}-{}'.format(
+            self.__execution_type__, scenario_cfg.get('type'), os.getpid())
+        self.process = multiprocessing.Process(
+            name=name,
+            target=_worker_process,
+            args=(self.result_queue, cls, method, scenario_cfg,
+                  context_cfg, self.aborted, self.output_queue))
+        self.process.start()
index 30ac1be..90a87ac 100644 (file)
@@ -119,3 +119,7 @@ class Scenario(object):
             except TypeError:
                 dic[k] = v
         return dic
+
+    def get_mq_ids(self):  # pragma: no cover
+        """Return stored MQ producer IDs, if defined"""
+        pass
index eb62d62..3bb168c 100644 (file)
@@ -50,7 +50,7 @@ class NetworkServiceTestCase(scenario_base.Scenario):
 
     __scenario_type__ = "NSPerf"
 
-    def __init__(self, scenario_cfg, context_cfg):  # Yardstick API
+    def __init__(self, scenario_cfg, context_cfg):  # pragma: no cover
         super(NetworkServiceTestCase, self).__init__()
         self.scenario_cfg = scenario_cfg
         self.context_cfg = context_cfg
@@ -61,6 +61,7 @@ class NetworkServiceTestCase(scenario_base.Scenario):
         self.traffic_profile = None
         self.node_netdevs = {}
         self.bin_path = get_nsb_option('bin_path', '')
+        self._mq_ids = []
 
     def _get_ip_flow_range(self, ip_start_range):
 
@@ -168,18 +169,18 @@ class NetworkServiceTestCase(scenario_base.Scenario):
         topology_yaml = vnfdgen.generate_vnfd(topology, topolgy_data)
         self.topology = topology_yaml["nsd:nsd-catalog"]["nsd"][0]
 
-    def _find_vnf_name_from_id(self, vnf_id):
+    def _find_vnf_name_from_id(self, vnf_id):  # pragma: no cover
         return next((vnfd["vnfd-id-ref"]
                      for vnfd in self.topology["constituent-vnfd"]
                      if vnf_id == vnfd["member-vnf-index"]), None)
 
-    def _find_vnfd_from_vnf_idx(self, vnf_id):
+    def _find_vnfd_from_vnf_idx(self, vnf_id):  # pragma: no cover
         return next((vnfd
                      for vnfd in self.topology["constituent-vnfd"]
                      if vnf_id == vnfd["member-vnf-index"]), None)
 
     @staticmethod
-    def find_node_if(nodes, name, if_name, vld_id):
+    def find_node_if(nodes, name, if_name, vld_id):  # pragma: no cover
         try:
             # check for xe0, xe1
             intf = nodes[name]["interfaces"][if_name]
@@ -272,14 +273,14 @@ class NetworkServiceTestCase(scenario_base.Scenario):
             node0_if["peer_intf"] = node1_copy
             node1_if["peer_intf"] = node0_copy
 
-    def _update_context_with_topology(self):
+    def _update_context_with_topology(self):  # pragma: no cover
         for vnfd in self.topology["constituent-vnfd"]:
             vnf_idx = vnfd["member-vnf-index"]
             vnf_name = self._find_vnf_name_from_id(vnf_idx)
             vnfd = self._find_vnfd_from_vnf_idx(vnf_idx)
             self.context_cfg["nodes"][vnf_name].update(vnfd)
 
-    def _generate_pod_yaml(self):
+    def _generate_pod_yaml(self):  # pragma: no cover
         context_yaml = os.path.join(LOG_DIR, "pod-{}.yaml".format(self.scenario_cfg['task_id']))
         # convert OrderedDict to a list
         # pod.yaml nodes is a list
@@ -293,7 +294,7 @@ class NetworkServiceTestCase(scenario_base.Scenario):
                            explicit_start=True)
 
     @staticmethod
-    def _serialize_node(node):
+    def _serialize_node(node):  # pragma: no cover
         new_node = copy.deepcopy(node)
         # name field is required
         # remove context suffix
@@ -315,7 +316,7 @@ class NetworkServiceTestCase(scenario_base.Scenario):
         self._update_context_with_topology()
 
     @classmethod
-    def get_vnf_impl(cls, vnf_model_id):
+    def get_vnf_impl(cls, vnf_model_id):  # pragma: no cover
         """ Find the implementing class from vnf_model["vnf"]["name"] field
 
         :param vnf_model_id: parsed vnfd model ID field
@@ -343,7 +344,7 @@ class NetworkServiceTestCase(scenario_base.Scenario):
         raise exceptions.IncorrectConfig(error_msg=message)
 
     @staticmethod
-    def create_interfaces_from_node(vnfd, node):
+    def create_interfaces_from_node(vnfd, node):  # pragma: no cover
         ext_intfs = vnfd["vdu"][0]["external-interface"] = []
         # have to sort so xe0 goes first
         for intf_name, intf in sorted(node['interfaces'].items()):
@@ -412,10 +413,7 @@ class NetworkServiceTestCase(scenario_base.Scenario):
         return vnfs
 
     def setup(self):
-        """ Setup infrastructure, provission VNFs & start traffic
-
-        :return:
-        """
+        """Setup infrastructure, provission VNFs & start traffic"""
         # 1. Verify if infrastructure mapping can meet topology
         self.map_topology_to_infrastructure()
         # 1a. Load VNF models
@@ -457,6 +455,11 @@ class NetworkServiceTestCase(scenario_base.Scenario):
         for traffic_gen in traffic_runners:
             LOG.info("Starting traffic on %s", traffic_gen.name)
             traffic_gen.run_traffic(self.traffic_profile)
+            self._mq_ids.append(traffic_gen.get_mq_producer_id())
+
+    def get_mq_ids(self):  # pragma: no cover
+        """Return stored MQ producer IDs"""
+        return self._mq_ids
 
     def run(self, result):  # yardstick API
         """ Yardstick calls run() at intervals defined in the yaml and
@@ -495,10 +498,10 @@ class NetworkServiceTestCase(scenario_base.Scenario):
             LOG.exception("")
             raise RuntimeError("Error in teardown")
 
-    def pre_run_wait_time(self, time_seconds):
+    def pre_run_wait_time(self, time_seconds):  # pragma: no cover
         """Time waited before executing the run method"""
         time.sleep(time_seconds)
 
-    def post_run_wait_time(self, time_seconds):
+    def post_run_wait_time(self, time_seconds):  # pragma: no cover
         """Time waited after executing the run method"""
         pass
index 2d160be..50def06 100644 (file)
@@ -193,6 +193,15 @@ class TaskRenderError(YardstickException):
     message = 'Failed to render template:\n%(input_task)s'
 
 
+class RunnerIterationIPCSetupActionNeeded(YardstickException):
+    message = ('IterationIPC needs the "setup" action to retrieve the VNF '
+               'handling processes PIDs to receive the messages sent')
+
+
+class RunnerIterationIPCNoCtxs(YardstickException):
+    message = 'Benchmark "setup" action did not return any VNF process PID'
+
+
 class TimerTimeout(YardstickException):
     message = 'Timer timeout expired, %(timeout)s seconds'
 
index f0f012e..bd700d9 100644 (file)
@@ -1,14 +1,3 @@
-# Copyright (c) 2018 Intel Corporation
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-#      http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
@@ -28,9 +17,17 @@ TRANSPORT_URL = (MQ_SERVICE + '://' + MQ_USER + ':' + MQ_PASS + '@' + SERVER +
 RPC_SERVER_EXECUTOR = 'threading'
 
 # Topics.
-RUNNER = 'runner'
+TOPIC_TG = 'topic_traffic_generator'
+TOPIC_RUNNER = 'topic_runner'
 
 # Methods.
-# RUNNER methods:
-RUNNER_INFO = 'runner_info'
-RUNNER_LOOP = 'runner_loop'
+# Traffic generator consumers methods. Names must match the methods implemented
+# in the consumer endpoint class.
+TG_METHOD_STARTED = 'tg_method_started'
+TG_METHOD_FINISHED = 'tg_method_finished'
+TG_METHOD_ITERATION = 'tg_method_iteration'
+
+# Runner consumers methods. Names must match the methods implemented in the
+# consumer endpoint class.
+RUNNER_METHOD_START_ITERATION = "runner_method_start_iteration"
+RUNNER_METHOD_STOP_ITERATION = "runner_method_stop_iteration"
index 24ec6f1..c99d7ed 100644 (file)
@@ -29,9 +29,9 @@ LOG = logging.getLogger(__name__)
 class NotificationHandler(object):
     """Abstract class to define a endpoint object for a MessagingConsumer"""
 
-    def __init__(self, _id, ctx_pids, queue):
+    def __init__(self, _id, ctx_ids, queue):
         self._id = _id
-        self._ctx_pids = ctx_pids
+        self._ctx_ids = ctx_ids
         self._queue = queue
 
 
@@ -43,11 +43,11 @@ class MessagingConsumer(object):
     the messages published by a `MessagingNotifier`.
     """
 
-    def __init__(self, topic, pids, endpoints, fanout=True):
+    def __init__(self, topic, ids, endpoints, fanout=True):
         """Init function.
 
         :param topic: (string) MQ exchange topic
-        :param pids: (list of int) list of PIDs of the processes implementing
+        :param ids: (list of int) list of IDs of the processes implementing
                      the MQ Notifier which will be in the message context
         :param endpoints: (list of class) list of classes implementing the
                           methods (see `MessagingNotifier.send_message) used by
@@ -58,7 +58,7 @@ class MessagingConsumer(object):
         :returns: `MessagingConsumer` class object
         """
 
-        self._pids = pids
+        self._ids = ids
         self._endpoints = endpoints
         self._transport = oslo_messaging.get_rpc_transport(
             cfg.CONF, url=messaging.TRANSPORT_URL)
index d29d798..8ede1e5 100644 (file)
@@ -51,3 +51,23 @@ class Payload(object):
     def dict_to_obj(cls, _dict):
         """Returns a Payload object built from the dictionary elements"""
         return cls(**_dict)
+
+
+class TrafficGeneratorPayload(Payload):
+    """Base traffic generator payload class"""
+    REQUIRED_FIELDS = {
+        'version',  # (str) version of the payload transmitted.
+        'iteration',  # (int) iteration index during the traffic injection,
+                      # starting from 1.
+        'kpi'  # (dict) collection of KPIs collected from the traffic
+               # injection. The content will depend on the generator and the
+               # traffic type.
+    }
+
+
+class RunnerPayload(Payload):
+    """Base runner payload class"""
+    REQUIRED_FIELDS = {
+        'version',  # (str) version of the payload transmitted.
+        'data'  # (dict) generic container of data to be used if needed.
+    }
index b6adc0c..aadab64 100644 (file)
@@ -34,18 +34,18 @@ class MessagingProducer(object):
     messages in a message queue.
     """
 
-    def __init__(self, topic, pid=os.getpid(), fanout=True):
+    def __init__(self, topic, _id=os.getpid(), fanout=True):
         """Init function.
 
         :param topic: (string) MQ exchange topic
-        :param pid: (int) PID of the process implementing this MQ Notifier
+        :param id: (int) ID of the process implementing this MQ Notifier
         :param fanout: (bool) MQ clients may request that a copy of the message
                        be delivered to all servers listening on a topic by
                        setting fanout to ``True``, rather than just one of them
         :returns: `MessagingNotifier` class object
         """
         self._topic = topic
-        self._pid = pid
+        self._id = _id
         self._fanout = fanout
         self._transport = oslo_messaging.get_rpc_transport(
             cfg.CONF, url=messaging.TRANSPORT_URL)
@@ -65,6 +65,11 @@ class MessagingProducer(object):
                        consumer endpoints
         :param payload: (subclass `Payload`) payload content
         """
-        self._notifier.cast({'pid': self._pid},
+        self._notifier.cast({'id': self._id},
                             method,
                             **payload.obj_to_dict())
+
+    @property
+    def id(self):
+        """Return MQ producer ID"""
+        return self._id
index f9fe0e3..85cecc7 100644 (file)
@@ -527,3 +527,25 @@ def wait_until_true(predicate, timeout=60, sleep=1, exception=None):
         if exception and issubclass(exception, Exception):
             raise exception  # pylint: disable=raising-bad-type
         raise exceptions.WaitTimeout
+
+
+def send_socket_command(host, port, command):
+    """Send a string command to a specific port in a host
+
+    :param host: (str) ip or hostname of the host
+    :param port: (int) port number
+    :param command: (str) command to send
+    :return: 0 if success, error number if error
+    """
+    sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
+    ret = 0
+    try:
+        err_number = sock.connect_ex((host, int(port)))
+        if err_number != 0:
+            return err_number
+        sock.sendall(six.b(command))
+    except Exception:  # pylint: disable=broad-except
+        ret = 1
+    finally:
+        sock.close()
+    return ret
index 393f60f..74deeec 100644 (file)
@@ -166,9 +166,10 @@ class IxNextgen(object):  # pragma: no cover
         :return: list of paired frame sizes and weights
         """
         weighted_range_pairs = []
-        for size, weight in framesize.items():
-            weighted_range_pairs.append(int(size.upper().replace('B', '')))
-            weighted_range_pairs.append(int(weight))
+        for size, weight in ((s, w) for (s, w) in framesize.items()
+                             if int(w) != 0):
+            size = int(size.upper().replace('B', ''))
+            weighted_range_pairs.append([size, size, int(weight)])
         return weighted_range_pairs
 
     def iter_over_get_lists(self, x1, x2, y2, offset=0):
@@ -339,7 +340,7 @@ class IxNextgen(object):  # pragma: no cover
                         "percentLineRate" no used)
         - Frame size: custom IMIX [1] definition; a list of packet size in
                       bytes and the weight. E.g.:
-                      [64, 10, 128, 15, 512, 5]
+                      [[64, 64, 10], [128, 128, 15], [512, 512, 5]]
 
         [1] https://en.wikipedia.org/wiki/Internet_Mix
 
index 9ceac31..fb41a4e 100644 (file)
@@ -18,6 +18,9 @@ import abc
 import logging
 import six
 
+from yardstick.common import messaging
+from yardstick.common.messaging import payloads
+from yardstick.common.messaging import producer
 from yardstick.network_services.helpers.samplevnf_helper import PortPairs
 
 
@@ -138,6 +141,39 @@ class VnfdHelper(dict):
             yield port_name, port_num
 
 
+class TrafficGeneratorProducer(producer.MessagingProducer):
+    """Class implementing the message producer for traffic generators
+
+    This message producer must be instantiated in the process created
+    "run_traffic" process.
+    """
+    def __init__(self, _id):
+        super(TrafficGeneratorProducer, self).__init__(messaging.TOPIC_TG,
+                                                       _id=_id)
+
+    def tg_method_started(self, version=1):
+        """Send a message to inform the traffic generation has started"""
+        self.send_message(
+            messaging.TG_METHOD_STARTED,
+            payloads.TrafficGeneratorPayload(version=version, iteration=0,
+                                             kpi={}))
+
+    def tg_method_finished(self, version=1):
+        """Send a message to inform the traffic generation has finished"""
+        self.send_message(
+            messaging.TG_METHOD_FINISHED,
+            payloads.TrafficGeneratorPayload(version=version, iteration=0,
+                                             kpi={}))
+
+    def tg_method_iteration(self, iteration, version=1, kpi=None):
+        """Send a message, with KPI, once an iteration has finished"""
+        kpi = {} if kpi is None else kpi
+        self.send_message(
+            messaging.TG_METHOD_ITERATION,
+            payloads.TrafficGeneratorPayload(version=version,
+                                             iteration=iteration, kpi=kpi))
+
+
 @six.add_metaclass(abc.ABCMeta)
 class GenericVNF(object):
     """Class providing file-like API for generic VNF implementation
@@ -216,6 +252,7 @@ class GenericTrafficGen(GenericVNF):
         super(GenericTrafficGen, self).__init__(name, vnfd)
         self.runs_traffic = True
         self.traffic_finished = False
+        self._mq_producer = None
 
     @abc.abstractmethod
     def run_traffic(self, traffic_profile):
@@ -286,3 +323,16 @@ class GenericTrafficGen(GenericVNF):
         :return: True/False
         """
         pass
+
+    @staticmethod
+    def _setup_mq_producer(id):
+        """Setup the TG MQ producer to send messages between processes
+
+        :return: (derived class from ``MessagingProducer``) MQ producer object
+        """
+        return TrafficGeneratorProducer(id)
+
+    def get_mq_producer_id(self):
+        """Return the MQ producer ID if initialized"""
+        if self._mq_producer:
+            return self._mq_producer.get_id()
index 6d28f47..3241719 100644 (file)
@@ -969,7 +969,7 @@ class ProxResourceHelper(ClientResourceHelper):
             self._test_type = self.setup_helper.find_in_section('global', 'name', None)
         return self._test_type
 
-    def run_traffic(self, traffic_profile):
+    def run_traffic(self, traffic_profile, *args):
         self._queue.cancel_join_thread()
         self.lower = 0.0
         self.upper = 100.0
index 1ee71aa..bc65380 100644 (file)
 
 import logging
 from multiprocessing import Queue, Value, Process
-
 import os
 import posixpath
 import re
-import six
+import uuid
 import subprocess
 import time
 
+import six
+
 from trex_stl_lib.trex_stl_client import LoggerApi
 from trex_stl_lib.trex_stl_client import STLClient
 from trex_stl_lib.trex_stl_exceptions import STLError
@@ -408,12 +409,13 @@ class ClientResourceHelper(ResourceHelper):
         time.sleep(self.QUEUE_WAIT_TIME)
         self._queue.put(samples)
 
-    def run_traffic(self, traffic_profile):
+    def run_traffic(self, traffic_profile, mq_producer):
         # if we don't do this we can hang waiting for the queue to drain
         # have to do this in the subprocess
         self._queue.cancel_join_thread()
         # fixme: fix passing correct trex config file,
         # instead of searching the default path
+        mq_producer.tg_method_started()
         try:
             self._build_ports()
             self.client = self._connect()
@@ -421,8 +423,11 @@ class ClientResourceHelper(ResourceHelper):
             self.client.remove_all_streams(self.all_ports)  # remove all streams
             traffic_profile.register_generator(self)
 
+            iteration_index = 0
             while self._terminated.value == 0:
+                iteration_index += 1
                 self._run_traffic_once(traffic_profile)
+                mq_producer.tg_method_iteration(iteration_index)
 
             self.client.stop(self.all_ports)
             self.client.disconnect()
@@ -433,6 +438,8 @@ class ClientResourceHelper(ResourceHelper):
                 return  # return if trex/tg server is stopped.
             raise
 
+        mq_producer.tg_method_finished()
+
     def terminate(self):
         self._terminated.value = 1  # stop client
 
@@ -911,12 +918,13 @@ class SampleVNFTrafficGen(GenericTrafficGen):
                 LOG.info("%s TG Server is up and running.", self.APP_NAME)
                 return self._tg_process.exitcode
 
-    def _traffic_runner(self, traffic_profile):
+    def _traffic_runner(self, traffic_profile, mq_id):
         # always drop connections first thing in new processes
         # so we don't get paramiko errors
         self.ssh_helper.drop_connection()
         LOG.info("Starting %s client...", self.APP_NAME)
-        self.resource_helper.run_traffic(traffic_profile)
+        self._mq_producer = self._setup_mq_producer(mq_id)
+        self.resource_helper.run_traffic(traffic_profile, self._mq_producer)
 
     def run_traffic(self, traffic_profile):
         """ Generate traffic on the wire according to the given params.
@@ -926,10 +934,12 @@ class SampleVNFTrafficGen(GenericTrafficGen):
         :param traffic_profile:
         :return: True/False
         """
-        name = "{}-{}-{}-{}".format(self.name, self.APP_NAME, traffic_profile.__class__.__name__,
+        name = '{}-{}-{}-{}'.format(self.name, self.APP_NAME,
+                                    traffic_profile.__class__.__name__,
                                     os.getpid())
-        self._traffic_process = Process(name=name, target=self._traffic_runner,
-                                        args=(traffic_profile,))
+        self._traffic_process = Process(
+            name=name, target=self._traffic_runner,
+            args=(traffic_profile, uuid.uuid1().int))
         self._traffic_process.start()
         # Wait for traffic process to start
         while self.resource_helper.client_started.value == 0:
@@ -938,8 +948,6 @@ class SampleVNFTrafficGen(GenericTrafficGen):
             if not self._traffic_process.is_alive():
                 break
 
-        return self._traffic_process.is_alive()
-
     def collect_kpi(self):
         # check if the tg processes have exited
         physical_node = Context.get_physical_node_from_server(
index a989543..4050dc6 100644 (file)
@@ -71,7 +71,7 @@ class PingResourceHelper(ClientResourceHelper):
         self._queue = Queue()
         self._parser = PingParser(self._queue)
 
-    def run_traffic(self, traffic_profile):
+    def run_traffic(self, traffic_profile, *args):
         # drop the connection in order to force a new one
         self.ssh_helper.drop_connection()
 
index a1f9fbe..875ae93 100644 (file)
@@ -102,7 +102,7 @@ class IxiaResourceHelper(ClientResourceHelper):
         self.client.assign_ports()
         self.client.create_traffic_model()
 
-    def run_traffic(self, traffic_profile):
+    def run_traffic(self, traffic_profile, *args):
         if self._terminated.value:
             return
 
index 9987434..f3e31e7 100644 (file)
@@ -32,25 +32,25 @@ class DummyPayload(payloads.Payload):
 class DummyEndpoint(consumer.NotificationHandler):
 
     def info(self, ctxt, **kwargs):
-        if ctxt['pid'] in self._ctx_pids:
-            self._queue.put('ID {}, data: {}, pid: {}'.format(
-                self._id, kwargs['data'], ctxt['pid']))
+        if ctxt['id'] in self._ctx_ids:
+            self._queue.put('Nr {}, data: {}, id: {}'.format(
+                self._id, kwargs['data'], ctxt['id']))
 
 
 class DummyConsumer(consumer.MessagingConsumer):
 
-    def __init__(self, _id, ctx_pids, queue):
+    def __init__(self, _id, ctx_ids, queue):
         self._id = _id
-        endpoints = [DummyEndpoint(_id, ctx_pids, queue)]
-        super(DummyConsumer, self).__init__(TOPIC, ctx_pids, endpoints)
+        endpoints = [DummyEndpoint(_id, ctx_ids, queue)]
+        super(DummyConsumer, self).__init__(TOPIC, ctx_ids, endpoints)
 
 
 class DummyProducer(producer.MessagingProducer):
     pass
 
 
-def _run_consumer(_id, ctx_pids, queue):
-    _consumer = DummyConsumer(_id, ctx_pids, queue)
+def _run_consumer(_id, ctx_ids, queue):
+    _consumer = DummyConsumer(_id, ctx_ids, queue)
     _consumer.start_rpc_server()
     _consumer.wait()
 
@@ -67,8 +67,8 @@ class MessagingTestCase(base.BaseFunctionalTestCase):
         num_consumers = 10
         ctx_1 = 100001
         ctx_2 = 100002
-        producers = [DummyProducer(TOPIC, pid=ctx_1),
-                     DummyProducer(TOPIC, pid=ctx_2)]
+        producers = [DummyProducer(TOPIC, _id=ctx_1),
+                     DummyProducer(TOPIC, _id=ctx_2)]
 
         processes = []
         for i in range(num_consumers):
@@ -91,7 +91,7 @@ class MessagingTestCase(base.BaseFunctionalTestCase):
             output.append(output_queue.get(True, 1))
 
         self.assertEqual(num_consumers * 4, len(output))
-        msg_template = 'ID {}, data: {}, pid: {}'
+        msg_template = 'Nr {}, data: {}, id: {}'
         for i in range(num_consumers):
             for ctx in [ctx_1, ctx_2]:
                 for message in ['message 0', 'message 1']:
index b5333bb..b9f1f77 100644 (file)
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
+import multiprocessing
 import unittest
+import socket
 import sys
+import time
 
 from yardstick.common import utils
 
@@ -32,3 +35,38 @@ class ImportModulesFromPackageTestCase(unittest.TestCase):
         library_obj = getattr(module_obj, library_name)
         class_obj = getattr(library_obj, class_name)
         self.assertEqual(class_name, class_obj().__class__.__name__)
+
+
+class SendSocketCommandTestCase(unittest.TestCase):
+
+    @staticmethod
+    def _run_socket_server(port):
+        sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
+        sock.bind(('localhost', port))
+        sock.listen(1)
+        conn = None
+        while not conn:
+            conn, _ = sock.accept()
+        sock.close()
+
+    @staticmethod
+    def _terminate_server(socket_server):
+        # Wait until the socket server closes the open port.
+        time.sleep(1)
+        if socket_server and socket_server.is_alive():
+            socket_server.terminate()
+
+    def test_send_command(self):
+        port = 47001
+
+        socket_server = multiprocessing.Process(
+            name='run_socket_server',
+            target=SendSocketCommandTestCase._run_socket_server,
+            args=(port, )).start()
+
+        self.addCleanup(self._terminate_server, socket_server)
+
+        # Wait until the socket is open.
+        time.sleep(0.5)
+        self.assertEqual(
+            0, utils.send_socket_command('localhost', port, 'test_command'))
index 559c991..49ba1ef 100644 (file)
@@ -8,12 +8,17 @@
 ##############################################################################
 
 import time
+import uuid
 
 import mock
+from oslo_config import cfg
+import oslo_messaging
 import subprocess
 
 from yardstick.benchmark.runners import base as runner_base
 from yardstick.benchmark.runners import iteration
+from yardstick.common import messaging
+from yardstick.common.messaging import payloads
 from yardstick.tests.unit import base as ut_base
 
 
@@ -94,3 +99,54 @@ class RunnerTestCase(ut_base.BaseUnitTestCase):
 
         with self.assertRaises(NotImplementedError):
             runner._run_benchmark(mock.Mock(), mock.Mock(), mock.Mock(), mock.Mock())
+
+
+class RunnerProducerTestCase(ut_base.BaseUnitTestCase):
+
+    @mock.patch.object(oslo_messaging, 'Target', return_value='rpc_target')
+    @mock.patch.object(oslo_messaging, 'RPCClient')
+    @mock.patch.object(oslo_messaging, 'get_rpc_transport',
+                       return_value='rpc_transport')
+    @mock.patch.object(cfg, 'CONF')
+    def test__init(self, mock_config, mock_transport, mock_rpcclient,
+                   mock_target):
+        _id = uuid.uuid1().int
+        runner_producer = runner_base.RunnerProducer(_id)
+        mock_transport.assert_called_once_with(
+            mock_config, url='rabbit://yardstick:yardstick@localhost:5672/')
+        mock_target.assert_called_once_with(topic=messaging.TOPIC_RUNNER,
+                                            fanout=True,
+                                            server=messaging.SERVER)
+        mock_rpcclient.assert_called_once_with('rpc_transport', 'rpc_target')
+        self.assertEqual(_id, runner_producer._id)
+        self.assertEqual(messaging.TOPIC_RUNNER, runner_producer._topic)
+
+    @mock.patch.object(oslo_messaging, 'Target', return_value='rpc_target')
+    @mock.patch.object(oslo_messaging, 'RPCClient')
+    @mock.patch.object(oslo_messaging, 'get_rpc_transport',
+                       return_value='rpc_transport')
+    @mock.patch.object(payloads, 'RunnerPayload', return_value='runner_pload')
+    def test_start_iteration(self, mock_runner_payload, *args):
+        runner_producer = runner_base.RunnerProducer(uuid.uuid1().int)
+        with mock.patch.object(runner_producer,
+                               'send_message') as mock_message:
+            runner_producer.start_iteration(version=10)
+
+        mock_message.assert_called_once_with(
+            messaging.RUNNER_METHOD_START_ITERATION, 'runner_pload')
+        mock_runner_payload.assert_called_once_with(version=10, data={})
+
+    @mock.patch.object(oslo_messaging, 'Target', return_value='rpc_target')
+    @mock.patch.object(oslo_messaging, 'RPCClient')
+    @mock.patch.object(oslo_messaging, 'get_rpc_transport',
+                       return_value='rpc_transport')
+    @mock.patch.object(payloads, 'RunnerPayload', return_value='runner_pload')
+    def test_stop_iteration(self, mock_runner_payload, *args):
+        runner_producer = runner_base.RunnerProducer(uuid.uuid1().int)
+        with mock.patch.object(runner_producer,
+                               'send_message') as mock_message:
+            runner_producer.stop_iteration(version=15)
+
+        mock_message.assert_called_once_with(
+            messaging.RUNNER_METHOD_STOP_ITERATION, 'runner_pload')
+        mock_runner_payload.assert_called_once_with(version=15, data={})
diff --git a/yardstick/tests/unit/benchmark/runner/test_iteration_ipc.py b/yardstick/tests/unit/benchmark/runner/test_iteration_ipc.py
new file mode 100644 (file)
index 0000000..10d14a8
--- /dev/null
@@ -0,0 +1,136 @@
+# Copyright (c) 2018 Intel Corporation
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import multiprocessing
+import time
+import os
+import uuid
+
+import mock
+
+from yardstick.benchmark.runners import iteration_ipc
+from yardstick.common import messaging
+from yardstick.common.messaging import payloads
+from yardstick.tests.unit import base as ut_base
+
+
+class RunnerIterationIPCEndpointTestCase(ut_base.BaseUnitTestCase):
+
+    def setUp(self):
+        self._id = uuid.uuid1().int
+        self._ctx_ids = [uuid.uuid1().int, uuid.uuid1().int]
+        self._queue = multiprocessing.Queue()
+        self.runner = iteration_ipc.RunnerIterationIPCEndpoint(
+            self._id, self._ctx_ids, self._queue)
+        self._kwargs = {'version': 1, 'iteration': 10, 'kpi': {}}
+        self._pload_dict = payloads.TrafficGeneratorPayload.dict_to_obj(
+            self._kwargs).obj_to_dict()
+
+    def test_tg_method_started(self):
+        self._queue.empty()
+        ctxt = {'id': self._ctx_ids[0]}
+        self.runner.tg_method_started(ctxt, **self._kwargs)
+        time.sleep(0.2)
+
+        output = []
+        while not self._queue.empty():
+            output.append(self._queue.get(True, 1))
+
+        self.assertEqual(1, len(output))
+        self.assertEqual(self._ctx_ids[0], output[0]['id'])
+        self.assertEqual(messaging.TG_METHOD_STARTED, output[0]['action'])
+        self.assertEqual(self._pload_dict, output[0]['payload'].obj_to_dict())
+
+    def test_tg_method_finished(self):
+        self._queue.empty()
+        ctxt = {'id': self._ctx_ids[0]}
+        self.runner.tg_method_finished(ctxt, **self._kwargs)
+        time.sleep(0.2)
+
+        output = []
+        while not self._queue.empty():
+            output.append(self._queue.get(True, 1))
+
+        self.assertEqual(1, len(output))
+        self.assertEqual(self._ctx_ids[0], output[0]['id'])
+        self.assertEqual(messaging.TG_METHOD_FINISHED, output[0]['action'])
+        self.assertEqual(self._pload_dict, output[0]['payload'].obj_to_dict())
+
+    def test_tg_method_iteration(self):
+        self._queue.empty()
+        ctxt = {'id': self._ctx_ids[0]}
+        self.runner.tg_method_iteration(ctxt, **self._kwargs)
+        time.sleep(0.2)
+
+        output = []
+        while not self._queue.empty():
+            output.append(self._queue.get(True, 1))
+
+        self.assertEqual(1, len(output))
+        self.assertEqual(self._ctx_ids[0], output[0]['id'])
+        self.assertEqual(messaging.TG_METHOD_ITERATION, output[0]['action'])
+        self.assertEqual(self._pload_dict, output[0]['payload'].obj_to_dict())
+
+
+class RunnerIterationIPCConsumerTestCase(ut_base.BaseUnitTestCase):
+
+    def setUp(self):
+        self._id = uuid.uuid1().int
+        self._ctx_ids = [uuid.uuid1().int, uuid.uuid1().int]
+        self.consumer = iteration_ipc.RunnerIterationIPCConsumer(
+            self._id, self._ctx_ids)
+        self.consumer._queue = mock.Mock()
+
+    def test__init(self):
+        self.assertEqual({self._ctx_ids[0]: [], self._ctx_ids[1]: []},
+                         self.consumer._kpi_per_id)
+
+    def test_is_all_kpis_received_in_iteration(self):
+        payload = payloads.TrafficGeneratorPayload(
+            version=1, iteration=1, kpi={})
+        msg1 = {'action': messaging.TG_METHOD_ITERATION,
+                'id': self._ctx_ids[0], 'payload': payload}
+        msg2 = {'action': messaging.TG_METHOD_ITERATION,
+                'id': self._ctx_ids[1], 'payload': payload}
+        self.consumer.iteration_index = 1
+
+        self.consumer._queue.empty.side_effect = [False, True]
+        self.consumer._queue.get.return_value = msg1
+        self.assertFalse(self.consumer.is_all_kpis_received_in_iteration())
+
+        self.consumer._queue.empty.side_effect = [False, True]
+        self.consumer._queue.get.return_value = msg2
+        self.assertTrue(self.consumer.is_all_kpis_received_in_iteration())
+
+
+class IterationIPCRunnerTestCase(ut_base.BaseUnitTestCase):
+
+    @mock.patch.object(iteration_ipc, '_worker_process')
+    @mock.patch.object(os, 'getpid', return_value=12345678)
+    @mock.patch.object(multiprocessing, 'Process', return_value=mock.Mock())
+    def test__run_benchmark(self, mock_process, mock_getpid, mock_worker):
+        method = 'method'
+        scenario_cfg = {'type': 'scenario_type'}
+        context_cfg = 'context_cfg'
+        name = '%s-%s-%s' % ('IterationIPC', 'scenario_type', 12345678)
+        runner = iteration_ipc.IterationIPCRunner(mock.ANY)
+        mock_getpid.reset_mock()
+
+        runner._run_benchmark('class', method, scenario_cfg, context_cfg)
+        mock_process.assert_called_once_with(
+            name=name,
+            target=mock_worker,
+            args=(runner.result_queue, 'class', method, scenario_cfg,
+                  context_cfg, runner.aborted, runner.output_queue))
+        mock_getpid.assert_called_once()
index bb1a7aa..77a54c0 100644 (file)
@@ -553,6 +553,7 @@ class TestNetworkServiceTestCase(unittest.TestCase):
             tgen.verify_traffic = lambda x: verified_dict
             tgen.terminate = mock.Mock(return_value=True)
             tgen.name = "tgen__1"
+            tgen.run_traffic.return_value = 'tg_id'
             vnf = mock.Mock(autospec=GenericVNF)
             vnf.runs_traffic = False
             vnf.terminate = mock.Mock(return_value=True)
@@ -565,7 +566,6 @@ class TestNetworkServiceTestCase(unittest.TestCase):
             self.s.load_vnf_models = mock.Mock(return_value=self.s.vnfs)
             self.s._fill_traffic_profile = \
                 mock.Mock(return_value=TRAFFIC_PROFILE)
-            self.assertIsNone(self.s.setup())
 
     def test_setup_exception(self):
         with mock.patch("yardstick.ssh.SSH") as ssh:
@@ -656,6 +656,9 @@ class TestNetworkServiceTestCase(unittest.TestCase):
         )
         self.assertEqual(self.s.topology, 'fake_nsd')
 
+    def test_get_mq_ids(self):
+        self.assertEqual(self.s._mq_ids, self.s.get_mq_ids())
+
     def test_teardown(self):
         vnf = mock.Mock(autospec=GenericVNF)
         vnf.terminate = mock.Mock(return_value=True)
index 00ec220..37b1f19 100644 (file)
@@ -44,3 +44,39 @@ class PayloadTestCase(ut_base.BaseUnitTestCase):
         _dict = {'version': 2, 'key1': 'value100', 'key2': 'value200'}
         payload = _DummyPayload.dict_to_obj(_dict)
         self.assertEqual(set(_dict.keys()), payload._fields)
+
+
+class TrafficGeneratorPayloadTestCase(ut_base.BaseUnitTestCase):
+
+    def test_init(self):
+        tg_payload = payloads.TrafficGeneratorPayload(
+            version=1, iteration=10, kpi={'key1': 'value1'})
+        self.assertEqual(1, tg_payload.version)
+        self.assertEqual(10, tg_payload.iteration)
+        self.assertEqual({'key1': 'value1'}, tg_payload.kpi)
+        self.assertEqual(3, len(tg_payload._fields))
+
+    def test__init_missing_required_fields(self):
+        with self.assertRaises(exceptions.PayloadMissingAttributes):
+            payloads.TrafficGeneratorPayload(version=1, iteration=10)
+        with self.assertRaises(exceptions.PayloadMissingAttributes):
+            payloads.TrafficGeneratorPayload(iteration=10, kpi={})
+        with self.assertRaises(exceptions.PayloadMissingAttributes):
+            payloads.TrafficGeneratorPayload(iteration=10)
+
+
+class RunnerPayloadTestCase(ut_base.BaseUnitTestCase):
+
+    def test_init(self):
+        runner_payload = payloads.RunnerPayload(version=5,
+                                                data={'key1': 'value1'})
+        self.assertEqual(5, runner_payload.version)
+        self.assertEqual({'key1': 'value1'}, runner_payload.data)
+
+    def test__init_missing_required_fields(self):
+        with self.assertRaises(exceptions.PayloadMissingAttributes):
+            payloads.RunnerPayload(version=1)
+        with self.assertRaises(exceptions.PayloadMissingAttributes):
+            payloads.RunnerPayload(data=None)
+        with self.assertRaises(exceptions.PayloadMissingAttributes):
+            payloads.RunnerPayload()
index 0289689..22286e5 100644 (file)
@@ -44,3 +44,10 @@ class MessagingProducerTestCase(ut_base.BaseUnitTestCase):
                 topic='test_topic', fanout=True, server=messaging.SERVER)
             mock_RPCClient.assert_called_once_with('test_rpc_transport',
                                                    'test_Target')
+
+    def test_id(self):
+        with mock.patch.object(oslo_messaging, 'RPCClient'), \
+                mock.patch.object(oslo_messaging, 'get_rpc_transport'), \
+                mock.patch.object(oslo_messaging, 'Target'):
+            msg_producer = _MessagingProducer('topic', 'id_to_check')
+        self.assertEqual('id_to_check', msg_producer.id)
index 6247afd..446afdd 100644 (file)
@@ -16,6 +16,7 @@ import mock
 import os
 import six
 from six.moves import configparser
+import socket
 import time
 import unittest
 
@@ -1282,3 +1283,29 @@ class WaitUntilTrueTestCase(ut_base.BaseUnitTestCase):
             self.assertIsNone(
                 utils.wait_until_true(lambda: False, timeout=1, sleep=1,
                                       exception=MyTimeoutException))
+
+
+class SendSocketCommandTestCase(unittest.TestCase):
+
+    @mock.patch.object(socket, 'socket')
+    def test_execute_correct(self, mock_socket):
+        mock_socket_obj = mock.Mock()
+        mock_socket_obj.connect_ex.return_value = 0
+        mock_socket.return_value = mock_socket_obj
+        self.assertEqual(0, utils.send_socket_command('host', 22, 'command'))
+        mock_socket.assert_called_once_with(socket.AF_INET, socket.SOCK_STREAM)
+        mock_socket_obj.connect_ex.assert_called_once_with(('host', 22))
+        mock_socket_obj.sendall.assert_called_once_with(six.b('command'))
+        mock_socket_obj.close.assert_called_once()
+
+    @mock.patch.object(socket, 'socket')
+    def test_execute_exception(self, mock_socket):
+        mock_socket_obj = mock.Mock()
+        mock_socket_obj.connect_ex.return_value = 0
+        mock_socket.return_value = mock_socket_obj
+        mock_socket_obj.sendall.side_effect = socket.error
+        self.assertEqual(1, utils.send_socket_command('host', 22, 'command'))
+        mock_socket.assert_called_once_with(socket.AF_INET, socket.SOCK_STREAM)
+        mock_socket_obj.connect_ex.assert_called_once_with(('host', 22))
+        mock_socket_obj.sendall.assert_called_once_with(six.b('command'))
+        mock_socket_obj.close.assert_called_once()
index 34afa3d..541855a 100644 (file)
@@ -203,13 +203,9 @@ class TestIxNextgen(unittest.TestCase):
         ixnet_gen._ixnet = self.ixnet
         framesize = {'64B': '75', '512b': '25'}
         output = ixnet_gen._parse_framesize(framesize)
-        for idx in range(len(framesize)):
-            if output[idx * 2] == 64:
-                self.assertEqual(75, output[idx * 2 + 1])
-            elif output[idx * 2] == 512:
-                self.assertEqual(25, output[idx * 2 + 1])
-            else:
-                raise self.failureException('Framesize (64, 512) not present')
+        self.assertEqual(2, len(output))
+        self.assertIn([64, 64, 75], output)
+        self.assertIn([512, 512, 25], output)
 
     @mock.patch.object(IxNetwork, 'IxNet')
     def test_connect(self, mock_ixnet):
index ebedcb4..43e5ac8 100644 (file)
 
 import multiprocessing
 import os
+import uuid
 
 import mock
+from oslo_config import cfg
+import oslo_messaging
 import unittest
 
+from yardstick.common import messaging
+from yardstick.common.messaging import payloads
 from yardstick.network_services.vnf_generic.vnf import base
 from yardstick.ssh import SSH
 
@@ -140,6 +145,24 @@ VNFD = {
 }
 
 
+class _DummyGenericTrafficGen(base.GenericTrafficGen):  # pragma: no cover
+
+    def run_traffic(self, *args):
+        pass
+
+    def terminate(self):
+        pass
+
+    def collect_kpi(self):
+        pass
+
+    def instantiate(self, *args):
+        pass
+
+    def scale(self, flavor=''):
+        pass
+
+
 class FileAbsPath(object):
     def __init__(self, module_file):
         super(FileAbsPath, self).__init__()
@@ -221,7 +244,7 @@ class TestGenericVNF(unittest.TestCase):
         self.assertEqual(msg, str(exc.exception))
 
 
-class TestGenericTrafficGen(unittest.TestCase):
+class GenericTrafficGenTestCase(unittest.TestCase):
 
     def test_definition(self):
         """Make sure that the abstract class cannot be instantiated"""
@@ -234,3 +257,81 @@ class TestGenericTrafficGen(unittest.TestCase):
                "abstract methods collect_kpi, instantiate, run_traffic, "
                "scale, terminate")
         self.assertEqual(msg, str(exc.exception))
+
+    def test_get_mq_producer_id(self):
+        vnfd = {'benchmark': {'kpi': mock.ANY},
+                'vdu': [{'external-interface': 'ext_int'}]
+                }
+        tg = _DummyGenericTrafficGen('name', vnfd)
+        tg._mq_producer = mock.Mock()
+        tg._mq_producer.get_id.return_value = 'fake_id'
+        self.assertEqual('fake_id', tg.get_mq_producer_id())
+
+
+class TrafficGeneratorProducerTestCase(unittest.TestCase):
+
+    @mock.patch.object(oslo_messaging, 'Target', return_value='rpc_target')
+    @mock.patch.object(oslo_messaging, 'RPCClient')
+    @mock.patch.object(oslo_messaging, 'get_rpc_transport',
+                       return_value='rpc_transport')
+    @mock.patch.object(cfg, 'CONF')
+    def test__init(self, mock_config, mock_transport, mock_rpcclient,
+                   mock_target):
+        _id = uuid.uuid1().int
+        tg_producer = base.TrafficGeneratorProducer(_id)
+        mock_transport.assert_called_once_with(
+            mock_config, url='rabbit://yardstick:yardstick@localhost:5672/')
+        mock_target.assert_called_once_with(topic=messaging.TOPIC_TG,
+                                            fanout=True,
+                                            server=messaging.SERVER)
+        mock_rpcclient.assert_called_once_with('rpc_transport', 'rpc_target')
+        self.assertEqual(_id, tg_producer._id)
+        self.assertEqual(messaging.TOPIC_TG, tg_producer._topic)
+
+    @mock.patch.object(oslo_messaging, 'Target', return_value='rpc_target')
+    @mock.patch.object(oslo_messaging, 'RPCClient')
+    @mock.patch.object(oslo_messaging, 'get_rpc_transport',
+                       return_value='rpc_transport')
+    @mock.patch.object(payloads, 'TrafficGeneratorPayload',
+                       return_value='tg_pload')
+    def test_tg_method_started(self, mock_tg_payload, *args):
+        tg_producer = base.TrafficGeneratorProducer(uuid.uuid1().int)
+        with mock.patch.object(tg_producer, 'send_message') as mock_message:
+            tg_producer.tg_method_started(version=10)
+
+        mock_message.assert_called_once_with(messaging.TG_METHOD_STARTED,
+                                             'tg_pload')
+        mock_tg_payload.assert_called_once_with(version=10, iteration=0,
+                                                kpi={})
+
+    @mock.patch.object(oslo_messaging, 'Target', return_value='rpc_target')
+    @mock.patch.object(oslo_messaging, 'RPCClient')
+    @mock.patch.object(oslo_messaging, 'get_rpc_transport',
+                       return_value='rpc_transport')
+    @mock.patch.object(payloads, 'TrafficGeneratorPayload',
+                       return_value='tg_pload')
+    def test_tg_method_finished(self, mock_tg_payload, *args):
+        tg_producer = base.TrafficGeneratorProducer(uuid.uuid1().int)
+        with mock.patch.object(tg_producer, 'send_message') as mock_message:
+            tg_producer.tg_method_finished(version=20)
+
+        mock_message.assert_called_once_with(messaging.TG_METHOD_FINISHED,
+                                             'tg_pload')
+        mock_tg_payload.assert_called_once_with(version=20, iteration=0,
+                                                kpi={})
+
+    @mock.patch.object(oslo_messaging, 'Target', return_value='rpc_target')
+    @mock.patch.object(oslo_messaging, 'RPCClient')
+    @mock.patch.object(oslo_messaging, 'get_rpc_transport',
+                       return_value='rpc_transport')
+    @mock.patch.object(payloads, 'TrafficGeneratorPayload',
+                       return_value='tg_pload')
+    def test_tg_method_iteration(self, mock_tg_payload, *args):
+        tg_producer = base.TrafficGeneratorProducer(uuid.uuid1().int)
+        with mock.patch.object(tg_producer, 'send_message') as mock_message:
+            tg_producer.tg_method_iteration(100, version=30, kpi={'k': 'v'})
+
+        mock_message.assert_called_once_with(messaging.TG_METHOD_ITERATION,
+                                             'tg_pload')
+        mock_tg_payload.assert_called_once_with(version=30, iteration=100,
+                                                kpi={'k': 'v'})
index 331e80d..48ae3b5 100644 (file)
@@ -1090,6 +1090,57 @@ class TestClientResourceHelper(unittest.TestCase):
 
         self.assertIs(client_resource_helper._connect(client), client)
 
+    @mock.patch.object(ClientResourceHelper, '_build_ports')
+    @mock.patch.object(ClientResourceHelper, '_run_traffic_once')
+    def test_run_traffic(self, mock_run_traffic_once, mock_build_ports):
+        client_resource_helper = ClientResourceHelper(mock.Mock())
+        client = mock.Mock()
+        traffic_profile = mock.Mock()
+        mq_producer = mock.Mock()
+        with mock.patch.object(client_resource_helper, '_connect') \
+                as mock_connect, \
+                mock.patch.object(client_resource_helper, '_terminated') \
+                as mock_terminated:
+            mock_connect.return_value = client
+            type(mock_terminated).value = mock.PropertyMock(
+                side_effect=[0, 1, lambda x: x])
+            client_resource_helper.run_traffic(traffic_profile, mq_producer)
+
+        mock_build_ports.assert_called_once()
+        traffic_profile.register_generator.assert_called_once()
+        mq_producer.tg_method_started.assert_called_once()
+        mq_producer.tg_method_finished.assert_called_once()
+        mq_producer.tg_method_iteration.assert_called_once_with(1)
+        mock_run_traffic_once.assert_called_once_with(traffic_profile)
+
+    @mock.patch.object(ClientResourceHelper, '_build_ports')
+    @mock.patch.object(ClientResourceHelper, '_run_traffic_once',
+                       side_effect=Exception)
+    def test_run_traffic_exception(self, mock_run_traffic_once,
+                                   mock_build_ports):
+        client_resource_helper = ClientResourceHelper(mock.Mock())
+        client = mock.Mock()
+        traffic_profile = mock.Mock()
+        mq_producer = mock.Mock()
+        with mock.patch.object(client_resource_helper, '_connect') \
+                as mock_connect, \
+                mock.patch.object(client_resource_helper, '_terminated') \
+                as mock_terminated:
+            mock_connect.return_value = client
+            type(mock_terminated).value = mock.PropertyMock(return_value=0)
+            mq_producer.reset_mock()
+            # NOTE(ralonsoh): "trex_stl_exceptions.STLError" is mocked
+            with self.assertRaises(Exception):
+                client_resource_helper.run_traffic(traffic_profile,
+                                                   mq_producer)
+
+        mock_build_ports.assert_called_once()
+        traffic_profile.register_generator.assert_called_once()
+        mock_run_traffic_once.assert_called_once_with(traffic_profile)
+        mq_producer.tg_method_started.assert_called_once()
+        mq_producer.tg_method_finished.assert_not_called()
+        mq_producer.tg_method_iteration.assert_not_called()
+
 
 class TestRfc2544ResourceHelper(unittest.TestCase):
 
index 050aa4a..3e2f598 100644 (file)
@@ -406,7 +406,8 @@ class TestProxTrafficGen(unittest.TestCase):
         sut.setup_helper.prox_config_dict = {}
         sut._connect_client = mock.Mock(autospec=STLClient)
         sut._connect_client.get_stats = mock.Mock(return_value="0")
-        sut._traffic_runner(mock_traffic_profile)
+        sut._setup_mq_producer = mock.Mock(return_value='mq_producer')
+        sut._traffic_runner(mock_traffic_profile, mock.ANY)
 
     @mock.patch('yardstick.network_services.vnf_generic.vnf.prox_helpers.socket')
     @mock.patch(SSH_HELPER)
index 42ac40b..4ade157 100644 (file)
@@ -379,7 +379,8 @@ class TestIXIATrafficGen(unittest.TestCase):
                     mock.mock_open(), create=True)
         @mock.patch('yardstick.network_services.vnf_generic.vnf.tg_rfc2544_ixia.LOG.exception')
         def _traffic_runner(*args):
-            result = sut._traffic_runner(mock_traffic_profile)
+            sut._setup_mq_producer = mock.Mock(return_value='mq_producer')
+            result = sut._traffic_runner(mock_traffic_profile, mock.ANY)
             self.assertIsNone(result)
 
         _traffic_runner()
index 350ba84..700e910 100644 (file)
@@ -387,8 +387,9 @@ class TestTrexTrafficGen(unittest.TestCase):
         # must generate cfg before we can run traffic so Trex port mapping is
         # created
         self.sut.resource_helper.generate_cfg()
+        self.sut._setup_mq_producer = mock.Mock()
         with mock.patch.object(self.sut.resource_helper, 'run_traffic'):
-            self.sut._traffic_runner(mock_traffic_profile)
+            self.sut._traffic_runner(mock_traffic_profile, mock.ANY)
 
     def test__generate_trex_cfg(self):
         vnfd = self.VNFD['vnfd:vnfd-catalog']['vnfd'][0]
@@ -453,9 +454,8 @@ class TestTrexTrafficGen(unittest.TestCase):
         self.sut.ssh_helper.run = mock.Mock()
         self.sut._traffic_runner = mock.Mock(return_value=0)
         self.sut.resource_helper.client_started.value = 1
-        result = self.sut.run_traffic(mock_traffic_profile)
+        self.sut.run_traffic(mock_traffic_profile)
         self.sut._traffic_process.terminate()
-        self.assertIsNotNone(result)
 
     def test_terminate(self):
         vnfd = self.VNFD['vnfd:vnfd-catalog']['vnfd'][0]