3 # Copyright (c) 2018 Enea AB and others
5 # This program and the accompanying materials
6 # are made available under the terms of the Apache License, Version 2.0
7 # which accompanies this distribution, and is available at
9 # http://www.apache.org/licenses/LICENSE-2.0
11 """CinderCheck testcase."""
16 from scp import SCPClient
17 from xtesting.core import testcase
19 from functest.core import singlevm
20 from functest.utils import config
21 from functest.utils import env
24 class CinderCheck(singlevm.SingleVm2):
26 CinderCheck testcase implementation.
28 Class to execute the CinderCheck test using 2 Floating IPs
29 to connect to the VMs and one data volume
31 # pylint: disable=too-many-instance-attributes
34 def __init__(self, **kwargs):
35 """Initialize testcase."""
36 if "case_name" not in kwargs:
37 kwargs["case_name"] = "cinder_test"
38 super().__init__(**kwargs)
39 self.logger = logging.getLogger(__name__)
46 """Execute CinderCheck testcase.
48 Sets up the OpenStack keypair, router, security group, and VM instance
49 objects then validates cinder.
50 :return: the exit code from the super.execute() method
52 return self._write_data() or self._read_data()
56 self.vm2 = self.boot_vm(
57 f'{self.case_name}-vm2_{self.guid}',
58 key_name=self.keypair.id,
59 security_groups=[self.sec.id])
60 (self.fip2, self.ssh2) = self.connect(self.vm2)
61 self.volume = self.cloud.create_volume(
62 name=f'{self.case_name}-volume_{self.guid}', size='2',
63 timeout=self.volume_timeout, wait=True)
65 def _write_data(self):
67 self.cloud.attach_volume(self.sshvm, self.volume,
68 timeout=self.volume_timeout)
69 write_data_script = pkg_resources.resource_filename(
70 'functest.opnfv_tests.openstack.cinder', 'write_data.sh')
72 scp = SCPClient(self.ssh.get_transport())
73 scp.put(write_data_script, remote_path="~/")
74 except Exception: # pylint: disable=broad-except
75 self.logger.error("File not transfered!")
76 return testcase.TestCase.EX_RUN_ERROR
77 self.logger.debug("ssh: %s", self.ssh)
78 (_, stdout, stderr) = self.ssh.exec_command(
79 f"sh ~/write_data.sh {env.get('VOLUME_DEVICE_NAME')}")
81 "volume_write stdout: %s", stdout.read().decode("utf-8"))
83 "volume_write stderr: %s", stderr.read().decode("utf-8"))
84 # Detach volume from VM 1
85 self.logger.info("Detach volume from VM 1")
86 self.cloud.detach_volume(
87 self.sshvm, self.volume, timeout=self.volume_timeout)
88 return stdout.channel.recv_exit_status()
92 # Attach volume to VM 2
93 self.logger.info("Attach volume to VM 2")
94 self.cloud.attach_volume(self.vm2, self.volume,
95 timeout=self.volume_timeout)
97 read_data_script = pkg_resources.resource_filename(
98 'functest.opnfv_tests.openstack.cinder', 'read_data.sh')
100 scp = SCPClient(self.ssh2.get_transport())
101 scp.put(read_data_script, remote_path="~/")
102 except Exception: # pylint: disable=broad-except
103 self.logger.error("File not transfered!")
104 return testcase.TestCase.EX_RUN_ERROR
105 self.logger.debug("ssh: %s", self.ssh2)
106 (_, stdout, stderr) = self.ssh2.exec_command(
107 f"sh ~/read_data.sh {env.get('VOLUME_DEVICE_NAME')}")
109 "read volume stdout: %s", stdout.read().decode("utf-8"))
111 "read volume stderr: %s", stderr.read().decode("utf-8"))
112 self.logger.info("Detach volume from VM 2")
113 self.cloud.detach_volume(
114 self.vm2, self.volume, timeout=self.volume_timeout)
115 return stdout.channel.recv_exit_status()
120 self.cloud.delete_server(
122 timeout=getattr(config.CONF, 'vping_vm_delete_timeout'))
124 self.cloud.delete_floating_ip(self.fip2.id)
126 self.cloud.delete_volume(self.volume.id)