# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
-
-'''Module for Openstack compute operations'''
-from glanceclient import exc as glance_exception
-import keystoneauth1
-from log import LOG
-import novaclient
+"""Module for Openstack compute operations"""
import os
import time
import traceback
-
+from glanceclient import exc as glance_exception
try:
from glanceclient.openstack.common.apiclient.exceptions import NotFound as GlanceImageNotFound
except ImportError:
from glanceclient.v1.apiclient.exceptions import NotFound as GlanceImageNotFound
+import keystoneauth1
+import novaclient
+from log import LOG
-class Compute(object):
+class Compute(object):
def __init__(self, nova_client, glance_client, neutron_client, config):
self.novaclient = nova_client
self.glance_client = glance_client
retry = 0
try:
# check image is file/url based.
- file_prefix = "file://"
- image_location = image_file.split(file_prefix)[1]
- with open(image_location) as f_image:
+ with open(image_file) as f_image:
img = self.glance_client.images.create(name=str(final_image_name),
disk_format="qcow2",
container_format="bare",
"image at the specified location %s is correct.", image_file)
return False
except keystoneauth1.exceptions.http.NotFound as exc:
- LOG.error("Authentication error while uploading the image:" + str(exc))
+ LOG.error("Authentication error while uploading the image: %s", str(exc))
return False
except Exception:
LOG.error(traceback.format_exc())
- LOG.error("Failed while uploading the image, please make sure the "
- "cloud under test has the access to file: %s.", image_file)
+ LOG.error("Failed to upload image %s.", image_file)
return False
return True
def delete_image(self, img_name):
try:
LOG.log("Deleting image %s...", img_name)
- img = self.glance_client.images.find(name=img_name)
+ img = self.find_image(image_name=img_name)
self.glance_client.images.delete(img.id)
except Exception:
LOG.error("Failed to delete the image %s.", img_name)
'''
if ssh_access.public_key_file:
return self.add_public_key(kp_name, ssh_access.public_key_file)
- else:
- keypair = self.create_keypair(kp_name, None)
- ssh_access.private_key = keypair.private_key
- return keypair
+ keypair = self.create_keypair(kp_name, None)
+ ssh_access.private_key = keypair.private_key
+ return keypair
def find_network(self, label):
net = self.novaclient.networks.find(label=label)
if hyp.host == host:
return self.normalize_az_host(hyp.zone, host)
# no match on host
- LOG.error('Passed host name does not exist: ' + host)
+ LOG.error('Passed host name does not exist: %s', host)
return None
if self.config.availability_zone:
return self.normalize_az_host(None, host)
if hyp.zone == zone:
# matches
return az_host
- # else continue - another zone with same host name?
+ # else continue - another zone with same host name?
# no match
- LOG.error('No match for availability zone and host ' + az_host)
+ LOG.error('No match for availability zone and host %s', az_host)
return None
else:
return self.auto_fill_az(host_list, az_host)
if not self.config.availability_zone:
LOG.error('Availability_zone must be configured')
elif host_list:
- LOG.error('No host matching the selection for availability zone: ' +
+ LOG.error('No host matching the selection for availability zone: %s',
self.config.availability_zone)
avail_list = []
else:
LOG.warning('Operation Forbidden: could not retrieve list of hypervisors'
' (likely no permission)')
- hypervisor_list = filter(lambda h: h.status == 'enabled' and h.state == 'up',
- hypervisor_list)
+ hypervisor_list = [h for h in hypervisor_list if h.status == 'enabled' and h.state == 'up']
if self.config.availability_zone:
- host_list = filter(lambda h: h.zone == self.config.availability_zone, host_list)
+ host_list = [h for h in host_list if h.zone == self.config.availability_zone]
if self.config.compute_nodes:
- host_list = filter(lambda h: h.host in self.config.compute_nodes, host_list)
+ host_list = [h for h in host_list if h.host in self.config.compute_nodes]
hosts = [h.hypervisor_hostname for h in hypervisor_list]
- host_list = filter(lambda h: h.host in hosts, host_list)
+ host_list = [h for h in host_list if h.host in hosts]
avail_list = []
for host in host_list:
try:
server_instance_1 = self.novaclient.servers.get(vm_instance1)
server_instance_2 = self.novaclient.servers.get(vm_instance2)
- if server_instance_1.hostId == server_instance_2.hostId:
- return True
- else:
- return False
+ return bool(server_instance_1.hostId == server_instance_2.hostId)
except novaclient.exceptions:
LOG.warning("Exception in retrieving the hostId of servers")
# check first the security group exists
sec_groups = self.neutronclient.list_security_groups()['security_groups']
group = [x for x in sec_groups if x['name'] == self.config.security_group_name]
- if len(group) > 0:
+ if group:
return group[0]
body = {