pass
self.create_interfaces_from_node(vnfd, node)
vnf_impl = self.get_vnf_impl(vnfd['id'])
- vnf_instance = vnf_impl(node_name, vnfd)
+ vnf_instance = vnf_impl(node_name, vnfd, scenario_cfg['task_id'])
vnfs.append(vnf_instance)
self.vnfs = vnfs
"""Abstract class to define a endpoint object for a MessagingConsumer"""
def __init__(self, _id, ctx_ids, queue):
+ super(NotificationHandler, self).__init__()
self._id = _id
self._ctx_ids = ctx_ids
self._queue = queue
'packets_dropped': 2,
}
- def __init__(self, name, vnfd, setup_env_helper_type=None, resource_helper_type=None):
+ def __init__(self, name, vnfd, task_id, setup_env_helper_type=None,
+ resource_helper_type=None):
if setup_env_helper_type is None:
setup_env_helper_type = AclApproxSetupEnvSetupEnvHelper
-
- super(AclApproxVnf, self).__init__(name, vnfd, setup_env_helper_type, resource_helper_type)
+ super(AclApproxVnf, self).__init__(
+ name, vnfd, task_id, setup_env_helper_type, resource_helper_type)
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
-""" Base class implementation for generic vnf implementation """
import abc
import six
from yardstick.common import messaging
+from yardstick.common.messaging import consumer
from yardstick.common.messaging import payloads
from yardstick.common.messaging import producer
from yardstick.network_services.helpers.samplevnf_helper import PortPairs
iteration=iteration, kpi=kpi))
+@six.add_metaclass(abc.ABCMeta)
+class GenericVNFEndpoint(consumer.NotificationHandler):
+ """Endpoint class for ``GenericVNFConsumer``"""
+
+ @abc.abstractmethod
+ def runner_method_start_iteration(self, ctxt, **kwargs):
+ """Endpoint when RUNNER_METHOD_START_ITERATION is received
+
+ :param ctxt: (dict) {'id': <Producer ID>}
+ :param kwargs: (dict) ``payloads.RunnerPayload`` context
+ """
+
+ @abc.abstractmethod
+ def runner_method_stop_iteration(self, ctxt, **kwargs):
+ """Endpoint when RUNNER_METHOD_STOP_ITERATION is received
+
+ :param ctxt: (dict) {'id': <Producer ID>}
+ :param kwargs: (dict) ``payloads.RunnerPayload`` context
+ """
+
+
+class GenericVNFConsumer(consumer.MessagingConsumer):
+ """MQ consumer for ``GenericVNF`` derived classes"""
+
+ def __init__(self, ctx_ids, endpoints):
+ if not isinstance(endpoints, list):
+ endpoints = [endpoints]
+ super(GenericVNFConsumer, self).__init__(messaging.TOPIC_RUNNER,
+ ctx_ids, endpoints)
+
+
@six.add_metaclass(abc.ABCMeta)
class GenericVNF(object):
"""Class providing file-like API for generic VNF implementation
UPLINK = PortPairs.UPLINK
DOWNLINK = PortPairs.DOWNLINK
- def __init__(self, name, vnfd):
+ def __init__(self, name, vnfd, task_id):
self.name = name
+ self._task_id = task_id
self.vnfd_helper = VnfdHelper(vnfd)
# List of statistics we can obtain from this VNF
# - ETSI MANO 6.3.1.1 monitoring_parameter
@six.add_metaclass(abc.ABCMeta)
class GenericTrafficGen(GenericVNF):
- """ Class providing file-like API for generic traffic generator """
+ """Class providing file-like API for generic traffic generator"""
- def __init__(self, name, vnfd):
- super(GenericTrafficGen, self).__init__(name, vnfd)
+ def __init__(self, name, vnfd, task_id):
+ super(GenericTrafficGen, self).__init__(name, vnfd, task_id)
self.runs_traffic = True
self.traffic_finished = False
self._mq_producer = None
"packets_dropped": 4,
}
- def __init__(self, name, vnfd, setup_env_helper_type=None, resource_helper_type=None):
+ def __init__(self, name, vnfd, task_id, setup_env_helper_type=None,
+ resource_helper_type=None):
if setup_env_helper_type is None:
setup_env_helper_type = CgnaptApproxSetupEnvHelper
-
- super(CgnaptApproxVnf, self).__init__(name, vnfd, setup_env_helper_type,
- resource_helper_type)
+ super(CgnaptApproxVnf, self).__init__(
+ name, vnfd, task_id, setup_env_helper_type, resource_helper_type)
def _vnf_up_post(self):
super(CgnaptApproxVnf, self)._vnf_up_post()
VNF_PROMPT = "PROX started"
LUA_PARAMETER_NAME = "sut"
- def __init__(self, name, vnfd, setup_env_helper_type=None, resource_helper_type=None):
+ def __init__(self, name, vnfd, task_id, setup_env_helper_type=None,
+ resource_helper_type=None):
if setup_env_helper_type is None:
setup_env_helper_type = ProxDpdkVnfSetupEnvHelper
self.prev_packets_sent = 0
self.prev_tsc = 0
self.tsc_hz = 0
- super(ProxApproxVnf, self).__init__(name, vnfd, setup_env_helper_type,
- resource_helper_type)
+ super(ProxApproxVnf, self).__init__(
+ name, vnfd, task_id, setup_env_helper_type, resource_helper_type)
def _vnf_up_post(self):
self.resource_helper.up_post()
WAIT_TIME = 1
- def __init__(self, name, vnfd, setup_env_helper_type=None, resource_helper_type=None):
+ def __init__(self, name, vnfd, task_id, setup_env_helper_type=None,
+ resource_helper_type=None):
if setup_env_helper_type is None:
setup_env_helper_type = DpdkVnfSetupEnvHelper
vnfd['mgmt-interface'].pop("pkey", "")
vnfd['mgmt-interface']['password'] = 'password'
- super(RouterVNF, self).__init__(name, vnfd, setup_env_helper_type, resource_helper_type)
+ super(RouterVNF, self).__init__(
+ name, vnfd, task_id, setup_env_helper_type, resource_helper_type)
def instantiate(self, scenario_cfg, context_cfg):
self.scenario_helper.scenario_cfg = scenario_cfg
test_timeout = self.options.get('timeout', constants.DEFAULT_VNF_TIMEOUT)
return test_duration if test_duration > test_timeout else test_timeout
+
class SampleVNF(GenericVNF):
""" Class providing file-like API for generic VNF implementation """
APP_NAME = "SampleVNF"
# we run the VNF interactively, so the ssh command will timeout after this long
- def __init__(self, name, vnfd, setup_env_helper_type=None, resource_helper_type=None):
- super(SampleVNF, self).__init__(name, vnfd)
+ def __init__(self, name, vnfd, task_id, setup_env_helper_type=None,
+ resource_helper_type=None):
+ super(SampleVNF, self).__init__(name, vnfd, task_id)
self.bin_path = get_nsb_option('bin_path', '')
self.scenario_helper = ScenarioHelper(self.name)
APP_NAME = 'Sample'
RUN_WAIT = 1
- def __init__(self, name, vnfd, setup_env_helper_type=None, resource_helper_type=None):
- super(SampleVNFTrafficGen, self).__init__(name, vnfd)
+ def __init__(self, name, vnfd, task_id, setup_env_helper_type=None,
+ resource_helper_type=None):
+ super(SampleVNFTrafficGen, self).__init__(name, vnfd, task_id)
self.bin_path = get_nsb_option('bin_path', '')
self.scenario_helper = ScenarioHelper(self.name)
class IxLoadTrafficGen(SampleVNFTrafficGen):
- def __init__(self, name, vnfd, setup_env_helper_type=None, resource_helper_type=None):
+ def __init__(self, name, vnfd, task_id, setup_env_helper_type=None,
+ resource_helper_type=None):
if resource_helper_type is None:
resource_helper_type = IxLoadResourceHelper
- super(IxLoadTrafficGen, self).__init__(name, vnfd, setup_env_helper_type,
- resource_helper_type)
+ super(IxLoadTrafficGen, self).__init__(
+ name, vnfd, task_id, setup_env_helper_type, resource_helper_type)
self._result = {}
def run_traffic(self, traffic_profile):
APP_NAME = 'Ping'
RUN_WAIT = 4
- def __init__(self, name, vnfd, setup_env_helper_type=None, resource_helper_type=None):
+ def __init__(self, name, vnfd, task_id, setup_env_helper_type=None,
+ resource_helper_type=None):
if setup_env_helper_type is None:
setup_env_helper_type = PingSetupEnvHelper
if resource_helper_type is None:
resource_helper_type = PingResourceHelper
-
- super(PingTrafficGen, self).__init__(name, vnfd, setup_env_helper_type,
- resource_helper_type)
+ super(PingTrafficGen, self).__init__(
+ name, vnfd, task_id, setup_env_helper_type, resource_helper_type)
self._result = {}
def _check_status(self):
LUA_PARAMETER_NAME = "gen"
WAIT_TIME = 1
- def __init__(self, name, vnfd, setup_env_helper_type=None, resource_helper_type=None):
+ def __init__(self, name, vnfd, task_id, setup_env_helper_type=None,
+ resource_helper_type=None):
# don't call superclass, use custom wrapper of ProxApproxVnf
- self._vnf_wrapper = ProxApproxVnf(name, vnfd, setup_env_helper_type, resource_helper_type)
+ self._vnf_wrapper = ProxApproxVnf(
+ name, vnfd, task_id, setup_env_helper_type, resource_helper_type)
self.bin_path = get_nsb_option('bin_path', '')
self.name = self._vnf_wrapper.name
self.ssh_helper = self._vnf_wrapper.ssh_helper
APP_NAME = 'Ixia'
- def __init__(self, name, vnfd, setup_env_helper_type=None, resource_helper_type=None):
+ def __init__(self, name, vnfd, task_id, setup_env_helper_type=None,
+ resource_helper_type=None):
if resource_helper_type is None:
resource_helper_type = IxiaResourceHelper
-
- super(IxiaTrafficGen, self).__init__(name, vnfd, setup_env_helper_type,
- resource_helper_type)
+ super(IxiaTrafficGen, self).__init__(
+ name, vnfd, task_id, setup_env_helper_type, resource_helper_type)
self._ixia_traffic_gen = None
self.ixia_file_name = ''
self.vnf_port_pairs = []
traffic for rfc2544 testcase.
"""
- def __init__(self, name, vnfd, setup_env_helper_type=None, resource_helper_type=None):
+ def __init__(self, name, vnfd, task_id, setup_env_helper_type=None,
+ resource_helper_type=None):
if resource_helper_type is None:
resource_helper_type = TrexRfcResourceHelper
-
- super(TrexTrafficGenRFC, self).__init__(name, vnfd, setup_env_helper_type,
- resource_helper_type)
+ super(TrexTrafficGenRFC, self).__init__(
+ name, vnfd, task_id, setup_env_helper_type, resource_helper_type)
APP_NAME = 'TRex'
- def __init__(self, name, vnfd, setup_env_helper_type=None, resource_helper_type=None):
+ def __init__(self, name, vnfd, task_id, setup_env_helper_type=None,
+ resource_helper_type=None):
if resource_helper_type is None:
resource_helper_type = TrexResourceHelper
-
if setup_env_helper_type is None:
setup_env_helper_type = TrexDpdkVnfSetupEnvHelper
-
- super(TrexTrafficGen, self).__init__(name, vnfd, setup_env_helper_type,
- resource_helper_type)
+ super(TrexTrafficGen, self).__init__(
+ name, vnfd, task_id, setup_env_helper_type, resource_helper_type)
def _check_status(self):
return self.resource_helper.check_status()
PIPELINE_COMMAND = REPLAY_PIPELINE_COMMAND
- def __init__(self, name, vnfd, setup_env_helper_type=None, resource_helper_type=None):
+ def __init__(self, name, vnfd, task_id, setup_env_helper_type=None,
+ resource_helper_type=None):
if resource_helper_type is None:
resource_helper_type = UdpReplayResourceHelper
-
if setup_env_helper_type is None:
setup_env_helper_type = UdpReplaySetupEnvHelper
-
- super(UdpReplayApproxVnf, self).__init__(name, vnfd, setup_env_helper_type,
- resource_helper_type)
+ super(UdpReplayApproxVnf, self).__init__(
+ name, vnfd, task_id, setup_env_helper_type, resource_helper_type)
def _build_pipeline_kwargs(self):
ports = self.vnfd_helper.port_pairs.all_ports
'packets_dropped': 3,
}
- def __init__(self, name, vnfd, setup_env_helper_type=None, resource_helper_type=None):
+ def __init__(self, name, vnfd, task_id, setup_env_helper_type=None,
+ resource_helper_type=None):
if setup_env_helper_type is None:
setup_env_helper_type = FWApproxSetupEnvHelper
-
- super(FWApproxVnf, self).__init__(name, vnfd, setup_env_helper_type, resource_helper_type)
+ super(FWApproxVnf, self).__init__(
+ name, vnfd, task_id, setup_env_helper_type, resource_helper_type)
COLLECT_KPI = VPE_COLLECT_KPI
WAIT_TIME = 20
- def __init__(self, name, vnfd, setup_env_helper_type=None, resource_helper_type=None):
+ def __init__(self, name, vnfd, task_id, setup_env_helper_type=None,
+ resource_helper_type=None):
if setup_env_helper_type is None:
setup_env_helper_type = VpeApproxSetupEnvHelper
-
- super(VpeApproxVnf, self).__init__(name, vnfd, setup_env_helper_type, resource_helper_type)
+ super(VpeApproxVnf, self).__init__(
+ name, vnfd, task_id, setup_env_helper_type, resource_helper_type)
def get_stats(self, *args, **kwargs):
raise NotImplementedError
--- /dev/null
+# Copyright (c) 2018 Intel Corporation
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import multiprocessing
+import time
+import uuid
+
+import mock
+
+from yardstick.common import messaging
+from yardstick.common.messaging import payloads
+from yardstick.common.messaging import producer
+from yardstick.network_services.vnf_generic.vnf import base as vnf_base
+from yardstick.tests.functional import base as ft_base
+
+
+class _TrafficGenMQConsumer(vnf_base.GenericTrafficGen,
+ vnf_base.GenericVNFEndpoint):
+
+ def __init__(self, name, vnfd, task_id):
+ vnf_base.GenericTrafficGen.__init__(self, name, vnfd, task_id)
+ self.queue = multiprocessing.Queue()
+ self._id = uuid.uuid1().int
+ vnf_base.GenericVNFEndpoint.__init__(self, self._id, [task_id],
+ self.queue)
+ self._consumer = vnf_base.GenericVNFConsumer([task_id], self)
+ self._consumer.start_rpc_server()
+
+ def run_traffic(self, *args):
+ pass
+
+ def terminate(self):
+ pass
+
+ def collect_kpi(self):
+ pass
+
+ def instantiate(self, *args):
+ pass
+
+ def scale(self, flavor=''):
+ pass
+
+ def runner_method_start_iteration(self, ctxt, **kwargs):
+ if ctxt['id'] in self._ctx_ids:
+ self._queue.put(
+ {'action': messaging.RUNNER_METHOD_START_ITERATION,
+ 'payload': payloads.RunnerPayload.dict_to_obj(kwargs)})
+
+ def runner_method_stop_iteration(self, ctxt, **kwargs):
+ if ctxt['id'] in self._ctx_ids:
+ self._queue.put(
+ {'action': messaging.RUNNER_METHOD_STOP_ITERATION,
+ 'payload': payloads.RunnerPayload.dict_to_obj(kwargs)})
+
+
+class _DummyProducer(producer.MessagingProducer):
+ pass
+
+
+class GenericVNFMQConsumerTestCase(ft_base.BaseFunctionalTestCase):
+
+ def test_fistro(self):
+ vnfd = {'benchmark': {'kpi': mock.ANY},
+ 'vdu': [{'external-interface': 'ext_int'}]
+ }
+ task_id = uuid.uuid1().int
+ tg_obj = _TrafficGenMQConsumer('name_tg', vnfd, task_id)
+ producer = _DummyProducer(messaging.TOPIC_RUNNER, task_id)
+
+ num_messages = 10
+ for i in range(num_messages):
+ pload = payloads.RunnerPayload(version=10, data=i)
+ for method in (messaging.RUNNER_METHOD_START_ITERATION,
+ messaging.RUNNER_METHOD_STOP_ITERATION):
+ producer.send_message(method, pload)
+
+ time.sleep(0.5) # Let consumers attend the calls
+ output = []
+ while not tg_obj.queue.empty():
+ data = tg_obj.queue.get(True, 1)
+ data_dict = {'action': data['action'],
+ 'payload': data['payload'].obj_to_dict()}
+ output.append(data_dict)
+
+ self.assertEqual(num_messages * 2, len(output))
+ for i in range(num_messages):
+ pload = payloads.RunnerPayload(version=10, data=i).obj_to_dict()
+ for method in (messaging.RUNNER_METHOD_START_ITERATION,
+ messaging.RUNNER_METHOD_STOP_ITERATION):
+ reg = {'action': method, 'payload': pload}
+ self.assertIn(reg, output)
self._get_file_abspath("tg_trex_tpl.yaml")
self.context_cfg["nodes"]['vnf__1']['VNF model'] = \
self._get_file_abspath("tg_trex_tpl.yaml")
+ self.context_cfg['task_id'] = 'fake_task_id'
vnf = mock.Mock(autospec=GenericVNF)
self.s.get_vnf_impl = mock.Mock(return_value=vnf)
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
-#
import unittest
import mock
import re
import copy
-from yardstick.tests import STL_MOCKS
-from yardstick.tests.unit.network_services.vnf_generic.vnf.test_base import mock_ssh
from yardstick.common import utils
from yardstick.common import exceptions
from yardstick.benchmark.contexts import base as ctx_base
-
-
-STLClient = mock.MagicMock()
-stl_patch = mock.patch.dict("sys.modules", STL_MOCKS)
-stl_patch.start()
-
-if stl_patch:
- from yardstick.network_services.vnf_generic.vnf import acl_vnf
- from yardstick.network_services.vnf_generic.vnf.base import VnfdHelper
- from yardstick.network_services.nfvi.resource import ResourceProfile
- from yardstick.network_services.vnf_generic.vnf.acl_vnf import AclApproxSetupEnvSetupEnvHelper
+from yardstick.network_services.vnf_generic.vnf import acl_vnf
+from yardstick.network_services.vnf_generic.vnf.base import VnfdHelper
+from yardstick.network_services.nfvi.resource import ResourceProfile
+from yardstick.network_services.vnf_generic.vnf.acl_vnf import AclApproxSetupEnvSetupEnvHelper
+from yardstick.tests.unit.network_services.vnf_generic.vnf.test_base import mock_ssh
TEST_FILE_YAML = 'nsb_test_case.yaml'
def test___init__(self, *args):
vnfd = self.VNFD['vnfd:vnfd-catalog']['vnfd'][0]
- acl_approx_vnf = acl_vnf.AclApproxVnf(name, vnfd)
+ acl_approx_vnf = acl_vnf.AclApproxVnf(name, vnfd, 'task_id')
self.assertIsNone(acl_approx_vnf._vnf_process)
@mock.patch("yardstick.network_services.vnf_generic.vnf.sample_vnf.time")
mock_ssh(ssh)
vnfd = self.VNFD['vnfd:vnfd-catalog']['vnfd'][0]
- acl_approx_vnf = acl_vnf.AclApproxVnf(name, vnfd)
+ acl_approx_vnf = acl_vnf.AclApproxVnf(name, vnfd, 'task_id')
acl_approx_vnf.scenario_helper.scenario_cfg = {
'nodes': {acl_approx_vnf.name: "mock"}
}
mock_ssh(ssh)
vnfd = self.VNFD['vnfd:vnfd-catalog']['vnfd'][0]
- acl_approx_vnf = acl_vnf.AclApproxVnf(name, vnfd)
+ acl_approx_vnf = acl_vnf.AclApproxVnf(name, vnfd, 'task_id')
acl_approx_vnf.q_in = mock.MagicMock()
acl_approx_vnf.q_out = mock.MagicMock()
acl_approx_vnf.q_out.qsize = mock.Mock(return_value=0)
mock_ssh(ssh)
vnfd = self.VNFD['vnfd:vnfd-catalog']['vnfd'][0]
- acl_approx_vnf = acl_vnf.AclApproxVnf(name, vnfd)
+ acl_approx_vnf = acl_vnf.AclApproxVnf(name, vnfd, 'task_id')
acl_approx_vnf.q_in = mock.MagicMock()
acl_approx_vnf.q_out = mock.MagicMock()
acl_approx_vnf.q_out.qsize = mock.Mock(return_value=0)
mock_ssh(ssh)
vnfd = self.VNFD['vnfd:vnfd-catalog']['vnfd'][0]
- acl_approx_vnf = acl_vnf.AclApproxVnf(name, vnfd)
+ acl_approx_vnf = acl_vnf.AclApproxVnf(name, vnfd, 'task_id')
acl_approx_vnf._build_config = mock.MagicMock()
acl_approx_vnf.queue_wrapper = mock.MagicMock()
acl_approx_vnf.scenario_helper.scenario_cfg = self.scenario_cfg
mock_ssh(ssh)
vnfd = self.VNFD['vnfd:vnfd-catalog']['vnfd'][0]
- acl_approx_vnf = acl_vnf.AclApproxVnf(name, vnfd)
+ acl_approx_vnf = acl_vnf.AclApproxVnf(name, vnfd, 'task_id')
acl_approx_vnf.deploy_helper = mock.MagicMock()
acl_approx_vnf.resource_helper = mock.MagicMock()
acl_approx_vnf._build_config = mock.MagicMock()
mock_ssh(ssh)
vnfd = self.VNFD['vnfd:vnfd-catalog']['vnfd'][0]
- acl_approx_vnf = acl_vnf.AclApproxVnf(name, vnfd)
+ acl_approx_vnf = acl_vnf.AclApproxVnf(name, vnfd, 'task_id')
acl_approx_vnf._vnf_process = mock.MagicMock()
acl_approx_vnf._vnf_process.terminate = mock.Mock()
acl_approx_vnf.used_drivers = {"01:01.0": "i40e",
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
-#
import multiprocessing
import os
from yardstick.common.messaging import payloads
from yardstick.network_services.vnf_generic.vnf import base
from yardstick.ssh import SSH
+from yardstick.tests.unit import base as ut_base
IP_PIPELINE_CFG_FILE_TPL = ("arp_route_tbl = ({port0_local_ip_hex},"
self.assertIsNotNone(queue_file_wrapper.q_out.empty())
-class TestGenericVNF(unittest.TestCase):
+class TestGenericVNF(ut_base.BaseUnitTestCase):
def test_definition(self):
"""Make sure that the abstract class cannot be instantiated"""
with self.assertRaises(TypeError) as exc:
# pylint: disable=abstract-class-instantiated
- base.GenericVNF('vnf1', VNFD['vnfd:vnfd-catalog']['vnfd'][0])
+ base.GenericVNF('vnf1', VNFD['vnfd:vnfd-catalog']['vnfd'][0],
+ 'task_id')
- msg = ("Can't instantiate abstract class GenericVNF with abstract methods "
- "collect_kpi, instantiate, scale, start_collect, "
+ msg = ("Can't instantiate abstract class GenericVNF with abstract "
+ "methods collect_kpi, instantiate, scale, start_collect, "
"stop_collect, terminate, wait_for_instantiate")
self.assertEqual(msg, str(exc.exception))
-class GenericTrafficGenTestCase(unittest.TestCase):
+class GenericTrafficGenTestCase(ut_base.BaseUnitTestCase):
def test_definition(self):
"""Make sure that the abstract class cannot be instantiated"""
name = 'vnf1'
with self.assertRaises(TypeError) as exc:
# pylint: disable=abstract-class-instantiated
- base.GenericTrafficGen(name, vnfd)
+ base.GenericTrafficGen(name, vnfd, 'task_id')
msg = ("Can't instantiate abstract class GenericTrafficGen with "
"abstract methods collect_kpi, instantiate, run_traffic, "
"scale, terminate")
vnfd = {'benchmark': {'kpi': mock.ANY},
'vdu': [{'external-interface': 'ext_int'}]
}
- tg = _DummyGenericTrafficGen('name', vnfd)
+ tg = _DummyGenericTrafficGen('name', vnfd, 'task_id')
tg._mq_producer = mock.Mock()
tg._mq_producer.id = 'fake_id'
self.assertEqual('fake_id', tg.get_mq_producer_id())
-class TrafficGeneratorProducerTestCase(unittest.TestCase):
+class TrafficGeneratorProducerTestCase(ut_base.BaseUnitTestCase):
@mock.patch.object(oslo_messaging, 'Target', return_value='rpc_target')
@mock.patch.object(oslo_messaging, 'RPCClient')
'tg_pload')
mock_tg_payload.assert_called_once_with(version=30, iteration=100,
kpi={'k': 'v'})
+
+
+class GenericVNFConsumerTestCase(ut_base.BaseUnitTestCase):
+
+ def test__init(self):
+ endpoints = 'endpoint_1'
+ _ids = [uuid.uuid1().int]
+ gvnf_consumer = base.GenericVNFConsumer(_ids, endpoints)
+ self.assertEqual(_ids, gvnf_consumer._ids)
+ self.assertEqual([endpoints], gvnf_consumer._endpoints)
def test___init__(self, *args):
vnfd = self.VNFD['vnfd:vnfd-catalog']['vnfd'][0]
- cgnapt_approx_vnf = cgnapt_vnf.CgnaptApproxVnf(name, vnfd)
+ cgnapt_approx_vnf = cgnapt_vnf.CgnaptApproxVnf(name, vnfd, 'task_id')
self.assertIsNone(cgnapt_approx_vnf._vnf_process)
@mock.patch.object(process, 'check_if_process_failed')
@mock.patch.object(ctx_base.Context, 'get_physical_node_from_server', return_value='mock_node')
def test_collect_kpi(self, *args):
vnfd = self.VNFD['vnfd:vnfd-catalog']['vnfd'][0]
- cgnapt_approx_vnf = cgnapt_vnf.CgnaptApproxVnf(name, vnfd)
+ cgnapt_approx_vnf = cgnapt_vnf.CgnaptApproxVnf(name, vnfd, 'task_id')
cgnapt_approx_vnf.scenario_helper.scenario_cfg = {
'nodes': {cgnapt_approx_vnf.name: "mock"}
}
@mock.patch.object(time, 'sleep')
def test_vnf_execute_command(self, *args):
vnfd = self.VNFD['vnfd:vnfd-catalog']['vnfd'][0]
- cgnapt_approx_vnf = cgnapt_vnf.CgnaptApproxVnf(name, vnfd)
+ cgnapt_approx_vnf = cgnapt_vnf.CgnaptApproxVnf(name, vnfd, 'task_id')
cgnapt_approx_vnf.q_in = mock.Mock()
cgnapt_approx_vnf.q_out = mock.Mock()
cgnapt_approx_vnf.q_out.qsize = mock.Mock(return_value=0)
def test_get_stats(self, *args):
vnfd = self.VNFD['vnfd:vnfd-catalog']['vnfd'][0]
- cgnapt_approx_vnf = cgnapt_vnf.CgnaptApproxVnf(name, vnfd)
+ cgnapt_approx_vnf = cgnapt_vnf.CgnaptApproxVnf(name, vnfd, 'task_id')
with mock.patch.object(cgnapt_approx_vnf, 'vnf_execute') as mock_exec:
mock_exec.return_value = 'output'
self.assertEqual('output', cgnapt_approx_vnf.get_stats())
def test_run_vcgnapt(self, *args):
vnfd = self.VNFD['vnfd:vnfd-catalog']['vnfd'][0]
- cgnapt_approx_vnf = cgnapt_vnf.CgnaptApproxVnf(name, vnfd)
+ cgnapt_approx_vnf = cgnapt_vnf.CgnaptApproxVnf(name, vnfd, 'task_id')
cgnapt_approx_vnf.ssh_helper = mock.Mock()
cgnapt_approx_vnf.setup_helper = mock.Mock()
with mock.patch.object(cgnapt_approx_vnf, '_build_config'), \
@mock.patch.object(ctx_base.Context, 'get_context_from_server')
def test_instantiate(self, *args):
vnfd = self.VNFD['vnfd:vnfd-catalog']['vnfd'][0]
- cgnapt_approx_vnf = cgnapt_vnf.CgnaptApproxVnf(name, vnfd)
+ cgnapt_approx_vnf = cgnapt_vnf.CgnaptApproxVnf(name, vnfd, 'task_id')
cgnapt_approx_vnf.deploy_helper = mock.MagicMock()
cgnapt_approx_vnf.resource_helper = mock.MagicMock()
cgnapt_approx_vnf._build_config = mock.MagicMock()
def test__vnf_up_post(self, *args):
vnfd = self.VNFD['vnfd:vnfd-catalog']['vnfd'][0]
self.scenario_cfg['options'][name]['napt'] = 'static'
- cgnapt_approx_vnf = cgnapt_vnf.CgnaptApproxVnf(name, vnfd)
+ cgnapt_approx_vnf = cgnapt_vnf.CgnaptApproxVnf(name, vnfd, 'task_id')
cgnapt_approx_vnf.vnf_execute = mock.Mock()
cgnapt_approx_vnf.scenario_helper.scenario_cfg = self.scenario_cfg
with mock.patch.object(cgnapt_approx_vnf, 'setup_helper') as \
def test__vnf_up_post_short(self, *args):
vnfd = self.VNFD['vnfd:vnfd-catalog']['vnfd'][0]
- cgnapt_approx_vnf = cgnapt_vnf.CgnaptApproxVnf(name, vnfd)
+ cgnapt_approx_vnf = cgnapt_vnf.CgnaptApproxVnf(name, vnfd, 'task_id')
cgnapt_approx_vnf.scenario_helper.scenario_cfg = self.scenario_cfg
cgnapt_approx_vnf._vnf_up_post()
@mock.patch(SSH_HELPER)
def test___init__(self, ssh, *args):
mock_ssh(ssh)
- prox_approx_vnf = prox_vnf.ProxApproxVnf(NAME, self.VNFD0)
+ prox_approx_vnf = prox_vnf.ProxApproxVnf(NAME, self.VNFD0, 'task_id')
self.assertIsNone(prox_approx_vnf._vnf_process)
@mock.patch.object(ctx_base.Context, 'get_physical_node_from_server', return_value='mock_node')
def test_collect_kpi_no_client(self, ssh, *args):
mock_ssh(ssh)
- prox_approx_vnf = prox_vnf.ProxApproxVnf(NAME, self.VNFD0)
+ prox_approx_vnf = prox_vnf.ProxApproxVnf(NAME, self.VNFD0, 'task_id')
prox_approx_vnf.scenario_helper.scenario_cfg = {
'nodes': {prox_approx_vnf.name: "mock"}
}
[2, 1, 2, 3, 4, 5], [3, 1, 2, 3, 4, 5]]
resource_helper.collect_collectd_kpi.return_value = {'core': {'result': 234}}
- prox_approx_vnf = prox_vnf.ProxApproxVnf(NAME, self.VNFD0)
+ prox_approx_vnf = prox_vnf.ProxApproxVnf(NAME, self.VNFD0, 'task_id')
prox_approx_vnf.scenario_helper.scenario_cfg = {
'nodes': {prox_approx_vnf.name: "mock"}
}
mock_ssh(ssh)
resource_helper = mock.MagicMock()
- prox_approx_vnf = prox_vnf.ProxApproxVnf(NAME, deepcopy(self.VNFD0))
+ prox_approx_vnf = prox_vnf.ProxApproxVnf(NAME, deepcopy(self.VNFD0),
+ 'task_id')
prox_approx_vnf.scenario_helper.scenario_cfg = {
'nodes': {prox_approx_vnf.name: "mock"}
}
def test_run_prox(self, ssh, *_):
mock_ssh(ssh)
- prox_approx_vnf = prox_vnf.ProxApproxVnf(NAME, self.VNFD0)
+ prox_approx_vnf = prox_vnf.ProxApproxVnf(NAME, self.VNFD0, 'task_id')
prox_approx_vnf.scenario_helper.scenario_cfg = self.SCENARIO_CFG
prox_approx_vnf.ssh_helper.join_bin_path.return_value = '/tool_path12/tool_file34'
prox_approx_vnf.setup_helper.remote_path = 'configs/file56.cfg'
@mock.patch(SSH_HELPER)
def bad_test_instantiate(self, *args):
- prox_approx_vnf = prox_vnf.ProxApproxVnf(NAME, self.VNFD0)
+ prox_approx_vnf = prox_vnf.ProxApproxVnf(NAME, self.VNFD0, 'task_id')
prox_approx_vnf.scenario_helper = mock.MagicMock()
prox_approx_vnf.setup_helper = mock.MagicMock()
# we can't mock super
@mock.patch(SSH_HELPER)
def test_wait_for_instantiate_panic(self, ssh, *args):
mock_ssh(ssh, exec_result=(1, "", ""))
- prox_approx_vnf = prox_vnf.ProxApproxVnf(NAME, self.VNFD0)
+ prox_approx_vnf = prox_vnf.ProxApproxVnf(NAME, self.VNFD0, 'task_id')
prox_approx_vnf._vnf_process = mock.MagicMock(**{"is_alive.return_value": True})
prox_approx_vnf._run_prox = mock.Mock(return_value=0)
prox_approx_vnf.WAIT_TIME = 0
@mock.patch(SSH_HELPER)
def test_terminate(self, ssh, *args):
mock_ssh(ssh)
- prox_approx_vnf = prox_vnf.ProxApproxVnf(NAME, self.VNFD0)
+ prox_approx_vnf = prox_vnf.ProxApproxVnf(NAME, self.VNFD0, 'task_id')
prox_approx_vnf._vnf_process = mock.MagicMock()
prox_approx_vnf._vnf_process.terminate = mock.Mock()
prox_approx_vnf.ssh_helper = mock.MagicMock()
@mock.patch(SSH_HELPER)
def test__vnf_up_post(self, ssh, *args):
mock_ssh(ssh)
- prox_approx_vnf = prox_vnf.ProxApproxVnf(NAME, self.VNFD0)
+ prox_approx_vnf = prox_vnf.ProxApproxVnf(NAME, self.VNFD0, 'task_id')
prox_approx_vnf.resource_helper = resource_helper = mock.Mock()
prox_approx_vnf._vnf_up_post()
@mock.patch(SSH_HELPER)
def test_vnf_execute_oserror(self, ssh, *args):
mock_ssh(ssh)
- prox_approx_vnf = prox_vnf.ProxApproxVnf(NAME, self.VNFD0)
+ prox_approx_vnf = prox_vnf.ProxApproxVnf(NAME, self.VNFD0, 'task_id')
prox_approx_vnf.resource_helper = resource_helper = mock.Mock()
resource_helper.execute.side_effect = OSError(errno.EPIPE, "")
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
-#
import unittest
import mock
-from yardstick.tests import STL_MOCKS
from yardstick.tests.unit.network_services.vnf_generic.vnf.test_base import mock_ssh
from yardstick.benchmark.contexts import base as ctx_base
-
-
-STLClient = mock.MagicMock()
-stl_patch = mock.patch.dict("sys.modules", STL_MOCKS)
-stl_patch.start()
-
-if stl_patch:
- from yardstick.network_services.vnf_generic.vnf.router_vnf import RouterVNF
+from yardstick.network_services.vnf_generic.vnf.router_vnf import RouterVNF
TEST_FILE_YAML = 'nsb_test_case.yaml'
def test___init__(self):
vnfd = self.VNFD['vnfd:vnfd-catalog']['vnfd'][0]
- router_vnf = RouterVNF(name, vnfd)
+ router_vnf = RouterVNF(name, vnfd, 'task_id')
self.assertIsNone(router_vnf._vnf_process)
def test_get_stats(self):
m = mock_ssh(ssh)
vnfd = self.VNFD['vnfd:vnfd-catalog']['vnfd'][0]
- router_vnf = RouterVNF(name, vnfd)
+ router_vnf = RouterVNF(name, vnfd, 'task_id')
router_vnf.scenario_helper.scenario_cfg = {
'nodes': {router_vnf.name: "mock"}
}
mock_ssh(ssh)
vnfd = self.VNFD['vnfd:vnfd-catalog']['vnfd'][0]
- router_vnf = RouterVNF(name, vnfd)
+ router_vnf = RouterVNF(name, vnfd, 'task_id')
router_vnf.scenario_helper.scenario_cfg = self.scenario_cfg
router_vnf._run()
router_vnf.ssh_helper.drop_connection.assert_called_once()
mock_ssh(ssh)
vnfd = self.VNFD['vnfd:vnfd-catalog']['vnfd'][0]
- router_vnf = RouterVNF(name, vnfd)
+ router_vnf = RouterVNF(name, vnfd, 'task_id')
router_vnf.WAIT_TIME = 0
router_vnf.INTERFACE_WAIT = 0
self.scenario_cfg.update({"nodes": {"vnf__1": ""}})
mock_ssh(ssh)
vnfd = self.VNFD['vnfd:vnfd-catalog']['vnfd'][0]
- router_vnf = RouterVNF(name, vnfd)
+ router_vnf = RouterVNF(name, vnfd, 'task_id')
router_vnf._vnf_process = mock.MagicMock()
router_vnf._vnf_process.terminate = mock.Mock()
self.assertIsNone(router_vnf.terminate())
}
def test___init__(self):
- sample_vnf = SampleVNF('vnf1', self.VNFD_0)
+ sample_vnf = SampleVNF('vnf1', self.VNFD_0, 'task_id')
self.assertEqual(sample_vnf.name, 'vnf1')
self.assertDictEqual(sample_vnf.vnfd_helper, self.VNFD_0)
class MyResourceHelper(ResourceHelper):
pass
- sample_vnf = SampleVNF('vnf1', self.VNFD_0, MySetupEnvHelper, MyResourceHelper)
+ sample_vnf = SampleVNF('vnf1', self.VNFD_0, 'task_id',
+ MySetupEnvHelper, MyResourceHelper)
self.assertEqual(sample_vnf.name, 'vnf1')
self.assertDictEqual(sample_vnf.vnfd_helper, self.VNFD_0)
@mock.patch('yardstick.network_services.vnf_generic.vnf.sample_vnf.Process')
def test__start_vnf(self, *args):
vnfd = self.VNFD['vnfd:vnfd-catalog']['vnfd'][0]
- sample_vnf = SampleVNF('vnf1', vnfd)
+ sample_vnf = SampleVNF('vnf1', vnfd, 'task_id')
sample_vnf._run = mock.Mock()
self.assertIsNone(sample_vnf.queue_wrapper)
}
vnfd = self.VNFD['vnfd:vnfd-catalog']['vnfd'][0]
- sample_vnf = SampleVNF('vnf1', vnfd)
+ sample_vnf = SampleVNF('vnf1', vnfd, 'task_id')
sample_vnf.scenario_helper.scenario_cfg = {
'nodes': {sample_vnf.name: 'mock'}
}
'plugin1': {'param': 1}}}
vnfd = self.VNFD['vnfd:vnfd-catalog']['vnfd'][0]
- sample_vnf = SampleVNF('vnf__0', vnfd)
+ sample_vnf = SampleVNF('vnf__0', vnfd, 'task_id')
sample_vnf._update_collectd_options(scenario_cfg, context_cfg)
self.assertEqual(sample_vnf.setup_helper.collectd_options, expected)
'plugin1': {'param': 1}}}
vnfd = self.VNFD['vnfd:vnfd-catalog']['vnfd'][0]
- sample_vnf = SampleVNF('vnf1', vnfd)
+ sample_vnf = SampleVNF('vnf1', vnfd, 'task_id')
sample_vnf._update_options(options2, options1)
self.assertEqual(options2, expected)
]
vnfd = self.VNFD['vnfd:vnfd-catalog']['vnfd'][0]
- sample_vnf = SampleVNF('vnf1', vnfd)
+ sample_vnf = SampleVNF('vnf1', vnfd, 'task_id')
sample_vnf.APP_NAME = 'sample1'
sample_vnf.WAIT_TIME_FOR_SCRIPT = 0
sample_vnf._start_server = mock.Mock(return_value=0)
]
vnfd = self.VNFD['vnfd:vnfd-catalog']['vnfd'][0]
- sample_vnf = SampleVNF('vnf1', vnfd)
+ sample_vnf = SampleVNF('vnf1', vnfd, 'task_id')
sample_vnf.APP_NAME = 'sample1'
sample_vnf.q_out = mock.Mock()
sample_vnf.q_out.qsize.side_effect = iter(queue_size_list)
def test_terminate_without_vnf_process(self):
vnfd = self.VNFD['vnfd:vnfd-catalog']['vnfd'][0]
- sample_vnf = SampleVNF('vnf1', vnfd)
+ sample_vnf = SampleVNF('vnf1', vnfd, 'task_id')
sample_vnf.APP_NAME = 'sample1'
sample_vnf.vnf_execute = mock.Mock()
sample_vnf.ssh_helper = mock.Mock()
def test_get_stats(self):
vnfd = self.VNFD['vnfd:vnfd-catalog']['vnfd'][0]
- sample_vnf = SampleVNF('vnf1', vnfd)
+ sample_vnf = SampleVNF('vnf1', vnfd, 'task_id')
sample_vnf.APP_NAME = 'sample1'
sample_vnf.APP_WORD = 'sample1'
sample_vnf.vnf_execute = mock.Mock(return_value='the stats')
@mock.patch.object(ctx_base.Context, 'get_physical_node_from_server', return_value='mock_node')
def test_collect_kpi(self, *args):
vnfd = self.VNFD['vnfd:vnfd-catalog']['vnfd'][0]
- sample_vnf = SampleVNF('vnf1', vnfd)
+ sample_vnf = SampleVNF('vnf1', vnfd, 'task_id')
sample_vnf.scenario_helper.scenario_cfg = {
'nodes': {sample_vnf.name: "mock"}
}
@mock.patch.object(ctx_base.Context, 'get_physical_node_from_server', return_value='mock_node')
def test_collect_kpi_default(self, *args):
vnfd = self.VNFD['vnfd:vnfd-catalog']['vnfd'][0]
- sample_vnf = SampleVNF('vnf1', vnfd)
+ sample_vnf = SampleVNF('vnf1', vnfd, 'task_id')
sample_vnf.scenario_helper.scenario_cfg = {
'nodes': {sample_vnf.name: "mock"}
}
def test_scale(self):
vnfd = self.VNFD['vnfd:vnfd-catalog']['vnfd'][0]
- sample_vnf = SampleVNF('vnf1', vnfd)
+ sample_vnf = SampleVNF('vnf1', vnfd, 'task_id')
self.assertRaises(y_exceptions.FunctionNotImplemented,
sample_vnf.scale)
test_cmd = 'test cmd'
run_kwargs = {'arg1': 'val1', 'arg2': 'val2'}
vnfd = self.VNFD['vnfd:vnfd-catalog']['vnfd'][0]
- sample_vnf = SampleVNF('vnf1', vnfd)
+ sample_vnf = SampleVNF('vnf1', vnfd, 'task_id')
sample_vnf.ssh_helper = mock.Mock()
sample_vnf.setup_helper = mock.Mock()
with mock.patch.object(sample_vnf, '_build_config',
}
def test__check_status(self):
- sample_vnf_tg = SampleVNFTrafficGen('tg1', self.VNFD_0)
+ sample_vnf_tg = SampleVNFTrafficGen('tg1', self.VNFD_0, 'task_id')
with self.assertRaises(NotImplementedError):
sample_vnf_tg._check_status()
def test_listen_traffic(self):
- sample_vnf_tg = SampleVNFTrafficGen('tg1', self.VNFD_0)
+ sample_vnf_tg = SampleVNFTrafficGen('tg1', self.VNFD_0, 'task_id')
sample_vnf_tg.listen_traffic(mock.Mock())
def test_verify_traffic(self):
- sample_vnf_tg = SampleVNFTrafficGen('tg1', self.VNFD_0)
+ sample_vnf_tg = SampleVNFTrafficGen('tg1', self.VNFD_0, 'task_id')
sample_vnf_tg.verify_traffic(mock.Mock())
def test_terminate(self):
- sample_vnf_tg = SampleVNFTrafficGen('tg1', self.VNFD_0)
+ sample_vnf_tg = SampleVNFTrafficGen('tg1', self.VNFD_0, 'task_id')
sample_vnf_tg._traffic_process = mock.Mock()
sample_vnf_tg._tg_process = mock.Mock()
sample_vnf_tg.terminate()
def test__wait_for_process(self):
- sample_vnf_tg = SampleVNFTrafficGen('tg1', self.VNFD_0)
+ sample_vnf_tg = SampleVNFTrafficGen('tg1', self.VNFD_0, 'task_id')
with mock.patch.object(sample_vnf_tg, '_check_status',
return_value=0) as mock_status, \
mock.patch.object(sample_vnf_tg, '_tg_process') as mock_proc:
mock_status.assert_called_once()
def test__wait_for_process_not_alive(self):
- sample_vnf_tg = SampleVNFTrafficGen('tg1', self.VNFD_0)
+ sample_vnf_tg = SampleVNFTrafficGen('tg1', self.VNFD_0, 'task_id')
with mock.patch.object(sample_vnf_tg, '_tg_process') as mock_proc:
mock_proc.is_alive.return_value = False
self.assertRaises(RuntimeError, sample_vnf_tg._wait_for_process)
mock_proc.is_alive.assert_called_once()
def test__wait_for_process_delayed(self):
- sample_vnf_tg = SampleVNFTrafficGen('tg1', self.VNFD_0)
+ sample_vnf_tg = SampleVNFTrafficGen('tg1', self.VNFD_0, 'task_id')
with mock.patch.object(sample_vnf_tg, '_check_status',
side_effect=[1, 0]) as mock_status, \
mock.patch.object(sample_vnf_tg,
mock_status.assert_has_calls([mock.call(), mock.call()])
def test_scale(self):
- sample_vnf_tg = SampleVNFTrafficGen('tg1', self.VNFD_0)
+ sample_vnf_tg = SampleVNFTrafficGen('tg1', self.VNFD_0, 'task_id')
self.assertRaises(y_exceptions.FunctionNotImplemented,
sample_vnf_tg.scale)
def test___init__(self):
vnfd = self.VNFD['vnfd:vnfd-catalog']['vnfd'][0]
- ixload_traffic_gen = tg_ixload.IxLoadTrafficGen(NAME, vnfd)
+ ixload_traffic_gen = tg_ixload.IxLoadTrafficGen(NAME, vnfd, 'task_id')
self.assertIsNone(ixload_traffic_gen.resource_helper.data)
@mock.patch.object(ctx_base.Context, 'get_physical_node_from_server',
return_value='mock_node')
def test_collect_kpi(self, *args):
vnfd = self.VNFD['vnfd:vnfd-catalog']['vnfd'][0]
- ixload_traffic_gen = tg_ixload.IxLoadTrafficGen(NAME, vnfd)
+ ixload_traffic_gen = tg_ixload.IxLoadTrafficGen(NAME, vnfd, 'task_id')
ixload_traffic_gen.scenario_helper.scenario_cfg = {
'nodes': {ixload_traffic_gen.name: "mock"}
}
def test_listen_traffic(self):
vnfd = self.VNFD['vnfd:vnfd-catalog']['vnfd'][0]
- ixload_traffic_gen = tg_ixload.IxLoadTrafficGen(NAME, vnfd)
+ ixload_traffic_gen = tg_ixload.IxLoadTrafficGen(NAME, vnfd, 'task_id')
self.assertIsNone(ixload_traffic_gen.listen_traffic({}))
@mock.patch.object(utils, 'find_relative_file')
@mock.patch.object(tg_ixload, 'shutil')
def test_instantiate(self, mock_shutil, *args):
vnfd = self.VNFD['vnfd:vnfd-catalog']['vnfd'][0]
- ixload_traffic_gen = tg_ixload.IxLoadTrafficGen(NAME, vnfd)
+ ixload_traffic_gen = tg_ixload.IxLoadTrafficGen(NAME, vnfd, 'task_id')
scenario_cfg = {'tc': "nsb_test_case",
'ixia_profile': "ixload.cfg",
'task_path': "/path/to/task"}
vnfd['mgmt-interface'].update({'tg-config': {}})
vnfd['mgmt-interface']['tg-config'].update({'ixchassis': '1.1.1.1'})
vnfd['mgmt-interface']['tg-config'].update({'py_bin_path': '/root'})
- sut = tg_ixload.IxLoadTrafficGen(NAME, vnfd)
+ sut = tg_ixload.IxLoadTrafficGen(NAME, vnfd, 'task_id')
sut.connection = mock.Mock()
sut._traffic_runner = mock.Mock(return_value=0)
result = sut.run_traffic(mock_traffic_profile)
vnfd['mgmt-interface'].update({'tg-config': {}})
vnfd['mgmt-interface']['tg-config'].update({'ixchassis': '1.1.1.1'})
vnfd['mgmt-interface']['tg-config'].update({'py_bin_path': '/root'})
- sut = tg_ixload.IxLoadTrafficGen(NAME, vnfd)
+ sut = tg_ixload.IxLoadTrafficGen(NAME, vnfd, 'task_id')
sut.connection = mock.Mock()
sut._traffic_runner = mock.Mock(return_value=0)
subprocess.call(['touch', '/tmp/1.csv'])
def test_terminate(self):
vnfd = self.VNFD['vnfd:vnfd-catalog']['vnfd'][0]
- ixload_traffic_gen = tg_ixload.IxLoadTrafficGen(NAME, vnfd)
+ ixload_traffic_gen = tg_ixload.IxLoadTrafficGen(NAME, vnfd, 'task_id')
self.assertIsNone(ixload_traffic_gen.terminate())
def test_parse_csv_read(self):
'HTTP Transaction Rate': True,
}
http_reader = [kpi_data]
- ixload_traffic_gen = tg_ixload.IxLoadTrafficGen(NAME, vnfd)
+ ixload_traffic_gen = tg_ixload.IxLoadTrafficGen(NAME, vnfd, 'task_id')
result = ixload_traffic_gen.resource_helper.result
ixload_traffic_gen.resource_helper.parse_csv_read(http_reader)
for k_left, k_right in tg_ixload.IxLoadResourceHelper.KPI_LIST.items():
'HTTP Connection Rate': 4,
'HTTP Transaction Rate': 5,
}]
- ixload_traffic_gen = tg_ixload.IxLoadTrafficGen(NAME, vnfd)
+ ixload_traffic_gen = tg_ixload.IxLoadTrafficGen(NAME, vnfd, 'task_id')
init_value = ixload_traffic_gen.resource_helper.result
ixload_traffic_gen.resource_helper.parse_csv_read(http_reader)
self.assertDictEqual(ixload_traffic_gen.resource_helper.result,
'HTTP Concurrent Connections': 3,
'HTTP Transaction Rate': 5,
}]
- ixload_traffic_gen = tg_ixload.IxLoadTrafficGen(NAME, vnfd)
+ ixload_traffic_gen = tg_ixload.IxLoadTrafficGen(NAME, vnfd, 'task_id')
+
with self.assertRaises(KeyError):
ixload_traffic_gen.resource_helper.parse_csv_read(http_reader)
import unittest
from yardstick.tests.unit.network_services.vnf_generic.vnf.test_base import mock_ssh
-from yardstick.tests import STL_MOCKS
from yardstick.benchmark.contexts import base as ctx_base
+from yardstick.network_services.vnf_generic.vnf.tg_ping import PingParser
+from yardstick.network_services.vnf_generic.vnf.tg_ping import PingTrafficGen
+from yardstick.network_services.vnf_generic.vnf.tg_ping import PingResourceHelper
+from yardstick.network_services.vnf_generic.vnf.tg_ping import PingSetupEnvHelper
+from yardstick.network_services.vnf_generic.vnf.vnf_ssh_helper import VnfSshHelper
-SSH_HELPER = "yardstick.network_services.vnf_generic.vnf.sample_vnf.VnfSshHelper"
-
-STLClient = mock.MagicMock()
-stl_patch = mock.patch.dict("sys.modules", STL_MOCKS)
-stl_patch.start()
-if stl_patch:
- from yardstick.network_services.vnf_generic.vnf.tg_ping import PingParser
- from yardstick.network_services.vnf_generic.vnf.tg_ping import PingTrafficGen
- from yardstick.network_services.vnf_generic.vnf.tg_ping import PingResourceHelper
- from yardstick.network_services.vnf_generic.vnf.tg_ping import PingSetupEnvHelper
- from yardstick.network_services.vnf_generic.vnf.vnf_ssh_helper import VnfSshHelper
+SSH_HELPER = "yardstick.network_services.vnf_generic.vnf.sample_vnf.VnfSshHelper"
class TestPingResourceHelper(unittest.TestCase):
@mock.patch("yardstick.ssh.SSH")
def test___init__(self, ssh):
ssh.from_node.return_value.execute.return_value = 0, "success", ""
- ping_traffic_gen = PingTrafficGen('vnf1', self.VNFD_0)
+ ping_traffic_gen = PingTrafficGen('vnf1', self.VNFD_0, 'task_id')
self.assertIsInstance(ping_traffic_gen.setup_helper, PingSetupEnvHelper)
self.assertIsInstance(ping_traffic_gen.resource_helper, PingResourceHelper)
(0, 'if_name_2', ''),
]
ssh.from_node.return_value.execute.side_effect = iter(execute_result_data)
- ping_traffic_gen = PingTrafficGen('vnf1', self.VNFD_0)
+ ping_traffic_gen = PingTrafficGen('vnf1', self.VNFD_0, 'task_id')
ext_ifs = ping_traffic_gen.vnfd_helper.interfaces
self.assertNotEqual(ext_ifs[0]['virtual-interface']['local_iface_name'], 'if_name_1')
self.assertNotEqual(ext_ifs[1]['virtual-interface']['local_iface_name'], 'if_name_2')
def test_collect_kpi(self, ssh, *args):
mock_ssh(ssh, exec_result=(0, "success", ""))
- ping_traffic_gen = PingTrafficGen('vnf1', self.VNFD_0)
+ ping_traffic_gen = PingTrafficGen('vnf1', self.VNFD_0, 'task_id')
ping_traffic_gen.scenario_helper.scenario_cfg = {
'nodes': {ping_traffic_gen.name: "mock"}
}
@mock.patch(SSH_HELPER)
def test_instantiate(self, ssh):
mock_ssh(ssh, spec=VnfSshHelper, exec_result=(0, "success", ""))
- ping_traffic_gen = PingTrafficGen('vnf1', self.VNFD_0)
+ ping_traffic_gen = PingTrafficGen('vnf1', self.VNFD_0, 'task_id')
ping_traffic_gen.setup_helper.ssh_helper = mock.MagicMock(
**{"execute.return_value": (0, "xe0_fake", "")})
self.assertIsInstance(ping_traffic_gen.ssh_helper, mock.Mock)
self.assertIsNotNone(ping_traffic_gen._result)
def test_listen_traffic(self):
- ping_traffic_gen = PingTrafficGen('vnf1', self.VNFD_0)
+ ping_traffic_gen = PingTrafficGen('vnf1', self.VNFD_0, 'task_id')
self.assertIsNone(ping_traffic_gen.listen_traffic({}))
@mock.patch("yardstick.ssh.SSH")
ssh.from_node.return_value.execute.return_value = 0, "success", ""
ssh.from_node.return_value.run.return_value = 0, "success", ""
- ping_traffic_gen = PingTrafficGen('vnf1', self.VNFD_0)
+ ping_traffic_gen = PingTrafficGen('vnf1', self.VNFD_0, 'task_id')
self.assertIsNone(ping_traffic_gen.terminate())
import mock
from yardstick.tests.unit.network_services.vnf_generic.vnf.test_base import mock_ssh
-from yardstick.tests import STL_MOCKS
from yardstick.benchmark.contexts import base as ctx_base
+from yardstick.network_services.vnf_generic.vnf.tg_prox import ProxTrafficGen
+from yardstick.network_services.traffic_profile.base import TrafficProfile
SSH_HELPER = 'yardstick.network_services.vnf_generic.vnf.sample_vnf.VnfSshHelper'
NAME = 'vnf__1'
-STLClient = mock.MagicMock()
-stl_patch = mock.patch.dict("sys.modules", STL_MOCKS)
-stl_patch.start()
-
-if stl_patch:
- from yardstick.network_services.vnf_generic.vnf.tg_prox import ProxTrafficGen
- from yardstick.network_services.traffic_profile.base import TrafficProfile
-
@mock.patch('yardstick.network_services.vnf_generic.vnf.prox_helpers.time')
class TestProxTrafficGen(unittest.TestCase):
@mock.patch(SSH_HELPER)
def test___init__(self, ssh, *args):
mock_ssh(ssh)
- prox_traffic_gen = ProxTrafficGen(NAME, self.VNFD0)
+ prox_traffic_gen = ProxTrafficGen(NAME, self.VNFD0, 'task_id')
self.assertIsNone(prox_traffic_gen._tg_process)
self.assertIsNone(prox_traffic_gen._traffic_process)
@mock.patch(SSH_HELPER)
def test_collect_kpi(self, ssh, *args):
mock_ssh(ssh)
- prox_traffic_gen = ProxTrafficGen(NAME, self.VNFD0)
+ prox_traffic_gen = ProxTrafficGen(NAME, self.VNFD0, 'task_id')
prox_traffic_gen.scenario_helper.scenario_cfg = {
'nodes': {prox_traffic_gen.name: "mock"}
}
mock_traffic_profile.params = self.TRAFFIC_PROFILE
vnfd = self.VNFD['vnfd:vnfd-catalog']['vnfd'][0]
- prox_traffic_gen = ProxTrafficGen(NAME, vnfd)
+ prox_traffic_gen = ProxTrafficGen(NAME, vnfd, 'task_id')
ssh_helper = mock.MagicMock(
**{"execute.return_value": (0, "", ""), "bin_path": ""})
prox_traffic_gen.ssh_helper = ssh_helper
mock_traffic_profile.params = self.TRAFFIC_PROFILE
vnfd = self.VNFD['vnfd:vnfd-catalog']['vnfd'][0]
- sut = ProxTrafficGen(NAME, vnfd)
+ sut = ProxTrafficGen(NAME, vnfd, 'task_id')
sut._get_socket = mock.MagicMock()
sut.ssh_helper = mock.Mock()
sut.ssh_helper.run = mock.Mock()
sut.setup_helper.prox_config_dict = {}
- sut._connect_client = mock.Mock(autospec=STLClient)
+ sut._connect_client = mock.Mock(autospec=mock.Mock())
sut._connect_client.get_stats = mock.Mock(return_value="0")
sut._setup_mq_producer = mock.Mock(return_value='mq_producer')
sut._traffic_runner(mock_traffic_profile, mock.ANY)
def test_listen_traffic(self, ssh, *args):
mock_ssh(ssh)
vnfd = self.VNFD['vnfd:vnfd-catalog']['vnfd'][0]
- prox_traffic_gen = ProxTrafficGen(NAME, vnfd)
+ prox_traffic_gen = ProxTrafficGen(NAME, vnfd, 'task_id')
self.assertIsNone(prox_traffic_gen.listen_traffic(mock.Mock()))
@mock.patch('yardstick.network_services.vnf_generic.vnf.prox_helpers.socket')
def test_terminate(self, ssh, *args):
mock_ssh(ssh)
vnfd = self.VNFD['vnfd:vnfd-catalog']['vnfd'][0]
- prox_traffic_gen = ProxTrafficGen(NAME, vnfd)
+ prox_traffic_gen = ProxTrafficGen(NAME, vnfd, 'task_id')
prox_traffic_gen._terminated = mock.MagicMock()
prox_traffic_gen._traffic_process = mock.MagicMock()
prox_traffic_gen._traffic_process.terminate = mock.Mock()
ssh.from_node.return_value = ssh_mock
vnfd = self.VNFD['vnfd:vnfd-catalog']['vnfd'][0]
# NOTE(ralonsoh): check the object returned.
- tg_rfc2544_ixia.IxiaTrafficGen(NAME, vnfd)
+ tg_rfc2544_ixia.IxiaTrafficGen(NAME, vnfd, 'task_id')
def test_listen_traffic(self, *args):
with mock.patch("yardstick.ssh.SSH") as ssh:
mock.Mock(return_value=(0, "", ""))
ssh.from_node.return_value = ssh_mock
vnfd = self.VNFD['vnfd:vnfd-catalog']['vnfd'][0]
- ixnet_traffic_gen = tg_rfc2544_ixia.IxiaTrafficGen(NAME, vnfd)
+ ixnet_traffic_gen = tg_rfc2544_ixia.IxiaTrafficGen(NAME, vnfd,
+ 'task_id')
self.assertIsNone(ixnet_traffic_gen.listen_traffic({}))
@mock.patch.object(ctx_base.Context, 'get_context_from_server', return_value='fake_context')
mock.Mock(return_value=(0, "", ""))
ssh.from_node.return_value = ssh_mock
vnfd = self.VNFD['vnfd:vnfd-catalog']['vnfd'][0]
- ixnet_traffic_gen = tg_rfc2544_ixia.IxiaTrafficGen(NAME, vnfd)
+ ixnet_traffic_gen = tg_rfc2544_ixia.IxiaTrafficGen(NAME, vnfd,
+ 'task_id')
scenario_cfg = {'tc': "nsb_test_case", "topology": "",
'ixia_profile': "ixload.cfg"}
scenario_cfg.update(
ssh.from_node.return_value = ssh_mock
vnfd = self.VNFD['vnfd:vnfd-catalog']['vnfd'][0]
- ixnet_traffic_gen = tg_rfc2544_ixia.IxiaTrafficGen(NAME, vnfd)
+ ixnet_traffic_gen = tg_rfc2544_ixia.IxiaTrafficGen(NAME, vnfd,
+ 'task_id')
ixnet_traffic_gen.scenario_helper.scenario_cfg = {
'nodes': {ixnet_traffic_gen.name: "mock"}
}
ssh_mock.execute = \
mock.Mock(return_value=(0, "", ""))
ssh.from_node.return_value = ssh_mock
- ixnet_traffic_gen = tg_rfc2544_ixia.IxiaTrafficGen(NAME, vnfd)
+ ixnet_traffic_gen = tg_rfc2544_ixia.IxiaTrafficGen(NAME, vnfd,
+ 'task_id')
ixnet_traffic_gen._terminated = mock.MagicMock()
ixnet_traffic_gen._terminated.value = 0
ixnet_traffic_gen._ixia_traffic_gen = mock.MagicMock()
def test__check_status(self, *args):
vnfd = self.VNFD['vnfd:vnfd-catalog']['vnfd'][0]
- sut = tg_rfc2544_ixia.IxiaTrafficGen('vnf1', vnfd)
+ sut = tg_rfc2544_ixia.IxiaTrafficGen('vnf1', vnfd, 'task_id')
sut._check_status()
@mock.patch("yardstick.ssh.SSH")
mock_traffic_profile.get_drop_percentage.return_value = [
'Completed', samples]
- sut = tg_rfc2544_ixia.IxiaTrafficGen(name, vnfd)
+ sut = tg_rfc2544_ixia.IxiaTrafficGen(name, vnfd, 'task_id')
sut.vnf_port_pairs = [[[0], [1]]]
sut.tc_file_name = self._get_file_abspath(TEST_FILE_YAML)
sut.topology = ""
self._mock_ssh_helper.stop()
def test___init__(self):
- trex_traffic_gen = tg_rfc2544_trex.TrexTrafficGenRFC('vnf1', self.VNFD_0)
+ trex_traffic_gen = tg_rfc2544_trex.TrexTrafficGenRFC(
+ 'vnf1', self.VNFD_0, 'task_id')
self.assertIsNotNone(trex_traffic_gen.resource_helper._terminated.value)
@mock.patch.object(ctx_base.Context, 'get_physical_node_from_server', return_value='mock_node')
def test_collect_kpi(self, *args):
- trex_traffic_gen = tg_rfc2544_trex.TrexTrafficGenRFC('vnf1', self.VNFD_0)
+ trex_traffic_gen = tg_rfc2544_trex.TrexTrafficGenRFC(
+ 'vnf1', self.VNFD_0, 'task_id')
trex_traffic_gen.scenario_helper.scenario_cfg = {
'nodes': {trex_traffic_gen.name: "mock"}
}
mock_traffic_profile.get_traffic_definition.return_value = "64"
mock_traffic_profile.params = self.TRAFFIC_PROFILE
- trex_traffic_gen = tg_rfc2544_trex.TrexTrafficGenRFC('vnf1', self.VNFD_0)
+ trex_traffic_gen = tg_rfc2544_trex.TrexTrafficGenRFC(
+ 'vnf1', self.VNFD_0, 'task_id')
trex_traffic_gen._start_server = mock.Mock(return_value=0)
trex_traffic_gen.resource_helper = mock.MagicMock()
trex_traffic_gen.setup_helper.setup_vnf_environment = mock.MagicMock()
mock_traffic_profile.get_traffic_definition.return_value = "64"
mock_traffic_profile.params = self.TRAFFIC_PROFILE
- trex_traffic_gen = tg_rfc2544_trex.TrexTrafficGenRFC('vnf1', self.VNFD_0)
+ trex_traffic_gen = tg_rfc2544_trex.TrexTrafficGenRFC(
+ 'vnf1', self.VNFD_0, 'task_id')
trex_traffic_gen.resource_helper = mock.MagicMock()
trex_traffic_gen.setup_helper.setup_vnf_environment = mock.MagicMock()
scenario_cfg = {
def test___init__(self):
vnfd = self.VNFD['vnfd:vnfd-catalog']['vnfd'][0]
- trex_traffic_gen = tg_trex.TrexTrafficGen(NAME, vnfd)
+ trex_traffic_gen = tg_trex.TrexTrafficGen(NAME, vnfd, 'task_id')
self.assertIsInstance(trex_traffic_gen.resource_helper,
tg_trex.TrexResourceHelper)
@mock.patch.object(ctx_base.Context, 'get_physical_node_from_server', return_value='mock_node')
def test_collect_kpi(self, *args):
vnfd = self.VNFD['vnfd:vnfd-catalog']['vnfd'][0]
- trex_traffic_gen = tg_trex.TrexTrafficGen(NAME, vnfd)
+ trex_traffic_gen = tg_trex.TrexTrafficGen(NAME, vnfd, 'task_id')
trex_traffic_gen.scenario_helper.scenario_cfg = {
'nodes': {trex_traffic_gen.name: "mock"}
}
def test_listen_traffic(self):
vnfd = self.VNFD['vnfd:vnfd-catalog']['vnfd'][0]
- trex_traffic_gen = tg_trex.TrexTrafficGen(NAME, vnfd)
+ trex_traffic_gen = tg_trex.TrexTrafficGen(NAME, vnfd, 'task_id')
self.assertIsNone(trex_traffic_gen.listen_traffic({}))
@mock.patch.object(ctx_base.Context, 'get_context_from_server', return_value='fake_context')
def test_instantiate(self, *args):
vnfd = self.VNFD['vnfd:vnfd-catalog']['vnfd'][0]
- trex_traffic_gen = tg_trex.TrexTrafficGen(NAME, vnfd)
+ trex_traffic_gen = tg_trex.TrexTrafficGen(NAME, vnfd, 'task_id')
trex_traffic_gen._start_server = mock.Mock(return_value=0)
trex_traffic_gen._tg_process = mock.MagicMock()
trex_traffic_gen._tg_process.start = mock.Mock()
@mock.patch.object(ctx_base.Context, 'get_context_from_server', return_value='fake_context')
def test_instantiate_error(self, *args):
vnfd = self.VNFD['vnfd:vnfd-catalog']['vnfd'][0]
- trex_traffic_gen = tg_trex.TrexTrafficGen(NAME, vnfd)
+ trex_traffic_gen = tg_trex.TrexTrafficGen(NAME, vnfd, 'task_id')
trex_traffic_gen._start_server = mock.Mock(return_value=0)
trex_traffic_gen._tg_process = mock.MagicMock()
trex_traffic_gen._tg_process.start = mock.Mock()
def test__start_server(self):
vnfd = self.VNFD['vnfd:vnfd-catalog']['vnfd'][0]
- trex_traffic_gen = tg_trex.TrexTrafficGen(NAME, vnfd)
+ trex_traffic_gen = tg_trex.TrexTrafficGen(NAME, vnfd, 'task_id')
trex_traffic_gen.ssh_helper = mock.MagicMock()
trex_traffic_gen.resource_helper.ssh_helper = mock.MagicMock()
trex_traffic_gen.scenario_helper.scenario_cfg = {}
def test__start_server_multiple_queues(self):
vnfd = self.VNFD['vnfd:vnfd-catalog']['vnfd'][0]
- trex_traffic_gen = tg_trex.TrexTrafficGen(NAME, vnfd)
+ trex_traffic_gen = tg_trex.TrexTrafficGen(NAME, vnfd, 'task_id')
trex_traffic_gen.ssh_helper = mock.MagicMock()
trex_traffic_gen.resource_helper.ssh_helper = mock.MagicMock()
trex_traffic_gen.scenario_helper.scenario_cfg = {
mock_traffic_profile.params = self.TRAFFIC_PROFILE
vnfd = self.VNFD['vnfd:vnfd-catalog']['vnfd'][0]
- self.sut = tg_trex.TrexTrafficGen(NAME, vnfd)
+ self.sut = tg_trex.TrexTrafficGen(NAME, vnfd, 'task_id')
self.sut.ssh_helper = mock.Mock()
self.sut.ssh_helper.run = mock.Mock()
self.sut._connect_client = mock.Mock()
def test__generate_trex_cfg(self):
vnfd = self.VNFD['vnfd:vnfd-catalog']['vnfd'][0]
- trex_traffic_gen = tg_trex.TrexTrafficGen(NAME, vnfd)
+ trex_traffic_gen = tg_trex.TrexTrafficGen(NAME, vnfd, 'task_id')
trex_traffic_gen.resource_helper.ssh_helper = mock.MagicMock()
self.assertIsNone(trex_traffic_gen.resource_helper.generate_cfg())
'local_mac': '00:00:00:00:00:01'},
'vnfd-connection-point-ref': 'xe1',
'name': 'xe1'}]
- trex_traffic_gen = tg_trex.TrexTrafficGen(NAME, vnfd)
+ trex_traffic_gen = tg_trex.TrexTrafficGen(NAME, vnfd, 'task_id')
trex_traffic_gen.resource_helper.ssh_helper = mock.MagicMock()
trex_traffic_gen.resource_helper.generate_cfg()
trex_traffic_gen.resource_helper._build_ports()
mock_traffic_profile.params = self.TRAFFIC_PROFILE
vnfd = self.VNFD['vnfd:vnfd-catalog']['vnfd'][0]
- self.sut = tg_trex.TrexTrafficGen(NAME, vnfd)
+ self.sut = tg_trex.TrexTrafficGen(NAME, vnfd, 'task_id')
self.sut.ssh_helper = mock.Mock()
self.sut.ssh_helper.run = mock.Mock()
self.sut._traffic_runner = mock.Mock(return_value=0)
def test_terminate(self):
vnfd = self.VNFD['vnfd:vnfd-catalog']['vnfd'][0]
- trex_traffic_gen = tg_trex.TrexTrafficGen(NAME, vnfd)
+ trex_traffic_gen = tg_trex.TrexTrafficGen(NAME, vnfd, 'task_id')
trex_traffic_gen.ssh_helper = mock.MagicMock()
trex_traffic_gen.resource_helper.ssh_helper = mock.MagicMock()
self.assertIsNone(trex_traffic_gen.terminate())
def test__connect_client(self):
vnfd = self.VNFD['vnfd:vnfd-catalog']['vnfd'][0]
- trex_traffic_gen = tg_trex.TrexTrafficGen(NAME, vnfd)
+ trex_traffic_gen = tg_trex.TrexTrafficGen(NAME, vnfd, 'task_id')
client = mock.Mock()
client.connect = mock.Mock(return_value=0)
self.assertIsNotNone(trex_traffic_gen.resource_helper._connect(client))
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
-#
import unittest
import mock
import os
-from yardstick.tests import STL_MOCKS
from yardstick.tests.unit.network_services.vnf_generic.vnf.test_base import mock_ssh
from yardstick.benchmark.contexts import base as ctx_base
+from yardstick.network_services.vnf_generic.vnf.udp_replay import UdpReplayApproxVnf
+from yardstick.network_services.vnf_generic.vnf.sample_vnf import ScenarioHelper
SSH_HELPER = 'yardstick.network_services.vnf_generic.vnf.sample_vnf.VnfSshHelper'
-STLClient = mock.MagicMock()
-stl_patch = mock.patch.dict("sys.modules", STL_MOCKS)
-stl_patch.start()
-
-if stl_patch:
- from yardstick.network_services.vnf_generic.vnf.udp_replay import UdpReplayApproxVnf
- from yardstick.network_services.vnf_generic.vnf.sample_vnf import ScenarioHelper
-
-
TEST_FILE_YAML = 'nsb_test_case.yaml'
-
NAME = "vnf__1"
}
def test___init__(self, *args):
- udp_replay_approx_vnf = UdpReplayApproxVnf(NAME, self.VNFD_0)
+ udp_replay_approx_vnf = UdpReplayApproxVnf(NAME, self.VNFD_0,
+ 'task_id')
self.assertIsNone(udp_replay_approx_vnf._vnf_process)
@mock.patch("yardstick.network_services.vnf_generic.vnf.sample_vnf.time")
"Port\t\tRx Packet\t\tTx Packet\t\tRx Pkt Drop\t\tTx Pkt Drop \r\n"\
"0\t\t7374156\t\t7374136\t\t\t0\t\t\t0\r\n" \
"1\t\t7374316\t\t7374315\t\t\t0\t\t\t0\r\n\r\nReplay>\r\r\nReplay>"
- udp_replay_approx_vnf = UdpReplayApproxVnf(NAME, vnfd)
+ udp_replay_approx_vnf = UdpReplayApproxVnf(NAME, vnfd, 'task_id')
udp_replay_approx_vnf.scenario_helper.scenario_cfg = {
'nodes': {udp_replay_approx_vnf.name: "mock"}
}
def test_get_stats(self, ssh, *args):
mock_ssh(ssh)
- udp_replay_approx_vnf = UdpReplayApproxVnf(NAME, self.VNFD_0)
+ udp_replay_approx_vnf = UdpReplayApproxVnf(NAME, self.VNFD_0,
+ 'task_id')
udp_replay_approx_vnf.q_in = mock.MagicMock()
udp_replay_approx_vnf.q_out = mock.MagicMock()
udp_replay_approx_vnf.q_out.qsize = mock.Mock(return_value=0)
nfvi_context.attrs = {'nfvi_type': 'baremetal'}
mock_get_ctx.return_value = nfvi_context
- udp_replay_approx_vnf = UdpReplayApproxVnf(NAME, self.VNFD_0)
+ udp_replay_approx_vnf = UdpReplayApproxVnf(NAME, self.VNFD_0,
+ 'task_id')
udp_replay_approx_vnf.queue_wrapper = mock.MagicMock()
udp_replay_approx_vnf.nfvi_context = mock_get_ctx
udp_replay_approx_vnf.nfvi_context.attrs = {'nfvi_type': 'baremetal'}
nfvi_context.attrs = {'nfvi_type': "baremetal"}
mock_get_ctx.return_value = nfvi_context
- udp_replay_approx_vnf = UdpReplayApproxVnf(NAME, self.VNFD_0)
+ udp_replay_approx_vnf = UdpReplayApproxVnf(NAME, self.VNFD_0,
+ 'task_id')
udp_replay_approx_vnf.setup_helper.bound_pci = ['0000:00:0.1', '0000:00:0.3']
udp_replay_approx_vnf.all_ports = ["xe0", "xe1"]
udp_replay_approx_vnf.ssh_helper.provision_tool = mock.MagicMock(return_value="tool_path")
def test_run_udp_replay(self, ssh, *args):
mock_ssh(ssh)
- udp_replay_approx_vnf = UdpReplayApproxVnf(NAME, self.VNFD_0)
+ udp_replay_approx_vnf = UdpReplayApproxVnf(NAME, self.VNFD_0,
+ 'task_id')
udp_replay_approx_vnf._build_config = mock.MagicMock()
udp_replay_approx_vnf.queue_wrapper = mock.MagicMock()
udp_replay_approx_vnf.scenario_helper = mock.MagicMock()
def test_instantiate(self, ssh, *args):
mock_ssh(ssh)
- udp_replay_approx_vnf = UdpReplayApproxVnf(NAME, self.VNFD_0)
+ udp_replay_approx_vnf = UdpReplayApproxVnf(NAME, self.VNFD_0,
+ 'task_id')
udp_replay_approx_vnf.q_out.put("Replay>")
udp_replay_approx_vnf.WAIT_TIME = 0
udp_replay_approx_vnf.setup_helper.setup_vnf_environment = mock.Mock()
@mock.patch('yardstick.ssh.SSH')
@mock.patch(SSH_HELPER)
def test_instantiate_panic(self, *args):
- udp_replay_approx_vnf = UdpReplayApproxVnf(NAME, self.VNFD_0)
+ udp_replay_approx_vnf = UdpReplayApproxVnf(NAME, self.VNFD_0,
+ 'task_id')
udp_replay_approx_vnf.WAIT_TIME = 0
udp_replay_approx_vnf.q_out.put("some text PANIC some text")
udp_replay_approx_vnf.setup_helper.setup_vnf_environment = mock.Mock()
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
-#
import unittest
import mock
import os
-from yardstick.tests import STL_MOCKS
-from yardstick.tests.unit.network_services.vnf_generic.vnf.test_base import mock_ssh
-
from yardstick.common import utils
from yardstick.benchmark.contexts import base as ctx_base
-
-STLClient = mock.MagicMock()
-stl_patch = mock.patch.dict("sys.modules", STL_MOCKS)
-stl_patch.start()
-
-if stl_patch:
- from yardstick.network_services.vnf_generic.vnf.vfw_vnf import FWApproxVnf
- from yardstick.network_services.nfvi.resource import ResourceProfile
- from yardstick.network_services.vnf_generic.vnf.vfw_vnf import FWApproxSetupEnvHelper
+from yardstick.network_services.vnf_generic.vnf.vfw_vnf import FWApproxVnf
+from yardstick.network_services.nfvi.resource import ResourceProfile
+from yardstick.network_services.vnf_generic.vnf.vfw_vnf import FWApproxSetupEnvHelper
+from yardstick.tests.unit.network_services.vnf_generic.vnf.test_base import mock_ssh
TEST_FILE_YAML = 'nsb_test_case.yaml'
def test___init__(self, *args):
vnfd = self.VNFD['vnfd:vnfd-catalog']['vnfd'][0]
- vfw_approx_vnf = FWApproxVnf(name, vnfd)
+ vfw_approx_vnf = FWApproxVnf(name, vnfd, 'task_id')
self.assertIsNone(vfw_approx_vnf._vnf_process)
STATS = """\
def test_collect_kpi(self, ssh, *args):
mock_ssh(ssh)
vnfd = self.VNFD['vnfd:vnfd-catalog']['vnfd'][0]
- vfw_approx_vnf = FWApproxVnf(name, vnfd)
+ vfw_approx_vnf = FWApproxVnf(name, vnfd, 'task_id')
vfw_approx_vnf.scenario_helper.scenario_cfg = {
'nodes': {vfw_approx_vnf.name: "mock"}
}
mock_ssh(ssh)
vnfd = self.VNFD['vnfd:vnfd-catalog']['vnfd'][0]
- vfw_approx_vnf = FWApproxVnf(name, vnfd)
+ vfw_approx_vnf = FWApproxVnf(name, vnfd, 'task_id')
vfw_approx_vnf.q_in = mock.MagicMock()
vfw_approx_vnf.q_out = mock.MagicMock()
vfw_approx_vnf.q_out.qsize = mock.Mock(return_value=0)
mock_ssh(ssh)
vnfd = self.VNFD['vnfd:vnfd-catalog']['vnfd'][0]
- vfw_approx_vnf = FWApproxVnf(name, vnfd)
+ vfw_approx_vnf = FWApproxVnf(name, vnfd, 'task_id')
vfw_approx_vnf.q_in = mock.MagicMock()
vfw_approx_vnf.q_out = mock.MagicMock()
vfw_approx_vnf.q_out.qsize = mock.Mock(return_value=0)
mock_ssh(ssh)
vnfd = self.VNFD['vnfd:vnfd-catalog']['vnfd'][0]
- vfw_approx_vnf = FWApproxVnf(name, vnfd)
+ vfw_approx_vnf = FWApproxVnf(name, vnfd, 'task_id')
vfw_approx_vnf._build_config = mock.MagicMock()
vfw_approx_vnf.queue_wrapper = mock.MagicMock()
vfw_approx_vnf.ssh_helper = mock.MagicMock()
mock_ssh(ssh)
vnfd = self.VNFD['vnfd:vnfd-catalog']['vnfd'][0]
- vfw_approx_vnf = FWApproxVnf(name, vnfd)
+ vfw_approx_vnf = FWApproxVnf(name, vnfd, 'task_id')
vfw_approx_vnf.ssh_helper = ssh
vfw_approx_vnf.deploy_helper = mock.MagicMock()
vfw_approx_vnf.resource_helper = mock.MagicMock()
self._mock_time_sleep.stop()
def test___init__(self):
- vpe_approx_vnf = vpe_vnf.VpeApproxVnf(NAME, self.VNFD_0)
+ vpe_approx_vnf = vpe_vnf.VpeApproxVnf(NAME, self.VNFD_0, 'task_id')
self.assertIsNone(vpe_approx_vnf._vnf_process)
@mock.patch.object(ctx_base.Context, 'get_physical_node_from_server',
resource.amqp_collect_nfvi_kpi.return_value = {'foo': 234}
resource.check_if_system_agent_running.return_value = (1, None)
- vpe_approx_vnf = vpe_vnf.VpeApproxVnf(NAME, self.VNFD_0)
+ vpe_approx_vnf = vpe_vnf.VpeApproxVnf(NAME, self.VNFD_0, 'task_id')
vpe_approx_vnf.scenario_helper.scenario_cfg = {
'nodes': {vpe_approx_vnf.name: "mock"}
}
resource.check_if_system_agent_running.return_value = 0, '1234'
resource.amqp_collect_nfvi_kpi.return_value = {'foo': 234}
- vpe_approx_vnf = vpe_vnf.VpeApproxVnf(NAME, self.VNFD_0)
+ vpe_approx_vnf = vpe_vnf.VpeApproxVnf(NAME, self.VNFD_0, 'task_id')
vpe_approx_vnf.scenario_helper.scenario_cfg = {
'nodes': {vpe_approx_vnf.name: "mock"}
}
@mock.patch.object(sample_vnf, 'VnfSshHelper')
def test_vnf_execute(self, ssh):
test_base.mock_ssh(ssh)
- vpe_approx_vnf = vpe_vnf.VpeApproxVnf(NAME, self.VNFD_0)
+ vpe_approx_vnf = vpe_vnf.VpeApproxVnf(NAME, self.VNFD_0, 'task_id')
vpe_approx_vnf.q_in = mock.MagicMock()
vpe_approx_vnf.q_out = mock.MagicMock()
vpe_approx_vnf.q_out.qsize = mock.Mock(return_value=0)
def test_run_vpe(self, ssh):
test_base.mock_ssh(ssh)
- vpe_approx_vnf = vpe_vnf.VpeApproxVnf(NAME, self.VNFD_0)
+ vpe_approx_vnf = vpe_vnf.VpeApproxVnf(NAME, self.VNFD_0, 'task_id')
vpe_approx_vnf.tc_file_name = get_file_abspath(TEST_FILE_YAML)
vpe_approx_vnf.vnf_cfg = {
'lb_config': 'SW',
mock_resource = mock.MagicMock()
- vpe_approx_vnf = vpe_vnf.VpeApproxVnf(NAME, self.VNFD_0)
+ vpe_approx_vnf = vpe_vnf.VpeApproxVnf(NAME, self.VNFD_0, 'task_id')
vpe_approx_vnf._vnf_process = mock_process
vpe_approx_vnf.q_out = mock_q_out
vpe_approx_vnf.queue_wrapper = mock.Mock(
mock_resource = mock.MagicMock()
- vpe_approx_vnf = vpe_vnf.VpeApproxVnf(NAME, self.VNFD_0)
+ vpe_approx_vnf = vpe_vnf.VpeApproxVnf(NAME, self.VNFD_0, 'task_id')
vpe_approx_vnf._vnf_process = mock_process
vpe_approx_vnf.q_out = mock_q_out
vpe_approx_vnf.queue_wrapper = mock.Mock(
mock_resource = mock.MagicMock()
- vpe_approx_vnf = vpe_vnf.VpeApproxVnf(NAME, self.VNFD_0)
+ vpe_approx_vnf = vpe_vnf.VpeApproxVnf(NAME, self.VNFD_0, 'task_id')
vpe_approx_vnf._vnf_process = mock_process
vpe_approx_vnf.resource_helper.resource = mock_resource
mock_resource = mock.MagicMock()
- vpe_approx_vnf = vpe_vnf.VpeApproxVnf(NAME, self.VNFD_0)
+ vpe_approx_vnf = vpe_vnf.VpeApproxVnf(NAME, self.VNFD_0, 'task_id')
vpe_approx_vnf._vnf_process = mock_process
vpe_approx_vnf.resource_helper.resource = mock_resource
mock_resource = mock.MagicMock()
- vpe_approx_vnf = vpe_vnf.VpeApproxVnf(NAME, self.VNFD_0)
+ vpe_approx_vnf = vpe_vnf.VpeApproxVnf(NAME, self.VNFD_0, 'task_id')
vpe_approx_vnf._vnf_process = mock_process
vpe_approx_vnf.q_out = mock_q_out
vpe_approx_vnf.resource_helper.resource = mock_resource
def test_terminate(self, ssh):
test_base.mock_ssh(ssh)
- vpe_approx_vnf = vpe_vnf.VpeApproxVnf(NAME, self.VNFD_0)
+ vpe_approx_vnf = vpe_vnf.VpeApproxVnf(NAME, self.VNFD_0, 'task_id')
vpe_approx_vnf._vnf_process = mock.MagicMock()
vpe_approx_vnf._resource_collect_stop = mock.Mock()
vpe_approx_vnf.resource_helper = mock.MagicMock()