import unittest
import mock
+from copy import deepcopy
+
from tests.unit import STL_MOCKS
STLClient = mock.MagicMock()
class TestIXIARFC2544Profile(unittest.TestCase):
+
TRAFFIC_PROFILE = {
"schema": "isb:traffic_profile:0.1",
"name": "fixed",
"traffic_type": "FixedTraffic",
"frame_rate": 100, # pps
"flow_number": 10,
- "frame_size": 64}}
+ "frame_size": 64,
+ },
+ }
PROFILE = {'description': 'Traffic profile to run RFC2544 latency',
'name': 'rfc2544',
'traffic_profile': {'traffic_type': 'IXIARFC2544Profile',
'frame_rate': 100},
- 'public': {'ipv4':
+ IXIARFC2544Profile.DOWNLINK: {'ipv4':
{'outer_l2': {'framesize':
{'64B': '100', '1518B': '0',
'128B': '0', '1400B': '0',
'dscp': 0, 'ttl': 32},
'outer_l4': {'srcport': '2001',
'dsrport': '1234'}}},
- 'private': {'ipv4':
+ IXIARFC2544Profile.UPLINK: {'ipv4':
{'outer_l2': {'framesize':
{'64B': '100', '1518B': '0',
'128B': '0', '1400B': '0',
def test_get_ixia_traffic_profile_error(self):
traffic_generator = mock.Mock(autospec=TrexProfile)
traffic_generator.my_ports = [0, 1]
- traffic_generator.priv_ports = [-1]
- traffic_generator.pub_ports = [1]
+ traffic_generator.uplink_ports = [-1]
+ traffic_generator.downlink_ports = [1]
traffic_generator.client = \
mock.Mock(return_value=True)
STATIC_TRAFFIC = {
- "private": {
+ IXIARFC2544Profile.UPLINK: {
"id": 1,
"bidir": "False",
"duration": 60,
},
"traffic_type": "continuous"
},
- "public": {
+ IXIARFC2544Profile.DOWNLINK: {
"id": 2,
"bidir": "False",
"duration": 60,
"dst_mac_0": "00:00:00:00:00:03",
"dst_mac_1": "00:00:00:00:00:04",
"dst_mac_2": "00:00:00:00:00:04"}
- self.assertRaises(IOError, r_f_c2544_profile._get_ixia_traffic_profile,
- self.PROFILE, mac, xfile="tmp",
- static_traffic=STATIC_TRAFFIC)
-
+ result = r_f_c2544_profile._get_ixia_traffic_profile(self.PROFILE, mac)
+ self.assertIsNotNone(result)
- @mock.patch("yardstick.network_services.traffic_profile.ixia_rfc2544.open")
- def test_get_ixia_traffic_profile(self, mock_open):
+ def test_get_ixia_traffic_profile(self):
traffic_generator = mock.Mock(autospec=TrexProfile)
traffic_generator.my_ports = [0, 1]
- traffic_generator.priv_ports = [-1]
- traffic_generator.pub_ports = [1]
+ traffic_generator.uplink_ports = [-1]
+ traffic_generator.downlink_ports = [1]
traffic_generator.client = \
mock.Mock(return_value=True)
STATIC_TRAFFIC = {
- "private": {
+ IXIARFC2544Profile.UPLINK: {
"id": 1,
"bidir": "False",
"duration": 60,
"proto": "udp",
"srcip4": "152.16.40.20",
"ttl": 32,
- "count": "1"
},
"outer_l4": {
"dstport": "2001",
},
"traffic_type": "continuous"
},
- "public": {
+ IXIARFC2544Profile.DOWNLINK: {
"id": 2,
"bidir": "False",
"duration": 60,
"proto": "udp",
"srcip4": "152.16.40.20",
"ttl": 32,
- "count": "1"
},
"outer_l3v6": {
"count": 1024,
"proto": "udp",
"srcip4": "152.16.40.20",
"ttl": 32,
- "count": "1"
},
"outer_l4": {
"dstport": "1234",
"dst_mac_0": "00:00:00:00:00:03",
"dst_mac_1": "00:00:00:00:00:04",
"dst_mac_2": "00:00:00:00:00:04"}
- result = r_f_c2544_profile._get_ixia_traffic_profile(
- self.PROFILE, mac, xfile="tmp", static_traffic=STATIC_TRAFFIC)
+ result = r_f_c2544_profile._get_ixia_traffic_profile(self.PROFILE, mac)
self.assertIsNotNone(result)
@mock.patch("yardstick.network_services.traffic_profile.ixia_rfc2544.open")
- def test_get_ixia_traffic_profile_v6(self, mock_open):
+ def test_get_ixia_traffic_profile_v6(self, *args):
traffic_generator = mock.Mock(autospec=TrexProfile)
traffic_generator.my_ports = [0, 1]
- traffic_generator.priv_ports = [-1]
- traffic_generator.pub_ports = [1]
+ traffic_generator.uplink_ports = [-1]
+ traffic_generator.downlink_ports = [1]
traffic_generator.client = \
mock.Mock(return_value=True)
STATIC_TRAFFIC = {
- "private": {
+ IXIARFC2544Profile.UPLINK: {
"id": 1,
"bidir": "False",
"duration": 60,
},
"traffic_type": "continuous"
},
- "public": {
+ IXIARFC2544Profile.DOWNLINK: {
"id": 2,
"bidir": "False",
"duration": 60,
'traffic_profile':
{'traffic_type': 'IXIARFC2544Profile',
'frame_rate': 100},
- 'public':
+ IXIARFC2544Profile.DOWNLINK:
{'ipv4':
{'outer_l2': {'framesize':
{'64B': '100', '1518B': '0',
'dscp': 0, 'ttl': 32},
'outer_l4': {'srcport': '2001',
'dsrport': '1234'}}},
- 'private': {'ipv4':
+ IXIARFC2544Profile.UPLINK: {'ipv4':
{'outer_l2': {'framesize':
{'64B': '100', '1518B': '0',
'128B': '0', '1400B': '0',
'outer_l4': {'dstport': '2001',
'srcport': '1234'}}},
'schema': 'isb:traffic_profile:0.1'}
- result = r_f_c2544_profile._get_ixia_traffic_profile(
- profile_data, mac, static_traffic=STATIC_TRAFFIC)
+ result = r_f_c2544_profile._get_ixia_traffic_profile(profile_data, mac)
self.assertIsNotNone(result)
+ def test__get_ixia_traffic_profile_default_args(self):
+ r_f_c2544_profile = IXIARFC2544Profile(self.TRAFFIC_PROFILE)
+
+ expected = {}
+ result = r_f_c2544_profile._get_ixia_traffic_profile({})
+ self.assertDictEqual(result, expected)
+
def test__ixia_traffic_generate(self):
traffic_generator = mock.Mock(autospec=TrexProfile)
- traffic_generator.my_ports = [0, 1]
- traffic_generator.priv_ports = [-1]
- traffic_generator.pub_ports = [1]
+ traffic_generator.networks = {
+ "uplink_0": ["xe0"],
+ "downlink_0": ["xe1"],
+ }
traffic_generator.client = \
mock.Mock(return_value=True)
- traffic = {"public": {'iload': 10},
- "private": {'iload': 10}}
+ traffic = {IXIARFC2544Profile.DOWNLINK: {'iload': 10},
+ IXIARFC2544Profile.UPLINK: {'iload': 10}}
ixia_obj = mock.MagicMock()
r_f_c2544_profile = IXIARFC2544Profile(self.TRAFFIC_PROFILE)
r_f_c2544_profile.rate = 100
- result = r_f_c2544_profile._ixia_traffic_generate(traffic_generator,
- traffic, ixia_obj)
+ result = r_f_c2544_profile._ixia_traffic_generate(traffic, ixia_obj)
self.assertIsNone(result)
-
def test_execute(self):
traffic_generator = mock.Mock(autospec=TrexProfile)
- traffic_generator.my_ports = [0, 1]
- traffic_generator.priv_ports = [-1]
- traffic_generator.pub_ports = [1]
+ traffic_generator.networks = {
+ "uplink_0": ["xe0"],
+ "downlink_0": ["xe1"],
+ }
traffic_generator.client = \
mock.Mock(return_value=True)
r_f_c2544_profile = IXIARFC2544Profile(self.TRAFFIC_PROFILE)
r_f_c2544_profile.first_run = True
- r_f_c2544_profile.params = {"public": {'iload': 10},
- "private": {'iload': 10}}
+ r_f_c2544_profile.params = {IXIARFC2544Profile.DOWNLINK: {'iload': 10},
+ IXIARFC2544Profile.UPLINK: {'iload': 10}}
r_f_c2544_profile.get_streams = mock.Mock()
r_f_c2544_profile.full_profile = {}
r_f_c2544_profile.get_multiplier = mock.Mock()
r_f_c2544_profile._ixia_traffic_generate = mock.Mock()
ixia_obj = mock.MagicMock()
- self.assertEqual(None, r_f_c2544_profile.execute(traffic_generator,
- ixia_obj))
+ self.assertEqual(None, r_f_c2544_profile.execute_traffic(traffic_generator, ixia_obj))
- def test_get_drop_percentage(self):
+ def test_update_traffic_profile(self):
traffic_generator = mock.Mock(autospec=TrexProfile)
- traffic_generator.my_ports = [0, 1]
- traffic_generator.priv_ports = [0]
- traffic_generator.pub_ports = [1]
- traffic_generator.client = \
- mock.Mock(return_value=True)
+ traffic_generator.networks = {
+ "uplink_0": ["xe0"], # private, one value for intfs
+ "downlink_0": ["xe1", "xe2"], # public, two values for intfs
+ "downlink_1": ["xe3"], # not in TRAFFIC PROFILE
+ "tenant_0": ["xe4"], # not public or private
+ }
+
+ ports_expected = [8, 3, 5]
+ traffic_generator.vnfd_helper.port_num.side_effect = ports_expected
+ traffic_generator.client.return_value = True
+
+ traffic_profile = deepcopy(self.TRAFFIC_PROFILE)
+ traffic_profile.update({
+ "uplink_0": ["xe0"],
+ "downlink_0": ["xe1", "xe2"],
+ })
+
+ r_f_c2544_profile = IXIARFC2544Profile(traffic_profile)
+ r_f_c2544_profile.full_profile = {}
+ r_f_c2544_profile.get_streams = mock.Mock()
+
+ self.assertIsNone(r_f_c2544_profile.update_traffic_profile(traffic_generator))
+ self.assertEqual(r_f_c2544_profile.ports, ports_expected)
+
+ def test_get_drop_percentage(self):
r_f_c2544_profile = IXIARFC2544Profile(self.TRAFFIC_PROFILE)
r_f_c2544_profile.params = self.PROFILE
ixia_obj = mock.MagicMock()
"out_packets": 1000}
tol_min = 100.0
tolerance = 0.0
- self.assertIsNotNone(r_f_c2544_profile.get_drop_percentage(
- traffic_generator, samples,
- tol_min, tolerance, ixia_obj))
+ self.assertIsNotNone(
+ r_f_c2544_profile.get_drop_percentage(samples, tol_min, tolerance,
+ ixia_obj))
def test_get_drop_percentage_update(self):
- traffic_generator = mock.Mock(autospec=TrexProfile)
- traffic_generator.my_ports = [0, 1]
- traffic_generator.priv_ports = [0]
- traffic_generator.pub_ports = [1]
- traffic_generator.client = \
- mock.Mock(return_value=True)
r_f_c2544_profile = IXIARFC2544Profile(self.TRAFFIC_PROFILE)
r_f_c2544_profile.params = self.PROFILE
ixia_obj = mock.MagicMock()
"out_packets": 1002}
tol_min = 0.0
tolerance = 1.0
- self.assertIsNotNone(r_f_c2544_profile.get_drop_percentage(
- traffic_generator, samples,
- tol_min, tolerance, ixia_obj))
+ self.assertIsNotNone(
+ r_f_c2544_profile.get_drop_percentage(samples, tol_min, tolerance,
+ ixia_obj))
def test_get_drop_percentage_div_zero(self):
- traffic_generator = mock.Mock(autospec=TrexProfile)
- traffic_generator.my_ports = [0, 1]
- traffic_generator.priv_ports = [0]
- traffic_generator.pub_ports = [1]
- traffic_generator.client = \
- mock.Mock(return_value=True)
r_f_c2544_profile = IXIARFC2544Profile(self.TRAFFIC_PROFILE)
r_f_c2544_profile.params = self.PROFILE
ixia_obj = mock.MagicMock()
tol_min = 0.0
tolerance = 0.0
r_f_c2544_profile.tmp_throughput = 0
- self.assertIsNotNone(r_f_c2544_profile.get_drop_percentage(
- traffic_generator, samples,
- tol_min, tolerance, ixia_obj))
+ self.assertIsNotNone(
+ r_f_c2544_profile.get_drop_percentage(samples, tol_min, tolerance,
+ ixia_obj))
def test_get_multiplier(self):
r_f_c2544_profile = IXIARFC2544Profile(self.TRAFFIC_PROFILE)
def test_start_ixia_latency(self):
traffic_generator = mock.Mock(autospec=TrexProfile)
- traffic_generator.my_ports = [0, 1]
- traffic_generator.priv_ports = [0]
- traffic_generator.pub_ports = [1]
+ traffic_generator.networks = {
+ "uplink_0": ["xe0"],
+ "downlink_0": ["xe1"],
+ }
traffic_generator.client = \
mock.Mock(return_value=True)
r_f_c2544_profile = IXIARFC2544Profile(self.TRAFFIC_PROFILE)
r_f_c2544_profile._ixia_traffic_generate = mock.Mock()
self.assertEqual(
None,
- r_f_c2544_profile.start_ixia_latency(traffic_generator,
- ixia_obj))
+ r_f_c2544_profile.start_ixia_latency(traffic_generator, ixia_obj))
if __name__ == '__main__':