# See the License for the specific language governing permissions and
# limitations under the License.
-from __future__ import absolute_import
import logging
from yardstick.network_services.traffic_profile.trex_traffic_profile import \
def _ixia_traffic_generate(self, traffic, ixia_obj):
for key, value in traffic.items():
if key.startswith((self.UPLINK, self.DOWNLINK)):
- value["iload"] = str(self.rate)
+ value['iload'] = str(self.rate)
ixia_obj.update_frame(traffic)
ixia_obj.update_ip_packet(traffic)
ixia_obj.start_traffic()
- self.tmp_drop = 0
- self.tmp_throughput = 0
def update_traffic_profile(self, traffic_generator):
def port_generator():
self.ports = [port for port in port_generator()]
- def execute_traffic(self, traffic_generator, ixia_obj, mac=None):
- if mac is None:
- mac = {}
+ def execute_traffic(self, traffic_generator, ixia_obj=None, mac=None):
+ mac = {} if mac is None else mac
+ first_run = self.first_run
if self.first_run:
+ self.first_run = False
self.full_profile = {}
self.pg_id = 0
self.update_traffic_profile(traffic_generator)
- traffic = \
- self._get_ixia_traffic_profile(self.full_profile, mac)
self.max_rate = self.rate
self.min_rate = 0
- self.get_multiplier()
- self._ixia_traffic_generate(traffic, ixia_obj)
-
- def get_multiplier(self):
- self.rate = round((self.max_rate + self.min_rate) / 2.0, 2)
- multiplier = round(self.rate / self.pps, 2)
- return str(multiplier)
+ else:
+ self.rate = round(float(self.max_rate + self.min_rate) / 2.0, 2)
- def start_ixia_latency(self, traffic_generator, ixia_obj, mac=None):
- if mac is None:
- mac = {}
- self.update_traffic_profile(traffic_generator)
- traffic = \
- self._get_ixia_traffic_profile(self.full_profile, mac)
+ traffic = self._get_ixia_traffic_profile(self.full_profile, mac)
self._ixia_traffic_generate(traffic, ixia_obj)
+ return first_run
- def get_drop_percentage(self, samples, tol_min, tolerance, ixia_obj,
- mac=None):
- if mac is None:
- mac = {}
- status = 'Running'
+ def get_drop_percentage(self, samples, tol_min, tolerance, duration=30.0,
+ first_run=False):
+ completed = False
drop_percent = 100
- in_packets = sum([samples[iface]['in_packets'] for iface in samples])
- out_packets = sum([samples[iface]['out_packets'] for iface in samples])
- rx_throughput = \
- sum([samples[iface]['RxThroughput'] for iface in samples])
- tx_throughput = \
- sum([samples[iface]['TxThroughput'] for iface in samples])
- packet_drop = abs(out_packets - in_packets)
+ num_ifaces = len(samples)
+ in_packets_sum = sum(
+ [samples[iface]['in_packets'] for iface in samples])
+ out_packets_sum = sum(
+ [samples[iface]['out_packets'] for iface in samples])
+ rx_throughput = sum(
+ [samples[iface]['RxThroughput'] for iface in samples])
+ rx_throughput = round(float(rx_throughput), 2)
+ tx_throughput = sum(
+ [samples[iface]['TxThroughput'] for iface in samples])
+ tx_throughput = round(float(tx_throughput), 2)
+ packet_drop = abs(out_packets_sum - in_packets_sum)
+
try:
- drop_percent = round((packet_drop / float(out_packets)) * 100, 2)
+ drop_percent = round(
+ (packet_drop / float(out_packets_sum)) * 100, 2)
except ZeroDivisionError:
LOG.info('No traffic is flowing')
- samples['TxThroughput'] = round(tx_throughput / 1.0, 2)
- samples['RxThroughput'] = round(rx_throughput / 1.0, 2)
- samples['CurrentDropPercentage'] = drop_percent
- samples['Throughput'] = self.tmp_throughput
- samples['DropPercentage'] = self.tmp_drop
- if drop_percent > tolerance and self.tmp_throughput == 0:
- samples['Throughput'] = round(rx_throughput / 1.0, 2)
- samples['DropPercentage'] = drop_percent
- if self.first_run:
- max_supported_rate = out_packets / 30.0
- self.rate = max_supported_rate
- self.first_run = False
- if drop_percent <= tolerance:
- status = 'Completed'
+
+ samples['TxThroughput'] = tx_throughput
+ samples['RxThroughput'] = rx_throughput
+ samples['DropPercentage'] = drop_percent
+
+ if first_run:
+ self.rate = out_packets_sum / duration / num_ifaces
+ completed = True if drop_percent <= tolerance else False
+
if drop_percent > tolerance:
self.max_rate = self.rate
elif drop_percent < tol_min:
self.min_rate = self.rate
- if drop_percent >= self.tmp_drop:
- self.tmp_drop = drop_percent
- self.tmp_throughput = round((rx_throughput / 1.0), 2)
- samples['Throughput'] = round(rx_throughput / 1.0, 2)
- samples['DropPercentage'] = drop_percent
else:
- samples['Throughput'] = round(rx_throughput / 1.0, 2)
- samples['DropPercentage'] = drop_percent
- return status, samples
- self.get_multiplier()
- traffic = self._get_ixia_traffic_profile(self.full_profile, mac)
- self._ixia_traffic_generate(traffic, ixia_obj)
- return status, samples
+ completed = True
+
+ return completed, samples
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
-#
-
-import unittest
-import mock
from copy import deepcopy
-from yardstick.tests import STL_MOCKS
-
-STLClient = mock.MagicMock()
-stl_patch = mock.patch.dict("sys.modules", STL_MOCKS)
-stl_patch.start()
+import mock
+import unittest
-if stl_patch:
- from yardstick.network_services.traffic_profile.trex_traffic_profile \
- import TrexProfile
- from yardstick.network_services.traffic_profile.ixia_rfc2544 import \
- IXIARFC2544Profile
- from yardstick.network_services.traffic_profile import ixia_rfc2544
+from yardstick.network_services.traffic_profile import ixia_rfc2544
+from yardstick.network_services.traffic_profile import trex_traffic_profile
class TestIXIARFC2544Profile(unittest.TestCase):
'traffic_profile': {
'traffic_type': 'IXIARFC2544Profile',
'frame_rate': 100},
- IXIARFC2544Profile.DOWNLINK: {
+ ixia_rfc2544.IXIARFC2544Profile.DOWNLINK: {
'ipv4': {
'outer_l2': {
'framesize': {
'outer_l4': {
'srcport': '2001',
'dsrport': '1234'}}},
- IXIARFC2544Profile.UPLINK: {
+ ixia_rfc2544.IXIARFC2544Profile.UPLINK: {
'ipv4': {
'outer_l2': {
'framesize': {
'schema': 'isb:traffic_profile:0.1'}
def test_get_ixia_traffic_profile_error(self):
- traffic_generator = mock.Mock(autospec=TrexProfile)
+ traffic_generator = mock.Mock(
+ autospec=trex_traffic_profile.TrexProfile)
traffic_generator.my_ports = [0, 1]
traffic_generator.uplink_ports = [-1]
traffic_generator.downlink_ports = [1]
traffic_generator.client = \
mock.Mock(return_value=True)
STATIC_TRAFFIC = {
- IXIARFC2544Profile.UPLINK: {
+ ixia_rfc2544.IXIARFC2544Profile.UPLINK: {
"id": 1,
"bidir": "False",
"duration": 60,
},
"traffic_type": "continuous"
},
- IXIARFC2544Profile.DOWNLINK: {
+ ixia_rfc2544.IXIARFC2544Profile.DOWNLINK: {
"id": 2,
"bidir": "False",
"duration": 60,
}
ixia_rfc2544.STATIC_TRAFFIC = STATIC_TRAFFIC
- r_f_c2544_profile = IXIARFC2544Profile(self.TRAFFIC_PROFILE)
+ r_f_c2544_profile = ixia_rfc2544.IXIARFC2544Profile(
+ self.TRAFFIC_PROFILE)
r_f_c2544_profile.rate = 100
mac = {"src_mac_0": "00:00:00:00:00:01",
"src_mac_1": "00:00:00:00:00:02",
self.assertIsNotNone(result)
def test_get_ixia_traffic_profile(self):
- traffic_generator = mock.Mock(autospec=TrexProfile)
+ traffic_generator = mock.Mock(
+ autospec=trex_traffic_profile.TrexProfile)
traffic_generator.my_ports = [0, 1]
traffic_generator.uplink_ports = [-1]
traffic_generator.downlink_ports = [1]
traffic_generator.client = \
mock.Mock(return_value=True)
STATIC_TRAFFIC = {
- IXIARFC2544Profile.UPLINK: {
+ ixia_rfc2544.IXIARFC2544Profile.UPLINK: {
"id": 1,
"bidir": "False",
"duration": 60,
},
"traffic_type": "continuous"
},
- IXIARFC2544Profile.DOWNLINK: {
+ ixia_rfc2544.IXIARFC2544Profile.DOWNLINK: {
"id": 2,
"bidir": "False",
"duration": 60,
}
ixia_rfc2544.STATIC_TRAFFIC = STATIC_TRAFFIC
- r_f_c2544_profile = IXIARFC2544Profile(self.TRAFFIC_PROFILE)
+ r_f_c2544_profile = ixia_rfc2544.IXIARFC2544Profile(
+ self.TRAFFIC_PROFILE)
r_f_c2544_profile.rate = 100
mac = {"src_mac_0": "00:00:00:00:00:01",
"src_mac_1": "00:00:00:00:00:02",
@mock.patch("yardstick.network_services.traffic_profile.ixia_rfc2544.open")
def test_get_ixia_traffic_profile_v6(self, *args):
- traffic_generator = mock.Mock(autospec=TrexProfile)
+ traffic_generator = mock.Mock(
+ autospec=trex_traffic_profile.TrexProfile)
traffic_generator.my_ports = [0, 1]
traffic_generator.uplink_ports = [-1]
traffic_generator.downlink_ports = [1]
traffic_generator.client = \
mock.Mock(return_value=True)
STATIC_TRAFFIC = {
- IXIARFC2544Profile.UPLINK: {
+ ixia_rfc2544.IXIARFC2544Profile.UPLINK: {
"id": 1,
"bidir": "False",
"duration": 60,
},
"traffic_type": "continuous"
},
- IXIARFC2544Profile.DOWNLINK: {
+ ixia_rfc2544.IXIARFC2544Profile.DOWNLINK: {
"id": 2,
"bidir": "False",
"duration": 60,
}
ixia_rfc2544.STATIC_TRAFFIC = STATIC_TRAFFIC
- r_f_c2544_profile = IXIARFC2544Profile(self.TRAFFIC_PROFILE)
+ r_f_c2544_profile = ixia_rfc2544.IXIARFC2544Profile(
+ self.TRAFFIC_PROFILE)
r_f_c2544_profile.rate = 100
mac = {"src_mac_0": "00:00:00:00:00:01",
"src_mac_1": "00:00:00:00:00:02",
'traffic_profile':
{'traffic_type': 'IXIARFC2544Profile',
'frame_rate': 100},
- IXIARFC2544Profile.DOWNLINK:
+ ixia_rfc2544.IXIARFC2544Profile.DOWNLINK:
{'ipv4':
{'outer_l2': {'framesize':
{'64B': '100', '1518B': '0',
'dscp': 0, 'ttl': 32},
'outer_l4': {'srcport': '2001',
'dsrport': '1234'}}},
- IXIARFC2544Profile.UPLINK: {'ipv4':
+ ixia_rfc2544.IXIARFC2544Profile.UPLINK: {'ipv4':
{'outer_l2': {'framesize':
{'64B': '100', '1518B': '0',
'128B': '0', '1400B': '0',
self.assertIsNotNone(result)
def test__get_ixia_traffic_profile_default_args(self):
- r_f_c2544_profile = IXIARFC2544Profile(self.TRAFFIC_PROFILE)
+ r_f_c2544_profile = ixia_rfc2544.IXIARFC2544Profile(
+ self.TRAFFIC_PROFILE)
expected = {}
result = r_f_c2544_profile._get_ixia_traffic_profile({})
self.assertDictEqual(result, expected)
def test__ixia_traffic_generate(self):
- traffic_generator = mock.Mock(autospec=TrexProfile)
+ traffic_generator = mock.Mock(
+ autospec=trex_traffic_profile.TrexProfile)
traffic_generator.networks = {
"uplink_0": ["xe0"],
"downlink_0": ["xe1"],
}
traffic_generator.client = \
mock.Mock(return_value=True)
- traffic = {IXIARFC2544Profile.DOWNLINK: {'iload': 10},
- IXIARFC2544Profile.UPLINK: {'iload': 10}}
+ traffic = {ixia_rfc2544.IXIARFC2544Profile.DOWNLINK: {'iload': 10},
+ ixia_rfc2544.IXIARFC2544Profile.UPLINK: {'iload': 10}}
ixia_obj = mock.MagicMock()
- r_f_c2544_profile = IXIARFC2544Profile(self.TRAFFIC_PROFILE)
+ r_f_c2544_profile = ixia_rfc2544.IXIARFC2544Profile(
+ self.TRAFFIC_PROFILE)
r_f_c2544_profile.rate = 100
result = r_f_c2544_profile._ixia_traffic_generate(traffic, ixia_obj)
self.assertIsNone(result)
- def test_execute(self):
- traffic_generator = mock.Mock(autospec=TrexProfile)
- traffic_generator.networks = {
- "uplink_0": ["xe0"],
- "downlink_0": ["xe1"],
- }
- traffic_generator.client = \
- mock.Mock(return_value=True)
- r_f_c2544_profile = IXIARFC2544Profile(self.TRAFFIC_PROFILE)
- r_f_c2544_profile.first_run = True
- r_f_c2544_profile.params = {IXIARFC2544Profile.DOWNLINK: {'iload': 10},
- IXIARFC2544Profile.UPLINK: {'iload': 10}}
+ def test_execute_traffic_first_run(self):
+ rfc2544_profile = ixia_rfc2544.IXIARFC2544Profile(self.TRAFFIC_PROFILE)
+ rfc2544_profile.first_run = True
+ rfc2544_profile.rate = 50
+ with mock.patch.object(rfc2544_profile, '_get_ixia_traffic_profile') \
+ as mock_get_tp, \
+ mock.patch.object(rfc2544_profile, '_ixia_traffic_generate') \
+ as mock_tgenerate, \
+ mock.patch.object(rfc2544_profile, 'update_traffic_profile') \
+ as mock_update_tp:
+ mock_get_tp.return_value = 'fake_tprofile'
+ output = rfc2544_profile.execute_traffic(mock.ANY,
+ ixia_obj=mock.ANY)
- r_f_c2544_profile.get_streams = mock.Mock()
- r_f_c2544_profile.full_profile = {}
- r_f_c2544_profile._get_ixia_traffic_profile = mock.Mock()
- r_f_c2544_profile.get_multiplier = mock.Mock()
- r_f_c2544_profile._ixia_traffic_generate = mock.Mock()
- ixia_obj = mock.MagicMock()
- self.assertIsNone(r_f_c2544_profile.execute_traffic(traffic_generator, ixia_obj))
+ self.assertTrue(output)
+ self.assertFalse(rfc2544_profile.first_run)
+ self.assertEqual(50, rfc2544_profile.max_rate)
+ self.assertEqual(0, rfc2544_profile.min_rate)
+ mock_get_tp.assert_called_once()
+ mock_tgenerate.assert_called_once()
+ mock_update_tp.assert_called_once()
+
+ def test_execute_traffic_not_first_run(self):
+ rfc2544_profile = ixia_rfc2544.IXIARFC2544Profile(self.TRAFFIC_PROFILE)
+ rfc2544_profile.first_run = False
+ rfc2544_profile.max_rate = 70
+ rfc2544_profile.min_rate = 0
+ with mock.patch.object(rfc2544_profile, '_get_ixia_traffic_profile') \
+ as mock_get_tp, \
+ mock.patch.object(rfc2544_profile, '_ixia_traffic_generate') \
+ as mock_tgenerate:
+ mock_get_tp.return_value = 'fake_tprofile'
+ rfc2544_profile.full_profile = mock.ANY
+ output = rfc2544_profile.execute_traffic(mock.ANY,
+ ixia_obj=mock.ANY)
+
+ self.assertFalse(output)
+ self.assertEqual(35.0, rfc2544_profile.rate)
+ mock_get_tp.assert_called_once()
+ mock_tgenerate.assert_called_once()
def test_update_traffic_profile(self):
- traffic_generator = mock.Mock(autospec=TrexProfile)
+ traffic_generator = mock.Mock(
+ autospec=trex_traffic_profile.TrexProfile)
traffic_generator.networks = {
"uplink_0": ["xe0"], # private, one value for intfs
"downlink_0": ["xe1", "xe2"], # public, two values for intfs
"downlink_0": ["xe1", "xe2"],
})
- r_f_c2544_profile = IXIARFC2544Profile(traffic_profile)
+ r_f_c2544_profile = ixia_rfc2544.IXIARFC2544Profile(traffic_profile)
r_f_c2544_profile.full_profile = {}
r_f_c2544_profile.get_streams = mock.Mock()
r_f_c2544_profile.update_traffic_profile(traffic_generator))
self.assertEqual(r_f_c2544_profile.ports, ports_expected)
- def test_get_drop_percentage(self):
- r_f_c2544_profile = IXIARFC2544Profile(self.TRAFFIC_PROFILE)
- r_f_c2544_profile.params = self.PROFILE
- ixia_obj = mock.MagicMock()
- r_f_c2544_profile.execute = mock.Mock()
- r_f_c2544_profile._get_ixia_traffic_profile = mock.Mock()
- r_f_c2544_profile._ixia_traffic_generate = mock.Mock()
- r_f_c2544_profile.get_multiplier = mock.Mock()
- r_f_c2544_profile.tmp_throughput = 0
- r_f_c2544_profile.tmp_drop = 0
- r_f_c2544_profile.full_profile = {}
- samples = {}
- for ifname in range(1):
- name = "xe{}".format(ifname)
- samples[name] = {"rx_throughput_fps": 20,
- "tx_throughput_fps": 20,
- "rx_throughput_mbps": 10,
- "tx_throughput_mbps": 10,
- "RxThroughput": 10,
- "TxThroughput": 10,
- "in_packets": 1000,
- "out_packets": 1000}
- tol_min = 100.0
- tolerance = 0.0
- self.assertIsNotNone(
- r_f_c2544_profile.get_drop_percentage(samples, tol_min, tolerance,
- ixia_obj))
+ def test_get_drop_percentage_completed(self):
+ samples = {'iface_name_1':
+ {'RxThroughput': 10, 'TxThroughput': 10,
+ 'in_packets': 1000, 'out_packets': 1000},
+ 'iface_name_2':
+ {'RxThroughput': 11, 'TxThroughput': 13,
+ 'in_packets': 1005, 'out_packets': 1007}
+ }
+ rfc2544_profile = ixia_rfc2544.IXIARFC2544Profile(self.TRAFFIC_PROFILE)
+ completed, samples = rfc2544_profile.get_drop_percentage(samples, 0, 1)
+ self.assertTrue(completed)
+ self.assertEqual(23.0, samples['TxThroughput'])
+ self.assertEqual(21.0, samples['RxThroughput'])
+ self.assertEqual(0.1, samples['DropPercentage'])
- def test_get_drop_percentage_update(self):
- r_f_c2544_profile = IXIARFC2544Profile(self.TRAFFIC_PROFILE)
- r_f_c2544_profile.params = self.PROFILE
- ixia_obj = mock.MagicMock()
- r_f_c2544_profile.execute = mock.Mock()
- r_f_c2544_profile._get_ixia_traffic_profile = mock.Mock()
- r_f_c2544_profile._ixia_traffic_generate = mock.Mock()
- r_f_c2544_profile.get_multiplier = mock.Mock()
- r_f_c2544_profile.tmp_throughput = 0
- r_f_c2544_profile.tmp_drop = 0
- r_f_c2544_profile.full_profile = {}
- samples = {}
- for ifname in range(1):
- name = "xe{}".format(ifname)
- samples[name] = {"rx_throughput_fps": 20,
- "tx_throughput_fps": 20,
- "rx_throughput_mbps": 10,
- "tx_throughput_mbps": 10,
- "RxThroughput": 10,
- "TxThroughput": 10,
- "in_packets": 1000,
- "out_packets": 1002}
- tol_min = 0.0
- tolerance = 1.0
- self.assertIsNotNone(
- r_f_c2544_profile.get_drop_percentage(samples, tol_min, tolerance,
- ixia_obj))
+ def test_get_drop_percentage_over_drop_percentage(self):
+ samples = {'iface_name_1':
+ {'RxThroughput': 10, 'TxThroughput': 10,
+ 'in_packets': 1000, 'out_packets': 1000},
+ 'iface_name_2':
+ {'RxThroughput': 11, 'TxThroughput': 13,
+ 'in_packets': 1005, 'out_packets': 1007}
+ }
+ rfc2544_profile = ixia_rfc2544.IXIARFC2544Profile(self.TRAFFIC_PROFILE)
+ rfc2544_profile.rate = 1000
+ completed, samples = rfc2544_profile.get_drop_percentage(
+ samples, 0, 0.05)
+ self.assertFalse(completed)
+ self.assertEqual(23.0, samples['TxThroughput'])
+ self.assertEqual(21.0, samples['RxThroughput'])
+ self.assertEqual(0.1, samples['DropPercentage'])
+ self.assertEqual(rfc2544_profile.rate, rfc2544_profile.max_rate)
- def test_get_drop_percentage_div_zero(self):
- r_f_c2544_profile = IXIARFC2544Profile(self.TRAFFIC_PROFILE)
- r_f_c2544_profile.params = self.PROFILE
- ixia_obj = mock.MagicMock()
- r_f_c2544_profile.execute = mock.Mock()
- r_f_c2544_profile._get_ixia_traffic_profile = mock.Mock()
- r_f_c2544_profile._ixia_traffic_generate = mock.Mock()
- r_f_c2544_profile.get_multiplier = mock.Mock()
- r_f_c2544_profile.tmp_throughput = 0
- r_f_c2544_profile.tmp_drop = 0
- r_f_c2544_profile.full_profile = {}
- samples = {}
- for ifname in range(1):
- name = "xe{}".format(ifname)
- samples[name] = {"rx_throughput_fps": 20,
- "tx_throughput_fps": 20,
- "rx_throughput_mbps": 10,
- "tx_throughput_mbps": 10,
- "RxThroughput": 10,
- "TxThroughput": 10,
- "in_packets": 1000,
- "out_packets": 0}
- tol_min = 0.0
- tolerance = 0.0
- r_f_c2544_profile.tmp_throughput = 0
- self.assertIsNotNone(
- r_f_c2544_profile.get_drop_percentage(samples, tol_min, tolerance,
- ixia_obj))
+ def test_get_drop_percentage_under_drop_percentage(self):
+ samples = {'iface_name_1':
+ {'RxThroughput': 10, 'TxThroughput': 10,
+ 'in_packets': 1000, 'out_packets': 1000},
+ 'iface_name_2':
+ {'RxThroughput': 11, 'TxThroughput': 13,
+ 'in_packets': 1005, 'out_packets': 1007}
+ }
+ rfc2544_profile = ixia_rfc2544.IXIARFC2544Profile(self.TRAFFIC_PROFILE)
+ rfc2544_profile.rate = 1000
+ completed, samples = rfc2544_profile.get_drop_percentage(
+ samples, 0.2, 1)
+ self.assertFalse(completed)
+ self.assertEqual(23.0, samples['TxThroughput'])
+ self.assertEqual(21.0, samples['RxThroughput'])
+ self.assertEqual(0.1, samples['DropPercentage'])
+ self.assertEqual(rfc2544_profile.rate, rfc2544_profile.min_rate)
- def test_get_multiplier(self):
- r_f_c2544_profile = IXIARFC2544Profile(self.TRAFFIC_PROFILE)
- r_f_c2544_profile.max_rate = 100
- r_f_c2544_profile.min_rate = 100
- self.assertEqual("1.0", r_f_c2544_profile.get_multiplier())
+ @mock.patch.object(ixia_rfc2544.LOG, 'info')
+ def test_get_drop_percentage_not_flow(self, *args):
+ samples = {'iface_name_1':
+ {'RxThroughput': 0, 'TxThroughput': 10,
+ 'in_packets': 1000, 'out_packets': 0},
+ 'iface_name_2':
+ {'RxThroughput': 0, 'TxThroughput': 13,
+ 'in_packets': 1005, 'out_packets': 0}
+ }
+ rfc2544_profile = ixia_rfc2544.IXIARFC2544Profile(self.TRAFFIC_PROFILE)
+ rfc2544_profile.rate = 1000
+ completed, samples = rfc2544_profile.get_drop_percentage(
+ samples, 0.2, 1)
+ self.assertFalse(completed)
+ self.assertEqual(23.0, samples['TxThroughput'])
+ self.assertEqual(0, samples['RxThroughput'])
+ self.assertEqual(100, samples['DropPercentage'])
+ self.assertEqual(rfc2544_profile.rate, rfc2544_profile.max_rate)
- def test_start_ixia_latency(self):
- traffic_generator = mock.Mock(autospec=TrexProfile)
- traffic_generator.networks = {
- "uplink_0": ["xe0"],
- "downlink_0": ["xe1"],
- }
- traffic_generator.client = \
- mock.Mock(return_value=True)
- r_f_c2544_profile = IXIARFC2544Profile(self.TRAFFIC_PROFILE)
- r_f_c2544_profile.max_rate = 100
- r_f_c2544_profile.min_rate = 100
- ixia_obj = mock.MagicMock()
- r_f_c2544_profile._get_ixia_traffic_profile = \
- mock.Mock(return_value={})
- r_f_c2544_profile.full_profile = {}
- r_f_c2544_profile._ixia_traffic_generate = mock.Mock()
- self.assertIsNone(
- r_f_c2544_profile.start_ixia_latency(traffic_generator, ixia_obj))
+ def test_get_drop_percentage_first_run(self):
+ samples = {'iface_name_1':
+ {'RxThroughput': 10, 'TxThroughput': 10,
+ 'in_packets': 1000, 'out_packets': 1000},
+ 'iface_name_2':
+ {'RxThroughput': 11, 'TxThroughput': 13,
+ 'in_packets': 1005, 'out_packets': 1007}
+ }
+ rfc2544_profile = ixia_rfc2544.IXIARFC2544Profile(self.TRAFFIC_PROFILE)
+ completed, samples = rfc2544_profile.get_drop_percentage(
+ samples, 0, 1, first_run=True)
+ self.assertTrue(completed)
+ self.assertEqual(23.0, samples['TxThroughput'])
+ self.assertEqual(21.0, samples['RxThroughput'])
+ self.assertEqual(0.1, samples['DropPercentage'])
+ self.assertEqual(33.45, rfc2544_profile.rate)