import six
from yardstick.common import messaging
+from yardstick.common.messaging import payloads
from yardstick.common.messaging import producer
from yardstick.network_services.helpers.samplevnf_helper import PortPairs
super(TrafficGeneratorProducer, self).__init__(messaging.TOPIC_TG,
pid=pid)
+ def tg_method_started(self, version=1):
+ """Send a message to inform the traffic generation has started"""
+ self.send_message(
+ messaging.TG_METHOD_STARTED,
+ payloads.TrafficGeneratorPayload(version=version, iteration=0,
+ kpi={}))
+
+ def tg_method_finished(self, version=1):
+ """Send a message to inform the traffic generation has finished"""
+ self.send_message(
+ messaging.TG_METHOD_FINISHED,
+ payloads.TrafficGeneratorPayload(version=version, iteration=0,
+ kpi={}))
+
+ def tg_method_iteration(self, iteration, version=1, kpi=None):
+ """Send a message, with KPI, once an iteration has finished"""
+ kpi = {} if kpi is None else kpi
+ self.send_message(
+ messaging.TG_METHOD_ITERATION,
+ payloads.TrafficGeneratorPayload(version=version,
+ iteration=iteration, kpi=kpi))
+
@six.add_metaclass(abc.ABCMeta)
class GenericVNF(object):
self._test_type = self.setup_helper.find_in_section('global', 'name', None)
return self._test_type
- def run_traffic(self, traffic_profile):
+ def run_traffic(self, traffic_profile, *args):
self._queue.cancel_join_thread()
self.lower = 0.0
self.upper = 100.0
import logging
from multiprocessing import Queue, Value, Process
-
import os
import posixpath
import re
-import six
+import uuid
import subprocess
import time
+import six
+
from trex_stl_lib.trex_stl_client import LoggerApi
from trex_stl_lib.trex_stl_client import STLClient
from trex_stl_lib.trex_stl_exceptions import STLError
time.sleep(self.QUEUE_WAIT_TIME)
self._queue.put(samples)
- def run_traffic(self, traffic_profile):
+ def run_traffic(self, traffic_profile, mq_producer):
# if we don't do this we can hang waiting for the queue to drain
# have to do this in the subprocess
self._queue.cancel_join_thread()
# fixme: fix passing correct trex config file,
# instead of searching the default path
+ mq_producer.tg_method_started()
try:
self._build_ports()
self.client = self._connect()
self.client.remove_all_streams(self.all_ports) # remove all streams
traffic_profile.register_generator(self)
+ iteration_index = 0
while self._terminated.value == 0:
+ iteration_index += 1
self._run_traffic_once(traffic_profile)
+ mq_producer.tg_method_iteration(iteration_index)
self.client.stop(self.all_ports)
self.client.disconnect()
return # return if trex/tg server is stopped.
raise
+ mq_producer.tg_method_finished()
+
def terminate(self):
self._terminated.value = 1 # stop client
LOG.info("%s TG Server is up and running.", self.APP_NAME)
return self._tg_process.exitcode
- def _traffic_runner(self, traffic_profile):
+ def _traffic_runner(self, traffic_profile, mq_pid):
# always drop connections first thing in new processes
# so we don't get paramiko errors
self.ssh_helper.drop_connection()
LOG.info("Starting %s client...", self.APP_NAME)
- self.resource_helper.run_traffic(traffic_profile)
+ self._mq_producer = self._setup_mq_producer(mq_pid)
+ self.resource_helper.run_traffic(traffic_profile, self._mq_producer)
def run_traffic(self, traffic_profile):
""" Generate traffic on the wire according to the given params.
:param traffic_profile:
:return: True/False
"""
- name = "{}-{}-{}-{}".format(self.name, self.APP_NAME, traffic_profile.__class__.__name__,
+ name = '{}-{}-{}-{}'.format(self.name, self.APP_NAME,
+ traffic_profile.__class__.__name__,
os.getpid())
+ mq_pid = uuid.uuid1().int
self._traffic_process = Process(name=name, target=self._traffic_runner,
- args=(traffic_profile,))
+ args=(traffic_profile, mq_pid))
self._traffic_process.start()
# Wait for traffic process to start
while self.resource_helper.client_started.value == 0:
if not self._traffic_process.is_alive():
break
- return self._traffic_process.is_alive()
+ return mq_pid
def collect_kpi(self):
# check if the tg processes have exited
self._queue = Queue()
self._parser = PingParser(self._queue)
- def run_traffic(self, traffic_profile):
+ def run_traffic(self, traffic_profile, *args):
# drop the connection in order to force a new one
self.ssh_helper.drop_connection()
self.client.assign_ports()
self.client.create_traffic_model()
- def run_traffic(self, traffic_profile):
+ def run_traffic(self, traffic_profile, *args):
if self._terminated.value:
return
import unittest
from yardstick.common import messaging
+from yardstick.common.messaging import payloads
from yardstick.network_services.vnf_generic.vnf import base
from yardstick.ssh import SSH
mock_rpcclient.assert_called_once_with('rpc_transport', 'rpc_target')
self.assertEqual(pid, tg_producer._pid)
self.assertEqual(messaging.TOPIC_TG, tg_producer._topic)
+
+ @mock.patch.object(oslo_messaging, 'Target', return_value='rpc_target')
+ @mock.patch.object(oslo_messaging, 'RPCClient')
+ @mock.patch.object(oslo_messaging, 'get_rpc_transport',
+ return_value='rpc_transport')
+ @mock.patch.object(payloads, 'TrafficGeneratorPayload',
+ return_value='tg_pload')
+ def test_tg_method_started(self, mock_tg_payload, *args):
+ tg_producer = base.TrafficGeneratorProducer(uuid.uuid1().int)
+ with mock.patch.object(tg_producer, 'send_message') as mock_message:
+ tg_producer.tg_method_started(version=10)
+
+ mock_message.assert_called_once_with(messaging.TG_METHOD_STARTED,
+ 'tg_pload')
+ mock_tg_payload.assert_called_once_with(version=10, iteration=0,
+ kpi={})
+
+ @mock.patch.object(oslo_messaging, 'Target', return_value='rpc_target')
+ @mock.patch.object(oslo_messaging, 'RPCClient')
+ @mock.patch.object(oslo_messaging, 'get_rpc_transport',
+ return_value='rpc_transport')
+ @mock.patch.object(payloads, 'TrafficGeneratorPayload',
+ return_value='tg_pload')
+ def test_tg_method_finished(self, mock_tg_payload, *args):
+ tg_producer = base.TrafficGeneratorProducer(uuid.uuid1().int)
+ with mock.patch.object(tg_producer, 'send_message') as mock_message:
+ tg_producer.tg_method_finished(version=20)
+
+ mock_message.assert_called_once_with(messaging.TG_METHOD_FINISHED,
+ 'tg_pload')
+ mock_tg_payload.assert_called_once_with(version=20, iteration=0,
+ kpi={})
+
+ @mock.patch.object(oslo_messaging, 'Target', return_value='rpc_target')
+ @mock.patch.object(oslo_messaging, 'RPCClient')
+ @mock.patch.object(oslo_messaging, 'get_rpc_transport',
+ return_value='rpc_transport')
+ @mock.patch.object(payloads, 'TrafficGeneratorPayload',
+ return_value='tg_pload')
+ def test_tg_method_iteration(self, mock_tg_payload, *args):
+ tg_producer = base.TrafficGeneratorProducer(uuid.uuid1().int)
+ with mock.patch.object(tg_producer, 'send_message') as mock_message:
+ tg_producer.tg_method_iteration(100, version=30, kpi={'k': 'v'})
+
+ mock_message.assert_called_once_with(messaging.TG_METHOD_ITERATION,
+ 'tg_pload')
+ mock_tg_payload.assert_called_once_with(version=30, iteration=100,
+ kpi={'k': 'v'})
self.assertIs(client_resource_helper._connect(client), client)
+ @mock.patch.object(ClientResourceHelper, '_build_ports')
+ @mock.patch.object(ClientResourceHelper, '_run_traffic_once')
+ def test_run_traffic(self, mock_run_traffic_once, mock_build_ports):
+ client_resource_helper = ClientResourceHelper(mock.Mock())
+ client = mock.Mock()
+ traffic_profile = mock.Mock()
+ mq_producer = mock.Mock()
+ with mock.patch.object(client_resource_helper, '_connect') \
+ as mock_connect, \
+ mock.patch.object(client_resource_helper, '_terminated') \
+ as mock_terminated:
+ mock_connect.return_value = client
+ type(mock_terminated).value = mock.PropertyMock(
+ side_effect=[0, 1, lambda x: x])
+ client_resource_helper.run_traffic(traffic_profile, mq_producer)
+
+ mock_build_ports.assert_called_once()
+ traffic_profile.register_generator.assert_called_once()
+ mq_producer.tg_method_started.assert_called_once()
+ mq_producer.tg_method_finished.assert_called_once()
+ mq_producer.tg_method_iteration.assert_called_once_with(1)
+ mock_run_traffic_once.assert_called_once_with(traffic_profile)
+
+ @mock.patch.object(ClientResourceHelper, '_build_ports')
+ @mock.patch.object(ClientResourceHelper, '_run_traffic_once',
+ side_effect=Exception)
+ def test_run_traffic_exception(self, mock_run_traffic_once,
+ mock_build_ports):
+ client_resource_helper = ClientResourceHelper(mock.Mock())
+ client = mock.Mock()
+ traffic_profile = mock.Mock()
+ mq_producer = mock.Mock()
+ with mock.patch.object(client_resource_helper, '_connect') \
+ as mock_connect, \
+ mock.patch.object(client_resource_helper, '_terminated') \
+ as mock_terminated:
+ mock_connect.return_value = client
+ type(mock_terminated).value = mock.PropertyMock(return_value=0)
+ mq_producer.reset_mock()
+ # NOTE(ralonsoh): "trex_stl_exceptions.STLError" is mocked
+ with self.assertRaises(Exception):
+ client_resource_helper.run_traffic(traffic_profile,
+ mq_producer)
+
+ mock_build_ports.assert_called_once()
+ traffic_profile.register_generator.assert_called_once()
+ mock_run_traffic_once.assert_called_once_with(traffic_profile)
+ mq_producer.tg_method_started.assert_called_once()
+ mq_producer.tg_method_finished.assert_not_called()
+ mq_producer.tg_method_iteration.assert_not_called()
+
class TestRfc2544ResourceHelper(unittest.TestCase):
sut.setup_helper.prox_config_dict = {}
sut._connect_client = mock.Mock(autospec=STLClient)
sut._connect_client.get_stats = mock.Mock(return_value="0")
- sut._traffic_runner(mock_traffic_profile)
+ sut._setup_mq_producer = mock.Mock(return_value='mq_producer')
+ sut._traffic_runner(mock_traffic_profile, mock.ANY)
@mock.patch('yardstick.network_services.vnf_generic.vnf.prox_helpers.socket')
@mock.patch(SSH_HELPER)
mock.mock_open(), create=True)
@mock.patch('yardstick.network_services.vnf_generic.vnf.tg_rfc2544_ixia.LOG.exception')
def _traffic_runner(*args):
- result = sut._traffic_runner(mock_traffic_profile)
+ sut._setup_mq_producer = mock.Mock(return_value='mq_producer')
+ result = sut._traffic_runner(mock_traffic_profile, mock.ANY)
self.assertIsNone(result)
_traffic_runner()
# must generate cfg before we can run traffic so Trex port mapping is
# created
self.sut.resource_helper.generate_cfg()
+ self.sut._setup_mq_producer = mock.Mock()
with mock.patch.object(self.sut.resource_helper, 'run_traffic'):
- self.sut._traffic_runner(mock_traffic_profile)
+ self.sut._traffic_runner(mock_traffic_profile, mock.ANY)
def test__generate_trex_cfg(self):
vnfd = self.VNFD['vnfd:vnfd-catalog']['vnfd'][0]