1 # Copyright (c) 2016-2017 Intel Corporation
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at
7 # http://www.apache.org/licenses/LICENSE-2.0
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
16 # Unittest for yardstick.network_services.vnf_generic.vnf.test_base
18 import multiprocessing
24 from yardstick.network_services.vnf_generic.vnf import base
25 from yardstick.ssh import SSH
28 IP_PIPELINE_CFG_FILE_TPL = ("arp_route_tbl = ({port0_local_ip_hex},"
29 "{port0_netmask_hex},1,{port1_local_ip_hex}) "
30 "({port1_local_ip_hex},{port1_netmask_hex},0,"
31 "{port0_local_ip_hex})")
33 IP_PIPELINE_ND_CFG_FILE_TPL = """
34 nd_route_tbl = ({port1_dst_ip_hex6},"""
35 """{port1_dst_netmask_hex6},1,{port1_dst_ip_hex6})"""
37 _LOCAL_OBJECT = object()
40 'short-name': 'VpeVnf',
45 'network': '152.16.100.20',
46 'netmask': '255.255.255.0',
47 'gateway': '152.16.100.20',
51 'network': '152.16.40.20',
52 'netmask': '255.255.255.0',
53 'gateway': '152.16.40.20',
57 'description': 'VPE approximation using DPDK',
58 'name': 'vpevnf-baremetal',
61 'network': '0064:ff9b:0:0:0:0:9810:6414',
63 'gateway': '0064:ff9b:0:0:0:0:9810:6414',
67 'network': '0064:ff9b:0:0:0:0:9810:2814',
69 'gateway': '0064:ff9b:0:0:0:0:9810:2814',
73 'id': 'vpevnf-baremetal',
74 'external-interface': [
76 'virtual-interface': {
77 'dst_mac': '00:00:00:00:00:03',
78 'vpci': '0000:05:00.0',
79 'local_ip': '152.16.100.19',
80 'type': 'PCI-PASSTHROUGH',
81 'netmask': '255.255.255.0',
83 'bandwidth': '10 Gbps',
84 'dst_ip': '152.16.100.20',
85 'local_mac': '00:00:00:00:00:01'
87 'vnfd-connection-point-ref': 'xe0',
91 'virtual-interface': {
92 'dst_mac': '00:00:00:00:00:04',
93 'vpci': '0000:05:00.1',
94 'local_ip': '152.16.40.19',
95 'type': 'PCI-PASSTHROUGH',
96 'netmask': '255.255.255.0',
98 'bandwidth': '10 Gbps',
99 'dst_ip': '152.16.40.20',
100 'local_mac': '00:00:00:00:00:02'
102 'vnfd-connection-point-ref': 'xe1',
108 'description': 'Vpe approximation using DPDK',
110 'vdu-id': 'vpevnf-baremetal',
123 'connection-point': [
133 'id': 'VpeApproxVnf', 'name': 'VPEVnfSsh'
137 'vnfd:vnfd-catalog': {
145 class FileAbsPath(object):
146 def __init__(self, module_file):
147 super(FileAbsPath, self).__init__()
148 self.module_path = os.path.dirname(os.path.abspath(module_file))
150 def get_path(self, filename):
151 file_path = os.path.join(self.module_path, filename)
155 def mock_ssh(mock_ssh_type, spec=None, exec_result=_LOCAL_OBJECT, run_result=_LOCAL_OBJECT):
159 if exec_result is _LOCAL_OBJECT:
160 exec_result = 0, "", ""
162 if run_result is _LOCAL_OBJECT:
163 run_result = 0, "", ""
165 mock_ssh_instance = mock.Mock(autospec=spec)
166 mock_ssh_instance._get_client.return_value = mock.Mock()
167 mock_ssh_instance.execute.return_value = exec_result
168 mock_ssh_instance.run.return_value = run_result
169 mock_ssh_type.from_node.return_value = mock_ssh_instance
170 return mock_ssh_instance
173 class TestQueueFileWrapper(unittest.TestCase):
175 self.prompt = "pipeline>"
176 self.q_in = multiprocessing.Queue()
177 self.q_out = multiprocessing.Queue()
179 def test___init__(self):
180 queue_file_wrapper = \
181 base.QueueFileWrapper(self.q_in, self.q_out, self.prompt)
182 self.assertEqual(queue_file_wrapper.prompt, self.prompt)
184 def test_clear(self):
185 queue_file_wrapper = \
186 base.QueueFileWrapper(self.q_in, self.q_out, self.prompt)
187 queue_file_wrapper.bufsize = 5
188 queue_file_wrapper.write("pipeline>")
189 queue_file_wrapper.close()
190 self.assertIsNone(queue_file_wrapper.clear())
191 self.assertIsNotNone(queue_file_wrapper.q_out.empty())
193 def test_close(self):
194 queue_file_wrapper = \
195 base.QueueFileWrapper(self.q_in, self.q_out, self.prompt)
196 self.assertIsNone(queue_file_wrapper.close())
199 queue_file_wrapper = \
200 base.QueueFileWrapper(self.q_in, self.q_out, self.prompt)
201 queue_file_wrapper.q_in.put("pipeline>")
202 self.assertEqual("pipeline>", queue_file_wrapper.read(20))
204 def test_write(self):
205 queue_file_wrapper = \
206 base.QueueFileWrapper(self.q_in, self.q_out, self.prompt)
207 queue_file_wrapper.write("pipeline>")
208 self.assertIsNotNone(queue_file_wrapper.q_out.empty())
211 class TestGenericVNF(unittest.TestCase):
213 def test_definition(self):
214 """Make sure that the abstract class cannot be instantiated"""
215 with self.assertRaises(TypeError) as exc:
216 # pylint: disable=abstract-class-instantiated
217 base.GenericVNF('vnf1', VNFD['vnfd:vnfd-catalog']['vnfd'][0])
218 msg = ("Can't instantiate abstract class GenericVNF with abstract "
219 "methods collect_kpi, instantiate, scale, terminate, "
220 "wait_for_instantiate")
221 self.assertEqual(msg, str(exc.exception))
224 class TestGenericTrafficGen(unittest.TestCase):
226 def test_definition(self):
227 """Make sure that the abstract class cannot be instantiated"""
228 vnfd = VNFD['vnfd:vnfd-catalog']['vnfd'][0]
230 with self.assertRaises(TypeError) as exc:
231 # pylint: disable=abstract-class-instantiated
232 base.GenericTrafficGen(name, vnfd)
233 msg = ("Can't instantiate abstract class GenericTrafficGen with "
234 "abstract methods collect_kpi, instantiate, run_traffic, "
236 self.assertEqual(msg, str(exc.exception))