Merge "Adding intel_pmu tools plugin for collectd"
[yardstick.git] / tests / unit / network_services / traffic_profile / test_rfc2544.py
1 #!/usr/bin/env python
2
3 # Copyright (c) 2016-2017 Intel Corporation
4 #
5 # Licensed under the Apache License, Version 2.0 (the "License");
6 # you may not use this file except in compliance with the License.
7 # You may obtain a copy of the License at
8 #
9 #      http://www.apache.org/licenses/LICENSE-2.0
10 #
11 # Unless required by applicable law or agreed to in writing, software
12 # distributed under the License is distributed on an "AS IS" BASIS,
13 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 # See the License for the specific language governing permissions and
15 # limitations under the License.
16 #
17
18 from __future__ import absolute_import
19 from __future__ import division
20
21 import unittest
22 import mock
23
24 from tests.unit import STL_MOCKS
25
26
27 STLClient = mock.MagicMock()
28 stl_patch = mock.patch.dict("sys.modules", STL_MOCKS)
29 stl_patch.start()
30
31 if stl_patch:
32     from yardstick.network_services.traffic_profile.traffic_profile \
33         import TrexProfile
34     from yardstick.network_services.traffic_profile.rfc2544 import \
35         RFC2544Profile
36
37
38 class TestRFC2544Profile(unittest.TestCase):
39     TRAFFIC_PROFILE = {
40         "schema": "isb:traffic_profile:0.1",
41         "name": "fixed",
42         "description": "Fixed traffic profile to run UDP traffic",
43         "traffic_profile": {
44             "traffic_type": "FixedTraffic",
45             "frame_rate": 100,  # pps
46             "flow_number": 10,
47             "frame_size": 64}}
48
49     PROFILE = {'description': 'Traffic profile to run RFC2544 latency',
50                'name': 'rfc2544',
51                'traffic_profile': {'traffic_type': 'RFC2544Profile',
52                                    'frame_rate': 100},
53                'public_1': {'ipv4':
54                           {'outer_l2': {'framesize':
55                                         {'64B': '100', '1518B': '0',
56                                          '128B': '0', '1400B': '0',
57                                          '256B': '0', '373b': '0',
58                                          '570B': '0'}},
59                            'outer_l3v4': {'dstip4': '1.1.1.1-1.15.255.255',
60                                           'proto': 'udp',
61                                           'srcip4': '90.90.1.1-90.105.255.255',
62                                           'dscp': 0, 'ttl': 32, 'count': 1},
63                            'outer_l4': {'srcport': '2001',
64                                'dsrport': '1234', 'count': 1}}},
65                'private_1': {'ipv4':
66                            {'outer_l2': {'framesize':
67                                          {'64B': '100', '1518B': '0',
68                                           '128B': '0', '1400B': '0',
69                                           '256B': '0', '373b': '0',
70                                           '570B': '0'}},
71                             'outer_l3v4': {'dstip4': '9.9.1.1-90.105.255.255',
72                                            'proto': 'udp',
73                                            'srcip4': '1.1.1.1-1.15.255.255',
74                                            'dscp': 0, 'ttl': 32, 'count': 1},
75                             'outer_l4': {'dstport': '2001',
76                                 'srcport': '1234', 'count': 1}}},
77                'schema': 'isb:traffic_profile:0.1'}
78
79     def test___init__(self):
80         r_f_c2544_profile = RFC2544Profile(self.TRAFFIC_PROFILE)
81         assert r_f_c2544_profile.rate
82
83     def test_execute(self):
84         traffic_generator = mock.Mock(autospec=TrexProfile)
85         traffic_generator.my_ports = [0, 1]
86         traffic_generator.priv_ports = [-1]
87         traffic_generator.pub_ports = [1]
88         traffic_generator.client = \
89             mock.Mock(return_value=True)
90         r_f_c2544_profile = RFC2544Profile(self.TRAFFIC_PROFILE)
91         r_f_c2544_profile.params = self.PROFILE
92         r_f_c2544_profile.first_run = True
93         self.assertEqual(None, r_f_c2544_profile.execute(traffic_generator))
94
95     def test_get_drop_percentage(self):
96         traffic_generator = mock.Mock(autospec=TrexProfile)
97         traffic_generator.my_ports = [0, 1]
98         traffic_generator.priv_ports = [0]
99         traffic_generator.pub_ports = [1]
100         traffic_generator.client = mock.Mock(return_value=True)
101
102         r_f_c2544_profile = RFC2544Profile(self.TRAFFIC_PROFILE)
103         r_f_c2544_profile.params = self.PROFILE
104         r_f_c2544_profile.register_generator(traffic_generator)
105         self.assertIsNone(r_f_c2544_profile.execute(traffic_generator))
106
107         samples = {}
108         for ifname in range(1):
109             name = "xe{}".format(ifname)
110             samples[name] = {
111                 "rx_throughput_fps": 20,
112                 "tx_throughput_fps": 20,
113                 "rx_throughput_mbps": 10,
114                 "tx_throughput_mbps": 10,
115                 "in_packets": 1000,
116                 "out_packets": 1000,
117             }
118
119         expected = {
120             'DropPercentage': 0.0,
121             'RxThroughput': 100 / 3.0,
122             'TxThroughput': 100 / 3.0,
123             'CurrentDropPercentage': 0.0,
124             'Throughput': 66.66666666666667,
125             'xe0': {
126                 'tx_throughput_fps': 20,
127                 'in_packets': 1000,
128                 'out_packets': 1000,
129                 'rx_throughput_mbps': 10,
130                 'tx_throughput_mbps': 10,
131                 'rx_throughput_fps': 20,
132             },
133         }
134         traffic_generator.generate_samples = mock.MagicMock(return_value=samples)
135         traffic_generator.RUN_DURATION = 30
136         traffic_generator.rfc2544_helper.tolerance_low = 0.0001
137         traffic_generator.rfc2544_helper.tolerance_high = 0.0001
138         result = r_f_c2544_profile.get_drop_percentage(traffic_generator)
139         self.assertDictEqual(result, expected)
140
141     def test_get_drop_percentage_update(self):
142         traffic_generator = mock.Mock(autospec=RFC2544Profile)
143         traffic_generator.my_ports = [0, 1]
144         traffic_generator.priv_ports = [0]
145         traffic_generator.pub_ports = [1]
146         traffic_generator.client = mock.Mock(return_value=True)
147
148         r_f_c2544_profile = RFC2544Profile(self.TRAFFIC_PROFILE)
149         r_f_c2544_profile.params = self.PROFILE
150         r_f_c2544_profile.register_generator(traffic_generator)
151         self.assertIsNone(r_f_c2544_profile.execute())
152
153         samples = {}
154         for ifname in range(1):
155             name = "xe{}".format(ifname)
156             samples[name] = {
157                 "rx_throughput_fps": 20,
158                 "tx_throughput_fps": 20,
159                 "rx_throughput_mbps": 10,
160                 "tx_throughput_mbps": 10,
161                 "in_packets": 1000,
162                 "out_packets": 1002,
163             }
164         tol_min = 0.0
165         tolerance = 1.0
166         expected = {
167             'DropPercentage': 0.1996,
168             'RxThroughput': 33.333333333333336,
169             'TxThroughput': 33.4,
170             'CurrentDropPercentage': 0.1996,
171             'Throughput': 66.66666666666667,
172             'xe0': {
173                 'tx_throughput_fps': 20,
174                 'in_packets': 1000,
175                 'out_packets': 1002,
176                 'rx_throughput_mbps': 10,
177                 'tx_throughput_mbps': 10,
178                 'rx_throughput_fps': 20,
179             },
180         }
181         traffic_generator.generate_samples = mock.MagicMock(return_value=samples)
182         traffic_generator.RUN_DURATION = 30
183         traffic_generator.rfc2544_helper.tolerance_low = 0.0001
184         traffic_generator.rfc2544_helper.tolerance_high = 0.0001
185         result = r_f_c2544_profile.get_drop_percentage(traffic_generator)
186         self.assertDictEqual(expected, result)
187
188     def test_get_drop_percentage_div_zero(self):
189         traffic_generator = mock.Mock(autospec=TrexProfile)
190         traffic_generator.my_ports = [0, 1]
191         traffic_generator.priv_ports = [0]
192         traffic_generator.pub_ports = [1]
193         traffic_generator.client = \
194             mock.Mock(return_value=True)
195         r_f_c2544_profile = RFC2544Profile(self.TRAFFIC_PROFILE)
196         r_f_c2544_profile.params = self.PROFILE
197         self.assertEqual(None, r_f_c2544_profile.execute(traffic_generator))
198         samples = {}
199         for ifname in range(1):
200             name = "xe{}".format(ifname)
201             samples[name] = {"rx_throughput_fps": 20,
202                              "tx_throughput_fps": 20,
203                              "rx_throughput_mbps": 10,
204                              "tx_throughput_mbps": 10,
205                              "in_packets": 1000,
206                              "out_packets": 0}
207         tol_min = 0.0
208         tolerance = 0.0
209         r_f_c2544_profile.throughput_max = 0
210         expected = {
211             'DropPercentage': 100.0, 'RxThroughput': 100 / 3.0,
212             'TxThroughput': 0.0, 'CurrentDropPercentage': 100.0,
213             'Throughput': 66.66666666666667,
214             'xe0': {
215                 'tx_throughput_fps': 20, 'in_packets': 1000,
216                 'out_packets': 0, 'rx_throughput_mbps': 10,
217                 'tx_throughput_mbps': 10, 'rx_throughput_fps': 20
218             }
219         }
220         traffic_generator.generate_samples = mock.MagicMock(return_value=samples)
221         traffic_generator.RUN_DURATION = 30
222         traffic_generator.rfc2544_helper.tolerance_low = 0.0001
223         traffic_generator.rfc2544_helper.tolerance_high = 0.0001
224         self.assertDictEqual(expected,
225                              r_f_c2544_profile.get_drop_percentage(traffic_generator))
226
227     def test_get_multiplier(self):
228         r_f_c2544_profile = RFC2544Profile(self.TRAFFIC_PROFILE)
229         r_f_c2544_profile.max_rate = 100
230         r_f_c2544_profile.min_rate = 100
231         self.assertEqual("1.0", r_f_c2544_profile.get_multiplier())
232
233     def test_calculate_pps(self):
234         r_f_c2544_profile = RFC2544Profile(self.TRAFFIC_PROFILE)
235         r_f_c2544_profile.rate = 100
236         r_f_c2544_profile.pps = 100
237         samples = {'Throughput': 4549093.33}
238         self.assertEqual((2274546.67, 1.0),
239                          r_f_c2544_profile.calculate_pps(samples))
240
241     def test_create_single_stream(self):
242         r_f_c2544_profile = RFC2544Profile(self.TRAFFIC_PROFILE)
243         r_f_c2544_profile._create_single_packet = mock.MagicMock()
244         r_f_c2544_profile.pg_id = 1
245         self.assertIsNotNone(
246             r_f_c2544_profile.create_single_stream(64, 2274546.67))
247
248     def test_create_single_stream_no_pg_id(self):
249         r_f_c2544_profile = RFC2544Profile(self.TRAFFIC_PROFILE)
250         r_f_c2544_profile._create_single_packet = mock.MagicMock()
251         r_f_c2544_profile.pg_id = 0
252         self.assertIsNotNone(
253             r_f_c2544_profile.create_single_stream(64, 2274546.67))
254
255     def test_execute_latency(self):
256         traffic_generator = mock.Mock(autospec=TrexProfile)
257         traffic_generator.my_ports = [0, 1]
258         traffic_generator.priv_ports = [-1]
259         traffic_generator.pub_ports = [1]
260         traffic_generator.client = \
261             mock.Mock(return_value=True)
262         r_f_c2544_profile = RFC2544Profile(self.TRAFFIC_PROFILE)
263         r_f_c2544_profile.params = self.PROFILE
264         r_f_c2544_profile.first_run = True
265         samples = {}
266         for ifname in range(1):
267             name = "xe{}".format(ifname)
268             samples[name] = {"rx_throughput_fps": 20,
269                              "tx_throughput_fps": 20,
270                              "rx_throughput_mbps": 10,
271                              "tx_throughput_mbps": 10,
272                              "in_packets": 1000,
273                              "out_packets": 0}
274
275         samples['Throughput'] = 4549093.33
276         r_f_c2544_profile.calculate_pps = mock.Mock(return_value=[2274546.67,
277                                                                   1.0])
278
279         self.assertEqual(None,
280                          r_f_c2544_profile.execute_latency(traffic_generator,
281                                                            samples))
282
283
284 if __name__ == '__main__':
285     unittest.main()