Merge "Cover vping_ssh"
[functest.git] / functest / tests / unit / openstack / vping / test_vping_ssh.py
1 #!/usr/bin/env python
2
3 # Copyright (c) 2018 Orange and others.
4 #
5 # All rights reserved. This program and the accompanying materials
6 # are made available under the terms of the Apache License, Version 2.0
7 # which accompanies this distribution, and is available at
8 # http://www.apache.org/licenses/LICENSE-2.0
9
10 # pylint: disable=missing-docstring
11
12 import logging
13 import unittest
14
15 from paramiko import ssh_exception
16 import mock
17 import munch
18 import shade
19
20 from functest.opnfv_tests.openstack.vping import vping_ssh
21 from functest.utils import config
22
23
24 class VpingSSHTesting(unittest.TestCase):
25
26     def setUp(self):
27         with mock.patch('functest.core.singlevm.SingleVm2.__init__'):
28             self.vping = vping_ssh.VPingSSH()
29             self.vping.cloud = mock.Mock()
30             self.vping.case_name = 'vping'
31             self.vping.guid = '1'
32
33     @mock.patch('functest.core.singlevm.SingleVm2.prepare',
34                 side_effect=Exception)
35     def test_prepare_exc1(self, *args):
36         with self.assertRaises(Exception):
37             self.vping.prepare()
38         args[0].assert_called_once_with()
39
40     @mock.patch('functest.opnfv_tests.openstack.vping.vping_ssh.VPingSSH.'
41                 'boot_vm',
42                 side_effect=Exception)
43     @mock.patch('functest.core.singlevm.SingleVm2.prepare')
44     def test_prepare_exc2(self, *args):
45         self.vping.sec = munch.Munch(id='foo')
46         with self.assertRaises(Exception):
47             self.vping.prepare()
48         args[0].assert_called_once_with()
49         args[1].assert_called_once_with(
50             '{}-vm2_{}'.format(self.vping.case_name, self.vping.guid),
51             security_groups=[self.vping.sec.id])
52
53     @mock.patch('functest.opnfv_tests.openstack.vping.vping_ssh.VPingSSH.'
54                 'boot_vm')
55     @mock.patch('functest.core.singlevm.SingleVm2.prepare')
56     def test_prepare(self, *args):
57         self.vping.sec = munch.Munch(id='foo')
58         self.vping.prepare()
59         args[0].assert_called_once_with()
60         args[1].assert_called_once_with(
61             '{}-vm2_{}'.format(self.vping.case_name, self.vping.guid),
62             security_groups=[self.vping.sec.id])
63
64     def test_execute_exc(self):
65         self.vping.vm2 = munch.Munch(private_v4='127.0.0.1')
66         self.vping.ssh = mock.Mock()
67         self.vping.ssh.exec_command.side_effect = ssh_exception.SSHException
68         with self.assertRaises(ssh_exception.SSHException):
69             self.vping.execute()
70         self.vping.ssh.exec_command.assert_called_once_with(
71             'ping -c 1 {}'.format(self.vping.vm2.private_v4))
72
73     def _test_execute(self, ret=0):
74         self.vping.vm2 = munch.Munch(private_v4='127.0.0.1')
75         self.vping.ssh = mock.Mock()
76         stdout = mock.Mock()
77         stdout.channel.recv_exit_status.return_value = ret
78         self.vping.ssh.exec_command.return_value = (None, stdout, None)
79         self.assertEqual(self.vping.execute(), ret)
80         self.vping.ssh.exec_command.assert_called_once_with(
81             'ping -c 1 {}'.format(self.vping.vm2.private_v4))
82
83     def test_execute1(self):
84         self._test_execute()
85
86     def test_execute2(self):
87         self._test_execute(1)
88
89     def test_clean_exc1(self):
90         self.vping.cloud = None
91         with self.assertRaises(AssertionError):
92             self.vping.clean()
93
94     def test_clean_exc2(self):
95         mdelete_server = self.vping.cloud.delete_server
96         mdelete_server.side_effect = shade.OpenStackCloudException(None)
97         with self.assertRaises(shade.OpenStackCloudException):
98             self.vping.clean()
99
100     @mock.patch('functest.core.singlevm.SingleVm2.clean',
101                 side_effect=Exception)
102     def test_clean_exc3(self, *args):
103         self.vping.vm2 = munch.Munch()
104         with self.assertRaises(Exception):
105             self.vping.clean()
106         self.vping.cloud.delete_server.assert_called_once_with(
107             self.vping.vm2, wait=True,
108             timeout=getattr(config.CONF, 'vping_vm_delete_timeout'))
109         args[0].assert_called_once_with()
110
111     @mock.patch('functest.core.singlevm.SingleVm2.clean')
112     def test_clean(self, *args):
113         self.vping.vm2 = munch.Munch()
114         self.vping.clean()
115         self.vping.cloud.delete_server.assert_called_once_with(
116             self.vping.vm2, wait=True,
117             timeout=getattr(config.CONF, 'vping_vm_delete_timeout'))
118         args[0].assert_called_once_with()
119
120
121 if __name__ == '__main__':
122     logging.disable(logging.CRITICAL)
123     unittest.main(verbosity=2)