+ def test_calculate_pps(self):
+ r_f_c2544_profile = RFC2544Profile(self.TRAFFIC_PROFILE)
+ r_f_c2544_profile.rate = 100
+ r_f_c2544_profile.pps = 100
+ samples = {'Throughput': 4549093.33}
+ self.assertEqual((2274546.67, 1.0),
+ r_f_c2544_profile.calculate_pps(samples))
+
+ def test_create_single_stream(self):
+ r_f_c2544_profile = RFC2544Profile(self.TRAFFIC_PROFILE)
+ r_f_c2544_profile._create_single_packet = mock.MagicMock()
+ r_f_c2544_profile.pg_id = 1
+ self.assertIsNotNone(
+ r_f_c2544_profile.create_single_stream(64, 2274546.67))
+
+ def test_create_single_stream_no_pg_id(self):
+ r_f_c2544_profile = RFC2544Profile(self.TRAFFIC_PROFILE)
+ r_f_c2544_profile._create_single_packet = mock.MagicMock()
+ r_f_c2544_profile.pg_id = 0
+ self.assertIsNotNone(
+ r_f_c2544_profile.create_single_stream(64, 2274546.67))
+
+ def test_execute_latency(self):
+ traffic_generator = mock.Mock(autospec=TrexProfile)
+ traffic_generator.networks = {
+ "private_0": ["xe0"],
+ "public_0": ["xe1"],
+ }
+ traffic_generator.client = \
+ mock.Mock(return_value=True)
+ r_f_c2544_profile = RFC2544Profile(self.TRAFFIC_PROFILE)
+ r_f_c2544_profile.params = self.PROFILE
+ r_f_c2544_profile.first_run = True
+ samples = {}
+ for ifname in range(1):
+ name = "xe{}".format(ifname)
+ samples[name] = {"rx_throughput_fps": 20,
+ "tx_throughput_fps": 20,
+ "rx_throughput_mbps": 10,
+ "tx_throughput_mbps": 10,
+ "in_packets": 1000,
+ "out_packets": 0}
+
+ samples['Throughput'] = 4549093.33
+ r_f_c2544_profile.calculate_pps = mock.Mock(return_value=[2274546.67,
+ 1.0])
+
+ self.assertEqual(None,
+ r_f_c2544_profile.execute_latency(traffic_generator,
+ samples))
+
+