3 # Copyright (c) 2018 Enea AB and others
5 # This program and the accompanying materials
6 # are made available under the terms of the Apache License, Version 2.0
7 # which accompanies this distribution, and is available at
9 # http://www.apache.org/licenses/LICENSE-2.0
11 """CinderCheck testcase."""
16 from scp import SCPClient
17 from xtesting.core import testcase
19 from functest.core import singlevm
20 from functest.utils import config
21 from functest.utils import env
24 class CinderCheck(singlevm.SingleVm2):
26 CinderCheck testcase implementation.
28 Class to execute the CinderCheck test using 2 Floating IPs
29 to connect to the VMs and one data volume
31 # pylint: disable=too-many-instance-attributes
34 def __init__(self, **kwargs):
35 """Initialize testcase."""
36 if "case_name" not in kwargs:
37 kwargs["case_name"] = "cinder_test"
38 super(CinderCheck, self).__init__(**kwargs)
39 self.logger = logging.getLogger(__name__)
46 """Execute CinderCheck testcase.
48 Sets up the OpenStack keypair, router, security group, and VM instance
49 objects then validates cinder.
50 :return: the exit code from the super.execute() method
52 return self._write_data() or self._read_data()
55 super(CinderCheck, self).prepare()
56 self.vm2 = self.boot_vm(
57 '{}-vm2_{}'.format(self.case_name, self.guid),
58 key_name=self.keypair.id,
59 security_groups=[self.sec.id])
60 (self.fip2, self.ssh2) = self.connect(self.vm2)
61 self.volume = self.cloud.create_volume(
62 name='{}-volume_{}'.format(self.case_name, self.guid), size='2',
63 timeout=self.volume_timeout)
65 def _write_data(self):
67 self.cloud.attach_volume(self.sshvm, self.volume,
68 timeout=self.volume_timeout)
69 write_data_script = pkg_resources.resource_filename(
70 'functest.opnfv_tests.openstack.cinder', 'write_data.sh')
72 scp = SCPClient(self.ssh.get_transport())
73 scp.put(write_data_script, remote_path="~/")
74 except Exception: # pylint: disable=broad-except
75 self.logger.error("File not transfered!")
76 return testcase.TestCase.EX_RUN_ERROR
77 self.logger.debug("ssh: %s", self.ssh)
78 (_, stdout, stderr) = self.ssh.exec_command(
79 "sh ~/write_data.sh {}".format(env.get('VOLUME_DEVICE_NAME')))
80 self.logger.debug("volume_write stdout: %s", stdout.read())
81 self.logger.debug("volume_write stderr: %s", stderr.read())
82 # Detach volume from VM 1
83 self.logger.info("Detach volume from VM 1")
84 self.cloud.detach_volume(self.sshvm, self.volume)
85 return stdout.channel.recv_exit_status()
89 # Attach volume to VM 2
90 self.logger.info("Attach volume to VM 2")
91 self.cloud.attach_volume(self.vm2, self.volume,
92 timeout=self.volume_timeout)
94 read_data_script = pkg_resources.resource_filename(
95 'functest.opnfv_tests.openstack.cinder', 'read_data.sh')
97 scp = SCPClient(self.ssh2.get_transport())
98 scp.put(read_data_script, remote_path="~/")
99 except Exception: # pylint: disable=broad-except
100 self.logger.error("File not transfered!")
101 return testcase.TestCase.EX_RUN_ERROR
102 self.logger.debug("ssh: %s", self.ssh2)
103 (_, stdout, stderr) = self.ssh2.exec_command(
104 "sh ~/read_data.sh {}".format(env.get('VOLUME_DEVICE_NAME')))
105 self.logger.debug("read volume stdout: %s", stdout.read())
106 self.logger.debug("read volume stderr: %s", stderr.read())
107 self.logger.info("Detach volume from VM 2")
108 self.cloud.detach_volume(self.vm2, self.volume,
109 timeout=self.volume_timeout)
110 return stdout.channel.recv_exit_status()
115 self.cloud.delete_server(
117 timeout=getattr(config.CONF, 'vping_vm_delete_timeout'))
119 self.cloud.delete_floating_ip(self.fip2.id)
121 self.cloud.delete_volume(self.volume.id)
122 super(CinderCheck, self).clean()