3 # Copyright (c) 2015 All rights reserved
4 # This program and the accompanying materials
5 # are made available under the terms of the Apache License, Version 2.0
6 # which accompanies this distribution, and is available at
8 # http://www.apache.org/licenses/LICENSE-2.0
11 """vPingSSH testcase."""
13 # This 1st import is here simply for pep8 as the 'os' package import appears
14 # to be required for mock and the unit tests will fail without it
15 import os # noqa # pylint: disable=unused-import
18 from scp import SCPClient
21 from functest.core.testcase import TestCase
22 from functest.energy import energy
23 from functest.opnfv_tests.openstack.vping import vping_base
24 from functest.utils.constants import CONST
26 from snaps.config.keypair import KeypairConfig
27 from snaps.config.network import PortConfig
28 from snaps.config.security_group import (
29 Direction, Protocol, SecurityGroupConfig, SecurityGroupRuleConfig)
30 from snaps.config.vm_inst import FloatingIpConfig, VmInstanceConfig
32 from snaps.openstack.utils import deploy_utils
35 class VPingSSH(vping_base.VPingBase):
37 VPingSSH testcase implementation.
39 Class to execute the vPing test using a Floating IP to connect to one VM
40 to issue the ping command to the second
43 def __init__(self, **kwargs):
44 """Initialize testcase."""
45 if "case_name" not in kwargs:
46 kwargs["case_name"] = "vping_ssh"
47 super(VPingSSH, self).__init__(**kwargs)
49 self.kp_name = CONST.__getattribute__('vping_keypair_name') + self.guid
50 self.kp_priv_file = CONST.__getattribute__('vping_keypair_priv_file')
51 self.kp_pub_file = CONST.__getattribute__('vping_keypair_pub_file')
52 self.sg_name = CONST.__getattribute__('vping_sg_name') + self.guid
53 self.sg_desc = CONST.__getattribute__('vping_sg_desc')
55 @energy.enable_recording
58 Excecute VPingSSH testcase.
60 Sets up the OpenStack keypair, router, security group, and VM instance
61 objects then validates the ping.
62 :return: the exit code from the super.execute() method
65 super(VPingSSH, self).run()
67 log = "Creating keypair with name: '%s'" % self.kp_name
69 kp_creator = deploy_utils.create_keypair(
72 name=self.kp_name, private_filepath=self.kp_priv_file,
73 public_filepath=self.kp_pub_file))
74 self.creators.append(kp_creator)
77 port1_settings = PortConfig(
78 name=self.vm1_name + '-vPingPort',
79 network_name=self.network_creator.network_settings.name)
80 instance1_settings = VmInstanceConfig(
81 name=self.vm1_name, flavor=self.flavor_name,
82 vm_boot_timeout=self.vm_boot_timeout,
83 vm_delete_timeout=self.vm_delete_timeout,
84 ssh_connect_timeout=self.vm_ssh_connect_timeout,
85 port_settings=[port1_settings])
87 log = ("Creating VM 1 instance with name: '%s'"
88 % instance1_settings.name)
90 self.vm1_creator = deploy_utils.create_vm_instance(
93 self.image_creator.image_settings,
94 keypair_creator=kp_creator)
95 self.creators.append(self.vm1_creator)
98 sg_creator = self.__create_security_group()
99 self.creators.append(sg_creator)
101 port2_settings = PortConfig(
102 name=self.vm2_name + '-vPingPort',
103 network_name=self.network_creator.network_settings.name)
104 instance2_settings = VmInstanceConfig(
105 name=self.vm2_name, flavor=self.flavor_name,
106 vm_boot_timeout=self.vm_boot_timeout,
107 vm_delete_timeout=self.vm_delete_timeout,
108 ssh_connect_timeout=self.vm_ssh_connect_timeout,
109 port_settings=[port2_settings],
110 security_group_names=[sg_creator.sec_grp_settings.name],
111 floating_ip_settings=[FloatingIpConfig(
112 name=self.vm2_name + '-FIPName',
113 port_name=port2_settings.name,
114 router_name=self.router_creator.router_settings.name)])
116 log = ("Creating VM 2 instance with name: '%s'"
117 % instance2_settings.name)
118 self.logger.info(log)
119 self.vm2_creator = deploy_utils.create_vm_instance(
122 self.image_creator.image_settings,
123 keypair_creator=kp_creator)
124 self.creators.append(self.vm2_creator)
126 return self._execute()
127 except Exception as exc: # pylint: disable=broad-except
128 self.logger.error('Unexpected error running test - ' + exc.message)
129 return TestCase.EX_RUN_ERROR
133 def _do_vping(self, vm_creator, test_ip):
135 Execute ping command.
139 if vm_creator.vm_ssh_active(block=True):
140 ssh = vm_creator.ssh_client()
141 if not self._transfer_ping_script(ssh):
142 return TestCase.EX_RUN_ERROR
143 return self._do_vping_ssh(ssh, test_ip)
145 return TestCase.EX_RUN_ERROR
147 def _transfer_ping_script(self, ssh):
149 Transfert vping script to VM.
151 Uses SCP to copy the ping script via the SSH client
152 :param ssh: the SSH client
155 self.logger.info("Trying to transfer ping.sh")
156 scp = SCPClient(ssh.get_transport())
157 ping_script = pkg_resources.resource_filename(
158 'functest.opnfv_tests.openstack.vping', 'ping.sh')
160 scp.put(ping_script, "~/")
161 except Exception: # pylint: disable=broad-except
162 self.logger.error("Cannot SCP the file '%s'", ping_script)
165 cmd = 'chmod 755 ~/ping.sh'
166 # pylint: disable=unused-variable
167 (stdin, stdout, stderr) = ssh.exec_command(cmd)
168 for line in stdout.readlines():
173 def _do_vping_ssh(self, ssh, test_ip):
175 Execute ping command via SSH.
177 Pings the test_ip via the SSH client
178 :param ssh: the SSH client used to issue the ping command
179 :param test_ip: the IP for the ping command to use
180 :return: exit_code (int)
182 exit_code = TestCase.EX_TESTCASE_FAILED
183 self.logger.info("Waiting for ping...")
186 cmd = '~/ping.sh ' + test_ip
191 (_, stdout, _) = ssh.exec_command(cmd)
192 output = stdout.readlines()
195 if "vPing OK" in line:
196 self.logger.info("vPing detected!")
197 exit_code = TestCase.EX_OK
201 elif sec == self.ping_timeout:
202 self.logger.info("Timeout reached.")
207 log = "Pinging %s. Waiting for response..." % test_ip
208 self.logger.debug(log)
212 def __create_security_group(self):
214 Configure OpenStack security groups.
216 Configures and deploys an OpenStack security group object
217 :return: the creator object
221 SecurityGroupRuleConfig(
222 sec_grp_name=self.sg_name, direction=Direction.ingress,
223 protocol=Protocol.icmp))
225 SecurityGroupRuleConfig(
226 sec_grp_name=self.sg_name, direction=Direction.ingress,
227 protocol=Protocol.tcp, port_range_min=22, port_range_max=22))
229 SecurityGroupRuleConfig(
230 sec_grp_name=self.sg_name, direction=Direction.egress,
231 protocol=Protocol.tcp, port_range_min=22, port_range_max=22))
233 log = "Security group with name: '%s'" % self.sg_name
234 self.logger.info(log)
235 return deploy_utils.create_security_group(self.os_creds,
238 description=self.sg_desc,
239 rule_settings=sg_rules))