d81bb100af32de04835090cffff9180581ca3a86
[functest.git] / functest / opnfv_tests / openstack / cinder / cinder_test.py
1 #!/usr/bin/env python
2
3 # Copyright (c) 2018 Enea AB and others
4
5 # This program and the accompanying materials
6 # are made available under the terms of the Apache License, Version 2.0
7 # which accompanies this distribution, and is available at
8 #
9 # http://www.apache.org/licenses/LICENSE-2.0
10
11 """CinderCheck testcase."""
12
13 import logging
14
15 import pkg_resources
16 from scp import SCPClient
17 from xtesting.core import testcase
18
19 from functest.core import singlevm
20 from functest.utils import config
21 from functest.utils import env
22
23
24 class CinderCheck(singlevm.SingleVm2):
25     """
26     CinderCheck testcase implementation.
27
28     Class to execute the CinderCheck test using 2 Floating IPs
29     to connect to the VMs and one data volume
30     """
31     # pylint: disable=too-many-instance-attributes
32     volume_timeout = 60
33
34     def __init__(self, **kwargs):
35         """Initialize testcase."""
36         if "case_name" not in kwargs:
37             kwargs["case_name"] = "cinder_test"
38         super(CinderCheck, self).__init__(**kwargs)
39         self.logger = logging.getLogger(__name__)
40         self.vm2 = None
41         self.fip2 = None
42         self.ssh2 = None
43         self.volume = None
44
45     def execute(self):
46         """Execute CinderCheck testcase.
47
48         Sets up the OpenStack keypair, router, security group, and VM instance
49         objects then validates cinder.
50         :return: the exit code from the super.execute() method
51         """
52         return self._write_data() or self._read_data()
53
54     def prepare(self):
55         super(CinderCheck, self).prepare()
56         self.vm2 = self.boot_vm(
57             '{}-vm2_{}'.format(self.case_name, self.guid),
58             key_name=self.keypair.id,
59             security_groups=[self.sec.id])
60         (self.fip2, self.ssh2) = self.connect(self.vm2)
61         self.volume = self.cloud.create_volume(
62             name='{}-volume_{}'.format(self.case_name, self.guid), size='2',
63             timeout=self.volume_timeout, wait=True)
64
65     def _write_data(self):
66         assert self.cloud
67         self.cloud.attach_volume(self.sshvm, self.volume,
68                                  timeout=self.volume_timeout)
69         write_data_script = pkg_resources.resource_filename(
70             'functest.opnfv_tests.openstack.cinder', 'write_data.sh')
71         try:
72             scp = SCPClient(self.ssh.get_transport())
73             scp.put(write_data_script, remote_path="~/")
74         except Exception:  # pylint: disable=broad-except
75             self.logger.error("File not transfered!")
76             return testcase.TestCase.EX_RUN_ERROR
77         self.logger.debug("ssh: %s", self.ssh)
78         (_, stdout, stderr) = self.ssh.exec_command(
79             "sh ~/write_data.sh {}".format(env.get('VOLUME_DEVICE_NAME')))
80         self.logger.debug(
81             "volume_write stdout: %s", stdout.read().decode("utf-8"))
82         self.logger.debug(
83             "volume_write stderr: %s", stderr.read().decode("utf-8"))
84         # Detach volume from VM 1
85         self.logger.info("Detach volume from VM 1")
86         self.cloud.detach_volume(
87             self.sshvm, self.volume, timeout=self.volume_timeout)
88         return stdout.channel.recv_exit_status()
89
90     def _read_data(self):
91         assert self.cloud
92         # Attach volume to VM 2
93         self.logger.info("Attach volume to VM 2")
94         self.cloud.attach_volume(self.vm2, self.volume,
95                                  timeout=self.volume_timeout)
96         # Check volume data
97         read_data_script = pkg_resources.resource_filename(
98             'functest.opnfv_tests.openstack.cinder', 'read_data.sh')
99         try:
100             scp = SCPClient(self.ssh2.get_transport())
101             scp.put(read_data_script, remote_path="~/")
102         except Exception:  # pylint: disable=broad-except
103             self.logger.error("File not transfered!")
104             return testcase.TestCase.EX_RUN_ERROR
105         self.logger.debug("ssh: %s", self.ssh2)
106         (_, stdout, stderr) = self.ssh2.exec_command(
107             "sh ~/read_data.sh {}".format(env.get('VOLUME_DEVICE_NAME')))
108         self.logger.debug(
109             "read volume stdout: %s", stdout.read().decode("utf-8"))
110         self.logger.debug(
111             "read volume stderr: %s", stderr.read().decode("utf-8"))
112         self.logger.info("Detach volume from VM 2")
113         self.cloud.detach_volume(
114             self.vm2, self.volume, timeout=self.volume_timeout)
115         return stdout.channel.recv_exit_status()
116
117     def clean(self):
118         assert self.cloud
119         if self.vm2:
120             self.cloud.delete_server(
121                 self.vm2, wait=True,
122                 timeout=getattr(config.CONF, 'vping_vm_delete_timeout'))
123         if self.fip2:
124             self.cloud.delete_floating_ip(self.fip2.id)
125         if self.volume:
126             self.cloud.delete_volume(self.volume.id)
127         super(CinderCheck, self).clean()