#
"""Test Chaining functions."""
-from mock_trex import no_op
-
from mock import MagicMock
from mock import patch
import pytest
+from .mock_trex import no_op
+
from nfvbench.chain_runner import ChainRunner
from nfvbench.chaining import ChainException
from nfvbench.chaining import ChainVnfPort
from nfvbench.summarizer import _annotate_chain_stats
from nfvbench.traffic_client import TrafficClient
from nfvbench.traffic_gen.traffic_base import Latency
-from nfvbench.traffic_gen.trex import TRex
-
+from nfvbench.traffic_gen.trex_gen import TRex
+from nfvbench import utils
# just to get rid of the unused function warning
no_op()
config.duration_sec = 2
config.interval_sec = 1
config.openrc_file = "dummy.rc"
+ config.no_flow_stats = False
+ config.no_latency_stats = False
+ config.no_latency_streams = False
+ config.loop_vm_arp = True
return config
def test_chain_runner_ext_no_openstack():
def _mock_find_image(self, image_name):
- return True
+ return MagicMock()
+
+def _mock_waiting_servers_deletion(nova_client, servers):
+ return MagicMock()
@patch.object(Compute, 'find_image', _mock_find_image)
+@patch.object(utils, 'waiting_servers_deletion', _mock_waiting_servers_deletion)
@patch('nfvbench.chaining.Client')
@patch('nfvbench.chaining.neutronclient')
@patch('nfvbench.chaining.glanceclient')
# Test not admin exception with empty value is raised
@patch.object(Compute, 'find_image', _mock_find_image)
+@patch.object(utils, 'waiting_servers_deletion', _mock_waiting_servers_deletion)
@patch('nfvbench.chaining.Client')
@patch('nfvbench.chaining.neutronclient')
@patch('nfvbench.chaining.glanceclient')
runner.close()
def test_pvp_chain_runner_no_admin_no_config_values():
- """Test PVP chain runner."""
+ """Test PVP/mock chain runner."""
cred = MagicMock(spec=nfvbench.credentials.Credentials)
cred.is_admin = False
for shared_net in [True, False]:
# Test not admin with mandatory parameters values in config file
@patch.object(Compute, 'find_image', _mock_find_image)
+@patch.object(utils, 'waiting_servers_deletion', _mock_waiting_servers_deletion)
@patch('nfvbench.chaining.Client')
@patch('nfvbench.chaining.neutronclient')
@patch('nfvbench.chaining.glanceclient')
for scc in [1, 2]:
config = _get_chain_config(ChainType.EXT, scc, shared_net)
config.no_arp = no_arp
+ # this time use a tuple of network names
+ config['external_networks']['left'] = ('ext-lnet00', 'ext-lnet01')
+ config['external_networks']['right'] = ('ext-rnet00', 'ext-rnet01')
if no_arp:
# If EXT and no arp, the config must provide mac addresses (1 pair per chain)
config['traffic_generator']['mac_addrs_left'] = ['00:00:00:00:00:00'] * scc
if l2_loopback:
config.l2_loopback = True
config.vlans = [[100], [200]]
+ if sc == ChainType.EXT:
+ config['external_networks']['left'] = 'ext-lnet'
+ config['external_networks']['right'] = 'ext-rnet'
factory = BasicFactory()
config_plugin = factory.get_config_plugin_class()(config)
config = config_plugin.get_config()
nfvb = NFVBench(config, openstack_spec, config_plugin, factory)
res = nfvb.run({}, 'pytest')
if res['status'] != 'OK':
- print res
+ print(res)
assert res['status'] == 'OK'
@patch.object(Compute, 'find_image', _mock_find_image)
@patch.object(TrafficClient, 'skip_sleep', lambda x: True)
@patch.object(ChainVnfPort, 'get_mac', _mock_get_mac)
+@patch.object(TrafficClient, 'is_udp', lambda x, y: True)
+@patch.object(utils, 'waiting_servers_deletion', _mock_waiting_servers_deletion)
@patch('nfvbench.chaining.Client')
@patch('nfvbench.chaining.neutronclient')
@patch('nfvbench.chaining.glanceclient')
@patch.object(Compute, 'find_image', _mock_find_image)
@patch.object(TrafficClient, 'skip_sleep', lambda x: True)
+@patch.object(TrafficClient, 'is_udp', lambda x, y: True)
@patch('nfvbench.chaining.Client')
@patch('nfvbench.chaining.neutronclient')
@patch('nfvbench.chaining.glanceclient')
@patch.object(Compute, 'find_image', _mock_find_image)
@patch.object(TrafficClient, 'skip_sleep', lambda x: True)
+@patch.object(TrafficClient, 'is_udp', lambda x, y: True)
@patch('nfvbench.chaining.Client')
@patch('nfvbench.chaining.neutronclient')
@patch('nfvbench.chaining.glanceclient')
assert stats == exp_stats
@patch.object(TrafficClient, 'skip_sleep', lambda x: True)
+@patch.object(TrafficClient, 'is_udp', lambda x, y: True)
def test_fixed_rate_no_openstack():
"""Test FIxed Rate run - no openstack."""
config = _get_chain_config(ChainType.EXT, 1, True, rate='100%')