1 # Copyright (c) 2017 Cable Television Laboratories, Inc. ("CableLabs")
2 # and others. All rights reserved.
4 # Licensed under the Apache License, Version 2.0 (the "License");
5 # you may not use this file except in compliance with the License.
6 # You may obtain a copy of the License at:
8 # http://www.apache.org/licenses/LICENSE-2.0
10 # Unless required by applicable law or agreed to in writing, software
11 # distributed under the License is distributed on an "AS IS" BASIS,
12 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 # See the License for the specific language governing permissions and
14 # limitations under the License.
17 from urllib.request import URLError
19 from urllib2 import URLError
27 from snaps import file_utils
28 from snaps.openstack.create_image import ImageSettings, ImageCreationError, ImageSettingsError
30 from snaps.openstack.tests import openstack_tests
31 from snaps.openstack.utils import glance_utils
32 from snaps.openstack import create_image
33 from snaps.openstack.tests.os_source_file_test import OSIntegrationTestCase
35 __author__ = 'spisarski'
37 logger = logging.getLogger('create_image_tests')
40 class ImageSettingsUnitTests(unittest.TestCase):
42 Tests the construction of the ImageSettings class
44 def test_no_params(self):
45 with self.assertRaises(ImageSettingsError):
48 def test_empty_config(self):
49 with self.assertRaises(ImageSettingsError):
50 ImageSettings(config=dict())
52 def test_name_only(self):
53 with self.assertRaises(ImageSettingsError):
54 ImageSettings(name='foo')
56 def test_config_with_name_only(self):
57 with self.assertRaises(ImageSettingsError):
58 ImageSettings(config={'name': 'foo'})
60 def test_name_user_only(self):
61 with self.assertRaises(ImageSettingsError):
62 ImageSettings(name='foo', image_user='bar')
64 def test_config_with_name_user_only(self):
65 with self.assertRaises(ImageSettingsError):
66 ImageSettings(config={'name': 'foo', 'image_user': 'bar'})
68 def test_name_user_format_only(self):
69 with self.assertRaises(ImageSettingsError):
70 ImageSettings(name='foo', image_user='bar', img_format='qcow2')
72 def test_config_with_name_user_format_only(self):
73 with self.assertRaises(ImageSettingsError):
74 ImageSettings(config={'name': 'foo', 'image_user': 'bar', 'format': 'qcow2'})
76 def test_name_user_format_url_file_only(self):
77 with self.assertRaises(ImageSettingsError):
78 ImageSettings(name='foo', image_user='bar', img_format='qcow2', url='http://foo.com',
79 image_file='/foo/bar.qcow')
81 def test_config_with_name_user_format_url_file_only(self):
82 with self.assertRaises(ImageSettingsError):
83 ImageSettings(config={'name': 'foo', 'image_user': 'bar', 'format': 'qcow2',
84 'download_url': 'http://foo.com', 'image_file': '/foo/bar.qcow'})
86 def test_name_user_format_url_only(self):
87 settings = ImageSettings(name='foo', image_user='bar', img_format='qcow2', url='http://foo.com')
88 self.assertEqual('foo', settings.name)
89 self.assertEqual('bar', settings.image_user)
90 self.assertEqual('qcow2', settings.format)
91 self.assertEqual('http://foo.com', settings.url)
92 self.assertIsNone(settings.image_file)
93 self.assertFalse(settings.exists)
94 self.assertFalse(settings.public)
95 self.assertIsNone(settings.nic_config_pb_loc)
97 def test_name_user_format_url_only_properties(self):
98 properties = {'hw_video_model': 'vga'}
99 settings = ImageSettings(name='foo', image_user='bar', img_format='qcow2', url='http://foo.com',
100 extra_properties=properties)
101 self.assertEqual('foo', settings.name)
102 self.assertEqual('bar', settings.image_user)
103 self.assertEqual('qcow2', settings.format)
104 self.assertEqual('http://foo.com', settings.url)
105 self.assertEqual(properties, settings.extra_properties)
106 self.assertIsNone(settings.image_file)
107 self.assertFalse(settings.exists)
108 self.assertFalse(settings.public)
109 self.assertIsNone(settings.nic_config_pb_loc)
111 def test_config_with_name_user_format_url_only(self):
112 settings = ImageSettings(config={'name': 'foo', 'image_user': 'bar', 'format': 'qcow2',
113 'download_url': 'http://foo.com'})
114 self.assertEqual('foo', settings.name)
115 self.assertEqual('bar', settings.image_user)
116 self.assertEqual('qcow2', settings.format)
117 self.assertEqual('http://foo.com', settings.url)
118 self.assertIsNone(settings.image_file)
119 self.assertFalse(settings.exists)
120 self.assertFalse(settings.public)
121 self.assertIsNone(settings.nic_config_pb_loc)
123 def test_name_user_format_file_only(self):
124 settings = ImageSettings(name='foo', image_user='bar', img_format='qcow2', image_file='/foo/bar.qcow')
125 self.assertEqual('foo', settings.name)
126 self.assertEqual('bar', settings.image_user)
127 self.assertEqual('qcow2', settings.format)
128 self.assertIsNone(settings.url)
129 self.assertEqual('/foo/bar.qcow', settings.image_file)
130 self.assertFalse(settings.exists)
131 self.assertFalse(settings.public)
132 self.assertIsNone(settings.nic_config_pb_loc)
134 def test_config_with_name_user_format_file_only(self):
135 settings = ImageSettings(config={'name': 'foo', 'image_user': 'bar', 'format': 'qcow2',
136 'image_file': '/foo/bar.qcow'})
137 self.assertEqual('foo', settings.name)
138 self.assertEqual('bar', settings.image_user)
139 self.assertEqual('qcow2', settings.format)
140 self.assertIsNone(settings.url)
141 self.assertEqual('/foo/bar.qcow', settings.image_file)
142 self.assertFalse(settings.exists)
143 self.assertFalse(settings.public)
144 self.assertIsNone(settings.nic_config_pb_loc)
146 def test_all_url(self):
147 properties = {'hw_video_model': 'vga'}
148 kernel_settings = ImageSettings(name='kernel', url='http://kernel.com', image_user='bar', img_format='qcow2')
149 ramdisk_settings = ImageSettings(name='ramdisk', url='http://ramdisk.com', image_user='bar', img_format='qcow2')
150 settings = ImageSettings(name='foo', image_user='bar', img_format='qcow2', url='http://foo.com',
151 extra_properties=properties, nic_config_pb_loc='/foo/bar',
152 kernel_image_settings=kernel_settings, ramdisk_image_settings=ramdisk_settings,
153 exists=True, public=True)
154 self.assertEqual('foo', settings.name)
155 self.assertEqual('bar', settings.image_user)
156 self.assertEqual('qcow2', settings.format)
157 self.assertEqual('http://foo.com', settings.url)
158 self.assertEqual(properties, settings.extra_properties)
159 self.assertIsNone(settings.image_file)
160 self.assertEqual('/foo/bar', settings.nic_config_pb_loc)
161 self.assertEqual('kernel', settings.kernel_image_settings.name)
162 self.assertEqual('http://kernel.com', settings.kernel_image_settings.url)
163 self.assertEqual('bar', settings.kernel_image_settings.image_user)
164 self.assertEqual('qcow2', settings.kernel_image_settings.format)
165 self.assertEqual('ramdisk', settings.ramdisk_image_settings.name)
166 self.assertEqual('http://ramdisk.com', settings.ramdisk_image_settings.url)
167 self.assertEqual('bar', settings.ramdisk_image_settings.image_user)
168 self.assertEqual('qcow2', settings.ramdisk_image_settings.format)
169 self.assertTrue(settings.exists)
170 self.assertTrue(settings.public)
172 def test_config_all_url(self):
173 settings = ImageSettings(
174 config={'name': 'foo', 'image_user': 'bar', 'format': 'qcow2',
175 'download_url': 'http://foo.com',
176 'extra_properties': '{\'hw_video_model\': \'vga\'}',
177 'nic_config_pb_loc': '/foo/bar',
178 'kernel_image_settings': {'name': 'kernel', 'download_url': 'http://kernel.com',
179 'image_user': 'bar', 'format': 'qcow2'},
180 'ramdisk_image_settings': {'name': 'ramdisk', 'download_url': 'http://ramdisk.com',
181 'image_user': 'bar', 'format': 'qcow2'},
182 'exists': True, 'public': True})
183 self.assertEqual('foo', settings.name)
184 self.assertEqual('bar', settings.image_user)
185 self.assertEqual('qcow2', settings.format)
186 self.assertEqual('http://foo.com', settings.url)
187 self.assertEqual('{\'hw_video_model\': \'vga\'}', settings.extra_properties)
188 self.assertIsNone(settings.image_file)
189 self.assertEqual('/foo/bar', settings.nic_config_pb_loc)
190 self.assertEqual('kernel', settings.kernel_image_settings.name)
191 self.assertEqual('http://kernel.com', settings.kernel_image_settings.url)
192 self.assertEqual('ramdisk', settings.ramdisk_image_settings.name)
193 self.assertEqual('http://ramdisk.com', settings.ramdisk_image_settings.url)
194 self.assertTrue(settings.exists)
195 self.assertTrue(settings.public)
197 def test_all_file(self):
198 properties = {'hw_video_model': 'vga'}
199 settings = ImageSettings(name='foo', image_user='bar', img_format='qcow2', image_file='/foo/bar.qcow',
200 extra_properties=properties, nic_config_pb_loc='/foo/bar', exists=True, public=True)
201 self.assertEqual('foo', settings.name)
202 self.assertEqual('bar', settings.image_user)
203 self.assertEqual('qcow2', settings.format)
204 self.assertIsNone(settings.url)
205 self.assertEqual('/foo/bar.qcow', settings.image_file)
206 self.assertEqual(properties, settings.extra_properties)
207 self.assertEqual('/foo/bar', settings.nic_config_pb_loc)
208 self.assertTrue(settings.exists)
209 self.assertTrue(settings.public)
211 def test_config_all_file(self):
212 settings = ImageSettings(config={'name': 'foo', 'image_user': 'bar', 'format': 'qcow2',
213 'image_file': '/foo/bar.qcow',
214 'extra_properties': '{\'hw_video_model\' : \'vga\'}',
215 'nic_config_pb_loc': '/foo/bar', 'exists': True, 'public': True})
216 self.assertEqual('foo', settings.name)
217 self.assertEqual('bar', settings.image_user)
218 self.assertEqual('qcow2', settings.format)
219 self.assertIsNone(settings.url)
220 self.assertEqual('/foo/bar.qcow', settings.image_file)
221 self.assertEqual('{\'hw_video_model\' : \'vga\'}', settings.extra_properties)
222 self.assertEqual('/foo/bar', settings.nic_config_pb_loc)
223 self.assertTrue(settings.exists)
224 self.assertTrue(settings.public)
227 class CreateImageSuccessTests(OSIntegrationTestCase):
229 Test for the CreateImage class defined in create_image.py
234 Instantiates the CreateImage object that is responsible for downloading and creating an OS image file
237 super(self.__class__, self).__start__()
240 self.image_name = self.__class__.__name__ + '-' + str(guid)
241 self.glance = glance_utils.glance_client(self.os_creds)
242 self.image_creator = None
244 if self.image_metadata and 'glance_tests' in self.image_metadata:
245 glance_test_meta = self.image_metadata['glance_tests']
247 glance_test_meta = None
249 self.tmp_dir = 'tmp/' + str(guid)
250 if not os.path.exists(self.tmp_dir):
251 os.makedirs(self.tmp_dir)
253 self.image_settings = openstack_tests.cirros_image_settings(name=self.image_name,
254 image_metadata=glance_test_meta)
258 Cleans the image and downloaded image file
260 if self.image_creator:
261 self.image_creator.clean()
263 if os.path.exists(self.tmp_dir) and os.path.isdir(self.tmp_dir):
264 shutil.rmtree(self.tmp_dir)
266 super(self.__class__, self).__clean__()
268 def test_create_image_clean_url(self):
270 Tests the creation of an OpenStack image from a URL.
273 # Set the default image settings, then set any custom parameters sent from the app
275 self.image_creator = create_image.OpenStackImage(self.os_creds, self.image_settings)
276 created_image = self.image_creator.create()
277 self.assertIsNotNone(created_image)
279 retrieved_image = glance_utils.get_image(self.glance, self.image_settings.name)
280 self.assertIsNotNone(retrieved_image)
281 self.assertEqual(created_image.size, retrieved_image.size)
282 self.assertEqual(get_image_size(self.image_settings), retrieved_image.size)
283 self.assertEqual(created_image.name, retrieved_image.name)
284 self.assertEqual(created_image.id, retrieved_image.id)
286 def test_create_image_clean_url_properties(self):
288 Tests the creation of an OpenStack image from a URL and set properties.
291 # Set the default image settings, then set any custom parameters sent from the app
292 self.image_creator = create_image.OpenStackImage(self.os_creds, self.image_settings)
293 created_image = self.image_creator.create()
294 self.assertIsNotNone(created_image)
296 retrieved_image = glance_utils.get_image(self.glance, self.image_settings.name)
297 self.assertIsNotNone(retrieved_image)
298 self.assertEqual(self.image_creator.get_image().size, retrieved_image.size)
299 self.assertEqual(get_image_size(self.image_settings), retrieved_image.size)
300 self.assertEqual(created_image.name, retrieved_image.name)
301 self.assertEqual(created_image.id, retrieved_image.id)
302 self.assertEqual(created_image.properties, retrieved_image.properties)
304 def test_create_image_clean_file(self):
306 Tests the creation of an OpenStack image from a file.
308 if not self.image_settings.image_file and self.image_settings.url:
309 # Download the file of the image
310 image_file_name = file_utils.download(self.image_settings.url, self.tmp_dir).name
312 image_file_name = self.image_settings.image_file
315 file_image_settings = openstack_tests.file_image_test_settings(
316 name=self.image_name, file_path=image_file_name)
318 self.image_creator = create_image.OpenStackImage(self.os_creds, file_image_settings)
319 created_image = self.image_creator.create()
320 self.assertIsNotNone(created_image)
321 self.assertEqual(self.image_name, created_image.name)
323 retrieved_image = glance_utils.get_image(self.glance, file_image_settings.name)
324 self.assertIsNotNone(retrieved_image)
325 self.assertEqual(self.image_creator.get_image().size, retrieved_image.size)
326 self.assertEqual(get_image_size(file_image_settings), retrieved_image.size)
328 self.assertEqual(created_image.name, retrieved_image.name)
329 self.assertEqual(created_image.id, retrieved_image.id)
331 logger.warn('Test not executed as the image metadata requires image files')
333 def test_create_delete_image(self):
335 Tests the creation then deletion of an OpenStack image to ensure clean() does not raise an Exception.
338 self.image_creator = create_image.OpenStackImage(self.os_creds, self.image_settings)
339 created_image = self.image_creator.create()
340 self.assertIsNotNone(created_image)
342 retrieved_image = glance_utils.get_image(self.glance, self.image_settings.name)
343 self.assertIsNotNone(retrieved_image)
344 self.assertEqual(self.image_creator.get_image().size, retrieved_image.size)
345 self.assertEqual(get_image_size(self.image_settings), retrieved_image.size)
347 # Delete Image manually
348 glance_utils.delete_image(self.glance, created_image)
350 self.assertIsNone(glance_utils.get_image(self.glance, self.image_creator.image_settings.name))
352 # Must not throw an exception when attempting to cleanup non-existent image
353 self.image_creator.clean()
354 self.assertIsNone(self.image_creator.get_image())
356 def test_create_same_image(self):
358 Tests the creation of an OpenStack image when the image already exists.
361 self.image_creator = create_image.OpenStackImage(self.os_creds, self.image_settings)
362 image1 = self.image_creator.create()
364 retrieved_image = glance_utils.get_image(self.glance, self.image_settings.name)
365 self.assertIsNotNone(retrieved_image)
366 self.assertEqual(self.image_creator.get_image().size, retrieved_image.size)
367 self.assertEqual(get_image_size(self.image_settings), retrieved_image.size)
368 self.assertEqual(image1.name, retrieved_image.name)
369 self.assertEqual(image1.id, retrieved_image.id)
370 self.assertEqual(image1.properties, retrieved_image.properties)
372 # Should be retrieving the instance data
373 os_image_2 = create_image.OpenStackImage(self.os_creds, self.image_settings)
374 image2 = os_image_2.create()
375 self.assertEqual(image1.id, image2.id)
377 def test_create_same_image_new_settings(self):
379 Tests the creation of an OpenStack image when the image already exists and the configuration only contains
383 self.image_creator = create_image.OpenStackImage(self.os_creds, self.image_settings)
384 image1 = self.image_creator.create()
386 retrieved_image = glance_utils.get_image(self.glance, self.image_settings.name)
387 self.assertIsNotNone(retrieved_image)
388 self.assertEqual(self.image_creator.get_image().size, retrieved_image.size)
389 self.assertEqual(get_image_size(self.image_settings), retrieved_image.size)
390 self.assertEqual(image1.name, retrieved_image.name)
391 self.assertEqual(image1.id, retrieved_image.id)
392 self.assertEqual(image1.properties, retrieved_image.properties)
394 # Should be retrieving the instance data
395 image_2_settings = ImageSettings(name=self.image_settings.name, image_user='foo', exists=True)
396 os_image_2 = create_image.OpenStackImage(self.os_creds, image_2_settings)
397 image2 = os_image_2.create()
398 self.assertEqual(image1.id, image2.id)
401 class CreateImageNegativeTests(OSIntegrationTestCase):
403 Negative test cases for the CreateImage class
407 super(self.__class__, self).__start__()
409 self.image_name = self.__class__.__name__ + '-' + str(uuid.uuid4())
410 self.image_creator = None
413 if self.image_creator:
414 self.image_creator.clean()
416 super(self.__class__, self).__clean__()
418 def test_bad_image_name(self):
420 Expect an ImageCreationError when the image name does not exist when a file or URL has not been configured
422 os_image_settings = ImageSettings(name='foo', image_user='bar', exists=True)
423 self.image_creator = create_image.OpenStackImage(self.os_creds, os_image_settings)
425 with self.assertRaises(ImageCreationError):
426 self.image_creator.create()
428 self.fail('ImageCreationError should have been raised prior to this line')
430 def test_bad_image_url(self):
432 Expect an ImageCreationError when the image download url is bad
434 os_image_settings = openstack_tests.cirros_image_settings(name=self.image_name)
435 self.image_creator = create_image.OpenStackImage(self.os_creds, create_image.ImageSettings(
436 name=os_image_settings.name, image_user=os_image_settings.image_user,
437 img_format=os_image_settings.format, url="http://foo.bar"))
439 with self.assertRaises(URLError):
440 self.image_creator.create()
442 def test_bad_image_file(self):
444 Expect an ImageCreationError when the image file does not exist
446 os_image_settings = openstack_tests.cirros_image_settings(name=self.image_name)
447 self.image_creator = create_image.OpenStackImage(
449 create_image.ImageSettings(name=os_image_settings.name, image_user=os_image_settings.image_user,
450 img_format=os_image_settings.format, image_file="/foo/bar.qcow"))
451 with self.assertRaises(IOError):
452 self.image_creator.create()
455 class CreateMultiPartImageTests(OSIntegrationTestCase):
457 Test different means for creating a 3-part images
461 Instantiates the CreateImage object that is responsible for
462 downloading and creating an OS image file within OpenStack
464 super(self.__class__, self).__start__()
467 self.image_creators = list()
468 self.image_name = self.__class__.__name__ + '-' + str(guid)
469 self.glance = glance_utils.glance_client(self.os_creds)
471 self.tmp_dir = 'tmp/' + str(guid)
472 if not os.path.exists(self.tmp_dir):
473 os.makedirs(self.tmp_dir)
475 if self.image_metadata and 'glance_tests' in self.image_metadata:
476 self.glance_test_meta = self.image_metadata['glance_tests']
478 self.glance_test_meta = dict()
482 Cleans the images and downloaded image file
484 for image_creator in self.image_creators:
485 image_creator.clean()
487 if os.path.exists(self.tmp_dir) and os.path.isdir(self.tmp_dir):
488 shutil.rmtree(self.tmp_dir)
490 super(self.__class__, self).__clean__()
492 def test_create_three_part_image_from_url(self):
494 Tests the creation of a 3-part OpenStack image from a URL.
496 # Create the kernel image
497 if 'disk_file' not in self.glance_test_meta:
498 image_settings = openstack_tests.cirros_image_settings(
499 name=self.image_name,
500 image_metadata={'disk_url': 'http://download.cirros-cloud.net/0.3.4/cirros-0.3.4-x86_64-disk.img',
501 'kernel_url': 'http://download.cirros-cloud.net/0.3.4/cirros-0.3.4-x86_64-kernel',
502 'ramdisk_url': 'http://download.cirros-cloud.net/0.3.4/cirros-0.3.4-x86_64-initramfs'})
504 image_creator = create_image.OpenStackImage(self.os_creds, image_settings)
505 self.image_creators.append(image_creator)
506 image_creator.create()
508 main_image = glance_utils.get_image(self.glance, image_settings.name)
509 self.assertIsNotNone(main_image)
510 self.assertIsNotNone(image_creator.get_image())
511 self.assertEqual(image_creator.get_image().id, main_image.id)
513 kernel_image = glance_utils.get_image(self.glance, image_settings.kernel_image_settings.name)
514 self.assertIsNotNone(kernel_image)
515 self.assertIsNotNone(image_creator.get_kernel_image())
516 self.assertEqual(kernel_image.id, image_creator.get_kernel_image().id)
518 ramdisk_image = glance_utils.get_image(self.glance, image_settings.ramdisk_image_settings.name)
519 self.assertIsNotNone(ramdisk_image)
520 self.assertIsNotNone(image_creator.get_ramdisk_image())
521 self.assertEqual(ramdisk_image.id, image_creator.get_ramdisk_image().id)
523 logger.warn('Test not executed as the image metadata requires image files')
525 def test_create_three_part_image_from_file_3_creators(self):
527 Tests the creation of a 3-part OpenStack image from files.
531 if self.glance_test_meta:
532 if 'extra_properties' in self.glance_test_meta:
533 properties = self.glance_test_meta['extra_properties']
535 # Create the kernel image
536 kernel_file_name = None
537 kernel_url = openstack_tests.CIRROS_DEFAULT_KERNEL_IMAGE_URL
538 if 'kernel_file' in self.glance_test_meta:
539 kernel_file_name = self.glance_test_meta['kernel_file']
540 elif 'kernel_url' in self.glance_test_meta:
541 kernel_url = self.glance_test_meta['kernel_url']
543 kernel_url = 'http://download.cirros-cloud.net/0.3.4/cirros-0.3.4-x86_64-kernel'
545 if not kernel_file_name:
546 kernel_file_name = file_utils.download(kernel_url, self.tmp_dir).name
548 kernel_file_image_settings = openstack_tests.file_image_test_settings(
549 name=self.image_name+'_kernel', file_path=kernel_file_name)
551 self.image_creators.append(create_image.OpenStackImage(self.os_creds, kernel_file_image_settings))
552 kernel_image = self.image_creators[-1].create()
553 self.assertIsNotNone(kernel_image)
554 self.assertEqual(get_image_size(kernel_file_image_settings), kernel_image.size)
556 # Create the ramdisk image
557 ramdisk_file_name = None
558 ramdisk_url = openstack_tests.CIRROS_DEFAULT_RAMDISK_IMAGE_URL
559 if 'ramdisk_file' in self.glance_test_meta:
560 ramdisk_file_name = self.glance_test_meta['ramdisk_file']
561 elif 'ramdisk_url' in self.glance_test_meta:
562 ramdisk_url = self.glance_test_meta['ramdisk_url']
564 if not ramdisk_file_name:
565 ramdisk_file_name = file_utils.download(ramdisk_url, self.tmp_dir).name
567 ramdisk_file_image_settings = openstack_tests.file_image_test_settings(
568 name=self.image_name+'_ramdisk', file_path=ramdisk_file_name)
569 self.image_creators.append(create_image.OpenStackImage(self.os_creds, ramdisk_file_image_settings))
570 ramdisk_image = self.image_creators[-1].create()
571 self.assertIsNotNone(ramdisk_image)
572 self.assertEqual(get_image_size(ramdisk_file_image_settings), ramdisk_image.size)
574 # Create the main disk image
575 disk_file_name = None
576 disk_url = openstack_tests.CIRROS_DEFAULT_IMAGE_URL
577 if 'disk_file' in self.glance_test_meta:
578 disk_file_name = self.glance_test_meta['disk_file']
579 elif 'disk_url' in self.glance_test_meta:
580 disk_url = self.glance_test_meta['disk_url']
582 if not disk_file_name:
583 disk_file_name = file_utils.download(disk_url, self.tmp_dir).name
585 file_image_settings = openstack_tests.file_image_test_settings(name=self.image_name, file_path=disk_file_name)
586 properties['kernel_id'] = kernel_image.id
587 properties['ramdisk_id'] = ramdisk_image.id
588 file_image_settings.extra_properties = properties
589 self.image_creators.append(create_image.OpenStackImage(self.os_creds, file_image_settings))
590 created_image = self.image_creators[-1].create()
592 self.assertIsNotNone(created_image)
593 self.assertEqual(self.image_name, created_image.name)
595 retrieved_image = glance_utils.get_image(self.glance, file_image_settings.name)
596 self.assertIsNotNone(retrieved_image)
597 self.assertEqual(self.image_creators[-1].get_image().size, retrieved_image.size)
598 self.assertEqual(get_image_size(file_image_settings), retrieved_image.size)
599 self.assertEqual(created_image.name, retrieved_image.name)
600 self.assertEqual(created_image.id, retrieved_image.id)
601 self.assertEqual(created_image.properties, retrieved_image.properties)
603 def test_create_three_part_image_from_url_3_creators(self):
605 Tests the creation of a 3-part OpenStack image from a URL.
607 if 'disk_file' not in self.glance_test_meta:
610 if self.glance_test_meta and 'extra_properties' in self.glance_test_meta:
611 properties = self.glance_test_meta['extra_properties']
613 # Create the kernel image
614 kernel_image_settings = openstack_tests.cirros_image_settings(
615 name=self.image_name+'_kernel',
616 url='http://download.cirros-cloud.net/0.3.4/cirros-0.3.4-x86_64-kernel')
618 if self.glance_test_meta:
619 if 'kernel_url' in self.glance_test_meta:
620 kernel_image_settings.url = self.glance_test_meta['kernel_url']
621 self.image_creators.append(create_image.OpenStackImage(self.os_creds, kernel_image_settings))
622 kernel_image = self.image_creators[-1].create()
623 self.assertIsNotNone(kernel_image)
624 self.assertEqual(get_image_size(kernel_image_settings), kernel_image.size)
626 # Create the ramdisk image
627 ramdisk_image_settings = openstack_tests.cirros_image_settings(
628 name=self.image_name+'_ramdisk',
629 url='http://download.cirros-cloud.net/0.3.4/cirros-0.3.4-x86_64-initramfs')
630 if self.glance_test_meta:
631 if 'ramdisk_url' in self.glance_test_meta:
632 ramdisk_image_settings.url = self.glance_test_meta['ramdisk_url']
633 self.image_creators.append(create_image.OpenStackImage(self.os_creds, ramdisk_image_settings))
634 ramdisk_image = self.image_creators[-1].create()
635 self.assertIsNotNone(ramdisk_image)
636 self.assertEqual(get_image_size(ramdisk_image_settings), ramdisk_image.size)
638 # Create the main image
639 os_image_settings = openstack_tests.cirros_image_settings(
640 name=self.image_name,
641 url='http://download.cirros-cloud.net/0.3.4/cirros-0.3.4-x86_64-disk.img')
642 if self.glance_test_meta:
643 if 'disk_url' in self.glance_test_meta:
644 os_image_settings.url = self.glance_test_meta['disk_url']
646 properties['kernel_id'] = kernel_image.id
647 properties['ramdisk_id'] = ramdisk_image.id
648 os_image_settings.extra_properties = properties
650 self.image_creators.append(create_image.OpenStackImage(self.os_creds, os_image_settings))
651 created_image = self.image_creators[-1].create()
652 self.assertIsNotNone(created_image)
653 self.assertEqual(self.image_name, created_image.name)
655 retrieved_image = glance_utils.get_image(self.glance, os_image_settings.name)
656 self.assertIsNotNone(retrieved_image)
658 self.assertEqual(self.image_creators[-1].get_image().size, retrieved_image.size)
659 self.assertEqual(get_image_size(os_image_settings), retrieved_image.size)
661 self.assertEqual(created_image.name, retrieved_image.name)
662 self.assertEqual(created_image.id, retrieved_image.id)
663 self.assertEqual(created_image.properties, retrieved_image.properties)
665 logger.warn('Test not executed as the image metadata requires image files')
668 def get_image_size(image_settings):
670 Returns the expected image size
673 if image_settings.image_file:
674 return os.path.getsize(image_settings.image_file)
675 elif image_settings.url:
676 return int(file_utils.get_content_length(image_settings.url))
678 raise Exception('Cannot retrieve expected image size. Image filename or URL has not been configured')