3 # Copyright (c) 2016-2017 Intel Corporation
5 # Licensed under the Apache License, Version 2.0 (the "License");
6 # you may not use this file except in compliance with the License.
7 # You may obtain a copy of the License at
9 # http://www.apache.org/licenses/LICENSE-2.0
11 # Unless required by applicable law or agreed to in writing, software
12 # distributed under the License is distributed on an "AS IS" BASIS,
13 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 # See the License for the specific language governing permissions and
15 # limitations under the License.
18 from __future__ import absolute_import
22 from multiprocessing import Queue
23 import multiprocessing
25 from tests.unit.network_services.vnf_generic.vnf.test_base import mock_ssh
26 from tests.unit import STL_MOCKS
28 SSH_HELPER = "yardstick.network_services.vnf_generic.vnf.sample_vnf.VnfSshHelper"
30 STLClient = mock.MagicMock()
31 stl_patch = mock.patch.dict("sys.modules", STL_MOCKS)
35 from yardstick.network_services.vnf_generic.vnf.tg_ping import PingParser
36 from yardstick.network_services.vnf_generic.vnf.tg_ping import PingTrafficGen
37 from yardstick.network_services.vnf_generic.vnf.tg_ping import PingResourceHelper
38 from yardstick.network_services.vnf_generic.vnf.tg_ping import PingSetupEnvHelper
39 from yardstick.network_services.vnf_generic.vnf.vnf_ssh_helper import VnfSshHelper
42 class TestPingResourceHelper(unittest.TestCase):
43 def test___init__(self):
44 setup_helper = mock.Mock()
45 helper = PingResourceHelper(setup_helper)
47 self.assertIsInstance(helper._queue, multiprocessing.queues.Queue)
48 self.assertIsInstance(helper._parser, PingParser)
50 def test_run_traffic(self):
51 setup_helper = mock.Mock()
52 traffic_profile = mock.Mock()
53 traffic_profile.params = {
59 helper = PingResourceHelper(setup_helper)
60 helper.cmd_kwargs = {'target_ip': '10.0.0.2',
61 'local_ip': '10.0.0.1',
62 'local_if_name': 'eth0',
64 helper.ssh_helper = mock.Mock()
65 helper.run_traffic(traffic_profile)
66 helper.ssh_helper.run.called_with('ping-s 64 10.0.0.2')
69 class TestPingParser(unittest.TestCase):
70 def test___init__(self):
72 ping_parser = PingParser(q_out)
73 self.assertIsNotNone(ping_parser.queue)
77 64 bytes from 10.102.22.93: icmp_seq=3 ttl=64 time=0.296 ms
80 ping_parser = PingParser(q_out)
81 ping_parser.write(sample_out)
83 self.assertTrue(q_out.empty())
87 ping_parser = PingParser(q_out)
88 self.assertIsNone(ping_parser.close())
92 64 bytes from 10.102.22.93: icmp_seq=3 ttl=64 time=0.296 ms
95 ping_parser = PingParser(q_out)
96 ping_parser.write(sample_out)
98 self.assertEqual({"packets_received": 3.0, "rtt": 0.296}, q_out.get())
101 class TestPingTrafficGen(unittest.TestCase):
103 'virtual-interface': {
104 'dst_mac': '00:00:00:00:00:04',
105 'vpci': '0000:05:00.0',
106 'local_ip': u'152.16.100.19',
107 'type': 'PCI-PASSTHROUGH',
108 'netmask': '255.255.255.0',
109 'bandwidth': '10 Gbps',
111 'dst_ip': u'152.16.100.20',
112 'local_iface_name': 'xe0',
113 'local_mac': '00:00:00:00:00:02',
115 'vnfd-connection-point-ref': 'xe0',
120 'virtual-interface': {
121 'dst_mac': '00:00:00:00:00:03',
122 'vpci': '0000:05:00.1',
123 'local_ip': u'152.16.40.19',
124 'type': 'PCI-PASSTHROUGH',
126 'netmask': '255.255.255.0',
127 'bandwidth': '10 Gbps',
128 'dst_ip': u'152.16.40.20',
129 'local_iface_name': 'xe1',
130 'local_mac': '00:00:00:00:00:01',
132 'vnfd-connection-point-ref': 'xe1',
136 VNFD_0_EXT_IF_LIST = [
142 'short-name': 'VpeVnf',
147 'network': u'152.16.100.20',
148 'netmask': u'255.255.255.0',
149 'gateway': u'152.16.100.20',
153 'network': u'152.16.40.20',
154 'netmask': u'255.255.255.0',
155 'gateway': u'152.16.40.20',
159 'description': 'VPE approximation using DPDK',
160 'name': 'vpevnf-baremetal',
163 'network': '0064:ff9b:0:0:0:0:9810:6414',
165 'gateway': '0064:ff9b:0:0:0:0:9810:6414',
169 'network': '0064:ff9b:0:0:0:0:9810:2814',
171 'gateway': '0064:ff9b:0:0:0:0:9810:2814',
175 'id': 'vpevnf-baremetal',
176 'external-interface': VNFD_0_EXT_IF_LIST,
179 'description': 'Vpe approximation using DPDK',
181 'vdu-id': 'vpevnf-baremetal',
194 'connection-point': [
204 'id': 'VpeApproxVnf',
209 'vnfd:vnfd-catalog': {
217 "schema": "isb:traffic_profile:0.1",
219 "description": "Fixed traffic profile to run UDP traffic",
221 "traffic_type": "FixedTraffic",
222 "frame_rate": 100, # pps
229 'target_ip': u'152.16.100.20',
230 'local_ip': u'152.16.100.19',
231 'local_if_name': u'xe0_fake',
234 @mock.patch("yardstick.ssh.SSH")
235 def test___init__(self, ssh):
236 ssh.from_node.return_value.execute.return_value = 0, "success", ""
237 ping_traffic_gen = PingTrafficGen('vnf1', self.VNFD_0)
239 self.assertIsInstance(ping_traffic_gen.setup_helper, PingSetupEnvHelper)
240 self.assertIsInstance(ping_traffic_gen.resource_helper, PingResourceHelper)
241 self.assertEqual(ping_traffic_gen._result, {})
243 @mock.patch("yardstick.ssh.SSH")
244 def test__bind_device_kernel_with_failure(self, ssh):
247 execute_result_data = [
248 (1, 'bad stdout messages', 'error messages'),
250 (0, 'if_name_1', ''),
251 (0, 'if_name_2', ''),
253 ssh.from_node.return_value.execute.side_effect = iter(execute_result_data)
254 ping_traffic_gen = PingTrafficGen('vnf1', self.VNFD_0)
255 ext_ifs = ping_traffic_gen.vnfd_helper.interfaces
256 self.assertNotEqual(ext_ifs[0]['virtual-interface']['local_iface_name'], 'if_name_1')
257 self.assertNotEqual(ext_ifs[1]['virtual-interface']['local_iface_name'], 'if_name_2')
259 @mock.patch("yardstick.ssh.SSH")
260 def test_collect_kpi(self, ssh):
261 mock_ssh(ssh, exec_result=(0, "success", ""))
262 ping_traffic_gen = PingTrafficGen('vnf1', self.VNFD_0)
263 ping_traffic_gen._queue = Queue()
264 ping_traffic_gen._queue.put({})
265 ping_traffic_gen.collect_kpi()
266 self.assertEqual(ping_traffic_gen._result, {})
268 @mock.patch(SSH_HELPER)
269 def test_instantiate(self, ssh):
270 mock_ssh(ssh, spec=VnfSshHelper, exec_result=(0, "success", ""))
271 ping_traffic_gen = PingTrafficGen('vnf1', self.VNFD_0)
272 ping_traffic_gen.setup_helper.ssh_helper = mock.MagicMock(
273 **{"execute.return_value": (0, "xe0_fake", "")})
274 self.assertIsInstance(ping_traffic_gen.ssh_helper, mock.Mock)
275 self.assertEqual(ping_traffic_gen._result, {})
277 self.assertIsNone(ping_traffic_gen.instantiate({}, {}))
280 ping_traffic_gen.vnfd_helper.interfaces[0]['virtual-interface']['local_iface_name'],
282 self.assertEqual(self.CMD_KWARGS, ping_traffic_gen.resource_helper.cmd_kwargs)
283 self.assertIsNotNone(ping_traffic_gen._result)
285 def test_listen_traffic(self):
286 ping_traffic_gen = PingTrafficGen('vnf1', self.VNFD_0)
287 self.assertIsNone(ping_traffic_gen.listen_traffic({}))
289 @mock.patch("yardstick.ssh.SSH")
290 def test_terminate(self, ssh):
291 ssh.from_node.return_value.execute.return_value = 0, "success", ""
292 ssh.from_node.return_value.run.return_value = 0, "success", ""
294 ping_traffic_gen = PingTrafficGen('vnf1', self.VNFD_0)
295 self.assertIsNone(ping_traffic_gen.terminate())