Migrates clean to python
[apex.git] / apex / tests / test_apex_network_jumphost.py
1 ##############################################################################
2 # Copyright (c) 2016 Dan Radez (Red Hat)
3 #
4 # All rights reserved. This program and the accompanying materials
5 # are made available under the terms of the Apache License, Version 2.0
6 # which accompanies this distribution, and is available at
7 # http://www.apache.org/licenses/LICENSE-2.0
8 ##############################################################################
9
10 import os
11 import shutil
12 import subprocess
13
14 from apex import NetworkSettings
15 from apex.tests import constants as con
16 from apex.common import constants as apex_constants
17 from apex.network import jumphost
18 from apex.common.exceptions import JumpHostNetworkException
19 from ipaddress import IPv4Interface
20 from mock import patch
21 from nose.tools import (
22     assert_is_instance,
23     assert_dict_equal,
24     assert_raises,
25     assert_true,
26     assert_false
27 )
28
29
30 def bridge_show_output(*args, **kwargs):
31     return b"""
32     b6f1b54a-b8ba-4e86-9c5b-733ab71b5712
33     Bridge br-admin
34         Port br-admin
35             Interface br-admin
36                 type: internal
37     ovs_version: "2.5.0"
38 """
39
40
41 def bridge_port_list(*args, **kwargs):
42     return b"""
43 enp6s0
44 vnet1
45 """
46
47
48 def subprocess_exception(*args, **kwargs):
49     raise subprocess.CalledProcessError(returncode=2, cmd='dummy')
50
51
52 class TestNetworkJumpHost:
53     @classmethod
54     def setup_class(cls):
55         """This method is run once for each class before any tests are run"""
56
57     @classmethod
58     def teardown_class(cls):
59         """This method is run once for each class _after_ all tests are run"""
60
61     def setup(self):
62         """This method is run once before _each_ test method is executed"""
63
64     def teardown(self):
65         """This method is run once after _each_ test method is executed"""
66
67     @patch('subprocess.check_output', side_effect=bridge_show_output)
68     def test_is_ovs_bridge(self, bridge_output_function):
69         assert_true(jumphost.is_ovs_bridge('br-admin'))
70         assert_false(jumphost.is_ovs_bridge('br-blah'))
71
72     @patch('subprocess.check_output', side_effect=bridge_port_list)
73     def test_dump_ovs_ports(self, bridge_function):
74         output = jumphost.dump_ovs_ports('br-admin')
75         assert_is_instance(output, list)
76         assert 'enp6s0' in output
77
78     def test_generate_ifcfg_params(self):
79         output = jumphost.generate_ifcfg_params(
80             os.path.join(con.TEST_DUMMY_CONFIG, 'ifcfg-br-external'),
81             apex_constants.EXTERNAL_NETWORK)
82         assert_is_instance(output, dict)
83         assert output['IPADDR'] == '172.30.9.66'
84         assert output['PEERDNS'] == 'no'
85
86     def test_negative_generate_ifcfg_params(self):
87         assert_raises(JumpHostNetworkException, jumphost.generate_ifcfg_params,
88                       os.path.join(con.TEST_DUMMY_CONFIG,
89                                    'bad_ifcfg-br-external'),
90                       apex_constants.EXTERNAL_NETWORK)
91
92     @patch('subprocess.check_call')
93     @patch('apex.network.ip_utils.get_interface', return_value=IPv4Interface(
94         '10.10.10.2'))
95     def test_configure_bridges_ip_exists(self, interface_function,
96                                          subprocess_func):
97         ns = NetworkSettings(os.path.join(con.TEST_CONFIG_DIR,
98                                           'network', 'network_settings.yaml'))
99         assert jumphost.configure_bridges(ns) is None
100
101     @patch('subprocess.check_call')
102     @patch('apex.network.ip_utils.get_interface', return_value=None)
103     def test_configure_bridges_no_ip(self, interface_function,
104                                      subprocess_func):
105         ns = NetworkSettings(os.path.join(con.TEST_CONFIG_DIR,
106                                           'network', 'network_settings.yaml'))
107         assert jumphost.configure_bridges(ns) is None
108
109     @patch('subprocess.check_call', side_effect=subprocess_exception)
110     @patch('apex.network.ip_utils.get_interface', return_value=None)
111     def test_negative_configure_bridges(self, interface_function,
112                                         subprocess_func):
113         ns = NetworkSettings(os.path.join(con.TEST_CONFIG_DIR,
114                                           'network', 'network_settings.yaml'))
115         assert_raises(subprocess.CalledProcessError,
116                       jumphost.configure_bridges, ns)
117
118     @patch('subprocess.check_call')
119     @patch('apex.network.jumphost.is_ovs_bridge', return_value=True)
120     @patch('apex.network.jumphost.dump_ovs_ports', return_value=[])
121     def test_attach_interface(self, dump_ports_func, is_bridge_func,
122                               subprocess_func):
123         ifcfg_dir = con.TEST_DUMMY_CONFIG
124         shutil.copyfile(os.path.join(ifcfg_dir, 'ifcfg-dummy'),
125                         os.path.join(ifcfg_dir, 'ifcfg-enpfakes0'))
126         shutil.copyfile(os.path.join(ifcfg_dir, 'ifcfg-br-dummy'),
127                         os.path.join(ifcfg_dir, 'ifcfg-br-admin'))
128         jumphost.NET_CFG_PATH = ifcfg_dir
129         output = jumphost.attach_interface_to_ovs('br-admin', 'enpfakes0',
130                                                   'admin')
131         assert output is None
132         assert os.path.isfile(os.path.join(ifcfg_dir, 'ifcfg-enpfakes0'))
133         assert os.path.isfile(os.path.join(ifcfg_dir, 'ifcfg-br-admin'))
134         assert os.path.isfile(os.path.join(ifcfg_dir, 'ifcfg-enpfakes0.orig'))
135
136         for ifcfg in ('ifcfg-enpfakes0', 'ifcfg-enpfakes0.orig',
137                       'ifcfg-br-admin'):
138             ifcfg_path = os.path.join(ifcfg_dir, ifcfg)
139             if os.path.isfile(ifcfg_path):
140                 os.remove(ifcfg_path)
141
142     @patch('subprocess.check_call')
143     @patch('apex.network.jumphost.is_ovs_bridge', return_value=True)
144     @patch('apex.network.jumphost.dump_ovs_ports', return_value=['dummy_int'])
145     def test_already_attached_interface(self, dump_ports_func, is_bridge_func,
146                                         subprocess_func):
147         output = jumphost.attach_interface_to_ovs('br-dummy', 'dummy_int',
148                                                   'admin')
149         assert output is None
150
151     @patch('subprocess.check_call')
152     @patch('apex.network.jumphost.is_ovs_bridge', return_value=True)
153     @patch('apex.network.jumphost.dump_ovs_ports', return_value=[])
154     def test_negative_attach_interface(self, dump_ports_func, is_bridge_func,
155                                        subprocess_func):
156         ifcfg_dir = con.TEST_DUMMY_CONFIG
157         jumphost.NET_CFG_PATH = ifcfg_dir
158         assert_raises(FileNotFoundError, jumphost.attach_interface_to_ovs,
159                       'br-dummy', 'dummy_int', 'admin')
160
161     @patch('subprocess.check_call', side_effect=subprocess_exception)
162     @patch('apex.network.jumphost.is_ovs_bridge', return_value=True)
163     @patch('apex.network.jumphost.dump_ovs_ports', return_value=[])
164     def test_negative_attach_interface_process_error(
165             self, dump_ports_func, is_bridge_func, subprocess_func):
166         ifcfg_dir = con.TEST_DUMMY_CONFIG
167         shutil.copyfile(os.path.join(ifcfg_dir, 'ifcfg-dummy'),
168                         os.path.join(ifcfg_dir, 'ifcfg-enpfakes0'))
169         shutil.copyfile(os.path.join(ifcfg_dir, 'ifcfg-br-dummy'),
170                         os.path.join(ifcfg_dir, 'ifcfg-br-admin'))
171         jumphost.NET_CFG_PATH = ifcfg_dir
172         assert_raises(subprocess.CalledProcessError,
173                       jumphost.attach_interface_to_ovs,
174                       'br-admin', 'enpfakes0', 'admin')
175         assert os.path.isfile(os.path.join(ifcfg_dir, 'ifcfg-enpfakes0'))
176         assert os.path.isfile(os.path.join(ifcfg_dir, 'ifcfg-br-admin'))
177         assert os.path.isfile(os.path.join(ifcfg_dir, 'ifcfg-enpfakes0.orig'))
178
179         for ifcfg in ('ifcfg-enpfakes0', 'ifcfg-enpfakes0.orig',
180                       'ifcfg-br-admin'):
181             ifcfg_path = os.path.join(ifcfg_dir, ifcfg)
182             if os.path.isfile(ifcfg_path):
183                 os.remove(ifcfg_path)
184
185     @patch('subprocess.check_call')
186     @patch('apex.network.jumphost.is_ovs_bridge', return_value=True)
187     @patch('apex.network.jumphost.dump_ovs_ports', return_value=['enpfakes0'])
188     def test_detach_interface(self, dump_ports_func, is_bridge_func,
189                               subprocess_func):
190         ifcfg_dir = con.TEST_DUMMY_CONFIG
191         shutil.copyfile(os.path.join(ifcfg_dir, 'ifcfg-br-dummy'),
192                         os.path.join(ifcfg_dir, 'ifcfg-br-admin'))
193         jumphost.NET_CFG_PATH = ifcfg_dir
194         output = jumphost.detach_interface_from_ovs('admin')
195         assert output is None
196         assert os.path.isfile(os.path.join(ifcfg_dir, 'ifcfg-enpfakes0'))
197         assert os.path.isfile(os.path.join(ifcfg_dir, 'ifcfg-br-admin'))
198
199         for ifcfg in ('ifcfg-enpfakes0', 'ifcfg-enpfakes0.orig',
200                       'ifcfg-br-admin'):
201             ifcfg_path = os.path.join(ifcfg_dir, ifcfg)
202             if os.path.isfile(ifcfg_path):
203                 os.remove(ifcfg_path)
204
205     @patch('subprocess.check_call')
206     @patch('apex.network.jumphost.is_ovs_bridge', return_value=False)
207     @patch('apex.network.jumphost.dump_ovs_ports', return_value=[])
208     def test_detach_interface_no_bridge(self, dump_ports_func,
209                                         is_bridge_func, subprocess_func):
210         ifcfg_dir = con.TEST_DUMMY_CONFIG
211         jumphost.NET_CFG_PATH = ifcfg_dir
212         output = jumphost.detach_interface_from_ovs('admin')
213         assert output is None
214
215     @patch('subprocess.check_call')
216     @patch('apex.network.jumphost.is_ovs_bridge', return_value=True)
217     @patch('apex.network.jumphost.dump_ovs_ports', return_value=[])
218     def test_detach_interface_no_int_to_remove(self, dump_ports_func,
219                                                is_bridge_func,
220                                                subprocess_func):
221         ifcfg_dir = con.TEST_DUMMY_CONFIG
222         jumphost.NET_CFG_PATH = ifcfg_dir
223         output = jumphost.detach_interface_from_ovs('admin')
224         assert output is None
225
226     @patch('subprocess.check_call')
227     @patch('apex.network.jumphost.is_ovs_bridge', return_value=True)
228     @patch('apex.network.jumphost.dump_ovs_ports', return_value=['enpfakes0'])
229     def test_negative_detach_interface(self, dump_ports_func, is_bridge_func,
230                                        subprocess_func):
231         ifcfg_dir = con.TEST_DUMMY_CONFIG
232         jumphost.NET_CFG_PATH = ifcfg_dir
233         assert_raises(FileNotFoundError, jumphost.detach_interface_from_ovs,
234                       'admin')
235
236     @patch('subprocess.check_call', side_effect=subprocess_exception)
237     @patch('apex.network.jumphost.is_ovs_bridge', return_value=True)
238     @patch('apex.network.jumphost.dump_ovs_ports', return_value=['enpfakes0'])
239     def test_negative_detach_interface_process_error(
240             self, dump_ports_func, is_bridge_func, subprocess_func):
241         ifcfg_dir = con.TEST_DUMMY_CONFIG
242         shutil.copyfile(os.path.join(ifcfg_dir, 'ifcfg-br-dummy'),
243                         os.path.join(ifcfg_dir, 'ifcfg-br-admin'))
244         jumphost.NET_CFG_PATH = ifcfg_dir
245         assert_raises(subprocess.CalledProcessError,
246                       jumphost.detach_interface_from_ovs, 'admin')
247         assert os.path.isfile(os.path.join(ifcfg_dir, 'ifcfg-enpfakes0'))
248         assert os.path.isfile(os.path.join(ifcfg_dir, 'ifcfg-br-admin'))
249
250         for ifcfg in ('ifcfg-enpfakes0', 'ifcfg-enpfakes0.orig',
251                       'ifcfg-br-admin'):
252             ifcfg_path = os.path.join(ifcfg_dir, ifcfg)
253             if os.path.isfile(ifcfg_path):
254                 os.remove(ifcfg_path)
255
256     @patch('subprocess.check_call')
257     @patch('apex.network.jumphost.is_ovs_bridge', return_value=True)
258     def test_remove_ovs_bridge(self, is_bridge_func, subprocess_func):
259         ifcfg_dir = con.TEST_DUMMY_CONFIG
260         jumphost.NET_CFG_PATH = ifcfg_dir
261         shutil.copyfile(os.path.join(ifcfg_dir, 'ifcfg-br-dummy'),
262                         os.path.join(ifcfg_dir, 'ifcfg-br-admin'))
263         assert jumphost.remove_ovs_bridge(apex_constants.ADMIN_NETWORK) is None
264         assert not os.path.isfile(os.path.join(ifcfg_dir, 'ifcfg-br-admin'))
265
266         # test without file
267         assert jumphost.remove_ovs_bridge(apex_constants.ADMIN_NETWORK) is None
268
269     @patch('subprocess.check_call', side_effect=subprocess_exception)
270     @patch('apex.network.jumphost.is_ovs_bridge', return_value=True)
271     def test_negative_remove_ovs_bridge(self, is_bridge_func, subprocess_func):
272         ifcfg_dir = con.TEST_DUMMY_CONFIG
273         jumphost.NET_CFG_PATH = ifcfg_dir
274         assert_raises(subprocess.CalledProcessError,
275                       jumphost.remove_ovs_bridge,
276                       apex_constants.ADMIN_NETWORK)