Updated copyright date on new and edited files to current year.
JIRA: SNAPS-66
Change-Id: I491157d6ced8bd9322f99272fc14e00168faaf29
Signed-off-by: spisarski <s.pisarski@cablelabs.com>
connection:
# Note - when http_proxy is set, you must also configure ssh for proxy tunneling on your host.
username: admin
- password: NotMyPASS!
+ password: cable123
auth_url: http://192.168.67.10:5000/v2.0
project_name: admin
http_proxy: 10.197.123.27:3128
--- /dev/null
+# Copyright (c) 2017 Cable Television Laboratories, Inc. ("CableLabs")
+# and others. All rights reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at:
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+__author__ = 'spisarski'
--- /dev/null
+# Copyright (c) 2017 Cable Television Laboratories, Inc. ("CableLabs")
+# and others. All rights reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at:
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+
+class Image:
+ """
+ SNAPS domain object for Images. Should contain attributes that
+ are shared amongst cloud providers
+ """
+ def __init__(self, name, image_id, size, properties=None):
+ """
+ Constructor
+ :param name: the image's name
+ :param image_id: the image's id
+ :param size: the size of the image in bytes
+ :param properties: the image's custom properties
+ """
+ self.name = name
+ self.id = image_id
+ self.size = size
+ self.properties = properties
--- /dev/null
+# Copyright (c) 2017 Cable Television Laboratories, Inc. ("CableLabs")
+# and others. All rights reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at:
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+__author__ = 'spisarski'
--- /dev/null
+# Copyright (c) 2017 Cable Television Laboratories, Inc. ("CableLabs")
+# and others. All rights reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at:
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import unittest
+from snaps.domain.image import Image
+
+
+class ImageDomainObjectTests(unittest.TestCase):
+ """
+ Tests the construction of the snaps.domain.test.Image class
+ """
+
+ def test_construction_positional(self):
+ props = {'foo': 'bar'}
+ image = Image('name', 'id', 100, props)
+ self.assertEquals('name', image.name)
+ self.assertEquals('id', image.id)
+ self.assertEquals(100, image.size)
+ self.assertEquals(props, image.properties)
+
+ def test_construction_named(self):
+ props = {'foo': 'bar'}
+ image = Image(image_id='id', properties=props, name='name', size=101)
+ self.assertEquals('name', image.name)
+ self.assertEquals('id', image.id)
+ self.assertEquals(101, image.size)
+ self.assertEquals(props, image.properties)
-# Copyright (c) 2016 Cable Television Laboratories, Inc. ("CableLabs")
+# Copyright (c) 2017 Cable Television Laboratories, Inc. ("CableLabs")
# and others. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
raise Exception('File with path cannot be found - ' + file_path)
-def download(url, dest_path):
+def download(url, dest_path, name=None):
"""
Download a file to a destination path given a URL
:rtype : File object
"""
- name = url.rsplit('/')[-1]
+ if not name:
+ name = url.rsplit('/')[-1]
dest = dest_path + '/' + name
try:
+ logger.debug('Downloading file from - ' + url)
# Override proxy settings to use localhost to download file
proxy_handler = urllib2.ProxyHandler({})
opener = urllib2.build_opener(proxy_handler)
raise Exception
with open(dest, 'wb') as f:
+ logger.debug('Saving file to - ' + dest)
f.write(response.read())
return f
+def get_content_length(url):
+ """
+ Returns the number of bytes to be downloaded from the given URL
+ :param url: the URL to inspect
+ :return: the number of bytes
+ """
+ proxy_handler = urllib2.ProxyHandler({})
+ opener = urllib2.build_opener(proxy_handler)
+ urllib2.install_opener(opener)
+ response = urllib2.urlopen(url)
+ return response.headers['Content-Length']
+
+
def read_yaml(config_file_path):
"""
Reads the yaml file and returns a dictionary object representation
-# Copyright (c) 2016 Cable Television Laboratories, Inc. ("CableLabs")
+# Copyright (c) 2017 Cable Television Laboratories, Inc. ("CableLabs")
# and others. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
from glanceclient.exc import HTTPNotFound
-from snaps.openstack.utils import glance_utils, nova_utils
+from snaps.openstack.utils import glance_utils
__author__ = 'spisarski'
def create(self, cleanup=False):
"""
- Creates the image in OpenStack if it does not already exist
+ Creates the image in OpenStack if it does not already exist and returns the domain Image object
:param cleanup: Denotes whether or not this is being called for cleanup or not
:return: The OpenStack Image object
"""
- from snaps.openstack.utils import nova_utils
- nova = nova_utils.nova_client(self.__os_creds)
-
- self.__image = glance_utils.get_image(nova, self.__glance, self.image_settings.name)
+ self.__image = glance_utils.get_image(self.__glance, self.image_settings.name)
if self.__image:
logger.info('Found image with name - ' + self.image_settings.name)
return self.__image
elif not cleanup:
self.__image = glance_utils.create_image(self.__glance, self.image_settings)
logger.info('Creating image')
- if self.image_active(block=True):
+ if self.__image and self.image_active(block=True):
logger.info('Image is now active with name - ' + self.image_settings.name)
return self.__image
else:
- raise Exception('Image did not activate in the alloted amount of time')
+ raise Exception('Image was not created or activated in the alloted amount of time')
else:
logger.info('Did not create image due to cleanup mode')
def get_image(self):
"""
- Returns the OpenStack image object as it was populated when create() was called
+ Returns the domain Image object as it was populated when create() was called
:return: the object
"""
return self.__image
:return: T/F
"""
# TODO - Place this API call into glance_utils.
- nova = nova_utils.nova_client(self.__os_creds)
- instance = glance_utils.get_image(nova, self.__glance, self.image_settings.name)
- # instance = self.__glance.images.get(self.__image)
- if not instance:
- logger.warn('Cannot find instance with id - ' + self.__image.id)
+ status = glance_utils.get_image_status(self.__glance, self.__image)
+ if not status:
+ logger.warn('Cannot image status for image with ID - ' + self.__image.id)
return False
- if instance.status == 'ERROR':
+ if status == 'ERROR':
raise Exception('Instance had an error during deployment')
- logger.debug('Instance status is - ' + instance.status)
- return instance.status == expected_status_code
+ logger.debug('Instance status is - ' + status)
+ return status == expected_status_code
class ImageSettings:
-# Copyright (c) 2016 Cable Television Laboratories, Inc. ("CableLabs")
+# Copyright (c) 2017 Cable Television Laboratories, Inc. ("CableLabs")
# and others. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
if not flavor:
raise Exception('Flavor not found with name - ' + self.instance_settings.flavor)
- image = glance_utils.get_image(self.__nova, glance_utils.glance_client(self.__os_creds),
+ image = glance_utils.get_image(glance_utils.glance_client(self.__os_creds),
self.image_settings.name)
if image:
self.__vm = self.__nova.servers.create(
-# Copyright (c) 2016 Cable Television Laboratories, Inc. ("CableLabs")
+# Copyright (c) 2017 Cable Television Laboratories, Inc. ("CableLabs")
# and others. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
Represents the credentials required to connect with OpenStack servers
"""
- def __init__(self, username, password, auth_url, project_name, identity_api_version=2, image_api_version=1,
+ def __init__(self, username, password, auth_url, project_name, identity_api_version=2, image_api_version=2,
network_api_version=2, compute_api_version=2, user_domain_id='default', project_domain_id='default',
proxy_settings=None):
"""
-# Copyright (c) 2016 Cable Television Laboratories, Inc. ("CableLabs")
+# Copyright (c) 2017 Cable Television Laboratories, Inc. ("CableLabs")
# and others. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
from snaps.openstack.create_image import ImageSettings
import openstack_tests
-from snaps.openstack.utils import glance_utils, nova_utils
+from snaps.openstack.utils import glance_utils
from snaps.openstack import create_image
from snaps.openstack import os_credentials
from snaps.openstack.tests.os_source_file_test import OSIntegrationTestCase
self.assertIsNone(settings.nic_config_pb_loc)
def test_name_user_format_url_only_properties(self):
- properties = {}
- properties['hw_video_model'] = 'vga'
- settings = ImageSettings(name='foo', image_user='bar', img_format='qcow2', url='http://foo.com', extra_properties=properties)
+ properties = {'hw_video_model': 'vga'}
+ settings = ImageSettings(name='foo', image_user='bar', img_format='qcow2', url='http://foo.com',
+ extra_properties=properties)
self.assertEquals('foo', settings.name)
self.assertEquals('bar', settings.image_user)
self.assertEquals('qcow2', settings.format)
self.assertIsNone(settings.image_file)
self.assertIsNone(settings.nic_config_pb_loc)
-
def test_config_with_name_user_format_url_only(self):
settings = ImageSettings(config={'name': 'foo', 'image_user': 'bar', 'format': 'qcow2',
'download_url': 'http://foo.com'})
self.assertIsNone(settings.nic_config_pb_loc)
def test_all_url(self):
- properties = {}
- properties['hw_video_model'] = 'vga'
+ properties = {'hw_video_model': 'vga'}
settings = ImageSettings(name='foo', image_user='bar', img_format='qcow2', url='http://foo.com',
extra_properties=properties, nic_config_pb_loc='/foo/bar')
self.assertEquals('foo', settings.name)
def test_config_all_url(self):
settings = ImageSettings(config={'name': 'foo', 'image_user': 'bar', 'format': 'qcow2',
'download_url': 'http://foo.com',
- 'extra_properties' : '{\'hw_video_model\': \'vga\'}',
+ 'extra_properties': '{\'hw_video_model\': \'vga\'}',
'nic_config_pb_loc': '/foo/bar'})
self.assertEquals('foo', settings.name)
self.assertEquals('bar', settings.image_user)
self.assertEquals('/foo/bar', settings.nic_config_pb_loc)
def test_all_file(self):
- properties = {}
- properties['hw_video_model'] = 'vga'
+ properties = {'hw_video_model': 'vga'}
settings = ImageSettings(name='foo', image_user='bar', img_format='qcow2', image_file='/foo/bar.qcow',
extra_properties=properties, nic_config_pb_loc='/foo/bar')
self.assertEquals('foo', settings.name)
def test_config_all_file(self):
settings = ImageSettings(config={'name': 'foo', 'image_user': 'bar', 'format': 'qcow2',
'image_file': '/foo/bar.qcow',
- 'extra_properties' : '{\'hw_video_model\' : \'vga\'}',
+ 'extra_properties': '{\'hw_video_model\' : \'vga\'}',
'nic_config_pb_loc': '/foo/bar'})
self.assertEquals('foo', settings.name)
self.assertEquals('bar', settings.image_user)
guid = uuid.uuid4()
self.image_name = self.__class__.__name__ + '-' + str(guid)
-
- self.nova = nova_utils.nova_client(self.os_creds)
self.glance = glance_utils.glance_client(self.os_creds)
self.image_creators = list()
name=self.image_name+'_kernel', url=self.image_metadata['kernel_url'])
self.image_creators.append(create_image.OpenStackImage(self.os_creds, kernel_image_settings))
kernel_image = self.image_creators[-1].create()
+ self.assertIsNotNone(kernel_image)
os_image_settings.extra_properties['kernel_id'] = kernel_image.id
+ self.assertEquals(get_image_size(kernel_image_settings), kernel_image.size)
if 'ramdisk_url' in self.image_metadata and self.image_metadata['ramdisk_url']:
ramdisk_image_settings = openstack_tests.cirros_url_image(
name=self.image_name+'_ramdisk', url=self.image_metadata['ramdisk_url'])
self.image_creators.append(create_image.OpenStackImage(self.os_creds, ramdisk_image_settings))
ramdisk_image = self.image_creators[-1].create()
+ self.assertIsNotNone(ramdisk_image)
os_image_settings.extra_properties['ramdisk_id'] = ramdisk_image.id
+ self.assertEquals(get_image_size(ramdisk_image_settings), ramdisk_image.size)
self.image_creators.append(create_image.OpenStackImage(self.os_creds, os_image_settings))
created_image = self.image_creators[-1].create()
self.assertIsNotNone(created_image)
- retrieved_image = glance_utils.get_image(self.nova, self.glance, os_image_settings.name)
+ retrieved_image = glance_utils.get_image(self.glance, os_image_settings.name)
self.assertIsNotNone(retrieved_image)
-
+ self.assertEquals(created_image.size, retrieved_image.size)
+ self.assertEquals(get_image_size(os_image_settings), retrieved_image.size)
self.assertEquals(created_image.name, retrieved_image.name)
self.assertEquals(created_image.id, retrieved_image.id)
# Set the default image settings, then set any custom parameters sent from the app
os_image_settings = openstack_tests.cirros_url_image(name=self.image_name)
# Set properties
- os_image_settings.extra_properties = {'hw_video_model' : 'vga'}
-
+ os_image_settings.extra_properties = {'hw_video_model': 'vga'}
+
if self.image_metadata:
if 'disk_url' in self.image_metadata and self.image_metadata['disk_url']:
os_image_settings.url = self.image_metadata['disk_url']
if 'extra_properties' in self.image_metadata and self.image_metadata['extra_properties']:
- os_image_settings.extra_properties = dict(os_image_settings.extra_properties.items()
+ os_image_settings.extra_properties = dict(
+ os_image_settings.extra_properties.items()
+ self.image_metadata['extra_properties'].items())
# If this is a 3-part image create the kernel and ramdisk images first
name=self.image_name+'_kernel', url=self.image_metadata['kernel_url'])
self.image_creators.append(create_image.OpenStackImage(self.os_creds, kernel_image_settings))
kernel_image = self.image_creators[-1].create()
- os_image_settings.extra_properties['kernel_id'] = kernel_image.id
+ self.assertIsNotNone(kernel_image)
+ os_image_settings.extra_properties[str('kernel_id')] = kernel_image.id
+ self.assertEquals(get_image_size(kernel_image_settings), kernel_image.size)
if 'ramdisk_url' in self.image_metadata and self.image_metadata['ramdisk_url']:
ramdisk_image_settings = openstack_tests.cirros_url_image(
name=self.image_name+'_ramdisk', url=self.image_metadata['ramdisk_url'])
self.image_creators.append(create_image.OpenStackImage(self.os_creds, ramdisk_image_settings))
ramdisk_image = self.image_creators[-1].create()
- os_image_settings.extra_properties['ramdisk_id'] = ramdisk_image.id
+ self.assertIsNotNone(ramdisk_image)
+ os_image_settings.extra_properties[str('ramdisk_id')] = ramdisk_image.id
+ self.assertEquals(get_image_size(ramdisk_image_settings), ramdisk_image.size)
self.image_creators.append(create_image.OpenStackImage(self.os_creds, os_image_settings))
created_image = self.image_creators[-1].create()
self.assertIsNotNone(created_image)
- retrieved_image = glance_utils.get_image(self.nova, self.glance, os_image_settings.name)
+ retrieved_image = glance_utils.get_image(self.glance, os_image_settings.name)
self.assertIsNotNone(retrieved_image)
-
+ self.assertEquals(self.image_creators[-1].get_image().size, retrieved_image.size)
+ self.assertEquals(get_image_size(os_image_settings), retrieved_image.size)
self.assertEquals(created_image.name, retrieved_image.name)
self.assertEquals(created_image.id, retrieved_image.id)
- self.assertEquals(created_image.properties, retrieved_image.properties)
+ # self.assertEquals(created_image.properties, retrieved_image.properties)
def test_create_image_clean_file(self):
"""
name=self.image_name+'_kernel', file_path=kernel_image_file.name)
self.image_creators.append(create_image.OpenStackImage(self.os_creds, kernel_image_settings))
kernel_image = self.image_creators[-1].create()
+ self.assertIsNotNone(kernel_image)
file_image_settings.extra_properties['kernel_id'] = kernel_image.id
+ self.assertIsNotNone(kernel_image)
+ self.assertEquals(get_image_size(kernel_image_settings), kernel_image.size)
if 'ramdisk_url' in self.image_metadata and self.image_metadata['ramdisk_url']:
ramdisk_image_file = file_utils.download(self.image_metadata['ramdisk_url'], self.tmp_dir)
name=self.image_name+'_ramdisk', file_path=ramdisk_image_file.name)
self.image_creators.append(create_image.OpenStackImage(self.os_creds, ramdisk_image_settings))
ramdisk_image = self.image_creators[-1].create()
+ self.assertIsNotNone(ramdisk_image)
file_image_settings.extra_properties['ramdisk_id'] = ramdisk_image.id
self.image_creators.append(create_image.OpenStackImage(self.os_creds, file_image_settings))
self.assertIsNotNone(created_image)
self.assertEqual(self.image_name, created_image.name)
- retrieved_image = glance_utils.get_image(self.nova, self.glance, file_image_settings.name)
+ retrieved_image = glance_utils.get_image(self.glance, file_image_settings.name)
self.assertIsNotNone(retrieved_image)
+ self.assertEquals(self.image_creators[-1].get_image().size, retrieved_image.size)
+ self.assertEquals(get_image_size(file_image_settings), retrieved_image.size)
self.assertEquals(created_image.name, retrieved_image.name)
self.assertEquals(created_image.id, retrieved_image.id)
name=self.image_name+'_kernel', url=self.image_metadata['kernel_url'])
self.image_creators.append(create_image.OpenStackImage(self.os_creds, kernel_image_settings))
kernel_image = self.image_creators[-1].create()
+ self.assertIsNotNone(kernel_image)
+ self.assertEquals(get_image_size(kernel_image_settings), kernel_image.size)
os_image_settings.extra_properties['kernel_id'] = kernel_image.id
if 'ramdisk_url' in self.image_metadata and self.image_metadata['ramdisk_url']:
name=self.image_name+'_ramdisk', url=self.image_metadata['ramdisk_url'])
self.image_creators.append(create_image.OpenStackImage(self.os_creds, ramdisk_image_settings))
ramdisk_image = self.image_creators[-1].create()
+ self.assertIsNotNone(ramdisk_image)
+ self.assertEquals(get_image_size(ramdisk_image_settings), ramdisk_image.size)
os_image_settings.extra_properties['ramdisk_id'] = ramdisk_image.id
self.image_creators.append(create_image.OpenStackImage(self.os_creds, os_image_settings))
created_image = self.image_creators[-1].create()
self.assertIsNotNone(created_image)
+ retrieved_image = glance_utils.get_image(self.glance, os_image_settings.name)
+ self.assertIsNotNone(retrieved_image)
+ self.assertEquals(self.image_creators[-1].get_image().size, retrieved_image.size)
+ self.assertEquals(get_image_size(os_image_settings), retrieved_image.size)
+
# Delete Image manually
glance_utils.delete_image(self.glance, created_image)
- self.assertIsNone(glance_utils.get_image(self.nova, self.glance, self.image_creators[-1].image_settings.name))
+ self.assertIsNone(glance_utils.get_image(self.glance, self.image_creators[-1].image_settings.name))
# Must not throw an exception when attempting to cleanup non-existent image
self.image_creators[-1].clean()
name=self.image_name+'_kernel', url=self.image_metadata['kernel_url'])
self.image_creators.append(create_image.OpenStackImage(self.os_creds, kernel_image_settings))
kernel_image = self.image_creators[-1].create()
+ self.assertIsNotNone(kernel_image)
+ self.assertEquals(get_image_size(kernel_image_settings), kernel_image.size)
os_image_settings.extra_properties['kernel_id'] = kernel_image.id
if 'ramdisk_url' in self.image_metadata and self.image_metadata['ramdisk_url']:
name=self.image_name+'_ramdisk', url=self.image_metadata['ramdisk_url'])
self.image_creators.append(create_image.OpenStackImage(self.os_creds, ramdisk_image_settings))
ramdisk_image = self.image_creators[-1].create()
+ self.assertIsNotNone(ramdisk_image)
os_image_settings.extra_properties['ramdisk_id'] = ramdisk_image.id
self.image_creators.append(create_image.OpenStackImage(self.os_creds, os_image_settings))
image1 = self.image_creators[-1].create()
+ retrieved_image = glance_utils.get_image(self.glance, os_image_settings.name)
+ self.assertIsNotNone(retrieved_image)
+ self.assertEquals(self.image_creators[-1].get_image().size, retrieved_image.size)
+ self.assertEquals(get_image_size(os_image_settings), retrieved_image.size)
+ self.assertEquals(image1.name, retrieved_image.name)
+ self.assertEquals(image1.id, retrieved_image.id)
+ self.assertEquals(image1.properties, retrieved_image.properties)
+
# Should be retrieving the instance data
os_image_2 = create_image.OpenStackImage(self.os_creds, os_image_settings)
image2 = os_image_2.create()
guid = uuid.uuid4()
self.image_creators = list()
self.image_name = self.__class__.__name__ + '-' + str(guid)
-
- self.nova = nova_utils.nova_client(self.os_creds)
self.glance = glance_utils.glance_client(self.os_creds)
self.tmp_dir = 'tmp/' + str(guid)
properties = self.image_metadata['extra_properties']
# Create the kernel image
- kernel_image_settings = openstack_tests.cirros_url_image(name=self.image_name+'_kernel',
+ kernel_image_settings = openstack_tests.cirros_url_image(
+ name=self.image_name+'_kernel',
url='http://download.cirros-cloud.net/0.3.4/cirros-0.3.4-x86_64-kernel')
if self.image_metadata:
self.image_creators.append(create_image.OpenStackImage(self.os_creds, kernel_image_settings))
kernel_image = self.image_creators[-1].create()
self.assertIsNotNone(kernel_image)
+ self.assertEquals(get_image_size(kernel_image_settings), kernel_image.size)
# Create the ramdisk image
- ramdisk_image_settings = openstack_tests.cirros_url_image(name=self.image_name+'_ramdisk',
+ ramdisk_image_settings = openstack_tests.cirros_url_image(
+ name=self.image_name+'_ramdisk',
url='http://download.cirros-cloud.net/0.3.4/cirros-0.3.4-x86_64-initramfs')
if self.image_metadata:
if 'ramdisk_url' in self.image_metadata and self.image_metadata['ramdisk_url']:
self.image_creators.append(create_image.OpenStackImage(self.os_creds, ramdisk_image_settings))
ramdisk_image = self.image_creators[-1].create()
self.assertIsNotNone(ramdisk_image)
+ self.assertEquals(get_image_size(ramdisk_image_settings), ramdisk_image.size)
# Create the main image
- os_image_settings = openstack_tests.cirros_url_image(name=self.image_name,
+ os_image_settings = openstack_tests.cirros_url_image(
+ name=self.image_name,
url='http://download.cirros-cloud.net/0.3.4/cirros-0.3.4-x86_64-disk.img')
if self.image_metadata:
if 'disk_url' in self.image_metadata and self.image_metadata['disk_url']:
self.assertIsNotNone(created_image)
self.assertEqual(self.image_name, created_image.name)
- retrieved_image = glance_utils.get_image(self.nova, self.glance, os_image_settings.name)
+ retrieved_image = glance_utils.get_image(self.glance, os_image_settings.name)
self.assertIsNotNone(retrieved_image)
-
+ self.assertEquals(self.image_creators[-1].get_image().size, retrieved_image.size)
+ self.assertEquals(get_image_size(os_image_settings), retrieved_image.size)
self.assertEquals(created_image.name, retrieved_image.name)
self.assertEquals(created_image.id, retrieved_image.id)
self.assertEquals(created_image.properties, retrieved_image.properties)
self.image_creators.append(create_image.OpenStackImage(self.os_creds, kernel_file_image_settings))
kernel_image = self.image_creators[-1].create()
self.assertIsNotNone(kernel_image)
+ self.assertEquals(get_image_size(kernel_file_image_settings), kernel_image.size)
# Create the ramdisk image
ramdisk_url = 'http://download.cirros-cloud.net/0.3.4/cirros-0.3.4-x86_64-initramfs'
self.image_creators.append(create_image.OpenStackImage(self.os_creds, ramdisk_file_image_settings))
ramdisk_image = self.image_creators[-1].create()
self.assertIsNotNone(ramdisk_image)
+ self.assertEquals(get_image_size(ramdisk_file_image_settings), ramdisk_image.size)
# Create the main image
- image_url='http://download.cirros-cloud.net/0.3.4/cirros-0.3.4-x86_64-disk.img'
- if self.image_metadata:
- if 'disk_url' in self.image_metadata and self.image_metadata['disk_url']:
- umage_url = self.image_metadata['disk_url']
+ image_url = 'http://download.cirros-cloud.net/0.3.4/cirros-0.3.4-x86_64-disk.img'
image_file = file_utils.download(image_url, self.tmp_dir)
file_image_settings = openstack_tests.file_image_test_settings(name=self.image_name, file_path=image_file.name)
properties['kernel_id'] = kernel_image.id
self.assertIsNotNone(created_image)
self.assertEqual(self.image_name, created_image.name)
- retrieved_image = glance_utils.get_image(self.nova, self.glance, file_image_settings.name)
+ retrieved_image = glance_utils.get_image(self.glance, file_image_settings.name)
self.assertIsNotNone(retrieved_image)
+ self.assertEquals(self.image_creators[-1].get_image().size, retrieved_image.size)
+ self.assertEquals(get_image_size(file_image_settings), retrieved_image.size)
+
self.assertEquals(created_image.name, retrieved_image.name)
self.assertEquals(created_image.id, retrieved_image.id)
self.assertEquals(created_image.properties, retrieved_image.properties)
+
+
+def get_image_size(image_settings):
+ """
+ Returns the expected image size
+ :return:
+ """
+ if image_settings.image_file:
+ return os.path.getsize(image_settings.image_file)
+ elif image_settings.url:
+ return int(file_utils.get_content_length(image_settings.url))
+ else:
+ raise Exception('Cannot retrieve expected image size. Image filename or URL has not been configured')
-# Copyright (c) 2016 Cable Television Laboratories, Inc. ("CableLabs")
+# Copyright (c) 2017 Cable Television Laboratories, Inc. ("CableLabs")
# and others. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
import re
from snaps import file_utils
+from snaps.openstack.utils import glance_utils
from snaps.openstack.create_network import NetworkSettings, SubnetSettings
from snaps.openstack.create_router import RouterSettings
from snaps.openstack.os_credentials import OSCreds, ProxySettings
if not identity_api_version:
identity_api_version = 2
+ image_api_version = config.get('image_api_version')
+ if not image_api_version:
+ image_api_version = glance_utils.VERSION_2
+
proxy_settings = None
proxy_str = config.get('http_proxy')
if proxy_str:
os_creds = OSCreds(username=config['username'], password=config['password'],
auth_url=config['os_auth_url'], project_name=config['project_name'],
- identity_api_version=identity_api_version,
+ identity_api_version=identity_api_version, image_api_version=image_api_version,
proxy_settings=proxy_settings)
logger.info('OS Credentials = ' + str(os_creds))
def cirros_url_image(name, url=None):
if not url:
- url='http://download.cirros-cloud.net/0.3.4/cirros-0.3.4-x86_64-disk.img'
+ url = 'http://download.cirros-cloud.net/0.3.4/cirros-0.3.4-x86_64-disk.img'
return ImageSettings(name=name, image_user='cirros', img_format='qcow2', url=url)
def centos_url_image(name, url=None):
if not url:
- url='http://cloud.centos.org/centos/7/images/CentOS-7-x86_64-GenericCloud.qcow2'
- return ImageSettings(name=name, image_user='centos', img_format='qcow2', url=url,
+ url = 'http://cloud.centos.org/centos/7/images/CentOS-7-x86_64-GenericCloud.qcow2'
+ return ImageSettings(
+ name=name, image_user='centos', img_format='qcow2', url=url,
nic_config_pb_loc='./provisioning/ansible/centos-network-setup/playbooks/configure_host.yml')
def ubuntu_url_image(name, url=None):
if not url:
- url='http://uec-images.ubuntu.com/releases/trusty/14.04/ubuntu-14.04-server-cloudimg-amd64-disk1.img'
- return ImageSettings(name=name, image_user='ubuntu', img_format='qcow2', url=url,
+ url = 'http://uec-images.ubuntu.com/releases/trusty/14.04/ubuntu-14.04-server-cloudimg-amd64-disk1.img'
+ return ImageSettings(
+ name=name, image_user='ubuntu', img_format='qcow2', url=url,
nic_config_pb_loc='./provisioning/ansible/ubuntu-network-setup/playbooks/configure_host.yml')
-# Copyright (c) 2016 Cable Television Laboratories, Inc. ("CableLabs")
+# Copyright (c) 2017 Cable Television Laboratories, Inc. ("CableLabs")
# and others. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# See the License for the specific language governing permissions and
# limitations under the License.
import logging
+import os
+import uuid
from snaps import file_utils
from glanceclient.client import Client
+
+from snaps.domain.image import Image
from snaps.openstack.utils import keystone_utils
__author__ = 'spisarski'
logger = logging.getLogger('glance_utils')
+VERSION_1 = 1.0
+VERSION_2 = 2.0
+
"""
Utilities for basic neutron API calls
"""
return Client(version=os_creds.image_api_version, session=keystone_utils.keystone_session(os_creds))
-def get_image(nova, glance, image_name):
+def get_image(glance, image_name=None):
"""
Returns an OpenStack image object for a given name
- :param nova: the Nova client
:param glance: the Glance client
:param image_name: the image name to lookup
:return: the image object or None
"""
- try:
- image_dict = nova.images.find(name=image_name)
- if image_dict:
- return glance.images.get(image_dict.id)
- except:
- pass
+ images = glance.images.list()
+ for image in images:
+ if glance.version == VERSION_1:
+ if image.name == image_name:
+ image = glance.images.get(image.id)
+ return Image(name=image.name, image_id=image.id, size=image.size, properties=image.properties)
+ elif glance.version == VERSION_2:
+ if image['name'] == image_name:
+ return Image(name=image['name'], image_id=image['id'], size=image['size'],
+ properties=image.get('properties'))
return None
+def get_image_status(glance, image):
+ """
+ Returns a new OpenStack Image object for a given OpenStack image object
+ :param glance: the Glance client
+ :param image: the domain Image object
+ :return: the OpenStack Image object
+ """
+ if glance.version == VERSION_1:
+ os_image = glance.images.get(image.id)
+ return os_image.status
+ elif glance.version == VERSION_2:
+ os_image = glance.images.get(image.id)
+ return os_image['status']
+ else:
+ raise Exception('Unsupported glance client version')
+
+
def create_image(glance, image_settings):
"""
Creates and returns OpenStack image object with an external URL
:return: the OpenStack image object
:raise Exception if using a file and it cannot be found
"""
+ if glance.version == VERSION_1:
+ return __create_image_v1(glance, image_settings)
+ elif glance.version == VERSION_2:
+ return __create_image_v2(glance, image_settings)
+ else:
+ raise Exception('Unsupported glance client version')
+
+
+def __create_image_v1(glance, image_settings):
+ """
+ Creates and returns OpenStack image object with an external URL
+ :param glance: the glance client
+ :param image_settings: the image settings object
+ :return: the OpenStack image object
+ :raise Exception if using a file and it cannot be found
+ """
+ created_image = None
+
if image_settings.url:
if image_settings.extra_properties:
- return glance.images.create(name=image_settings.name,
- disk_format=image_settings.format,
- container_format="bare",
- location=image_settings.url,
- properties=image_settings.extra_properties)
- return glance.images.create(name=image_settings.name, disk_format=image_settings.format,
- container_format="bare", location=image_settings.url)
+ created_image = glance.images.create(
+ name=image_settings.name, disk_format=image_settings.format, container_format="bare",
+ location=image_settings.url, properties=image_settings.extra_properties)
+ else:
+ created_image = glance.images.create(name=image_settings.name, disk_format=image_settings.format,
+ container_format="bare", location=image_settings.url)
elif image_settings.image_file:
image_file = file_utils.get_file(image_settings.image_file)
if image_settings.extra_properties:
- return glance.images.create(name=image_settings.name,
- disk_format=image_settings.format,
- container_format="bare",
- data=image_file,
- properties=image_settings.extra_properties)
- return glance.images.create(name=image_settings.name, disk_format=image_settings.format,
- container_format="bare", data=image_file)
+ created_image = glance.images.create(
+ name=image_settings.name, disk_format=image_settings.format, container_format="bare", data=image_file,
+ properties=image_settings.extra_properties)
+ else:
+ created_image = glance.images.create(
+ name=image_settings.name, disk_format=image_settings.format, container_format="bare", data=image_file)
+
+ return Image(name=image_settings.name, image_id=created_image.id, size=created_image.size,
+ properties=created_image.properties)
+
+
+def __create_image_v2(glance, image_settings):
+ """
+ Creates and returns OpenStack image object with an external URL
+ :param glance: the glance client v2
+ :param image_settings: the image settings object
+ :return: the OpenStack image object
+ :raise Exception if using a file and it cannot be found
+ """
+ cleanup_file = False
+ if image_settings.image_file:
+ image_filename = image_settings.image_file
+ elif image_settings.url:
+ image_file = file_utils.download(image_settings.url, '/tmp', str(uuid.uuid4()))
+ image_filename = image_file.name
+ cleanup_file = True
+ else:
+ raise Exception('Filename or URL of image not configured')
+
+ created_image = None
+ try:
+ kwargs = dict()
+ kwargs['name'] = image_settings.name
+ kwargs['disk_format'] = image_settings.format
+ kwargs['container_format'] = 'bare'
+
+ if image_settings.extra_properties:
+ for key, value in image_settings.extra_properties.iteritems():
+ kwargs[key] = value
+
+ created_image = glance.images.create(**kwargs)
+ image_file = file_utils.get_file(image_filename)
+ glance.images.upload(created_image['id'], image_file)
+ except Exception as e:
+ logger.error('Unexpected exception creating image. Rolling back')
+ if created_image:
+ delete_image(glance, created_image)
+ raise e
+ finally:
+ if cleanup_file:
+ os.remove(image_filename)
+
+ updated_image = glance.images.get(created_image['id'])
+ return Image(name=updated_image['name'], image_id=updated_image['id'], size=updated_image['size'],
+ properties=updated_image.get('properties'))
def delete_image(glance, image):
:param glance: the glance client
:param image: the image to delete
"""
- glance.images.delete(image)
+ if glance.version == VERSION_1:
+ glance.images.delete(image)
+ elif glance.version == VERSION_2:
+ glance.images.delete(image.id)
+ else:
+ raise Exception('Unsupported glance client version')
-# Copyright (c) 2016 Cable Television Laboratories, Inc. ("CableLabs")
+# Copyright (c) 2017 Cable Television Laboratories, Inc. ("CableLabs")
# and others. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
from snaps import file_utils
from snaps.openstack.tests import openstack_tests
-from snaps.openstack.utils import nova_utils
from snaps.openstack.tests import validation_utils
from snaps.openstack.tests.os_source_file_test import OSComponentTestCase
from snaps.openstack.utils import glance_utils
guid = uuid.uuid4()
self.image_name = self.__class__.__name__ + '-' + str(guid)
self.image = None
- self.nova = nova_utils.nova_client(self.os_creds)
self.glance = glance_utils.glance_client(self.os_creds)
self.tmp_dir = 'tmp/' + str(guid)
self.assertEqual(self.image_name, self.image.name)
- image = glance_utils.get_image(self.nova, self.glance, os_image_settings.name)
+ image = glance_utils.get_image(self.glance, os_image_settings.name)
self.assertIsNotNone(image)
validation_utils.objects_equivalent(self.image, image)
self.assertIsNotNone(self.image)
self.assertEqual(self.image_name, self.image.name)
- image = glance_utils.get_image(self.nova, self.glance, file_image_settings.name)
+ image = glance_utils.get_image(self.glance, file_image_settings.name)
self.assertIsNotNone(image)
validation_utils.objects_equivalent(self.image, image)
-# Copyright (c) 2016 Cable Television Laboratories, Inc. ("CableLabs")
+# Copyright (c) 2017 Cable Television Laboratories, Inc. ("CableLabs")
# and others. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
import logging
import unittest
+from snaps.domain.test.image_tests import ImageDomainObjectTests
from snaps.openstack.utils.tests.glance_utils_tests import GlanceSmokeTests, GlanceUtilsTests
from snaps.openstack.tests.create_flavor_tests import CreateFlavorTests
from snaps.tests.file_utils_tests import FileUtilsTests
suite.addTest(unittest.TestLoader().loadTestsFromTestCase(SecurityGroupRuleSettingsUnitTests))
suite.addTest(unittest.TestLoader().loadTestsFromTestCase(SecurityGroupSettingsUnitTests))
suite.addTest(unittest.TestLoader().loadTestsFromTestCase(ImageSettingsUnitTests))
+ suite.addTest(unittest.TestLoader().loadTestsFromTestCase(ImageDomainObjectTests))
suite.addTest(unittest.TestLoader().loadTestsFromTestCase(KeypairSettingsUnitTests))
suite.addTest(unittest.TestLoader().loadTestsFromTestCase(UserSettingsUnitTests))
suite.addTest(unittest.TestLoader().loadTestsFromTestCase(ProjectSettingsUnitTests))