# See the License for the specific language governing permissions and
# limitations under the License.
-from copy import deepcopy
+import copy
import mock
import unittest
+import collections
from yardstick.network_services.traffic_profile import ixia_rfc2544
from yardstick.network_services.traffic_profile import trex_traffic_profile
result = r_f_c2544_profile._get_ixia_traffic_profile(profile_data, mac)
self.assertIsNotNone(result)
+ def test__init__(self):
+ t_profile_data = copy.deepcopy(self.TRAFFIC_PROFILE)
+ t_profile_data['traffic_profile']['frame_rate'] = 12345678
+ r_f_c2544_profile = ixia_rfc2544.IXIARFC2544Profile(t_profile_data)
+ self.assertEqual(12345678, r_f_c2544_profile.rate)
+
+ def test__get_ip_and_mask_range(self):
+ ip_range = '1.2.0.2-1.2.255.254'
+ r_f_c2544_profile = ixia_rfc2544.IXIARFC2544Profile(
+ self.TRAFFIC_PROFILE)
+ ip, mask = r_f_c2544_profile._get_ip_and_mask(ip_range)
+ self.assertEqual('1.2.0.2', ip)
+ self.assertEqual(16, mask)
+
+ def test__get_ip_and_mask_single(self):
+ ip_range = '192.168.1.10'
+ r_f_c2544_profile = ixia_rfc2544.IXIARFC2544Profile(
+ self.TRAFFIC_PROFILE)
+ ip, mask = r_f_c2544_profile._get_ip_and_mask(ip_range)
+ self.assertEqual('192.168.1.10', ip)
+ self.assertIsNone(mask)
+
+ def test__get_fixed_and_mask_range(self):
+ fixed_mask = '8-48'
+ r_f_c2544_profile = ixia_rfc2544.IXIARFC2544Profile(
+ self.TRAFFIC_PROFILE)
+ fixed, mask = r_f_c2544_profile._get_fixed_and_mask(fixed_mask)
+ self.assertEqual(8, fixed)
+ self.assertEqual(48, mask)
+
+ def test__get_fixed_and_mask_single(self):
+ fixed_mask = 1234
+ r_f_c2544_profile = ixia_rfc2544.IXIARFC2544Profile(
+ self.TRAFFIC_PROFILE)
+ fixed, mask = r_f_c2544_profile._get_fixed_and_mask(fixed_mask)
+ self.assertEqual(1234, fixed)
+ self.assertEqual(0, mask)
+
def test__get_ixia_traffic_profile_default_args(self):
r_f_c2544_profile = ixia_rfc2544.IXIARFC2544Profile(
self.TRAFFIC_PROFILE)
with mock.patch.object(rfc2544_profile, '_get_ixia_traffic_profile') \
as mock_get_tp, \
mock.patch.object(rfc2544_profile, '_ixia_traffic_generate') \
- as mock_tgenerate, \
- mock.patch.object(rfc2544_profile, 'update_traffic_profile') \
- as mock_update_tp:
+ as mock_tgenerate:
mock_get_tp.return_value = 'fake_tprofile'
output = rfc2544_profile.execute_traffic(mock.ANY,
ixia_obj=mock.ANY)
self.assertEqual(0, rfc2544_profile.min_rate)
mock_get_tp.assert_called_once()
mock_tgenerate.assert_called_once()
- mock_update_tp.assert_called_once()
def test_execute_traffic_not_first_run(self):
rfc2544_profile = ixia_rfc2544.IXIARFC2544Profile(self.TRAFFIC_PROFILE)
traffic_generator.vnfd_helper.port_num.side_effect = ports_expected
traffic_generator.client.return_value = True
- traffic_profile = deepcopy(self.TRAFFIC_PROFILE)
+ traffic_profile = copy.deepcopy(self.TRAFFIC_PROFILE)
traffic_profile.update({
"uplink_0": ["xe0"],
"downlink_0": ["xe1", "xe2"],
def test_get_drop_percentage_completed(self):
samples = {'iface_name_1':
- {'RxThroughput': 10, 'TxThroughput': 10,
- 'in_packets': 1000, 'out_packets': 1000},
+ {'in_packets': 1000, 'out_packets': 1000,
+ 'Store-Forward_Avg_latency_ns': 20,
+ 'Store-Forward_Min_latency_ns': 15,
+ 'Store-Forward_Max_latency_ns': 25},
'iface_name_2':
- {'RxThroughput': 11, 'TxThroughput': 13,
- 'in_packets': 1005, 'out_packets': 1007}
+ {'in_packets': 1005, 'out_packets': 1007,
+ 'Store-Forward_Avg_latency_ns': 23,
+ 'Store-Forward_Min_latency_ns': 13,
+ 'Store-Forward_Max_latency_ns': 28}
}
rfc2544_profile = ixia_rfc2544.IXIARFC2544Profile(self.TRAFFIC_PROFILE)
- completed, samples = rfc2544_profile.get_drop_percentage(samples, 0, 1)
+ completed, samples = rfc2544_profile.get_drop_percentage(
+ samples, 0, 1, 4)
self.assertTrue(completed)
- self.assertEqual(23.0, samples['TxThroughput'])
- self.assertEqual(21.0, samples['RxThroughput'])
- self.assertEqual(0.1, samples['DropPercentage'])
+ self.assertEqual(66.9, samples['TxThroughput'])
+ self.assertEqual(66.833, samples['RxThroughput'])
+ self.assertEqual(0.099651, samples['DropPercentage'])
+ self.assertEqual(21.5, samples['latency_ns_avg'])
+ self.assertEqual(14.0, samples['latency_ns_min'])
+ self.assertEqual(26.5, samples['latency_ns_max'])
def test_get_drop_percentage_over_drop_percentage(self):
samples = {'iface_name_1':
- {'RxThroughput': 10, 'TxThroughput': 10,
- 'in_packets': 1000, 'out_packets': 1000},
+ {'in_packets': 1000, 'out_packets': 1000,
+ 'Store-Forward_Avg_latency_ns': 20,
+ 'Store-Forward_Min_latency_ns': 15,
+ 'Store-Forward_Max_latency_ns': 25},
'iface_name_2':
- {'RxThroughput': 11, 'TxThroughput': 13,
- 'in_packets': 1005, 'out_packets': 1007}
+ {'in_packets': 1005, 'out_packets': 1007,
+ 'Store-Forward_Avg_latency_ns': 20,
+ 'Store-Forward_Min_latency_ns': 15,
+ 'Store-Forward_Max_latency_ns': 25}
}
rfc2544_profile = ixia_rfc2544.IXIARFC2544Profile(self.TRAFFIC_PROFILE)
rfc2544_profile.rate = 1000
completed, samples = rfc2544_profile.get_drop_percentage(
- samples, 0, 0.05)
+ samples, 0, 0.05, 4)
self.assertFalse(completed)
- self.assertEqual(23.0, samples['TxThroughput'])
- self.assertEqual(21.0, samples['RxThroughput'])
- self.assertEqual(0.1, samples['DropPercentage'])
+ self.assertEqual(66.9, samples['TxThroughput'])
+ self.assertEqual(66.833, samples['RxThroughput'])
+ self.assertEqual(0.099651, samples['DropPercentage'])
self.assertEqual(rfc2544_profile.rate, rfc2544_profile.max_rate)
def test_get_drop_percentage_under_drop_percentage(self):
samples = {'iface_name_1':
- {'RxThroughput': 10, 'TxThroughput': 10,
- 'in_packets': 1000, 'out_packets': 1000},
+ {'in_packets': 1000, 'out_packets': 1000,
+ 'Store-Forward_Avg_latency_ns': 20,
+ 'Store-Forward_Min_latency_ns': 15,
+ 'Store-Forward_Max_latency_ns': 25},
'iface_name_2':
- {'RxThroughput': 11, 'TxThroughput': 13,
- 'in_packets': 1005, 'out_packets': 1007}
+ {'in_packets': 1005, 'out_packets': 1007,
+ 'Store-Forward_Avg_latency_ns': 20,
+ 'Store-Forward_Min_latency_ns': 15,
+ 'Store-Forward_Max_latency_ns': 25}
}
rfc2544_profile = ixia_rfc2544.IXIARFC2544Profile(self.TRAFFIC_PROFILE)
rfc2544_profile.rate = 1000
completed, samples = rfc2544_profile.get_drop_percentage(
- samples, 0.2, 1)
+ samples, 0.2, 1, 4)
self.assertFalse(completed)
- self.assertEqual(23.0, samples['TxThroughput'])
- self.assertEqual(21.0, samples['RxThroughput'])
- self.assertEqual(0.1, samples['DropPercentage'])
+ self.assertEqual(66.9, samples['TxThroughput'])
+ self.assertEqual(66.833, samples['RxThroughput'])
+ self.assertEqual(0.099651, samples['DropPercentage'])
self.assertEqual(rfc2544_profile.rate, rfc2544_profile.min_rate)
@mock.patch.object(ixia_rfc2544.LOG, 'info')
def test_get_drop_percentage_not_flow(self, *args):
samples = {'iface_name_1':
- {'RxThroughput': 0, 'TxThroughput': 10,
- 'in_packets': 1000, 'out_packets': 0},
+ {'in_packets': 1000, 'out_packets': 0,
+ 'Store-Forward_Avg_latency_ns': 20,
+ 'Store-Forward_Min_latency_ns': 15,
+ 'Store-Forward_Max_latency_ns': 25},
'iface_name_2':
- {'RxThroughput': 0, 'TxThroughput': 13,
- 'in_packets': 1005, 'out_packets': 0}
+ {'in_packets': 1005, 'out_packets': 0,
+ 'Store-Forward_Avg_latency_ns': 20,
+ 'Store-Forward_Min_latency_ns': 15,
+ 'Store-Forward_Max_latency_ns': 25}
}
rfc2544_profile = ixia_rfc2544.IXIARFC2544Profile(self.TRAFFIC_PROFILE)
rfc2544_profile.rate = 1000
completed, samples = rfc2544_profile.get_drop_percentage(
- samples, 0.2, 1)
+ samples, 0.2, 1, 4)
self.assertFalse(completed)
- self.assertEqual(23.0, samples['TxThroughput'])
- self.assertEqual(0, samples['RxThroughput'])
+ self.assertEqual(0.0, samples['TxThroughput'])
+ self.assertEqual(66.833, samples['RxThroughput'])
self.assertEqual(100, samples['DropPercentage'])
self.assertEqual(rfc2544_profile.rate, rfc2544_profile.max_rate)
def test_get_drop_percentage_first_run(self):
samples = {'iface_name_1':
- {'RxThroughput': 10, 'TxThroughput': 10,
- 'in_packets': 1000, 'out_packets': 1000},
+ {'in_packets': 1000, 'out_packets': 1000,
+ 'Store-Forward_Avg_latency_ns': 20,
+ 'Store-Forward_Min_latency_ns': 15,
+ 'Store-Forward_Max_latency_ns': 25},
'iface_name_2':
- {'RxThroughput': 11, 'TxThroughput': 13,
- 'in_packets': 1005, 'out_packets': 1007}
+ {'in_packets': 1005, 'out_packets': 1007,
+ 'Store-Forward_Avg_latency_ns': 20,
+ 'Store-Forward_Min_latency_ns': 15,
+ 'Store-Forward_Max_latency_ns': 25}
}
rfc2544_profile = ixia_rfc2544.IXIARFC2544Profile(self.TRAFFIC_PROFILE)
completed, samples = rfc2544_profile.get_drop_percentage(
- samples, 0, 1, first_run=True)
+ samples, 0, 1, 4, first_run=True)
self.assertTrue(completed)
- self.assertEqual(23.0, samples['TxThroughput'])
- self.assertEqual(21.0, samples['RxThroughput'])
- self.assertEqual(0.1, samples['DropPercentage'])
+ self.assertEqual(66.9, samples['TxThroughput'])
+ self.assertEqual(66.833, samples['RxThroughput'])
+ self.assertEqual(0.099651, samples['DropPercentage'])
self.assertEqual(33.45, rfc2544_profile.rate)
+
+
+class TestIXIARFC2544PppoeScenarioProfile(unittest.TestCase):
+
+ TRAFFIC_PROFILE = {
+ "schema": "nsb:traffic_profile:0.1",
+ "name": "fixed",
+ "description": "Fixed traffic profile to run UDP traffic",
+ "traffic_profile": {
+ "traffic_type": "FixedTraffic",
+ "frame_rate": 100},
+ 'uplink_0': {'ipv4': {'port': 'xe0', 'id': 1}},
+ 'downlink_0': {'ipv4': {'port': 'xe2', 'id': 2}},
+ 'uplink_1': {'ipv4': {'port': 'xe1', 'id': 3}},
+ 'downlink_1': {'ipv4': {'port': 'xe2', 'id': 4}}
+ }
+
+ def setUp(self):
+ self.ixia_tp = ixia_rfc2544.IXIARFC2544PppoeScenarioProfile(
+ self.TRAFFIC_PROFILE)
+
+ def test___init__(self):
+ self.assertIsInstance(self.ixia_tp.full_profile,
+ collections.OrderedDict)
+
+ def test__get_flow_groups_params(self):
+ expected_tp = collections.OrderedDict([
+ ('uplink_0', {'ipv4': {'id': 1, 'port': 'xe0'}}),
+ ('downlink_0', {'ipv4': {'id': 2, 'port': 'xe2'}}),
+ ('uplink_1', {'ipv4': {'id': 3, 'port': 'xe1'}}),
+ ('downlink_1', {'ipv4': {'id': 4, 'port': 'xe2'}})])
+
+ self.ixia_tp._get_flow_groups_params()
+ self.assertDictEqual(self.ixia_tp.full_profile, expected_tp)