self.image = None
self.flavor = None
self.vm1 = None
+ self.sec1 = None
def run(self, **kwargs): # pylint: disable=too-many-locals
"""
self.cloud.set_flavor_specs(
self.flavor.id, getattr(config.CONF, 'flavor_extra_specs', {}))
+ self.sec1 = self.cloud.create_security_group(
+ getattr(config.CONF, 'vping_sg_name') + self.guid,
+ getattr(config.CONF, 'vping_sg_desc'))
+ self.cloud.create_security_group_rule(
+ self.sec1.id, protocol='icmp', direction='ingress')
+
vm1_name = getattr(config.CONF, 'vping_vm_name_1') + self.guid
self.logger.info(
"Creating VM 1 instance with name: '%s'", vm1_name)
flavor=self.flavor.id,
auto_ip=False, wait=True,
timeout=getattr(config.CONF, 'vping_vm_boot_timeout'),
- network=self.network.id)
+ network=self.network.id,
+ security_groups=[self.sec1.id])
self.logger.debug("vm1: %s", self.vm1)
self.vm1 = self.cloud.wait_for_server(self.vm1, auto_ip=False)
+ p_console = self.cloud.get_server_console(self.vm1.id)
+ self.logger.debug("vm1 console: \n%s", p_console)
def _execute(self):
"""
"""
assert self.cloud
self.cloud.delete_server(self.vm1, wait=True)
+ self.cloud.delete_security_group(self.sec1.id)
self.cloud.delete_image(self.image)
self.cloud.remove_router_interface(self.router, self.subnet.id)
self.cloud.delete_router(self.router.id)
import time
import paramiko
-import pkg_resources
-from scp import SCPClient
from xtesting.core import testcase
from xtesting.energy import energy
super(VPingSSH, self).__init__(**kwargs)
self.logger = logging.getLogger(__name__)
self.vm2 = None
- self.sec = None
+ self.sec2 = None
self.fip = None
self.keypair = None
self.ssh = paramiko.SSHClient()
with open(self.key_filename, 'w') as private_key_file:
private_key_file.write(self.keypair.private_key)
- self.sec = self.cloud.create_security_group(
+ self.sec2 = self.cloud.create_security_group(
getattr(config.CONF, 'vping_sg_name') + self.guid,
getattr(config.CONF, 'vping_sg_desc'))
self.cloud.create_security_group_rule(
- self.sec.id, port_range_min='22', port_range_max='22',
+ self.sec2.id, port_range_min='22', port_range_max='22',
protocol='tcp', direction='ingress')
self.cloud.create_security_group_rule(
- self.sec.id, protocol='icmp', direction='ingress')
+ self.sec2.id, protocol='icmp', direction='ingress')
vm2_name = "{}-{}-{}".format(
getattr(config.CONF, 'vping_vm_name_2'), "ssh", self.guid)
auto_ip=False, wait=True,
timeout=getattr(config.CONF, 'vping_vm_boot_timeout'),
network=self.network.id,
- security_groups=[self.sec.id])
+ security_groups=[self.sec2.id])
self.logger.debug("vm2: %s", self.vm2)
self.fip = self.cloud.create_floating_ip(
network=self.ext_net.id, server=self.vm2)
self.logger.debug("floating_ip2: %s", self.fip)
self.vm2 = self.cloud.wait_for_server(self.vm2, auto_ip=False)
-
+ p_console = self.cloud.get_server_console(self.vm2.id)
+ self.logger.debug("vm2 console: \n%s", p_console)
return self._execute()
except Exception: # pylint: disable=broad-except
self.logger.exception('Unexpected error running vping_ssh')
return testcase.TestCase.EX_RUN_ERROR
def _do_vping(self):
- """
- Execute ping command.
-
- Override from super
- """
time.sleep(10)
self.ssh.set_missing_host_key_policy(paramiko.client.AutoAddPolicy())
self.ssh.connect(
key_filename=self.key_filename,
timeout=getattr(config.CONF, 'vping_vm_ssh_connect_timeout'))
self.logger.debug("ssh: %s", self.ssh)
- if not self._transfer_ping_script():
- return testcase.TestCase.EX_RUN_ERROR
- return self._do_vping_ssh()
-
- def _transfer_ping_script(self):
- """
- Transfert vping script to VM.
-
- Uses SCP to copy the ping script via the SSH client
- :param ssh: the SSH client
- :return:
- """
- self.logger.info("Trying to transfer ping.sh")
- scp = SCPClient(self.ssh.get_transport())
- ping_script = pkg_resources.resource_filename(
- 'functest.opnfv_tests.openstack.vping', 'ping.sh')
- try:
- scp.put(ping_script, "~/")
- except Exception: # pylint: disable=broad-except
- self.logger.error("Cannot SCP the file '%s'", ping_script)
- return False
-
- cmd = 'chmod 755 ~/ping.sh'
- (_, stdout, _) = self.ssh.exec_command(cmd)
- for line in stdout.readlines():
- print line
-
- return True
-
- def _do_vping_ssh(self):
- """
- Execute ping command via SSH.
-
- Pings the test_ip via the SSH client
- :param ssh: the SSH client used to issue the ping command
- :param test_ip: the IP for the ping command to use
- :return: exit_code (int)
- """
- exit_code = testcase.TestCase.EX_TESTCASE_FAILED
- self.logger.info("Waiting for ping...")
-
- sec = 0
- cmd = '~/ping.sh ' + self.vm1.private_v4
- flag = False
-
- while True:
- time.sleep(1)
- (_, stdout, _) = self.ssh.exec_command(cmd)
- output = stdout.readlines()
-
- for line in output:
- if "vPing OK" in line:
- self.logger.info("vPing detected!")
- exit_code = testcase.TestCase.EX_OK
- flag = True
- break
- elif sec == getattr(config.CONF, 'vping_ping_timeout'):
- self.logger.info("Timeout reached.")
- flag = True
- break
- if flag:
- break
- log = "Pinging %s. Waiting for response..." % self.vm1.private_v4
- self.logger.debug(log)
- sec += 1
- return exit_code
+ (_, stdout, _) = self.ssh.exec_command(
+ 'ping -c 1 ' + self.vm1.private_v4)
+ self.logger.debug("ping output: %s", stdout)
+ return stdout.channel.recv_exit_status()
def clean(self):
assert self.cloud
self.cloud.delete_server(
self.vm2, wait=True,
timeout=getattr(config.CONF, 'vping_vm_delete_timeout'))
- self.cloud.delete_security_group(self.sec.id)
+ self.cloud.delete_security_group(self.sec2.id)
self.cloud.delete_keypair(self.keypair.id)
self.cloud.delete_floating_ip(self.fip.id)
super(VPingSSH, self).clean()