X-Git-Url: https://gerrit.opnfv.org/gerrit/gitweb?a=blobdiff_plain;f=yardstick%2Ftests%2Funit%2Fnetwork_services%2Ftraffic_profile%2Ftest_ixia_rfc2544.py;h=ef16676c78fb3de9d14394ffc7b077cff237d8ae;hb=9667904026c5dcdc5d9abf23a9692332cc58c7ac;hp=a0abe2bbd9fce3ad6ea09331717d03631a8cbfa7;hpb=180b0b4c7b7288bc2fd8e2905858d17d0fefc93a;p=yardstick.git diff --git a/yardstick/tests/unit/network_services/traffic_profile/test_ixia_rfc2544.py b/yardstick/tests/unit/network_services/traffic_profile/test_ixia_rfc2544.py index a0abe2bbd..ef16676c7 100644 --- a/yardstick/tests/unit/network_services/traffic_profile/test_ixia_rfc2544.py +++ b/yardstick/tests/unit/network_services/traffic_profile/test_ixia_rfc2544.py @@ -11,25 +11,15 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. -# - -import unittest -import mock -from copy import deepcopy +import copy -from yardstick.tests import STL_MOCKS - -STLClient = mock.MagicMock() -stl_patch = mock.patch.dict("sys.modules", STL_MOCKS) -stl_patch.start() +import mock +import unittest +import collections -if stl_patch: - from yardstick.network_services.traffic_profile.trex_traffic_profile \ - import TrexProfile - from yardstick.network_services.traffic_profile.ixia_rfc2544 import \ - IXIARFC2544Profile - from yardstick.network_services.traffic_profile import ixia_rfc2544 +from yardstick.network_services.traffic_profile import ixia_rfc2544 +from yardstick.network_services.traffic_profile import trex_traffic_profile class TestIXIARFC2544Profile(unittest.TestCase): @@ -52,7 +42,7 @@ class TestIXIARFC2544Profile(unittest.TestCase): 'traffic_profile': { 'traffic_type': 'IXIARFC2544Profile', 'frame_rate': 100}, - IXIARFC2544Profile.DOWNLINK: { + ixia_rfc2544.IXIARFC2544Profile.DOWNLINK: { 'ipv4': { 'outer_l2': { 'framesize': { @@ -73,7 +63,7 @@ class TestIXIARFC2544Profile(unittest.TestCase): 'outer_l4': { 'srcport': '2001', 'dsrport': '1234'}}}, - IXIARFC2544Profile.UPLINK: { + ixia_rfc2544.IXIARFC2544Profile.UPLINK: { 'ipv4': { 'outer_l2': { 'framesize': { @@ -97,14 +87,15 @@ class TestIXIARFC2544Profile(unittest.TestCase): 'schema': 'isb:traffic_profile:0.1'} def test_get_ixia_traffic_profile_error(self): - traffic_generator = mock.Mock(autospec=TrexProfile) + traffic_generator = mock.Mock( + autospec=trex_traffic_profile.TrexProfile) traffic_generator.my_ports = [0, 1] traffic_generator.uplink_ports = [-1] traffic_generator.downlink_ports = [1] traffic_generator.client = \ mock.Mock(return_value=True) STATIC_TRAFFIC = { - IXIARFC2544Profile.UPLINK: { + ixia_rfc2544.IXIARFC2544Profile.UPLINK: { "id": 1, "bidir": "False", "duration": 60, @@ -143,7 +134,7 @@ class TestIXIARFC2544Profile(unittest.TestCase): }, "traffic_type": "continuous" }, - IXIARFC2544Profile.DOWNLINK: { + ixia_rfc2544.IXIARFC2544Profile.DOWNLINK: { "id": 2, "bidir": "False", "duration": 60, @@ -187,7 +178,8 @@ class TestIXIARFC2544Profile(unittest.TestCase): } ixia_rfc2544.STATIC_TRAFFIC = STATIC_TRAFFIC - r_f_c2544_profile = IXIARFC2544Profile(self.TRAFFIC_PROFILE) + r_f_c2544_profile = ixia_rfc2544.IXIARFC2544Profile( + self.TRAFFIC_PROFILE) r_f_c2544_profile.rate = 100 mac = {"src_mac_0": "00:00:00:00:00:01", "src_mac_1": "00:00:00:00:00:02", @@ -199,14 +191,15 @@ class TestIXIARFC2544Profile(unittest.TestCase): self.assertIsNotNone(result) def test_get_ixia_traffic_profile(self): - traffic_generator = mock.Mock(autospec=TrexProfile) + traffic_generator = mock.Mock( + autospec=trex_traffic_profile.TrexProfile) traffic_generator.my_ports = [0, 1] traffic_generator.uplink_ports = [-1] traffic_generator.downlink_ports = [1] traffic_generator.client = \ mock.Mock(return_value=True) STATIC_TRAFFIC = { - IXIARFC2544Profile.UPLINK: { + ixia_rfc2544.IXIARFC2544Profile.UPLINK: { "id": 1, "bidir": "False", "duration": 60, @@ -246,7 +239,7 @@ class TestIXIARFC2544Profile(unittest.TestCase): }, "traffic_type": "continuous" }, - IXIARFC2544Profile.DOWNLINK: { + ixia_rfc2544.IXIARFC2544Profile.DOWNLINK: { "id": 2, "bidir": "False", "duration": 60, @@ -289,7 +282,8 @@ class TestIXIARFC2544Profile(unittest.TestCase): } ixia_rfc2544.STATIC_TRAFFIC = STATIC_TRAFFIC - r_f_c2544_profile = IXIARFC2544Profile(self.TRAFFIC_PROFILE) + r_f_c2544_profile = ixia_rfc2544.IXIARFC2544Profile( + self.TRAFFIC_PROFILE) r_f_c2544_profile.rate = 100 mac = {"src_mac_0": "00:00:00:00:00:01", "src_mac_1": "00:00:00:00:00:02", @@ -302,14 +296,15 @@ class TestIXIARFC2544Profile(unittest.TestCase): @mock.patch("yardstick.network_services.traffic_profile.ixia_rfc2544.open") def test_get_ixia_traffic_profile_v6(self, *args): - traffic_generator = mock.Mock(autospec=TrexProfile) + traffic_generator = mock.Mock( + autospec=trex_traffic_profile.TrexProfile) traffic_generator.my_ports = [0, 1] traffic_generator.uplink_ports = [-1] traffic_generator.downlink_ports = [1] traffic_generator.client = \ mock.Mock(return_value=True) STATIC_TRAFFIC = { - IXIARFC2544Profile.UPLINK: { + ixia_rfc2544.IXIARFC2544Profile.UPLINK: { "id": 1, "bidir": "False", "duration": 60, @@ -348,7 +343,7 @@ class TestIXIARFC2544Profile(unittest.TestCase): }, "traffic_type": "continuous" }, - IXIARFC2544Profile.DOWNLINK: { + ixia_rfc2544.IXIARFC2544Profile.DOWNLINK: { "id": 2, "bidir": "False", "duration": 60, @@ -392,7 +387,8 @@ class TestIXIARFC2544Profile(unittest.TestCase): } ixia_rfc2544.STATIC_TRAFFIC = STATIC_TRAFFIC - r_f_c2544_profile = IXIARFC2544Profile(self.TRAFFIC_PROFILE) + r_f_c2544_profile = ixia_rfc2544.IXIARFC2544Profile( + self.TRAFFIC_PROFILE) r_f_c2544_profile.rate = 100 mac = {"src_mac_0": "00:00:00:00:00:01", "src_mac_1": "00:00:00:00:00:02", @@ -405,7 +401,7 @@ class TestIXIARFC2544Profile(unittest.TestCase): 'traffic_profile': {'traffic_type': 'IXIARFC2544Profile', 'frame_rate': 100}, - IXIARFC2544Profile.DOWNLINK: + ixia_rfc2544.IXIARFC2544Profile.DOWNLINK: {'ipv4': {'outer_l2': {'framesize': {'64B': '100', '1518B': '0', @@ -422,7 +418,7 @@ class TestIXIARFC2544Profile(unittest.TestCase): 'dscp': 0, 'ttl': 32}, 'outer_l4': {'srcport': '2001', 'dsrport': '1234'}}}, - IXIARFC2544Profile.UPLINK: {'ipv4': + ixia_rfc2544.IXIARFC2544Profile.UPLINK: {'ipv4': {'outer_l2': {'framesize': {'64B': '100', '1518B': '0', '128B': '0', '1400B': '0', @@ -445,52 +441,111 @@ class TestIXIARFC2544Profile(unittest.TestCase): result = r_f_c2544_profile._get_ixia_traffic_profile(profile_data, mac) self.assertIsNotNone(result) + def test__init__(self): + t_profile_data = copy.deepcopy(self.TRAFFIC_PROFILE) + t_profile_data['traffic_profile']['frame_rate'] = 12345678 + r_f_c2544_profile = ixia_rfc2544.IXIARFC2544Profile(t_profile_data) + self.assertEqual(12345678, r_f_c2544_profile.rate) + + def test__get_ip_and_mask_range(self): + ip_range = '1.2.0.2-1.2.255.254' + r_f_c2544_profile = ixia_rfc2544.IXIARFC2544Profile( + self.TRAFFIC_PROFILE) + ip, mask = r_f_c2544_profile._get_ip_and_mask(ip_range) + self.assertEqual('1.2.0.2', ip) + self.assertEqual(16, mask) + + def test__get_ip_and_mask_single(self): + ip_range = '192.168.1.10' + r_f_c2544_profile = ixia_rfc2544.IXIARFC2544Profile( + self.TRAFFIC_PROFILE) + ip, mask = r_f_c2544_profile._get_ip_and_mask(ip_range) + self.assertEqual('192.168.1.10', ip) + self.assertIsNone(mask) + + def test__get_fixed_and_mask_range(self): + fixed_mask = '8-48' + r_f_c2544_profile = ixia_rfc2544.IXIARFC2544Profile( + self.TRAFFIC_PROFILE) + fixed, mask = r_f_c2544_profile._get_fixed_and_mask(fixed_mask) + self.assertEqual(8, fixed) + self.assertEqual(48, mask) + + def test__get_fixed_and_mask_single(self): + fixed_mask = 1234 + r_f_c2544_profile = ixia_rfc2544.IXIARFC2544Profile( + self.TRAFFIC_PROFILE) + fixed, mask = r_f_c2544_profile._get_fixed_and_mask(fixed_mask) + self.assertEqual(1234, fixed) + self.assertEqual(0, mask) + def test__get_ixia_traffic_profile_default_args(self): - r_f_c2544_profile = IXIARFC2544Profile(self.TRAFFIC_PROFILE) + r_f_c2544_profile = ixia_rfc2544.IXIARFC2544Profile( + self.TRAFFIC_PROFILE) expected = {} result = r_f_c2544_profile._get_ixia_traffic_profile({}) self.assertDictEqual(result, expected) def test__ixia_traffic_generate(self): - traffic_generator = mock.Mock(autospec=TrexProfile) + traffic_generator = mock.Mock( + autospec=trex_traffic_profile.TrexProfile) traffic_generator.networks = { "uplink_0": ["xe0"], "downlink_0": ["xe1"], } traffic_generator.client = \ mock.Mock(return_value=True) - traffic = {IXIARFC2544Profile.DOWNLINK: {'iload': 10}, - IXIARFC2544Profile.UPLINK: {'iload': 10}} + traffic = {ixia_rfc2544.IXIARFC2544Profile.DOWNLINK: {'iload': 10}, + ixia_rfc2544.IXIARFC2544Profile.UPLINK: {'iload': 10}} ixia_obj = mock.MagicMock() - r_f_c2544_profile = IXIARFC2544Profile(self.TRAFFIC_PROFILE) + r_f_c2544_profile = ixia_rfc2544.IXIARFC2544Profile( + self.TRAFFIC_PROFILE) r_f_c2544_profile.rate = 100 result = r_f_c2544_profile._ixia_traffic_generate(traffic, ixia_obj) self.assertIsNone(result) - def test_execute(self): - traffic_generator = mock.Mock(autospec=TrexProfile) - traffic_generator.networks = { - "uplink_0": ["xe0"], - "downlink_0": ["xe1"], - } - traffic_generator.client = \ - mock.Mock(return_value=True) - r_f_c2544_profile = IXIARFC2544Profile(self.TRAFFIC_PROFILE) - r_f_c2544_profile.first_run = True - r_f_c2544_profile.params = {IXIARFC2544Profile.DOWNLINK: {'iload': 10}, - IXIARFC2544Profile.UPLINK: {'iload': 10}} + def test_execute_traffic_first_run(self): + rfc2544_profile = ixia_rfc2544.IXIARFC2544Profile(self.TRAFFIC_PROFILE) + rfc2544_profile.first_run = True + rfc2544_profile.rate = 50 + with mock.patch.object(rfc2544_profile, '_get_ixia_traffic_profile') \ + as mock_get_tp, \ + mock.patch.object(rfc2544_profile, '_ixia_traffic_generate') \ + as mock_tgenerate: + mock_get_tp.return_value = 'fake_tprofile' + output = rfc2544_profile.execute_traffic(mock.ANY, + ixia_obj=mock.ANY) - r_f_c2544_profile.get_streams = mock.Mock() - r_f_c2544_profile.full_profile = {} - r_f_c2544_profile._get_ixia_traffic_profile = mock.Mock() - r_f_c2544_profile.get_multiplier = mock.Mock() - r_f_c2544_profile._ixia_traffic_generate = mock.Mock() - ixia_obj = mock.MagicMock() - self.assertIsNone(r_f_c2544_profile.execute_traffic(traffic_generator, ixia_obj)) + self.assertTrue(output) + self.assertFalse(rfc2544_profile.first_run) + self.assertEqual(50, rfc2544_profile.max_rate) + self.assertEqual(0, rfc2544_profile.min_rate) + mock_get_tp.assert_called_once() + mock_tgenerate.assert_called_once() + + def test_execute_traffic_not_first_run(self): + rfc2544_profile = ixia_rfc2544.IXIARFC2544Profile(self.TRAFFIC_PROFILE) + rfc2544_profile.first_run = False + rfc2544_profile.max_rate = 70 + rfc2544_profile.min_rate = 0 + with mock.patch.object(rfc2544_profile, '_get_ixia_traffic_profile') \ + as mock_get_tp, \ + mock.patch.object(rfc2544_profile, '_ixia_traffic_generate') \ + as mock_tgenerate: + mock_get_tp.return_value = 'fake_tprofile' + rfc2544_profile.full_profile = mock.ANY + output = rfc2544_profile.execute_traffic(mock.ANY, + ixia_obj=mock.ANY) + + self.assertFalse(output) + self.assertEqual(35.0, rfc2544_profile.rate) + mock_get_tp.assert_called_once() + mock_tgenerate.assert_called_once() def test_update_traffic_profile(self): - traffic_generator = mock.Mock(autospec=TrexProfile) + traffic_generator = mock.Mock( + autospec=trex_traffic_profile.TrexProfile) traffic_generator.networks = { "uplink_0": ["xe0"], # private, one value for intfs "downlink_0": ["xe1", "xe2"], # public, two values for intfs @@ -502,13 +557,13 @@ class TestIXIARFC2544Profile(unittest.TestCase): traffic_generator.vnfd_helper.port_num.side_effect = ports_expected traffic_generator.client.return_value = True - traffic_profile = deepcopy(self.TRAFFIC_PROFILE) + traffic_profile = copy.deepcopy(self.TRAFFIC_PROFILE) traffic_profile.update({ "uplink_0": ["xe0"], "downlink_0": ["xe1", "xe2"], }) - r_f_c2544_profile = IXIARFC2544Profile(traffic_profile) + r_f_c2544_profile = ixia_rfc2544.IXIARFC2544Profile(traffic_profile) r_f_c2544_profile.full_profile = {} r_f_c2544_profile.get_streams = mock.Mock() @@ -516,112 +571,147 @@ class TestIXIARFC2544Profile(unittest.TestCase): r_f_c2544_profile.update_traffic_profile(traffic_generator)) self.assertEqual(r_f_c2544_profile.ports, ports_expected) - def test_get_drop_percentage(self): - r_f_c2544_profile = IXIARFC2544Profile(self.TRAFFIC_PROFILE) - r_f_c2544_profile.params = self.PROFILE - ixia_obj = mock.MagicMock() - r_f_c2544_profile.execute = mock.Mock() - r_f_c2544_profile._get_ixia_traffic_profile = mock.Mock() - r_f_c2544_profile._ixia_traffic_generate = mock.Mock() - r_f_c2544_profile.get_multiplier = mock.Mock() - r_f_c2544_profile.tmp_throughput = 0 - r_f_c2544_profile.tmp_drop = 0 - r_f_c2544_profile.full_profile = {} - samples = {} - for ifname in range(1): - name = "xe{}".format(ifname) - samples[name] = {"rx_throughput_fps": 20, - "tx_throughput_fps": 20, - "rx_throughput_mbps": 10, - "tx_throughput_mbps": 10, - "RxThroughput": 10, - "TxThroughput": 10, - "in_packets": 1000, - "out_packets": 1000} - tol_min = 100.0 - tolerance = 0.0 - self.assertIsNotNone( - r_f_c2544_profile.get_drop_percentage(samples, tol_min, tolerance, - ixia_obj)) - - def test_get_drop_percentage_update(self): - r_f_c2544_profile = IXIARFC2544Profile(self.TRAFFIC_PROFILE) - r_f_c2544_profile.params = self.PROFILE - ixia_obj = mock.MagicMock() - r_f_c2544_profile.execute = mock.Mock() - r_f_c2544_profile._get_ixia_traffic_profile = mock.Mock() - r_f_c2544_profile._ixia_traffic_generate = mock.Mock() - r_f_c2544_profile.get_multiplier = mock.Mock() - r_f_c2544_profile.tmp_throughput = 0 - r_f_c2544_profile.tmp_drop = 0 - r_f_c2544_profile.full_profile = {} - samples = {} - for ifname in range(1): - name = "xe{}".format(ifname) - samples[name] = {"rx_throughput_fps": 20, - "tx_throughput_fps": 20, - "rx_throughput_mbps": 10, - "tx_throughput_mbps": 10, - "RxThroughput": 10, - "TxThroughput": 10, - "in_packets": 1000, - "out_packets": 1002} - tol_min = 0.0 - tolerance = 1.0 - self.assertIsNotNone( - r_f_c2544_profile.get_drop_percentage(samples, tol_min, tolerance, - ixia_obj)) - - def test_get_drop_percentage_div_zero(self): - r_f_c2544_profile = IXIARFC2544Profile(self.TRAFFIC_PROFILE) - r_f_c2544_profile.params = self.PROFILE - ixia_obj = mock.MagicMock() - r_f_c2544_profile.execute = mock.Mock() - r_f_c2544_profile._get_ixia_traffic_profile = mock.Mock() - r_f_c2544_profile._ixia_traffic_generate = mock.Mock() - r_f_c2544_profile.get_multiplier = mock.Mock() - r_f_c2544_profile.tmp_throughput = 0 - r_f_c2544_profile.tmp_drop = 0 - r_f_c2544_profile.full_profile = {} - samples = {} - for ifname in range(1): - name = "xe{}".format(ifname) - samples[name] = {"rx_throughput_fps": 20, - "tx_throughput_fps": 20, - "rx_throughput_mbps": 10, - "tx_throughput_mbps": 10, - "RxThroughput": 10, - "TxThroughput": 10, - "in_packets": 1000, - "out_packets": 0} - tol_min = 0.0 - tolerance = 0.0 - r_f_c2544_profile.tmp_throughput = 0 - self.assertIsNotNone( - r_f_c2544_profile.get_drop_percentage(samples, tol_min, tolerance, - ixia_obj)) - - def test_get_multiplier(self): - r_f_c2544_profile = IXIARFC2544Profile(self.TRAFFIC_PROFILE) - r_f_c2544_profile.max_rate = 100 - r_f_c2544_profile.min_rate = 100 - self.assertEqual("1.0", r_f_c2544_profile.get_multiplier()) - - def test_start_ixia_latency(self): - traffic_generator = mock.Mock(autospec=TrexProfile) - traffic_generator.networks = { - "uplink_0": ["xe0"], - "downlink_0": ["xe1"], - } - traffic_generator.client = \ - mock.Mock(return_value=True) - r_f_c2544_profile = IXIARFC2544Profile(self.TRAFFIC_PROFILE) - r_f_c2544_profile.max_rate = 100 - r_f_c2544_profile.min_rate = 100 - ixia_obj = mock.MagicMock() - r_f_c2544_profile._get_ixia_traffic_profile = \ - mock.Mock(return_value={}) - r_f_c2544_profile.full_profile = {} - r_f_c2544_profile._ixia_traffic_generate = mock.Mock() - self.assertIsNone( - r_f_c2544_profile.start_ixia_latency(traffic_generator, ixia_obj)) + def test_get_drop_percentage_completed(self): + samples = {'iface_name_1': + {'in_packets': 1000, 'out_packets': 1000, + 'Store-Forward_Avg_latency_ns': 20, + 'Store-Forward_Min_latency_ns': 15, + 'Store-Forward_Max_latency_ns': 25}, + 'iface_name_2': + {'in_packets': 1005, 'out_packets': 1007, + 'Store-Forward_Avg_latency_ns': 23, + 'Store-Forward_Min_latency_ns': 13, + 'Store-Forward_Max_latency_ns': 28} + } + rfc2544_profile = ixia_rfc2544.IXIARFC2544Profile(self.TRAFFIC_PROFILE) + completed, samples = rfc2544_profile.get_drop_percentage( + samples, 0, 1, 4) + self.assertTrue(completed) + self.assertEqual(66.9, samples['TxThroughput']) + self.assertEqual(66.833, samples['RxThroughput']) + self.assertEqual(0.099651, samples['DropPercentage']) + self.assertEqual(21.5, samples['latency_ns_avg']) + self.assertEqual(14.0, samples['latency_ns_min']) + self.assertEqual(26.5, samples['latency_ns_max']) + + def test_get_drop_percentage_over_drop_percentage(self): + samples = {'iface_name_1': + {'in_packets': 1000, 'out_packets': 1000, + 'Store-Forward_Avg_latency_ns': 20, + 'Store-Forward_Min_latency_ns': 15, + 'Store-Forward_Max_latency_ns': 25}, + 'iface_name_2': + {'in_packets': 1005, 'out_packets': 1007, + 'Store-Forward_Avg_latency_ns': 20, + 'Store-Forward_Min_latency_ns': 15, + 'Store-Forward_Max_latency_ns': 25} + } + rfc2544_profile = ixia_rfc2544.IXIARFC2544Profile(self.TRAFFIC_PROFILE) + rfc2544_profile.rate = 1000 + completed, samples = rfc2544_profile.get_drop_percentage( + samples, 0, 0.05, 4) + self.assertFalse(completed) + self.assertEqual(66.9, samples['TxThroughput']) + self.assertEqual(66.833, samples['RxThroughput']) + self.assertEqual(0.099651, samples['DropPercentage']) + self.assertEqual(rfc2544_profile.rate, rfc2544_profile.max_rate) + + def test_get_drop_percentage_under_drop_percentage(self): + samples = {'iface_name_1': + {'in_packets': 1000, 'out_packets': 1000, + 'Store-Forward_Avg_latency_ns': 20, + 'Store-Forward_Min_latency_ns': 15, + 'Store-Forward_Max_latency_ns': 25}, + 'iface_name_2': + {'in_packets': 1005, 'out_packets': 1007, + 'Store-Forward_Avg_latency_ns': 20, + 'Store-Forward_Min_latency_ns': 15, + 'Store-Forward_Max_latency_ns': 25} + } + rfc2544_profile = ixia_rfc2544.IXIARFC2544Profile(self.TRAFFIC_PROFILE) + rfc2544_profile.rate = 1000 + completed, samples = rfc2544_profile.get_drop_percentage( + samples, 0.2, 1, 4) + self.assertFalse(completed) + self.assertEqual(66.9, samples['TxThroughput']) + self.assertEqual(66.833, samples['RxThroughput']) + self.assertEqual(0.099651, samples['DropPercentage']) + self.assertEqual(rfc2544_profile.rate, rfc2544_profile.min_rate) + + @mock.patch.object(ixia_rfc2544.LOG, 'info') + def test_get_drop_percentage_not_flow(self, *args): + samples = {'iface_name_1': + {'in_packets': 1000, 'out_packets': 0, + 'Store-Forward_Avg_latency_ns': 20, + 'Store-Forward_Min_latency_ns': 15, + 'Store-Forward_Max_latency_ns': 25}, + 'iface_name_2': + {'in_packets': 1005, 'out_packets': 0, + 'Store-Forward_Avg_latency_ns': 20, + 'Store-Forward_Min_latency_ns': 15, + 'Store-Forward_Max_latency_ns': 25} + } + rfc2544_profile = ixia_rfc2544.IXIARFC2544Profile(self.TRAFFIC_PROFILE) + rfc2544_profile.rate = 1000 + completed, samples = rfc2544_profile.get_drop_percentage( + samples, 0.2, 1, 4) + self.assertFalse(completed) + self.assertEqual(0.0, samples['TxThroughput']) + self.assertEqual(66.833, samples['RxThroughput']) + self.assertEqual(100, samples['DropPercentage']) + self.assertEqual(rfc2544_profile.rate, rfc2544_profile.max_rate) + + def test_get_drop_percentage_first_run(self): + samples = {'iface_name_1': + {'in_packets': 1000, 'out_packets': 1000, + 'Store-Forward_Avg_latency_ns': 20, + 'Store-Forward_Min_latency_ns': 15, + 'Store-Forward_Max_latency_ns': 25}, + 'iface_name_2': + {'in_packets': 1005, 'out_packets': 1007, + 'Store-Forward_Avg_latency_ns': 20, + 'Store-Forward_Min_latency_ns': 15, + 'Store-Forward_Max_latency_ns': 25} + } + rfc2544_profile = ixia_rfc2544.IXIARFC2544Profile(self.TRAFFIC_PROFILE) + completed, samples = rfc2544_profile.get_drop_percentage( + samples, 0, 1, 4, first_run=True) + self.assertTrue(completed) + self.assertEqual(66.9, samples['TxThroughput']) + self.assertEqual(66.833, samples['RxThroughput']) + self.assertEqual(0.099651, samples['DropPercentage']) + self.assertEqual(33.45, rfc2544_profile.rate) + + +class TestIXIARFC2544PppoeScenarioProfile(unittest.TestCase): + + TRAFFIC_PROFILE = { + "schema": "nsb:traffic_profile:0.1", + "name": "fixed", + "description": "Fixed traffic profile to run UDP traffic", + "traffic_profile": { + "traffic_type": "FixedTraffic", + "frame_rate": 100}, + 'uplink_0': {'ipv4': {'port': 'xe0', 'id': 1}}, + 'downlink_0': {'ipv4': {'port': 'xe2', 'id': 2}}, + 'uplink_1': {'ipv4': {'port': 'xe1', 'id': 3}}, + 'downlink_1': {'ipv4': {'port': 'xe2', 'id': 4}} + } + + def setUp(self): + self.ixia_tp = ixia_rfc2544.IXIARFC2544PppoeScenarioProfile( + self.TRAFFIC_PROFILE) + + def test___init__(self): + self.assertIsInstance(self.ixia_tp.full_profile, + collections.OrderedDict) + + def test__get_flow_groups_params(self): + expected_tp = collections.OrderedDict([ + ('uplink_0', {'ipv4': {'id': 1, 'port': 'xe0'}}), + ('downlink_0', {'ipv4': {'id': 2, 'port': 'xe2'}}), + ('uplink_1', {'ipv4': {'id': 3, 'port': 'xe1'}}), + ('downlink_1', {'ipv4': {'id': 4, 'port': 'xe2'}})]) + + self.ixia_tp._get_flow_groups_params() + self.assertDictEqual(self.ixia_tp.full_profile, expected_tp)