Merge "Use environment variable for image properties"
[functest.git] / functest / opnfv_tests / openstack / cinder / cinder_test.py
1 #!/usr/bin/env python
2
3 # Copyright (c) 2018 Enea AB and others
4
5 # This program and the accompanying materials
6 # are made available under the terms of the Apache License, Version 2.0
7 # which accompanies this distribution, and is available at
8 #
9 # http://www.apache.org/licenses/LICENSE-2.0
10
11 """CinderCheck testcase."""
12
13 import logging
14
15 import pkg_resources
16 from scp import SCPClient
17 from xtesting.core import testcase
18
19 from functest.core import singlevm
20 from functest.utils import config
21 from functest.utils import env
22
23
24 class CinderCheck(singlevm.SingleVm2):
25     """
26     CinderCheck testcase implementation.
27
28     Class to execute the CinderCheck test using 2 Floating IPs
29     to connect to the VMs and one data volume
30     """
31     # pylint: disable=too-many-instance-attributes
32     volume_timeout = 60
33
34     def __init__(self, **kwargs):
35         """Initialize testcase."""
36         if "case_name" not in kwargs:
37             kwargs["case_name"] = "cinder_test"
38         super(CinderCheck, self).__init__(**kwargs)
39         self.logger = logging.getLogger(__name__)
40         self.vm2 = None
41         self.fip2 = None
42         self.ssh2 = None
43         self.volume = None
44
45     def execute(self):
46         """Execute CinderCheck testcase.
47
48         Sets up the OpenStack keypair, router, security group, and VM instance
49         objects then validates cinder.
50         :return: the exit code from the super.execute() method
51         """
52         return self._write_data() or self._read_data()
53
54     def prepare(self):
55         super(CinderCheck, self).prepare()
56         self.vm2 = self.boot_vm(
57             '{}-vm2_{}'.format(self.case_name, self.guid),
58             key_name=self.keypair.id,
59             security_groups=[self.sec.id])
60         (self.fip2, self.ssh2) = self.connect(self.vm2)
61         self.volume = self.cloud.create_volume(
62             name='{}-volume_{}'.format(self.case_name, self.guid), size='2',
63             timeout=self.volume_timeout, wait=True)
64
65     def _write_data(self):
66         assert self.cloud
67         self.cloud.attach_volume(self.sshvm, self.volume,
68                                  timeout=self.volume_timeout)
69         write_data_script = pkg_resources.resource_filename(
70             'functest.opnfv_tests.openstack.cinder', 'write_data.sh')
71         try:
72             scp = SCPClient(self.ssh.get_transport())
73             scp.put(write_data_script, remote_path="~/")
74         except Exception:  # pylint: disable=broad-except
75             self.logger.error("File not transfered!")
76             return testcase.TestCase.EX_RUN_ERROR
77         self.logger.debug("ssh: %s", self.ssh)
78         (_, stdout, stderr) = self.ssh.exec_command(
79             "sh ~/write_data.sh {}".format(env.get('VOLUME_DEVICE_NAME')))
80         self.logger.debug("volume_write stdout: %s", stdout.read())
81         self.logger.debug("volume_write stderr: %s", stderr.read())
82         # Detach volume from VM 1
83         self.logger.info("Detach volume from VM 1")
84         self.cloud.detach_volume(
85             self.sshvm, self.volume, timeout=self.volume_timeout)
86         return stdout.channel.recv_exit_status()
87
88     def _read_data(self):
89         assert self.cloud
90         # Attach volume to VM 2
91         self.logger.info("Attach volume to VM 2")
92         self.cloud.attach_volume(self.vm2, self.volume,
93                                  timeout=self.volume_timeout)
94         # Check volume data
95         read_data_script = pkg_resources.resource_filename(
96             'functest.opnfv_tests.openstack.cinder', 'read_data.sh')
97         try:
98             scp = SCPClient(self.ssh2.get_transport())
99             scp.put(read_data_script, remote_path="~/")
100         except Exception:  # pylint: disable=broad-except
101             self.logger.error("File not transfered!")
102             return testcase.TestCase.EX_RUN_ERROR
103         self.logger.debug("ssh: %s", self.ssh2)
104         (_, stdout, stderr) = self.ssh2.exec_command(
105             "sh ~/read_data.sh {}".format(env.get('VOLUME_DEVICE_NAME')))
106         self.logger.debug("read volume stdout: %s", stdout.read())
107         self.logger.debug("read volume stderr: %s", stderr.read())
108         self.logger.info("Detach volume from VM 2")
109         self.cloud.detach_volume(
110             self.vm2, self.volume, timeout=self.volume_timeout)
111         return stdout.channel.recv_exit_status()
112
113     def clean(self):
114         assert self.cloud
115         if self.vm2:
116             self.cloud.delete_server(
117                 self.vm2, wait=True,
118                 timeout=getattr(config.CONF, 'vping_vm_delete_timeout'))
119         if self.fip2:
120             self.cloud.delete_floating_ip(self.fip2.id)
121         if self.volume:
122             self.cloud.delete_volume(self.volume.id)
123         super(CinderCheck, self).clean()