#
"""Test Chaining functions."""
-from mock_trex import no_op
-
from mock import MagicMock
from mock import patch
import pytest
+from .mock_trex import no_op
+
from nfvbench.chain_runner import ChainRunner
from nfvbench.chaining import ChainException
from nfvbench.chaining import ChainVnfPort
from nfvbench.traffic_gen.traffic_base import Latency
from nfvbench.traffic_gen.trex_gen import TRex
-
# just to get rid of the unused function warning
no_op()
config.duration_sec = 2
config.interval_sec = 1
config.openrc_file = "dummy.rc"
+ config.no_flow_stats = False
+ config.no_latency_stats = False
+ config.no_latency_streams = False
return config
def test_chain_runner_ext_no_openstack():
runner.close()
def test_pvp_chain_runner_no_admin_no_config_values():
- """Test PVP chain runner."""
+ """Test PVP/mock chain runner."""
cred = MagicMock(spec=nfvbench.credentials.Credentials)
cred.is_admin = False
for shared_net in [True, False]:
nfvb = NFVBench(config, openstack_spec, config_plugin, factory)
res = nfvb.run({}, 'pytest')
if res['status'] != 'OK':
- print res
+ print(res)
assert res['status'] == 'OK'
@patch.object(Compute, 'find_image', _mock_find_image)
@patch.object(TrafficClient, 'skip_sleep', lambda x: True)
@patch.object(ChainVnfPort, 'get_mac', _mock_get_mac)
+@patch.object(TrafficClient, 'is_udp', lambda x, y: True)
@patch('nfvbench.chaining.Client')
@patch('nfvbench.chaining.neutronclient')
@patch('nfvbench.chaining.glanceclient')
@patch.object(Compute, 'find_image', _mock_find_image)
@patch.object(TrafficClient, 'skip_sleep', lambda x: True)
+@patch.object(TrafficClient, 'is_udp', lambda x, y: True)
@patch('nfvbench.chaining.Client')
@patch('nfvbench.chaining.neutronclient')
@patch('nfvbench.chaining.glanceclient')
@patch.object(Compute, 'find_image', _mock_find_image)
@patch.object(TrafficClient, 'skip_sleep', lambda x: True)
+@patch.object(TrafficClient, 'is_udp', lambda x, y: True)
@patch('nfvbench.chaining.Client')
@patch('nfvbench.chaining.neutronclient')
@patch('nfvbench.chaining.glanceclient')
assert stats == exp_stats
@patch.object(TrafficClient, 'skip_sleep', lambda x: True)
+@patch.object(TrafficClient, 'is_udp', lambda x, y: True)
def test_fixed_rate_no_openstack():
"""Test FIxed Rate run - no openstack."""
config = _get_chain_config(ChainType.EXT, 1, True, rate='100%')