1 # Copyright (c) 2017 Cable Television Laboratories, Inc. ("CableLabs")
2 # and others. All rights reserved.
4 # Licensed under the Apache License, Version 2.0 (the "License");
5 # you may not use this file except in compliance with the License.
6 # You may obtain a copy of the License at:
8 # http://www.apache.org/licenses/LICENSE-2.0
10 # Unless required by applicable law or agreed to in writing, software
11 # distributed under the License is distributed on an "AS IS" BASIS,
12 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 # See the License for the specific language governing permissions and
14 # limitations under the License.
17 from urllib.request import URLError
19 from urllib2 import URLError
28 from snaps import file_utils
29 from snaps.openstack import create_image
30 from snaps.openstack.create_image import (ImageSettings, ImageCreationError,
32 from snaps.openstack.tests import openstack_tests
33 from snaps.openstack.tests.os_source_file_test import OSIntegrationTestCase
34 from snaps.openstack.utils import glance_utils
36 __author__ = 'spisarski'
38 logger = logging.getLogger('create_image_tests')
41 class ImageSettingsUnitTests(unittest.TestCase):
43 Tests the construction of the ImageSettings class
46 def test_no_params(self):
47 with self.assertRaises(ImageSettingsError):
50 def test_empty_config(self):
51 with self.assertRaises(ImageSettingsError):
52 ImageSettings(**dict())
54 def test_name_only(self):
55 with self.assertRaises(ImageSettingsError):
56 ImageSettings(name='foo')
58 def test_config_with_name_only(self):
59 with self.assertRaises(ImageSettingsError):
60 ImageSettings(**{'name': 'foo'})
62 def test_name_user_only(self):
63 with self.assertRaises(ImageSettingsError):
64 ImageSettings(name='foo', image_user='bar')
66 def test_config_with_name_user_only(self):
67 with self.assertRaises(ImageSettingsError):
68 ImageSettings(**{'name': 'foo', 'image_user': 'bar'})
70 def test_name_user_format_only(self):
71 with self.assertRaises(ImageSettingsError):
72 ImageSettings(name='foo', image_user='bar', img_format='qcow2')
74 def test_config_with_name_user_format_only(self):
75 with self.assertRaises(ImageSettingsError):
77 **{'name': 'foo', 'image_user': 'bar', 'format': 'qcow2'})
79 def test_name_user_format_url_file_only(self):
80 with self.assertRaises(ImageSettingsError):
81 ImageSettings(name='foo', image_user='bar', img_format='qcow2',
83 image_file='/foo/bar.qcow')
85 def test_config_with_name_user_format_url_file_only(self):
86 with self.assertRaises(ImageSettingsError):
88 **{'name': 'foo', 'image_user': 'bar', 'format': 'qcow2',
89 'download_url': 'http://foo.com',
90 'image_file': '/foo/bar.qcow'})
92 def test_name_user_format_url_only(self):
93 settings = ImageSettings(name='foo', image_user='bar',
94 img_format='qcow2', url='http://foo.com')
95 self.assertEqual('foo', settings.name)
96 self.assertEqual('bar', settings.image_user)
97 self.assertEqual('qcow2', settings.format)
98 self.assertEqual('http://foo.com', settings.url)
99 self.assertIsNone(settings.image_file)
100 self.assertFalse(settings.exists)
101 self.assertFalse(settings.public)
102 self.assertIsNone(settings.nic_config_pb_loc)
104 def test_name_user_format_url_only_properties(self):
105 properties = {'hw_video_model': 'vga'}
106 settings = ImageSettings(name='foo', image_user='bar',
107 img_format='qcow2', url='http://foo.com',
108 extra_properties=properties)
109 self.assertEqual('foo', settings.name)
110 self.assertEqual('bar', settings.image_user)
111 self.assertEqual('qcow2', settings.format)
112 self.assertEqual('http://foo.com', settings.url)
113 self.assertEqual(properties, settings.extra_properties)
114 self.assertIsNone(settings.image_file)
115 self.assertFalse(settings.exists)
116 self.assertFalse(settings.public)
117 self.assertIsNone(settings.nic_config_pb_loc)
119 def test_config_with_name_user_format_url_only(self):
120 settings = ImageSettings(
121 **{'name': 'foo', 'image_user': 'bar', 'format': 'qcow2',
122 'download_url': 'http://foo.com'})
123 self.assertEqual('foo', settings.name)
124 self.assertEqual('bar', settings.image_user)
125 self.assertEqual('qcow2', settings.format)
126 self.assertEqual('http://foo.com', settings.url)
127 self.assertIsNone(settings.image_file)
128 self.assertFalse(settings.exists)
129 self.assertFalse(settings.public)
130 self.assertIsNone(settings.nic_config_pb_loc)
132 def test_name_user_format_file_only(self):
133 settings = ImageSettings(name='foo', image_user='bar',
135 image_file='/foo/bar.qcow')
136 self.assertEqual('foo', settings.name)
137 self.assertEqual('bar', settings.image_user)
138 self.assertEqual('qcow2', settings.format)
139 self.assertIsNone(settings.url)
140 self.assertEqual('/foo/bar.qcow', settings.image_file)
141 self.assertFalse(settings.exists)
142 self.assertFalse(settings.public)
143 self.assertIsNone(settings.nic_config_pb_loc)
145 def test_config_with_name_user_format_file_only(self):
146 settings = ImageSettings(
147 **{'name': 'foo', 'image_user': 'bar', 'format': 'qcow2',
148 'image_file': '/foo/bar.qcow'})
149 self.assertEqual('foo', settings.name)
150 self.assertEqual('bar', settings.image_user)
151 self.assertEqual('qcow2', settings.format)
152 self.assertIsNone(settings.url)
153 self.assertEqual('/foo/bar.qcow', settings.image_file)
154 self.assertFalse(settings.exists)
155 self.assertFalse(settings.public)
156 self.assertIsNone(settings.nic_config_pb_loc)
158 def test_all_url(self):
159 properties = {'hw_video_model': 'vga'}
160 kernel_settings = ImageSettings(name='kernel', url='http://kernel.com',
161 image_user='bar', img_format='qcow2')
162 ramdisk_settings = ImageSettings(name='ramdisk',
163 url='http://ramdisk.com',
164 image_user='bar', img_format='qcow2')
165 settings = ImageSettings(name='foo', image_user='bar',
166 img_format='qcow2', url='http://foo.com',
167 extra_properties=properties,
168 nic_config_pb_loc='/foo/bar',
169 kernel_image_settings=kernel_settings,
170 ramdisk_image_settings=ramdisk_settings,
171 exists=True, public=True)
172 self.assertEqual('foo', settings.name)
173 self.assertEqual('bar', settings.image_user)
174 self.assertEqual('qcow2', settings.format)
175 self.assertEqual('http://foo.com', settings.url)
176 self.assertEqual(properties, settings.extra_properties)
177 self.assertIsNone(settings.image_file)
178 self.assertEqual('/foo/bar', settings.nic_config_pb_loc)
179 self.assertEqual('kernel', settings.kernel_image_settings.name)
180 self.assertEqual('http://kernel.com',
181 settings.kernel_image_settings.url)
182 self.assertEqual('bar', settings.kernel_image_settings.image_user)
183 self.assertEqual('qcow2', settings.kernel_image_settings.format)
184 self.assertEqual('ramdisk', settings.ramdisk_image_settings.name)
185 self.assertEqual('http://ramdisk.com',
186 settings.ramdisk_image_settings.url)
187 self.assertEqual('bar', settings.ramdisk_image_settings.image_user)
188 self.assertEqual('qcow2', settings.ramdisk_image_settings.format)
189 self.assertTrue(settings.exists)
190 self.assertTrue(settings.public)
192 def test_config_all_url(self):
193 settings = ImageSettings(
194 **{'name': 'foo', 'image_user': 'bar', 'format': 'qcow2',
195 'download_url': 'http://foo.com',
196 'extra_properties': '{\'hw_video_model\': \'vga\'}',
197 'nic_config_pb_loc': '/foo/bar',
198 'kernel_image_settings': {
200 'download_url': 'http://kernel.com',
203 'ramdisk_image_settings': {
205 'download_url': 'http://ramdisk.com',
208 'exists': True, 'public': True})
209 self.assertEqual('foo', settings.name)
210 self.assertEqual('bar', settings.image_user)
211 self.assertEqual('qcow2', settings.format)
212 self.assertEqual('http://foo.com', settings.url)
213 self.assertEqual('{\'hw_video_model\': \'vga\'}',
214 settings.extra_properties)
215 self.assertIsNone(settings.image_file)
216 self.assertEqual('/foo/bar', settings.nic_config_pb_loc)
217 self.assertEqual('kernel', settings.kernel_image_settings.name)
218 self.assertEqual('http://kernel.com',
219 settings.kernel_image_settings.url)
220 self.assertEqual('ramdisk', settings.ramdisk_image_settings.name)
221 self.assertEqual('http://ramdisk.com',
222 settings.ramdisk_image_settings.url)
223 self.assertTrue(settings.exists)
224 self.assertTrue(settings.public)
226 def test_all_file(self):
227 properties = {'hw_video_model': 'vga'}
228 settings = ImageSettings(name='foo', image_user='bar',
230 image_file='/foo/bar.qcow',
231 extra_properties=properties,
232 nic_config_pb_loc='/foo/bar', exists=True,
234 self.assertEqual('foo', settings.name)
235 self.assertEqual('bar', settings.image_user)
236 self.assertEqual('qcow2', settings.format)
237 self.assertIsNone(settings.url)
238 self.assertEqual('/foo/bar.qcow', settings.image_file)
239 self.assertEqual(properties, settings.extra_properties)
240 self.assertEqual('/foo/bar', settings.nic_config_pb_loc)
241 self.assertTrue(settings.exists)
242 self.assertTrue(settings.public)
244 def test_config_all_file(self):
245 settings = ImageSettings(
246 **{'name': 'foo', 'image_user': 'bar', 'format': 'qcow2',
247 'image_file': '/foo/bar.qcow',
248 'extra_properties': '{\'hw_video_model\' : \'vga\'}',
249 'nic_config_pb_loc': '/foo/bar', 'exists': True,
251 self.assertEqual('foo', settings.name)
252 self.assertEqual('bar', settings.image_user)
253 self.assertEqual('qcow2', settings.format)
254 self.assertIsNone(settings.url)
255 self.assertEqual('/foo/bar.qcow', settings.image_file)
256 self.assertEqual('{\'hw_video_model\' : \'vga\'}',
257 settings.extra_properties)
258 self.assertEqual('/foo/bar', settings.nic_config_pb_loc)
259 self.assertTrue(settings.exists)
260 self.assertTrue(settings.public)
263 class CreateImageSuccessTests(OSIntegrationTestCase):
265 Test for the CreateImage class defined in create_image.py
270 Instantiates the CreateImage object that is responsible for downloading
271 and creating an OS image file within OpenStack
273 super(self.__class__, self).__start__()
276 self.image_name = self.__class__.__name__ + '-' + str(guid)
277 self.glance = glance_utils.glance_client(self.os_creds)
278 self.image_creator = None
280 if self.image_metadata and 'glance_tests' in self.image_metadata:
281 glance_test_meta = self.image_metadata['glance_tests']
283 glance_test_meta = None
285 self.tmp_dir = 'tmp/' + str(guid)
286 if not os.path.exists(self.tmp_dir):
287 os.makedirs(self.tmp_dir)
289 self.image_settings = openstack_tests.cirros_image_settings(
290 name=self.image_name,
291 image_metadata=glance_test_meta)
295 Cleans the image and downloaded image file
297 if self.image_creator:
298 self.image_creator.clean()
300 if os.path.exists(self.tmp_dir) and os.path.isdir(self.tmp_dir):
301 shutil.rmtree(self.tmp_dir)
303 super(self.__class__, self).__clean__()
305 def test_create_image_clean_url(self):
307 Tests the creation of an OpenStack image from a URL.
310 # Set the default image settings, then set any custom parameters sent
313 self.image_creator = create_image.OpenStackImage(self.os_creds,
315 created_image = self.image_creator.create()
316 self.assertIsNotNone(created_image)
318 retrieved_image = glance_utils.get_image(self.glance,
319 self.image_settings.name)
320 self.assertIsNotNone(retrieved_image)
321 self.assertEqual(created_image.size, retrieved_image.size)
322 self.assertEqual(get_image_size(self.image_settings),
323 retrieved_image.size)
324 self.assertEqual(created_image.name, retrieved_image.name)
325 self.assertEqual(created_image.id, retrieved_image.id)
327 def test_create_image_clean_url_properties(self):
329 Tests the creation of an OpenStack image from a URL and set properties.
332 # Set the default image settings, then set any custom parameters sent
334 self.image_creator = create_image.OpenStackImage(self.os_creds,
336 created_image = self.image_creator.create()
337 self.assertIsNotNone(created_image)
339 retrieved_image = glance_utils.get_image(self.glance,
340 self.image_settings.name)
341 self.assertIsNotNone(retrieved_image)
342 self.assertEqual(self.image_creator.get_image().size,
343 retrieved_image.size)
344 self.assertEqual(get_image_size(self.image_settings),
345 retrieved_image.size)
346 self.assertEqual(created_image.name, retrieved_image.name)
347 self.assertEqual(created_image.id, retrieved_image.id)
348 self.assertEqual(created_image.properties, retrieved_image.properties)
350 def test_create_image_clean_file(self):
352 Tests the creation of an OpenStack image from a file.
354 if not self.image_settings.image_file and self.image_settings.url:
355 # Download the file of the image
356 image_file_name = file_utils.download(self.image_settings.url,
359 image_file_name = self.image_settings.image_file
362 file_image_settings = openstack_tests.file_image_test_settings(
363 name=self.image_name, file_path=image_file_name)
365 self.image_creator = create_image.OpenStackImage(
366 self.os_creds, file_image_settings)
367 created_image = self.image_creator.create()
368 self.assertIsNotNone(created_image)
369 self.assertEqual(self.image_name, created_image.name)
371 retrieved_image = glance_utils.get_image(
372 self.glance, file_image_settings.name)
373 self.assertIsNotNone(retrieved_image)
374 self.assertEqual(self.image_creator.get_image().size,
375 retrieved_image.size)
376 self.assertEqual(get_image_size(file_image_settings),
377 retrieved_image.size)
379 self.assertEqual(created_image.name, retrieved_image.name)
380 self.assertEqual(created_image.id, retrieved_image.id)
383 'Test not executed as the image metadata requires image files')
385 def test_create_delete_image(self):
387 Tests the creation then deletion of an OpenStack image to ensure
388 clean() does not raise an Exception.
391 self.image_creator = create_image.OpenStackImage(self.os_creds,
393 created_image = self.image_creator.create()
394 self.assertIsNotNone(created_image)
396 retrieved_image = glance_utils.get_image(self.glance,
397 self.image_settings.name)
398 self.assertIsNotNone(retrieved_image)
399 self.assertEqual(self.image_creator.get_image().size,
400 retrieved_image.size)
401 self.assertEqual(get_image_size(self.image_settings),
402 retrieved_image.size)
404 # Delete Image manually
405 glance_utils.delete_image(self.glance, created_image)
407 self.assertIsNone(glance_utils.get_image(
408 self.glance, self.image_creator.image_settings.name))
410 # Must not throw an exception when attempting to cleanup non-existent
412 self.image_creator.clean()
413 self.assertIsNone(self.image_creator.get_image())
415 def test_create_same_image(self):
417 Tests the creation of an OpenStack image when the image already exists.
420 self.image_creator = create_image.OpenStackImage(self.os_creds,
422 image1 = self.image_creator.create()
424 retrieved_image = glance_utils.get_image(self.glance,
425 self.image_settings.name)
426 self.assertIsNotNone(retrieved_image)
427 self.assertEqual(self.image_creator.get_image().size,
428 retrieved_image.size)
429 self.assertEqual(get_image_size(self.image_settings),
430 retrieved_image.size)
431 self.assertEqual(image1.name, retrieved_image.name)
432 self.assertEqual(image1.id, retrieved_image.id)
433 self.assertEqual(image1.properties, retrieved_image.properties)
435 # Should be retrieving the instance data
436 os_image_2 = create_image.OpenStackImage(self.os_creds,
438 image2 = os_image_2.create()
439 self.assertEqual(image1.id, image2.id)
441 def test_create_same_image_new_settings(self):
443 Tests the creation of an OpenStack image when the image already exists
444 and the configuration only contains the name.
447 self.image_creator = create_image.OpenStackImage(self.os_creds,
449 image1 = self.image_creator.create()
451 retrieved_image = glance_utils.get_image(self.glance,
452 self.image_settings.name)
453 self.assertIsNotNone(retrieved_image)
454 self.assertEqual(self.image_creator.get_image().size,
455 retrieved_image.size)
456 self.assertEqual(get_image_size(self.image_settings),
457 retrieved_image.size)
458 self.assertEqual(image1.name, retrieved_image.name)
459 self.assertEqual(image1.id, retrieved_image.id)
460 self.assertEqual(image1.properties, retrieved_image.properties)
462 # Should be retrieving the instance data
463 image_2_settings = ImageSettings(name=self.image_settings.name,
464 image_user='foo', exists=True)
465 os_image_2 = create_image.OpenStackImage(self.os_creds,
467 image2 = os_image_2.create()
468 self.assertEqual(image1.id, image2.id)
471 class CreateImageNegativeTests(OSIntegrationTestCase):
473 Negative test cases for the CreateImage class
477 super(self.__class__, self).__start__()
479 self.image_name = self.__class__.__name__ + '-' + str(uuid.uuid4())
480 self.image_creator = None
483 if self.image_creator:
484 self.image_creator.clean()
486 super(self.__class__, self).__clean__()
488 def test_bad_image_name(self):
490 Expect an ImageCreationError when the image name does not exist when a
491 file or URL has not been configured
493 os_image_settings = ImageSettings(name='foo', image_user='bar',
495 self.image_creator = create_image.OpenStackImage(self.os_creds,
498 with self.assertRaises(ImageCreationError):
499 self.image_creator.create()
501 self.fail('ImageCreationError should have been raised prior to'
504 def test_bad_image_url(self):
506 Expect an ImageCreationError when the image download url is bad
508 os_image_settings = openstack_tests.cirros_image_settings(
509 name=self.image_name)
510 self.image_creator = create_image.OpenStackImage(
512 create_image.ImageSettings(name=os_image_settings.name,
513 image_user=os_image_settings.image_user,
514 img_format=os_image_settings.format,
515 url="http://foo.bar"))
517 with self.assertRaises(URLError):
518 self.image_creator.create()
520 def test_bad_image_file(self):
522 Expect an ImageCreationError when the image file does not exist
524 os_image_settings = openstack_tests.cirros_image_settings(
525 name=self.image_name)
526 self.image_creator = create_image.OpenStackImage(
528 create_image.ImageSettings(name=os_image_settings.name,
529 image_user=os_image_settings.image_user,
530 img_format=os_image_settings.format,
531 image_file="/foo/bar.qcow"))
532 with self.assertRaises(IOError):
533 self.image_creator.create()
536 class CreateMultiPartImageTests(OSIntegrationTestCase):
538 Test different means for creating a 3-part images
543 Instantiates the CreateImage object that is responsible for
544 downloading and creating an OS image file within OpenStack
546 super(self.__class__, self).__start__()
549 self.image_creators = list()
550 self.image_name = self.__class__.__name__ + '-' + str(guid)
551 self.glance = glance_utils.glance_client(self.os_creds)
553 self.tmp_dir = 'tmp/' + str(guid)
554 if not os.path.exists(self.tmp_dir):
555 os.makedirs(self.tmp_dir)
557 if self.image_metadata and 'glance_tests' in self.image_metadata:
558 self.glance_test_meta = self.image_metadata['glance_tests']
560 self.glance_test_meta = dict()
564 Cleans the images and downloaded image file
566 for image_creator in self.image_creators:
567 image_creator.clean()
569 if os.path.exists(self.tmp_dir) and os.path.isdir(self.tmp_dir):
570 shutil.rmtree(self.tmp_dir)
572 super(self.__class__, self).__clean__()
574 def test_create_three_part_image_from_url(self):
576 Tests the creation of a 3-part OpenStack image from a URL.
578 # Create the kernel image
579 if 'disk_file' not in self.glance_test_meta:
580 image_settings = openstack_tests.cirros_image_settings(
581 name=self.image_name,
584 openstack_tests.CIRROS_DEFAULT_IMAGE_URL,
586 openstack_tests.CIRROS_DEFAULT_KERNEL_IMAGE_URL,
588 openstack_tests.CIRROS_DEFAULT_RAMDISK_IMAGE_URL})
590 image_creator = create_image.OpenStackImage(self.os_creds,
592 self.image_creators.append(image_creator)
593 image_creator.create()
595 main_image = glance_utils.get_image(self.glance,
597 self.assertIsNotNone(main_image)
598 self.assertIsNotNone(image_creator.get_image())
599 self.assertEqual(image_creator.get_image().id, main_image.id)
601 kernel_image = glance_utils.get_image(
602 self.glance, image_settings.kernel_image_settings.name)
603 self.assertIsNotNone(kernel_image)
604 self.assertIsNotNone(image_creator.get_kernel_image())
605 self.assertEqual(kernel_image.id,
606 image_creator.get_kernel_image().id)
608 ramdisk_image = glance_utils.get_image(
609 self.glance, image_settings.ramdisk_image_settings.name)
610 self.assertIsNotNone(ramdisk_image)
611 self.assertIsNotNone(image_creator.get_ramdisk_image())
612 self.assertEqual(ramdisk_image.id,
613 image_creator.get_ramdisk_image().id)
616 'Test not executed as the image metadata requires image files')
618 def test_create_three_part_image_from_file_3_creators(self):
620 Tests the creation of a 3-part OpenStack image from files.
626 if self.glance_test_meta:
627 if 'extra_properties' in self.glance_test_meta:
628 properties = self.glance_test_meta['extra_properties']
629 if 'disk_file' in self.glance_test_meta:
632 # Create the kernel image
633 kernel_file_name = None
634 kernel_url = openstack_tests.CIRROS_DEFAULT_KERNEL_IMAGE_URL
635 if 'kernel_file' in self.glance_test_meta:
636 kernel_file_name = self.glance_test_meta['kernel_file']
637 elif 'kernel_url' in self.glance_test_meta:
638 kernel_url = self.glance_test_meta['kernel_url']
640 kernel_url = openstack_tests.CIRROS_DEFAULT_KERNEL_IMAGE_URL
642 if not kernel_file_name and not file_only:
643 kernel_file_name = file_utils.download(kernel_url,
646 logger.warn('Will not download the kernel image.'
647 ' Cannot execute test')
650 kernel_file_image_settings = openstack_tests.file_image_test_settings(
651 name=self.image_name + '_kernel', file_path=kernel_file_name)
653 self.image_creators.append(create_image.OpenStackImage(
654 self.os_creds, kernel_file_image_settings))
655 kernel_image = self.image_creators[-1].create()
656 self.assertIsNotNone(kernel_image)
657 self.assertEqual(get_image_size(kernel_file_image_settings),
660 # Create the ramdisk image
661 ramdisk_file_name = None
662 ramdisk_url = openstack_tests.CIRROS_DEFAULT_RAMDISK_IMAGE_URL
663 if 'ramdisk_file' in self.glance_test_meta:
664 ramdisk_file_name = self.glance_test_meta['ramdisk_file']
665 elif 'ramdisk_url' in self.glance_test_meta:
666 ramdisk_url = self.glance_test_meta['ramdisk_url']
668 if not ramdisk_file_name and not file_only:
669 ramdisk_file_name = file_utils.download(ramdisk_url,
672 logger.warn('Will not download the ramdisk image.'
673 ' Cannot execute test')
676 ramdisk_file_image_settings = openstack_tests.file_image_test_settings(
677 name=self.image_name + '_ramdisk', file_path=ramdisk_file_name)
678 self.image_creators.append(create_image.OpenStackImage(
679 self.os_creds, ramdisk_file_image_settings))
680 ramdisk_image = self.image_creators[-1].create()
681 self.assertIsNotNone(ramdisk_image)
682 self.assertEqual(get_image_size(ramdisk_file_image_settings),
685 # Create the main disk image
686 disk_file_name = None
687 disk_url = openstack_tests.CIRROS_DEFAULT_IMAGE_URL
688 if 'disk_file' in self.glance_test_meta:
689 disk_file_name = self.glance_test_meta['disk_file']
690 elif 'disk_url' in self.glance_test_meta:
691 disk_url = self.glance_test_meta['disk_url']
693 if not disk_file_name and not file_only:
694 disk_file_name = file_utils.download(disk_url, self.tmp_dir).name
696 logger.warn('Will not download the disk file image.'
697 ' Cannot execute test')
700 file_image_settings = openstack_tests.file_image_test_settings(
701 name=self.image_name, file_path=disk_file_name)
702 properties['kernel_id'] = kernel_image.id
703 properties['ramdisk_id'] = ramdisk_image.id
704 file_image_settings.extra_properties = properties
705 self.image_creators.append(
706 create_image.OpenStackImage(self.os_creds, file_image_settings))
707 created_image = self.image_creators[-1].create()
709 self.assertIsNotNone(created_image)
710 self.assertEqual(self.image_name, created_image.name)
712 retrieved_image = glance_utils.get_image(self.glance,
713 file_image_settings.name)
714 self.assertIsNotNone(retrieved_image)
715 self.assertEqual(self.image_creators[-1].get_image().size,
716 retrieved_image.size)
717 self.assertEqual(get_image_size(file_image_settings),
718 retrieved_image.size)
719 self.assertEqual(created_image.name, retrieved_image.name)
720 self.assertEqual(created_image.id, retrieved_image.id)
721 self.assertEqual(created_image.properties, retrieved_image.properties)
723 def test_create_three_part_image_from_url_3_creators(self):
725 Tests the creation of a 3-part OpenStack image from a URL.
727 if 'disk_file' not in self.glance_test_meta:
730 if self.glance_test_meta and \
731 'extra_properties' in self.glance_test_meta:
732 properties = self.glance_test_meta['extra_properties']
734 # Create the kernel image
735 kernel_image_settings = openstack_tests.cirros_image_settings(
736 name=self.image_name + '_kernel',
737 url=openstack_tests.CIRROS_DEFAULT_KERNEL_IMAGE_URL)
739 if self.glance_test_meta:
740 if 'kernel_url' in self.glance_test_meta:
741 kernel_image_settings.url = self.glance_test_meta[
743 self.image_creators.append(
744 create_image.OpenStackImage(self.os_creds,
745 kernel_image_settings))
746 kernel_image = self.image_creators[-1].create()
747 self.assertIsNotNone(kernel_image)
748 self.assertEqual(get_image_size(kernel_image_settings),
751 # Create the ramdisk image
752 ramdisk_image_settings = openstack_tests.cirros_image_settings(
753 name=self.image_name + '_ramdisk',
754 url=openstack_tests.CIRROS_DEFAULT_RAMDISK_IMAGE_URL)
755 if self.glance_test_meta:
756 if 'ramdisk_url' in self.glance_test_meta:
757 ramdisk_image_settings.url = self.glance_test_meta[
759 self.image_creators.append(
760 create_image.OpenStackImage(self.os_creds,
761 ramdisk_image_settings))
762 ramdisk_image = self.image_creators[-1].create()
763 self.assertIsNotNone(ramdisk_image)
764 self.assertEqual(get_image_size(ramdisk_image_settings),
767 # Create the main image
768 os_image_settings = openstack_tests.cirros_image_settings(
769 name=self.image_name,
770 url=openstack_tests.CIRROS_DEFAULT_IMAGE_URL)
771 if self.glance_test_meta:
772 if 'disk_url' in self.glance_test_meta:
773 os_image_settings.url = self.glance_test_meta['disk_url']
775 properties['kernel_id'] = kernel_image.id
776 properties['ramdisk_id'] = ramdisk_image.id
777 os_image_settings.extra_properties = properties
779 self.image_creators.append(
780 create_image.OpenStackImage(self.os_creds, os_image_settings))
781 created_image = self.image_creators[-1].create()
782 self.assertIsNotNone(created_image)
783 self.assertEqual(self.image_name, created_image.name)
785 retrieved_image = glance_utils.get_image(self.glance,
786 os_image_settings.name)
787 self.assertIsNotNone(retrieved_image)
789 self.assertEqual(self.image_creators[-1].get_image().size,
790 retrieved_image.size)
791 self.assertEqual(get_image_size(os_image_settings),
792 retrieved_image.size)
794 self.assertEqual(created_image.name, retrieved_image.name)
795 self.assertEqual(created_image.id, retrieved_image.id)
796 self.assertEqual(created_image.properties,
797 retrieved_image.properties)
800 'Test not executed as the image metadata requires image files')
803 def get_image_size(image_settings):
805 Returns the expected image size
808 if image_settings.image_file:
809 return os.path.getsize(image_settings.image_file)
810 elif image_settings.url:
811 return int(file_utils.get_content_length(image_settings.url))
814 'Cannot retrieve expected image size. Image filename or URL has '
815 'not been configured')