3 # Copyright (c) 2018 Orange and others.
5 # All rights reserved. This program and the accompanying materials
6 # are made available under the terms of the Apache License, Version 2.0
7 # which accompanies this distribution, and is available at
8 # http://www.apache.org/licenses/LICENSE-2.0
10 # pylint: disable=missing-docstring
15 from paramiko import ssh_exception
20 from functest.opnfv_tests.openstack.vping import vping_ssh
21 from functest.utils import config
24 class VpingSSHTesting(unittest.TestCase):
27 with mock.patch('functest.core.singlevm.SingleVm2.__init__'):
28 self.vping = vping_ssh.VPingSSH()
29 self.vping.cloud = mock.Mock()
30 self.vping.case_name = 'vping'
33 @mock.patch('functest.core.singlevm.SingleVm2.prepare',
34 side_effect=Exception)
35 def test_prepare_exc1(self, *args):
36 with self.assertRaises(Exception):
38 args[0].assert_called_once_with()
40 @mock.patch('functest.opnfv_tests.openstack.vping.vping_ssh.VPingSSH.'
42 side_effect=Exception)
43 @mock.patch('functest.core.singlevm.SingleVm2.prepare')
44 def test_prepare_exc2(self, *args):
45 self.vping.sec = munch.Munch(id='foo')
46 with self.assertRaises(Exception):
48 args[0].assert_called_once_with()
49 args[1].assert_called_once_with(
50 '{}-vm2_{}'.format(self.vping.case_name, self.vping.guid),
51 security_groups=[self.vping.sec.id])
53 @mock.patch('functest.opnfv_tests.openstack.vping.vping_ssh.VPingSSH.'
55 @mock.patch('functest.core.singlevm.SingleVm2.prepare')
56 def test_prepare(self, *args):
57 self.vping.sec = munch.Munch(id='foo')
59 args[0].assert_called_once_with()
60 args[1].assert_called_once_with(
61 '{}-vm2_{}'.format(self.vping.case_name, self.vping.guid),
62 security_groups=[self.vping.sec.id])
64 @mock.patch('functest.opnfv_tests.openstack.vping.vping_ssh.VPingSSH.'
65 'check_regex_in_console', return_value=True)
66 def test_execute_exc(self, *args):
67 self.vping.vm2 = munch.Munch(private_v4='127.0.0.1', name='foo')
68 self.vping.ssh = mock.Mock()
69 self.vping.ssh.exec_command.side_effect = ssh_exception.SSHException
70 with self.assertRaises(ssh_exception.SSHException):
72 self.vping.ssh.exec_command.assert_called_once_with(
73 'ping -c 1 {}'.format(self.vping.vm2.private_v4))
74 args[0].assert_called_once_with('foo')
76 @mock.patch('functest.opnfv_tests.openstack.vping.vping_ssh.VPingSSH.'
77 'check_regex_in_console', return_value=False)
78 def test_execute_exc2(self, *args):
79 self.vping.vm2 = munch.Munch(private_v4='127.0.0.1', name='foo')
80 self.vping.ssh = mock.Mock()
82 self.vping.ssh.exec_command.assert_not_called()
83 args[0].assert_called_once_with('foo')
85 def _test_execute(self, ret=0):
86 self.vping.vm2 = munch.Munch(private_v4='127.0.0.1', name='foo')
87 self.vping.ssh = mock.Mock()
89 stdout.channel.recv_exit_status.return_value = ret
90 self.vping.ssh.exec_command.return_value = (None, stdout, mock.Mock())
92 'functest.opnfv_tests.openstack.vping.vping_ssh.VPingSSH.'
93 'check_regex_in_console', return_value=True) as mock_check:
94 self.assertEqual(self.vping.execute(), ret)
95 mock_check.assert_called_once_with('foo')
96 self.vping.ssh.exec_command.assert_called_once_with(
97 'ping -c 1 {}'.format(self.vping.vm2.private_v4))
99 def test_execute1(self):
102 def test_execute2(self):
103 self._test_execute(1)
105 def test_clean_exc1(self):
106 self.vping.cloud = None
107 with self.assertRaises(AssertionError):
110 def test_clean_exc2(self):
111 self.vping.vm2 = munch.Munch(id='vm2')
112 mdelete_server = self.vping.cloud.delete_server
113 mdelete_server.side_effect = shade.OpenStackCloudException(None)
114 with self.assertRaises(shade.OpenStackCloudException):
117 @mock.patch('functest.core.singlevm.SingleVm2.clean',
118 side_effect=Exception)
119 def test_clean_exc3(self, *args):
120 self.vping.vm2 = munch.Munch(id='vm2')
121 with self.assertRaises(Exception):
123 self.vping.cloud.delete_server.assert_called_once_with(
124 self.vping.vm2, wait=True,
125 timeout=getattr(config.CONF, 'vping_vm_delete_timeout'))
126 args[0].assert_called_once_with()
128 @mock.patch('functest.core.singlevm.SingleVm2.clean')
129 def test_clean1(self, *args):
130 self.vping.vm2 = None
132 self.vping.cloud.delete_server.assert_not_called()
133 args[0].assert_called_once_with()
135 @mock.patch('functest.core.singlevm.SingleVm2.clean')
136 def test_clean2(self, *args):
137 self.vping.vm2 = munch.Munch(id='vm2')
139 self.vping.cloud.delete_server.assert_called_once_with(
140 self.vping.vm2, wait=True,
141 timeout=getattr(config.CONF, 'vping_vm_delete_timeout'))
142 args[0].assert_called_once_with()
145 if __name__ == '__main__':
146 logging.disable(logging.CRITICAL)
147 unittest.main(verbosity=2)