1 # Copyright (c) 2016-2017 Intel Corporation
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at
7 # http://www.apache.org/licenses/LICENSE-2.0
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
16 from __future__ import absolute_import
20 from multiprocessing import Queue
21 import multiprocessing
23 from tests.unit.network_services.vnf_generic.vnf.test_base import mock_ssh
24 from tests.unit import STL_MOCKS
26 SSH_HELPER = "yardstick.network_services.vnf_generic.vnf.sample_vnf.VnfSshHelper"
28 STLClient = mock.MagicMock()
29 stl_patch = mock.patch.dict("sys.modules", STL_MOCKS)
33 from yardstick.network_services.vnf_generic.vnf.tg_ping import PingParser
34 from yardstick.network_services.vnf_generic.vnf.tg_ping import PingTrafficGen
35 from yardstick.network_services.vnf_generic.vnf.tg_ping import PingResourceHelper
36 from yardstick.network_services.vnf_generic.vnf.tg_ping import PingSetupEnvHelper
37 from yardstick.network_services.vnf_generic.vnf.vnf_ssh_helper import VnfSshHelper
40 class TestPingResourceHelper(unittest.TestCase):
41 def test___init__(self):
42 setup_helper = mock.Mock()
43 helper = PingResourceHelper(setup_helper)
45 self.assertIsInstance(helper._queue, multiprocessing.queues.Queue)
46 self.assertIsInstance(helper._parser, PingParser)
48 def test_run_traffic(self):
49 setup_helper = mock.Mock()
50 traffic_profile = mock.Mock()
51 traffic_profile.params = {
57 helper = PingResourceHelper(setup_helper)
58 helper.cmd_kwargs = {'target_ip': '10.0.0.2',
59 'local_ip': '10.0.0.1',
60 'local_if_name': 'eth0',
62 helper.ssh_helper = mock.Mock()
63 helper.run_traffic(traffic_profile)
64 helper.ssh_helper.run.called_with('ping-s 64 10.0.0.2')
67 class TestPingParser(unittest.TestCase):
68 def test___init__(self):
70 ping_parser = PingParser(q_out)
71 self.assertIsNotNone(ping_parser.queue)
75 64 bytes from 10.102.22.93: icmp_seq=3 ttl=64 time=0.296 ms
78 ping_parser = PingParser(q_out)
79 ping_parser.write(sample_out)
81 self.assertTrue(q_out.empty())
85 ping_parser = PingParser(q_out)
86 self.assertIsNone(ping_parser.close())
90 64 bytes from 10.102.22.93: icmp_seq=3 ttl=64 time=0.296 ms
93 ping_parser = PingParser(q_out)
94 ping_parser.write(sample_out)
96 self.assertEqual({"packets_received": 3.0, "rtt": 0.296}, q_out.get())
99 class TestPingTrafficGen(unittest.TestCase):
101 'virtual-interface': {
102 'dst_mac': '00:00:00:00:00:04',
103 'vpci': '0000:05:00.0',
104 'local_ip': u'152.16.100.19',
105 'type': 'PCI-PASSTHROUGH',
106 'netmask': '255.255.255.0',
107 'bandwidth': '10 Gbps',
109 'dst_ip': u'152.16.100.20',
110 'local_iface_name': 'xe0',
111 'local_mac': '00:00:00:00:00:02',
113 'vnfd-connection-point-ref': 'xe0',
118 'virtual-interface': {
119 'dst_mac': '00:00:00:00:00:03',
120 'vpci': '0000:05:00.1',
121 'local_ip': u'152.16.40.19',
122 'type': 'PCI-PASSTHROUGH',
124 'netmask': '255.255.255.0',
125 'bandwidth': '10 Gbps',
126 'dst_ip': u'152.16.40.20',
127 'local_iface_name': 'xe1',
128 'local_mac': '00:00:00:00:00:01',
130 'vnfd-connection-point-ref': 'xe1',
134 VNFD_0_EXT_IF_LIST = [
140 'short-name': 'VpeVnf',
145 'network': u'152.16.100.20',
146 'netmask': u'255.255.255.0',
147 'gateway': u'152.16.100.20',
151 'network': u'152.16.40.20',
152 'netmask': u'255.255.255.0',
153 'gateway': u'152.16.40.20',
157 'description': 'VPE approximation using DPDK',
158 'name': 'vpevnf-baremetal',
161 'network': '0064:ff9b:0:0:0:0:9810:6414',
163 'gateway': '0064:ff9b:0:0:0:0:9810:6414',
167 'network': '0064:ff9b:0:0:0:0:9810:2814',
169 'gateway': '0064:ff9b:0:0:0:0:9810:2814',
173 'id': 'vpevnf-baremetal',
174 'external-interface': VNFD_0_EXT_IF_LIST,
177 'description': 'Vpe approximation using DPDK',
179 'vdu-id': 'vpevnf-baremetal',
192 'connection-point': [
202 'id': 'VpeApproxVnf',
207 'vnfd:vnfd-catalog': {
215 "schema": "isb:traffic_profile:0.1",
217 "description": "Fixed traffic profile to run UDP traffic",
219 "traffic_type": "FixedTraffic",
220 "frame_rate": 100, # pps
227 'target_ip': u'152.16.100.20',
228 'local_ip': u'152.16.100.19',
229 'local_if_name': u'xe0_fake',
232 @mock.patch("yardstick.ssh.SSH")
233 def test___init__(self, ssh):
234 ssh.from_node.return_value.execute.return_value = 0, "success", ""
235 ping_traffic_gen = PingTrafficGen('vnf1', self.VNFD_0)
237 self.assertIsInstance(ping_traffic_gen.setup_helper, PingSetupEnvHelper)
238 self.assertIsInstance(ping_traffic_gen.resource_helper, PingResourceHelper)
239 self.assertEqual(ping_traffic_gen._result, {})
241 @mock.patch("yardstick.ssh.SSH")
242 def test__bind_device_kernel_with_failure(self, ssh):
245 execute_result_data = [
246 (1, 'bad stdout messages', 'error messages'),
248 (0, 'if_name_1', ''),
249 (0, 'if_name_2', ''),
251 ssh.from_node.return_value.execute.side_effect = iter(execute_result_data)
252 ping_traffic_gen = PingTrafficGen('vnf1', self.VNFD_0)
253 ext_ifs = ping_traffic_gen.vnfd_helper.interfaces
254 self.assertNotEqual(ext_ifs[0]['virtual-interface']['local_iface_name'], 'if_name_1')
255 self.assertNotEqual(ext_ifs[1]['virtual-interface']['local_iface_name'], 'if_name_2')
257 @mock.patch("yardstick.ssh.SSH")
258 def test_collect_kpi(self, ssh):
259 mock_ssh(ssh, exec_result=(0, "success", ""))
260 ping_traffic_gen = PingTrafficGen('vnf1', self.VNFD_0)
261 ping_traffic_gen._queue = Queue()
262 ping_traffic_gen._queue.put({})
263 ping_traffic_gen.collect_kpi()
264 self.assertEqual(ping_traffic_gen._result, {})
266 @mock.patch(SSH_HELPER)
267 def test_instantiate(self, ssh):
268 mock_ssh(ssh, spec=VnfSshHelper, exec_result=(0, "success", ""))
269 ping_traffic_gen = PingTrafficGen('vnf1', self.VNFD_0)
270 ping_traffic_gen.setup_helper.ssh_helper = mock.MagicMock(
271 **{"execute.return_value": (0, "xe0_fake", "")})
272 self.assertIsInstance(ping_traffic_gen.ssh_helper, mock.Mock)
273 self.assertEqual(ping_traffic_gen._result, {})
275 self.assertIsNone(ping_traffic_gen.instantiate({}, {}))
278 ping_traffic_gen.vnfd_helper.interfaces[0]['virtual-interface']['local_iface_name'],
280 self.assertEqual(self.CMD_KWARGS, ping_traffic_gen.resource_helper.cmd_kwargs)
281 self.assertIsNotNone(ping_traffic_gen._result)
283 def test_listen_traffic(self):
284 ping_traffic_gen = PingTrafficGen('vnf1', self.VNFD_0)
285 self.assertIsNone(ping_traffic_gen.listen_traffic({}))
287 @mock.patch("yardstick.ssh.SSH")
288 def test_terminate(self, ssh):
289 ssh.from_node.return_value.execute.return_value = 0, "success", ""
290 ssh.from_node.return_value.run.return_value = 0, "success", ""
292 ping_traffic_gen = PingTrafficGen('vnf1', self.VNFD_0)
293 self.assertIsNone(ping_traffic_gen.terminate())