Add tempest section in rally.conf
[functest.git] / functest / opnfv_tests / openstack / cinder / cinder_test.py
1 #!/usr/bin/env python
2
3 # Copyright (c) 2018 Enea AB and others
4
5 # This program and the accompanying materials
6 # are made available under the terms of the Apache License, Version 2.0
7 # which accompanies this distribution, and is available at
8 #
9 # http://www.apache.org/licenses/LICENSE-2.0
10
11 """CinderCheck testcase."""
12
13 import logging
14
15 import pkg_resources
16 from scp import SCPClient
17 from xtesting.core import testcase
18
19 from functest.core import singlevm
20 from functest.utils import config
21 from functest.utils import env
22
23
24 class CinderCheck(singlevm.SingleVm2):
25     """
26     CinderCheck testcase implementation.
27
28     Class to execute the CinderCheck test using 2 Floating IPs
29     to connect to the VMs and one data volume
30     """
31     # pylint: disable=too-many-instance-attributes
32     volume_timeout = 60
33
34     def __init__(self, **kwargs):
35         """Initialize testcase."""
36         if "case_name" not in kwargs:
37             kwargs["case_name"] = "cinder_test"
38         super(CinderCheck, self).__init__(**kwargs)
39         self.logger = logging.getLogger(__name__)
40         self.vm2 = None
41         self.fip2 = None
42         self.ssh2 = None
43         self.volume = None
44
45     def execute(self):
46         """Execute CinderCheck testcase.
47
48         Sets up the OpenStack keypair, router, security group, and VM instance
49         objects then validates cinder.
50         :return: the exit code from the super.execute() method
51         """
52         return self._write_data() or self._read_data()
53
54     def prepare(self):
55         super(CinderCheck, self).prepare()
56         self.vm2 = self.boot_vm(
57             '{}-vm2_{}'.format(self.case_name, self.guid),
58             key_name=self.keypair.id,
59             security_groups=[self.sec.id])
60         (self.fip2, self.ssh2) = self.connect(self.vm2)
61         self.volume = self.cloud.create_volume(
62             name='{}-volume_{}'.format(self.case_name, self.guid), size='2',
63             timeout=self.volume_timeout)
64
65     def _write_data(self):
66         assert self.cloud
67         self.cloud.attach_volume(self.sshvm, self.volume,
68                                  timeout=self.volume_timeout)
69         write_data_script = pkg_resources.resource_filename(
70             'functest.opnfv_tests.openstack.cinder', 'write_data.sh')
71         try:
72             scp = SCPClient(self.ssh.get_transport())
73             scp.put(write_data_script, remote_path="~/")
74         except Exception:  # pylint: disable=broad-except
75             self.logger.error("File not transfered!")
76             return testcase.TestCase.EX_RUN_ERROR
77         self.logger.debug("ssh: %s", self.ssh)
78         (_, stdout, stderr) = self.ssh.exec_command(
79             "sh ~/write_data.sh {}".format(env.get('VOLUME_DEVICE_NAME')))
80         self.logger.debug("volume_write stdout: %s", stdout.read())
81         self.logger.debug("volume_write stderr: %s", stderr.read())
82         # Detach volume from VM 1
83         self.logger.info("Detach volume from VM 1")
84         self.cloud.detach_volume(self.sshvm, self.volume)
85         return stdout.channel.recv_exit_status()
86
87     def _read_data(self):
88         assert self.cloud
89         # Attach volume to VM 2
90         self.logger.info("Attach volume to VM 2")
91         self.cloud.attach_volume(self.vm2, self.volume,
92                                  timeout=self.volume_timeout)
93         # Check volume data
94         read_data_script = pkg_resources.resource_filename(
95             'functest.opnfv_tests.openstack.cinder', 'read_data.sh')
96         try:
97             scp = SCPClient(self.ssh2.get_transport())
98             scp.put(read_data_script, remote_path="~/")
99         except Exception:  # pylint: disable=broad-except
100             self.logger.error("File not transfered!")
101             return testcase.TestCase.EX_RUN_ERROR
102         self.logger.debug("ssh: %s", self.ssh2)
103         (_, stdout, stderr) = self.ssh2.exec_command(
104             "sh ~/read_data.sh {}".format(env.get('VOLUME_DEVICE_NAME')))
105         self.logger.debug("read volume stdout: %s", stdout.read())
106         self.logger.debug("read volume stderr: %s", stderr.read())
107         self.logger.info("Detach volume from VM 2")
108         self.cloud.detach_volume(self.vm2, self.volume,
109                                  timeout=self.volume_timeout)
110         return stdout.channel.recv_exit_status()
111
112     def clean(self):
113         assert self.cloud
114         if self.vm2:
115             self.cloud.delete_server(
116                 self.vm2, wait=True,
117                 timeout=getattr(config.CONF, 'vping_vm_delete_timeout'))
118         if self.fip2:
119             self.cloud.delete_floating_ip(self.fip2.id)
120         if self.volume:
121             self.cloud.delete_volume(self.volume.id)
122         super(CinderCheck, self).clean()