"""vPingSSH testcase."""
import logging
-import os
-import tempfile
-import time
-import paramiko
-from xtesting.core import testcase
-from xtesting.energy import energy
-
-from functest.opnfv_tests.openstack.vping import vping_base
+from functest.core import singlevm
from functest.utils import config
-class VPingSSH(vping_base.VPingBase):
+class VPingSSH(singlevm.SingleVm2):
"""
VPingSSH testcase implementation.
to issue the ping command to the second
"""
+ __logger = logging.getLogger(__name__)
+
def __init__(self, **kwargs):
"""Initialize testcase."""
if "case_name" not in kwargs:
kwargs["case_name"] = "vping_ssh"
super(VPingSSH, self).__init__(**kwargs)
- self.logger = logging.getLogger(__name__)
self.vm2 = None
- self.sec2 = None
- self.fip = None
- self.keypair = None
- self.ssh = paramiko.SSHClient()
- (_, self.key_filename) = tempfile.mkstemp()
- @energy.enable_recording
- def run(self, **kwargs):
- """
- Excecute VPingSSH testcase.
-
- Sets up the OpenStack keypair, router, security group, and VM instance
- objects then validates the ping.
- :return: the exit code from the super.execute() method
- """
- try:
- assert self.cloud
- super(VPingSSH, self).run()
+ def prepare(self):
+ super(VPingSSH, self).prepare()
+ self.vm2 = self.boot_vm(
+ '{}-vm2_{}'.format(self.case_name, self.guid),
+ security_groups=[self.sec.id])
- kp_name = getattr(config.CONF, 'vping_keypair_name') + self.guid
- self.logger.info("Creating keypair with name: '%s'", kp_name)
- self.keypair = self.cloud.create_keypair(kp_name)
- self.logger.debug("keypair: %s", self.keypair)
- self.logger.debug("private_key: %s", self.keypair.private_key)
- with open(self.key_filename, 'w') as private_key_file:
- private_key_file.write(self.keypair.private_key)
+ def execute(self):
+ """Ping the second VM
- self.sec2 = self.cloud.create_security_group(
- getattr(config.CONF, 'vping_sg_name') + self.guid,
- getattr(config.CONF, 'vping_sg_desc'))
- self.cloud.create_security_group_rule(
- self.sec2.id, port_range_min='22', port_range_max='22',
- protocol='tcp', direction='ingress')
- self.cloud.create_security_group_rule(
- self.sec2.id, protocol='icmp', direction='ingress')
-
- vm2_name = "{}-{}-{}".format(
- getattr(config.CONF, 'vping_vm_name_2'), "ssh", self.guid)
- self.logger.info(
- "Creating VM 2 instance with name: '%s'", vm2_name)
- self.vm2 = self.cloud.create_server(
- vm2_name, image=self.image.id, flavor=self.flavor.id,
- key_name=self.keypair.id,
- auto_ip=False, wait=True,
- timeout=getattr(config.CONF, 'vping_vm_boot_timeout'),
- network=self.network.id,
- security_groups=[self.sec2.id])
- self.logger.debug("vm2: %s", self.vm2)
- self.fip = self.cloud.create_floating_ip(
- network=self.ext_net.id, server=self.vm2)
- self.logger.debug("floating_ip2: %s", self.fip)
- self.vm2 = self.cloud.wait_for_server(self.vm2, auto_ip=False)
- p_console = self.cloud.get_server_console(self.vm2.id)
- self.logger.debug("vm2 console: \n%s", p_console)
- return self._execute()
- except Exception: # pylint: disable=broad-except
- self.logger.exception('Unexpected error running vping_ssh')
- return testcase.TestCase.EX_RUN_ERROR
-
- def _do_vping(self):
- self.ssh.set_missing_host_key_policy(paramiko.client.AutoAddPolicy())
- for loop in range(6):
- try:
- self.ssh.connect(
- self.vm2.public_v4,
- username=getattr(config.CONF, 'openstack_image_user'),
- key_filename=self.key_filename,
- timeout=getattr(
- config.CONF, 'vping_vm_ssh_connect_timeout'))
- break
- except Exception: # pylint: disable=broad-except
- self.logger.info(
- "try %s: cannot connect to %s", loop + 1,
- self.vm2.public_v4)
- time.sleep(10)
- else:
- self.logger.error("cannot connect to %s", self.vm2.public_v4)
- return False
-
- self.logger.debug("ssh: %s", self.ssh)
- (_, stdout, _) = self.ssh.exec_command(
- 'ping -c 1 ' + self.vm1.private_v4)
- self.logger.debug("ping output:\n%s", stdout.read())
+ Returns: ping exit codes
+ """
+ assert self.ssh
+ if not self.check_regex_in_console(self.vm2.name):
+ return 1
+ (_, stdout, stderr) = self.ssh.exec_command(
+ 'ping -c 1 {}'.format(
+ self.vm2.private_v4 or self.vm2.addresses[
+ self.network.name][0].addr))
+ self.__logger.info("output:\n%s", stdout.read().decode("utf-8"))
+ self.__logger.info("error:\n%s", stderr.read().decode("utf-8"))
return stdout.channel.recv_exit_status()
def clean(self):
assert self.cloud
- os.remove(self.key_filename)
- self.cloud.delete_server(
- self.vm2, wait=True,
- timeout=getattr(config.CONF, 'vping_vm_delete_timeout'))
- self.cloud.delete_security_group(self.sec2.id)
- self.cloud.delete_keypair(self.keypair.id)
- self.cloud.delete_floating_ip(self.fip.id)
+ if self.vm2:
+ self.cloud.delete_server(
+ self.vm2, wait=True,
+ timeout=getattr(config.CONF, 'vping_vm_delete_timeout'))
super(VPingSSH, self).clean()