3 # Copyright (c) 2015 All rights reserved
4 # This program and the accompanying materials
5 # are made available under the terms of the Apache License, Version 2.0
6 # which accompanies this distribution, and is available at
8 # http://www.apache.org/licenses/LICENSE-2.0
10 """vPingSSH testcase."""
15 from scp import SCPClient
18 from snaps.config.keypair import KeypairConfig
19 from snaps.config.network import PortConfig
20 from snaps.config.security_group import (
21 Direction, Protocol, SecurityGroupConfig, SecurityGroupRuleConfig)
22 from snaps.config.vm_inst import FloatingIpConfig, VmInstanceConfig
23 from snaps.openstack.utils import deploy_utils
25 from xtesting.core import testcase
26 from xtesting.energy import energy
28 from functest.opnfv_tests.openstack.vping import vping_base
29 from functest.utils import config
32 class VPingSSH(vping_base.VPingBase):
33 # pylint: disable=too-many-instance-attributes
35 VPingSSH testcase implementation.
37 Class to execute the vPing test using a Floating IP to connect to one VM
38 to issue the ping command to the second
41 def __init__(self, **kwargs):
42 """Initialize testcase."""
43 if "case_name" not in kwargs:
44 kwargs["case_name"] = "vping_ssh"
45 super(VPingSSH, self).__init__(**kwargs)
46 self.logger = logging.getLogger(__name__)
48 self.kp_name = getattr(config.CONF, 'vping_keypair_name') + self.guid
49 self.kp_priv_file = getattr(config.CONF, 'vping_keypair_priv_file')
50 self.kp_pub_file = getattr(config.CONF, 'vping_keypair_pub_file')
51 self.sg_name = getattr(config.CONF, 'vping_sg_name') + self.guid
52 self.sg_desc = getattr(config.CONF, 'vping_sg_desc')
54 @energy.enable_recording
55 def run(self, **kwargs):
57 Excecute VPingSSH testcase.
59 Sets up the OpenStack keypair, router, security group, and VM instance
60 objects then validates the ping.
61 :return: the exit code from the super.execute() method
64 super(VPingSSH, self).run()
66 log = "Creating keypair with name: '%s'" % self.kp_name
68 kp_creator = deploy_utils.create_keypair(
71 name=self.kp_name, private_filepath=self.kp_priv_file,
72 public_filepath=self.kp_pub_file))
73 self.creators.append(kp_creator)
76 port1_settings = PortConfig(
77 name=self.vm1_name + '-vPingPort',
78 network_name=self.network_creator.network_settings.name)
79 instance1_settings = VmInstanceConfig(
80 name=self.vm1_name, flavor=self.flavor_name,
81 vm_boot_timeout=self.vm_boot_timeout,
82 vm_delete_timeout=self.vm_delete_timeout,
83 ssh_connect_timeout=self.vm_ssh_connect_timeout,
84 port_settings=[port1_settings])
86 log = ("Creating VM 1 instance with name: '%s'"
87 % instance1_settings.name)
89 self.vm1_creator = deploy_utils.create_vm_instance(
92 self.image_creator.image_settings,
93 keypair_creator=kp_creator)
94 self.creators.append(self.vm1_creator)
97 sg_creator = self.__create_security_group()
98 self.creators.append(sg_creator)
100 port2_settings = PortConfig(
101 name=self.vm2_name + '-vPingPort',
102 network_name=self.network_creator.network_settings.name)
103 instance2_settings = VmInstanceConfig(
104 name=self.vm2_name, flavor=self.flavor_name,
105 vm_boot_timeout=self.vm_boot_timeout,
106 vm_delete_timeout=self.vm_delete_timeout,
107 ssh_connect_timeout=self.vm_ssh_connect_timeout,
108 port_settings=[port2_settings],
109 security_group_names=[sg_creator.sec_grp_settings.name],
110 floating_ip_settings=[FloatingIpConfig(
111 name=self.vm2_name + '-FIPName',
112 port_name=port2_settings.name,
113 router_name=self.router_creator.router_settings.name)])
115 log = ("Creating VM 2 instance with name: '%s'"
116 % instance2_settings.name)
117 self.logger.info(log)
118 self.vm2_creator = deploy_utils.create_vm_instance(
121 self.image_creator.image_settings,
122 keypair_creator=kp_creator)
123 self.creators.append(self.vm2_creator)
125 return self._execute()
126 except Exception: # pylint: disable=broad-except
127 self.logger.exception('Unexpected error running test')
128 return testcase.TestCase.EX_RUN_ERROR
130 def _do_vping(self, vm_creator, test_ip):
132 Execute ping command.
136 if vm_creator.vm_ssh_active(block=True):
137 ssh = vm_creator.ssh_client()
138 if not self._transfer_ping_script(ssh):
139 return testcase.TestCase.EX_RUN_ERROR
140 return self._do_vping_ssh(ssh, test_ip)
142 return testcase.TestCase.EX_RUN_ERROR
144 def _transfer_ping_script(self, ssh):
146 Transfert vping script to VM.
148 Uses SCP to copy the ping script via the SSH client
149 :param ssh: the SSH client
152 self.logger.info("Trying to transfer ping.sh")
153 scp = SCPClient(ssh.get_transport())
154 ping_script = pkg_resources.resource_filename(
155 'functest.opnfv_tests.openstack.vping', 'ping.sh')
157 scp.put(ping_script, "~/")
158 except Exception: # pylint: disable=broad-except
159 self.logger.error("Cannot SCP the file '%s'", ping_script)
162 cmd = 'chmod 755 ~/ping.sh'
163 # pylint: disable=unused-variable
164 (stdin, stdout, stderr) = ssh.exec_command(cmd)
165 for line in stdout.readlines():
170 def _do_vping_ssh(self, ssh, test_ip):
172 Execute ping command via SSH.
174 Pings the test_ip via the SSH client
175 :param ssh: the SSH client used to issue the ping command
176 :param test_ip: the IP for the ping command to use
177 :return: exit_code (int)
179 exit_code = testcase.TestCase.EX_TESTCASE_FAILED
180 self.logger.info("Waiting for ping...")
183 cmd = '~/ping.sh ' + test_ip
188 (_, stdout, _) = ssh.exec_command(cmd)
189 output = stdout.readlines()
192 if "vPing OK" in line:
193 self.logger.info("vPing detected!")
194 exit_code = testcase.TestCase.EX_OK
198 elif sec == self.ping_timeout:
199 self.logger.info("Timeout reached.")
204 log = "Pinging %s. Waiting for response..." % test_ip
205 self.logger.debug(log)
209 def __create_security_group(self):
211 Configure OpenStack security groups.
213 Configures and deploys an OpenStack security group object
214 :return: the creator object
218 SecurityGroupRuleConfig(
219 sec_grp_name=self.sg_name, direction=Direction.ingress,
220 protocol=Protocol.icmp))
222 SecurityGroupRuleConfig(
223 sec_grp_name=self.sg_name, direction=Direction.ingress,
224 protocol=Protocol.tcp, port_range_min=22, port_range_max=22))
226 SecurityGroupRuleConfig(
227 sec_grp_name=self.sg_name, direction=Direction.egress,
228 protocol=Protocol.tcp, port_range_min=22, port_range_max=22))
230 log = "Security group with name: '%s'" % self.sg_name
231 self.logger.info(log)
232 return deploy_utils.create_security_group(self.os_creds,
235 description=self.sg_desc,
236 rule_settings=sg_rules))