- def test_get_drop_percentage(self):
- r_f_c2544_profile = IXIARFC2544Profile(self.TRAFFIC_PROFILE)
- r_f_c2544_profile.params = self.PROFILE
- ixia_obj = mock.MagicMock()
- r_f_c2544_profile.execute = mock.Mock()
- r_f_c2544_profile._get_ixia_traffic_profile = mock.Mock()
- r_f_c2544_profile._ixia_traffic_generate = mock.Mock()
- r_f_c2544_profile.get_multiplier = mock.Mock()
- r_f_c2544_profile.tmp_throughput = 0
- r_f_c2544_profile.tmp_drop = 0
- r_f_c2544_profile.full_profile = {}
- samples = {}
- for ifname in range(1):
- name = "xe{}".format(ifname)
- samples[name] = {"rx_throughput_fps": 20,
- "tx_throughput_fps": 20,
- "rx_throughput_mbps": 10,
- "tx_throughput_mbps": 10,
- "RxThroughput": 10,
- "TxThroughput": 10,
- "in_packets": 1000,
- "out_packets": 1000}
- tol_min = 100.0
- tolerance = 0.0
- self.assertIsNotNone(
- r_f_c2544_profile.get_drop_percentage(samples, tol_min, tolerance,
- ixia_obj))
-
- def test_get_drop_percentage_update(self):
- r_f_c2544_profile = IXIARFC2544Profile(self.TRAFFIC_PROFILE)
- r_f_c2544_profile.params = self.PROFILE
- ixia_obj = mock.MagicMock()
- r_f_c2544_profile.execute = mock.Mock()
- r_f_c2544_profile._get_ixia_traffic_profile = mock.Mock()
- r_f_c2544_profile._ixia_traffic_generate = mock.Mock()
- r_f_c2544_profile.get_multiplier = mock.Mock()
- r_f_c2544_profile.tmp_throughput = 0
- r_f_c2544_profile.tmp_drop = 0
- r_f_c2544_profile.full_profile = {}
- samples = {}
- for ifname in range(1):
- name = "xe{}".format(ifname)
- samples[name] = {"rx_throughput_fps": 20,
- "tx_throughput_fps": 20,
- "rx_throughput_mbps": 10,
- "tx_throughput_mbps": 10,
- "RxThroughput": 10,
- "TxThroughput": 10,
- "in_packets": 1000,
- "out_packets": 1002}
- tol_min = 0.0
- tolerance = 1.0
- self.assertIsNotNone(
- r_f_c2544_profile.get_drop_percentage(samples, tol_min, tolerance,
- ixia_obj))
-
- def test_get_drop_percentage_div_zero(self):
- r_f_c2544_profile = IXIARFC2544Profile(self.TRAFFIC_PROFILE)
- r_f_c2544_profile.params = self.PROFILE
- ixia_obj = mock.MagicMock()
- r_f_c2544_profile.execute = mock.Mock()
- r_f_c2544_profile._get_ixia_traffic_profile = mock.Mock()
- r_f_c2544_profile._ixia_traffic_generate = mock.Mock()
- r_f_c2544_profile.get_multiplier = mock.Mock()
- r_f_c2544_profile.tmp_throughput = 0
- r_f_c2544_profile.tmp_drop = 0
- r_f_c2544_profile.full_profile = {}
- samples = {}
- for ifname in range(1):
- name = "xe{}".format(ifname)
- samples[name] = {"rx_throughput_fps": 20,
- "tx_throughput_fps": 20,
- "rx_throughput_mbps": 10,
- "tx_throughput_mbps": 10,
- "RxThroughput": 10,
- "TxThroughput": 10,
- "in_packets": 1000,
- "out_packets": 0}
- tol_min = 0.0
- tolerance = 0.0
- r_f_c2544_profile.tmp_throughput = 0
- self.assertIsNotNone(
- r_f_c2544_profile.get_drop_percentage(samples, tol_min, tolerance,
- ixia_obj))
-
- def test_get_multiplier(self):
- r_f_c2544_profile = IXIARFC2544Profile(self.TRAFFIC_PROFILE)
- r_f_c2544_profile.max_rate = 100
- r_f_c2544_profile.min_rate = 100
- self.assertEqual("1.0", r_f_c2544_profile.get_multiplier())
-
- def test_start_ixia_latency(self):
- traffic_generator = mock.Mock(autospec=TrexProfile)
- traffic_generator.networks = {
- "uplink_0": ["xe0"],
- "downlink_0": ["xe1"],
- }
- traffic_generator.client = \
- mock.Mock(return_value=True)
- r_f_c2544_profile = IXIARFC2544Profile(self.TRAFFIC_PROFILE)
- r_f_c2544_profile.max_rate = 100
- r_f_c2544_profile.min_rate = 100
- ixia_obj = mock.MagicMock()
- r_f_c2544_profile._get_ixia_traffic_profile = \
- mock.Mock(return_value={})
- r_f_c2544_profile.full_profile = {}
- r_f_c2544_profile._ixia_traffic_generate = mock.Mock()
- self.assertIsNone(
- r_f_c2544_profile.start_ixia_latency(traffic_generator, ixia_obj))
+ def test_get_drop_percentage_completed(self):
+ samples = {'iface_name_1':
+ {'in_packets': 1000, 'out_packets': 1000,
+ 'Store-Forward_Avg_latency_ns': 20,
+ 'Store-Forward_Min_latency_ns': 15,
+ 'Store-Forward_Max_latency_ns': 25},
+ 'iface_name_2':
+ {'in_packets': 1005, 'out_packets': 1007,
+ 'Store-Forward_Avg_latency_ns': 23,
+ 'Store-Forward_Min_latency_ns': 13,
+ 'Store-Forward_Max_latency_ns': 28}
+ }
+ rfc2544_profile = ixia_rfc2544.IXIARFC2544Profile(self.TRAFFIC_PROFILE)
+ completed, samples = rfc2544_profile.get_drop_percentage(
+ samples, 0, 1, 4)
+ self.assertTrue(completed)
+ self.assertEqual(66.9, samples['TxThroughput'])
+ self.assertEqual(66.833, samples['RxThroughput'])
+ self.assertEqual(0.099651, samples['DropPercentage'])
+ self.assertEqual(21.5, samples['latency_ns_avg'])
+ self.assertEqual(14.0, samples['latency_ns_min'])
+ self.assertEqual(26.5, samples['latency_ns_max'])
+
+ def test_get_drop_percentage_over_drop_percentage(self):
+ samples = {'iface_name_1':
+ {'in_packets': 1000, 'out_packets': 1000,
+ 'Store-Forward_Avg_latency_ns': 20,
+ 'Store-Forward_Min_latency_ns': 15,
+ 'Store-Forward_Max_latency_ns': 25},
+ 'iface_name_2':
+ {'in_packets': 1005, 'out_packets': 1007,
+ 'Store-Forward_Avg_latency_ns': 20,
+ 'Store-Forward_Min_latency_ns': 15,
+ 'Store-Forward_Max_latency_ns': 25}
+ }
+ rfc2544_profile = ixia_rfc2544.IXIARFC2544Profile(self.TRAFFIC_PROFILE)
+ rfc2544_profile.rate = 1000
+ completed, samples = rfc2544_profile.get_drop_percentage(
+ samples, 0, 0.05, 4)
+ self.assertFalse(completed)
+ self.assertEqual(66.9, samples['TxThroughput'])
+ self.assertEqual(66.833, samples['RxThroughput'])
+ self.assertEqual(0.099651, samples['DropPercentage'])
+ self.assertEqual(rfc2544_profile.rate, rfc2544_profile.max_rate)
+
+ def test_get_drop_percentage_under_drop_percentage(self):
+ samples = {'iface_name_1':
+ {'in_packets': 1000, 'out_packets': 1000,
+ 'Store-Forward_Avg_latency_ns': 20,
+ 'Store-Forward_Min_latency_ns': 15,
+ 'Store-Forward_Max_latency_ns': 25},
+ 'iface_name_2':
+ {'in_packets': 1005, 'out_packets': 1007,
+ 'Store-Forward_Avg_latency_ns': 20,
+ 'Store-Forward_Min_latency_ns': 15,
+ 'Store-Forward_Max_latency_ns': 25}
+ }
+ rfc2544_profile = ixia_rfc2544.IXIARFC2544Profile(self.TRAFFIC_PROFILE)
+ rfc2544_profile.rate = 1000
+ completed, samples = rfc2544_profile.get_drop_percentage(
+ samples, 0.2, 1, 4)
+ self.assertFalse(completed)
+ self.assertEqual(66.9, samples['TxThroughput'])
+ self.assertEqual(66.833, samples['RxThroughput'])
+ self.assertEqual(0.099651, samples['DropPercentage'])
+ self.assertEqual(rfc2544_profile.rate, rfc2544_profile.min_rate)
+
+ @mock.patch.object(ixia_rfc2544.LOG, 'info')
+ def test_get_drop_percentage_not_flow(self, *args):
+ samples = {'iface_name_1':
+ {'in_packets': 1000, 'out_packets': 0,
+ 'Store-Forward_Avg_latency_ns': 20,
+ 'Store-Forward_Min_latency_ns': 15,
+ 'Store-Forward_Max_latency_ns': 25},
+ 'iface_name_2':
+ {'in_packets': 1005, 'out_packets': 0,
+ 'Store-Forward_Avg_latency_ns': 20,
+ 'Store-Forward_Min_latency_ns': 15,
+ 'Store-Forward_Max_latency_ns': 25}
+ }
+ rfc2544_profile = ixia_rfc2544.IXIARFC2544Profile(self.TRAFFIC_PROFILE)
+ rfc2544_profile.rate = 1000
+ completed, samples = rfc2544_profile.get_drop_percentage(
+ samples, 0.2, 1, 4)
+ self.assertFalse(completed)
+ self.assertEqual(0.0, samples['TxThroughput'])
+ self.assertEqual(66.833, samples['RxThroughput'])
+ self.assertEqual(100, samples['DropPercentage'])
+ self.assertEqual(rfc2544_profile.rate, rfc2544_profile.max_rate)
+
+ def test_get_drop_percentage_first_run(self):
+ samples = {'iface_name_1':
+ {'in_packets': 1000, 'out_packets': 1000,
+ 'Store-Forward_Avg_latency_ns': 20,
+ 'Store-Forward_Min_latency_ns': 15,
+ 'Store-Forward_Max_latency_ns': 25},
+ 'iface_name_2':
+ {'in_packets': 1005, 'out_packets': 1007,
+ 'Store-Forward_Avg_latency_ns': 20,
+ 'Store-Forward_Min_latency_ns': 15,
+ 'Store-Forward_Max_latency_ns': 25}
+ }
+ rfc2544_profile = ixia_rfc2544.IXIARFC2544Profile(self.TRAFFIC_PROFILE)
+ completed, samples = rfc2544_profile.get_drop_percentage(
+ samples, 0, 1, 4, first_run=True)
+ self.assertTrue(completed)
+ self.assertEqual(66.9, samples['TxThroughput'])
+ self.assertEqual(66.833, samples['RxThroughput'])
+ self.assertEqual(0.099651, samples['DropPercentage'])
+ self.assertEqual(33.45, rfc2544_profile.rate)
+
+
+class TestIXIARFC2544PppoeScenarioProfile(unittest.TestCase):
+
+ TRAFFIC_PROFILE = {
+ "schema": "nsb:traffic_profile:0.1",
+ "name": "fixed",
+ "description": "Fixed traffic profile to run UDP traffic",
+ "traffic_profile": {
+ "traffic_type": "FixedTraffic",
+ "frame_rate": 100},
+ 'uplink_0': {'ipv4': {'port': 'xe0', 'id': 1}},
+ 'downlink_0': {'ipv4': {'port': 'xe2', 'id': 2}},
+ 'uplink_1': {'ipv4': {'port': 'xe1', 'id': 3}},
+ 'downlink_1': {'ipv4': {'port': 'xe2', 'id': 4}}
+ }
+
+ def setUp(self):
+ self.ixia_tp = ixia_rfc2544.IXIARFC2544PppoeScenarioProfile(
+ self.TRAFFIC_PROFILE)
+
+ def test___init__(self):
+ self.assertIsInstance(self.ixia_tp.full_profile,
+ collections.OrderedDict)
+
+ def test__get_flow_groups_params(self):
+ expected_tp = collections.OrderedDict([
+ ('uplink_0', {'ipv4': {'id': 1, 'port': 'xe0'}}),
+ ('downlink_0', {'ipv4': {'id': 2, 'port': 'xe2'}}),
+ ('uplink_1', {'ipv4': {'id': 3, 'port': 'xe1'}}),
+ ('downlink_1', {'ipv4': {'id': 4, 'port': 'xe2'}})])
+
+ self.ixia_tp._get_flow_groups_params()
+ self.assertDictEqual(self.ixia_tp.full_profile, expected_tp)