1 # Copyright (c) 2017 Cable Television Laboratories, Inc. ("CableLabs")
2 # and others. All rights reserved.
4 # Licensed under the Apache License, Version 2.0 (the "License");
5 # you may not use this file except in compliance with the License.
6 # You may obtain a copy of the License at:
8 # http://www.apache.org/licenses/LICENSE-2.0
10 # Unless required by applicable law or agreed to in writing, software
11 # distributed under the License is distributed on an "AS IS" BASIS,
12 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 # See the License for the specific language governing permissions and
14 # limitations under the License.
15 from glanceclient.exc import HTTPBadRequest
18 from urllib.request import URLError
20 from urllib2 import URLError
29 from snaps import file_utils
30 from snaps.config.image import ImageConfigError
31 from snaps.openstack import create_image
32 from snaps.openstack.create_image import ImageSettings, ImageCreationError
33 from snaps.config.image import ImageConfig
34 from snaps.openstack.tests import openstack_tests
35 from snaps.openstack.tests.os_source_file_test import OSIntegrationTestCase
36 from snaps.openstack.utils import glance_utils
38 __author__ = 'spisarski'
40 logger = logging.getLogger('create_image_tests')
43 class ImageSettingsUnitTests(unittest.TestCase):
45 Tests the construction of the ImageSettings class
46 To be removed once the deprecated class ImageSettings is finally removed
50 def test_no_params(self):
51 with self.assertRaises(ImageConfigError):
54 def test_empty_config(self):
55 with self.assertRaises(ImageConfigError):
56 ImageSettings(**dict())
58 def test_name_only(self):
59 with self.assertRaises(ImageConfigError):
60 ImageSettings(name='foo')
62 def test_config_with_name_only(self):
63 with self.assertRaises(ImageConfigError):
64 ImageSettings(**{'name': 'foo'})
66 def test_name_user_only(self):
67 with self.assertRaises(ImageConfigError):
68 ImageSettings(name='foo', image_user='bar')
70 def test_config_with_name_user_only(self):
71 with self.assertRaises(ImageConfigError):
72 ImageSettings(**{'name': 'foo', 'image_user': 'bar'})
74 def test_name_user_format_only(self):
75 with self.assertRaises(ImageConfigError):
76 ImageSettings(name='foo', image_user='bar', img_format='qcow2')
78 def test_config_with_name_user_format_only(self):
79 with self.assertRaises(ImageConfigError):
81 **{'name': 'foo', 'image_user': 'bar', 'format': 'qcow2'})
83 def test_name_user_format_url_only(self):
84 settings = ImageSettings(name='foo', image_user='bar',
85 img_format='qcow2', url='http://foo.com')
86 self.assertEqual('foo', settings.name)
87 self.assertEqual('bar', settings.image_user)
88 self.assertEqual('qcow2', settings.format)
89 self.assertEqual('http://foo.com', settings.url)
90 self.assertIsNone(settings.image_file)
91 self.assertFalse(settings.exists)
92 self.assertFalse(settings.public)
93 self.assertIsNone(settings.nic_config_pb_loc)
95 def test_name_user_format_url_only_properties(self):
96 properties = {'hw_video_model': 'vga'}
97 settings = ImageSettings(name='foo', image_user='bar',
98 img_format='qcow2', url='http://foo.com',
99 extra_properties=properties)
100 self.assertEqual('foo', settings.name)
101 self.assertEqual('bar', settings.image_user)
102 self.assertEqual('qcow2', settings.format)
103 self.assertEqual('http://foo.com', settings.url)
104 self.assertEqual(properties, settings.extra_properties)
105 self.assertIsNone(settings.image_file)
106 self.assertFalse(settings.exists)
107 self.assertFalse(settings.public)
108 self.assertIsNone(settings.nic_config_pb_loc)
110 def test_config_with_name_user_format_url_only(self):
111 settings = ImageSettings(
112 **{'name': 'foo', 'image_user': 'bar', 'format': 'qcow2',
113 'download_url': 'http://foo.com'})
114 self.assertEqual('foo', settings.name)
115 self.assertEqual('bar', settings.image_user)
116 self.assertEqual('qcow2', settings.format)
117 self.assertEqual('http://foo.com', settings.url)
118 self.assertIsNone(settings.image_file)
119 self.assertFalse(settings.exists)
120 self.assertFalse(settings.public)
121 self.assertIsNone(settings.nic_config_pb_loc)
123 def test_name_user_format_file_only(self):
124 settings = ImageSettings(name='foo', image_user='bar',
126 image_file='/foo/bar.qcow')
127 self.assertEqual('foo', settings.name)
128 self.assertEqual('bar', settings.image_user)
129 self.assertEqual('qcow2', settings.format)
130 self.assertIsNone(settings.url)
131 self.assertEqual('/foo/bar.qcow', settings.image_file)
132 self.assertFalse(settings.exists)
133 self.assertFalse(settings.public)
134 self.assertIsNone(settings.nic_config_pb_loc)
136 def test_config_with_name_user_format_file_only(self):
137 settings = ImageSettings(
138 **{'name': 'foo', 'image_user': 'bar', 'format': 'qcow2',
139 'image_file': '/foo/bar.qcow'})
140 self.assertEqual('foo', settings.name)
141 self.assertEqual('bar', settings.image_user)
142 self.assertEqual('qcow2', settings.format)
143 self.assertIsNone(settings.url)
144 self.assertEqual('/foo/bar.qcow', settings.image_file)
145 self.assertFalse(settings.exists)
146 self.assertFalse(settings.public)
147 self.assertIsNone(settings.nic_config_pb_loc)
149 def test_all_url(self):
150 properties = {'hw_video_model': 'vga'}
151 kernel_settings = ImageSettings(name='kernel', url='http://kernel.com',
152 image_user='bar', img_format='qcow2')
153 ramdisk_settings = ImageSettings(name='ramdisk',
154 url='http://ramdisk.com',
155 image_user='bar', img_format='qcow2')
156 settings = ImageSettings(name='foo', image_user='bar',
157 img_format='qcow2', url='http://foo.com',
158 extra_properties=properties,
159 nic_config_pb_loc='/foo/bar',
160 kernel_image_settings=kernel_settings,
161 ramdisk_image_settings=ramdisk_settings,
162 exists=True, public=True)
163 self.assertEqual('foo', settings.name)
164 self.assertEqual('bar', settings.image_user)
165 self.assertEqual('qcow2', settings.format)
166 self.assertEqual('http://foo.com', settings.url)
167 self.assertEqual(properties, settings.extra_properties)
168 self.assertIsNone(settings.image_file)
169 self.assertEqual('/foo/bar', settings.nic_config_pb_loc)
170 self.assertEqual('kernel', settings.kernel_image_settings.name)
171 self.assertEqual('http://kernel.com',
172 settings.kernel_image_settings.url)
173 self.assertEqual('bar', settings.kernel_image_settings.image_user)
174 self.assertEqual('qcow2', settings.kernel_image_settings.format)
175 self.assertEqual('ramdisk', settings.ramdisk_image_settings.name)
176 self.assertEqual('http://ramdisk.com',
177 settings.ramdisk_image_settings.url)
178 self.assertEqual('bar', settings.ramdisk_image_settings.image_user)
179 self.assertEqual('qcow2', settings.ramdisk_image_settings.format)
180 self.assertTrue(settings.exists)
181 self.assertTrue(settings.public)
183 def test_config_all_url(self):
184 settings = ImageSettings(
185 **{'name': 'foo', 'image_user': 'bar', 'format': 'qcow2',
186 'download_url': 'http://foo.com',
187 'extra_properties': '{\'hw_video_model\': \'vga\'}',
188 'nic_config_pb_loc': '/foo/bar',
189 'kernel_image_settings': {
191 'download_url': 'http://kernel.com',
194 'ramdisk_image_settings': {
196 'download_url': 'http://ramdisk.com',
199 'exists': True, 'public': True})
200 self.assertEqual('foo', settings.name)
201 self.assertEqual('bar', settings.image_user)
202 self.assertEqual('qcow2', settings.format)
203 self.assertEqual('http://foo.com', settings.url)
204 self.assertEqual('{\'hw_video_model\': \'vga\'}',
205 settings.extra_properties)
206 self.assertIsNone(settings.image_file)
207 self.assertEqual('/foo/bar', settings.nic_config_pb_loc)
208 self.assertEqual('kernel', settings.kernel_image_settings.name)
209 self.assertEqual('http://kernel.com',
210 settings.kernel_image_settings.url)
211 self.assertEqual('ramdisk', settings.ramdisk_image_settings.name)
212 self.assertEqual('http://ramdisk.com',
213 settings.ramdisk_image_settings.url)
214 self.assertTrue(settings.exists)
215 self.assertTrue(settings.public)
217 def test_all_file(self):
218 properties = {'hw_video_model': 'vga'}
219 settings = ImageSettings(name='foo', image_user='bar',
221 image_file='/foo/bar.qcow',
222 extra_properties=properties,
223 nic_config_pb_loc='/foo/bar', exists=True,
225 self.assertEqual('foo', settings.name)
226 self.assertEqual('bar', settings.image_user)
227 self.assertEqual('qcow2', settings.format)
228 self.assertIsNone(settings.url)
229 self.assertEqual('/foo/bar.qcow', settings.image_file)
230 self.assertEqual(properties, settings.extra_properties)
231 self.assertEqual('/foo/bar', settings.nic_config_pb_loc)
232 self.assertTrue(settings.exists)
233 self.assertTrue(settings.public)
235 def test_config_all_file(self):
236 settings = ImageSettings(
237 **{'name': 'foo', 'image_user': 'bar', 'format': 'qcow2',
238 'image_file': '/foo/bar.qcow',
239 'extra_properties': '{\'hw_video_model\' : \'vga\'}',
240 'nic_config_pb_loc': '/foo/bar', 'exists': True,
242 self.assertEqual('foo', settings.name)
243 self.assertEqual('bar', settings.image_user)
244 self.assertEqual('qcow2', settings.format)
245 self.assertIsNone(settings.url)
246 self.assertEqual('/foo/bar.qcow', settings.image_file)
247 self.assertEqual('{\'hw_video_model\' : \'vga\'}',
248 settings.extra_properties)
249 self.assertEqual('/foo/bar', settings.nic_config_pb_loc)
250 self.assertTrue(settings.exists)
251 self.assertTrue(settings.public)
254 class CreateImageSuccessTests(OSIntegrationTestCase):
256 Test for the CreateImage class defined in create_image.py
261 Instantiates the CreateImage object that is responsible for downloading
262 and creating an OS image file within OpenStack
264 super(self.__class__, self).__start__()
267 self.image_name = self.__class__.__name__ + '-' + str(guid)
268 self.glance = glance_utils.glance_client(
269 self.os_creds, self.os_session)
270 self.image_creator = None
272 if self.image_metadata and 'glance_tests' in self.image_metadata:
273 glance_test_meta = self.image_metadata['glance_tests']
275 glance_test_meta = None
277 self.tmp_dir = 'tmp/' + str(guid)
278 if not os.path.exists(self.tmp_dir):
279 os.makedirs(self.tmp_dir)
281 self.image_settings = openstack_tests.cirros_image_settings(
282 name=self.image_name,
283 image_metadata=glance_test_meta)
287 Cleans the image and downloaded image file
289 if self.image_creator:
290 self.image_creator.clean()
292 if os.path.exists(self.tmp_dir) and os.path.isdir(self.tmp_dir):
293 shutil.rmtree(self.tmp_dir)
295 super(self.__class__, self).__clean__()
297 def test_create_image_clean_url(self):
299 Tests the creation of an OpenStack image from a URL.
302 # Set the default image settings, then set any custom parameters sent
305 self.image_creator = create_image.OpenStackImage(self.os_creds,
307 created_image = self.image_creator.create()
308 self.assertIsNotNone(created_image)
310 retrieved_image = glance_utils.get_image(
311 self.glance, image_settings=self.image_settings)
312 self.assertIsNotNone(retrieved_image)
313 self.assertEqual(created_image.size, retrieved_image.size)
314 self.assertEqual(get_image_size(self.image_settings),
315 retrieved_image.size)
316 self.assertEqual(created_image.name, retrieved_image.name)
317 self.assertEqual(created_image.id, retrieved_image.id)
319 def test_create_image_clean_url_properties(self):
321 Tests the creation of an OpenStack image from a URL and set properties.
324 # Set the default image settings, then set any custom parameters sent
326 self.image_creator = create_image.OpenStackImage(self.os_creds,
328 created_image = self.image_creator.create()
329 self.assertIsNotNone(created_image)
331 retrieved_image = glance_utils.get_image(
332 self.glance, image_settings=self.image_settings)
333 self.assertIsNotNone(retrieved_image)
334 self.assertEqual(self.image_creator.get_image().size,
335 retrieved_image.size)
336 self.assertEqual(get_image_size(self.image_settings),
337 retrieved_image.size)
338 self.assertEqual(created_image.name, retrieved_image.name)
339 self.assertEqual(created_image.id, retrieved_image.id)
340 self.assertEqual(created_image.properties, retrieved_image.properties)
342 def test_create_image_clean_file(self):
344 Tests the creation of an OpenStack image from a file.
346 if not self.image_settings.image_file and self.image_settings.url:
347 # Download the file of the image
348 image_file_name = file_utils.download(self.image_settings.url,
351 image_file_name = self.image_settings.image_file
354 file_image_settings = openstack_tests.file_image_test_settings(
355 name=self.image_name, file_path=image_file_name)
357 self.image_creator = create_image.OpenStackImage(
358 self.os_creds, file_image_settings)
359 created_image = self.image_creator.create()
360 self.assertIsNotNone(created_image)
361 self.assertEqual(self.image_name, created_image.name)
363 retrieved_image = glance_utils.get_image(
364 self.glance, image_settings=file_image_settings)
365 self.assertIsNotNone(retrieved_image)
366 self.assertEqual(self.image_creator.get_image().size,
367 retrieved_image.size)
368 self.assertEqual(get_image_size(file_image_settings),
369 retrieved_image.size)
371 self.assertEqual(created_image.name, retrieved_image.name)
372 self.assertEqual(created_image.id, retrieved_image.id)
375 'Test not executed as the image metadata requires image files')
377 def test_create_delete_image(self):
379 Tests the creation then deletion of an OpenStack image to ensure
380 clean() does not raise an Exception.
383 self.image_creator = create_image.OpenStackImage(
384 self.os_creds, self.image_settings)
385 created_image = self.image_creator.create()
386 self.assertIsNotNone(created_image)
388 retrieved_image = glance_utils.get_image(
389 self.glance, image_settings=self.image_settings)
390 self.assertIsNotNone(retrieved_image)
391 self.assertEqual(self.image_creator.get_image().size,
392 retrieved_image.size)
393 self.assertEqual(get_image_size(self.image_settings),
394 retrieved_image.size)
396 # Delete Image manually
397 glance_utils.delete_image(self.glance, created_image)
399 self.assertIsNone(glance_utils.get_image(
400 self.glance, image_settings=self.image_creator.image_settings))
402 # Must not throw an exception when attempting to cleanup non-existent
404 self.image_creator.clean()
405 self.assertIsNone(self.image_creator.get_image())
407 def test_create_same_image(self):
409 Tests the creation of an OpenStack image when the image already exists.
412 self.image_creator = create_image.OpenStackImage(self.os_creds,
414 image1 = self.image_creator.create()
416 retrieved_image = glance_utils.get_image(
417 self.glance, image_settings=self.image_settings)
418 self.assertIsNotNone(retrieved_image)
419 self.assertEqual(self.image_creator.get_image().size,
420 retrieved_image.size)
421 self.assertEqual(get_image_size(self.image_settings),
422 retrieved_image.size)
423 self.assertEqual(image1.name, retrieved_image.name)
424 self.assertEqual(image1.id, retrieved_image.id)
425 self.assertEqual(image1.properties, retrieved_image.properties)
427 # Should be retrieving the instance data
428 os_image_2 = create_image.OpenStackImage(self.os_creds,
430 image2 = os_image_2.create()
431 self.assertEqual(image1.id, image2.id)
433 def test_create_same_image_new_settings(self):
435 Tests the creation of an OpenStack image when the image already exists
436 and the configuration only contains the name.
439 self.image_creator = create_image.OpenStackImage(self.os_creds,
441 image1 = self.image_creator.create()
443 retrieved_image = glance_utils.get_image(
444 self.glance, image_settings=self.image_settings)
445 self.assertIsNotNone(retrieved_image)
446 self.assertEqual(self.image_creator.get_image().size,
447 retrieved_image.size)
448 self.assertEqual(get_image_size(self.image_settings),
449 retrieved_image.size)
450 self.assertEqual(image1.name, retrieved_image.name)
451 self.assertEqual(image1.id, retrieved_image.id)
452 self.assertEqual(image1.properties, retrieved_image.properties)
454 # Should be retrieving the instance data
455 image_2_settings = ImageConfig(name=self.image_settings.name,
456 image_user='foo', exists=True)
457 os_image_2 = create_image.OpenStackImage(self.os_creds,
459 image2 = os_image_2.create()
460 self.assertEqual(image1.id, image2.id)
463 class CreateImageNegativeTests(OSIntegrationTestCase):
465 Negative test cases for the CreateImage class
469 super(self.__class__, self).__start__()
471 self.image_name = self.__class__.__name__ + '-' + str(uuid.uuid4())
472 self.image_creator = None
475 if self.image_creator:
476 self.image_creator.clean()
478 super(self.__class__, self).__clean__()
480 def test_bad_image_name(self):
482 Expect an ImageCreationError when the image name does not exist when a
483 file or URL has not been configured
485 os_image_settings = ImageConfig(name='foo', image_user='bar',
487 self.image_creator = create_image.OpenStackImage(self.os_creds,
490 with self.assertRaises(ImageCreationError):
491 self.image_creator.create()
493 self.fail('ImageCreationError should have been raised prior to'
496 def test_bad_image_url(self):
498 Expect an ImageCreationError when the image download url is bad
500 os_image_settings = openstack_tests.cirros_image_settings(
501 name=self.image_name)
502 self.image_creator = create_image.OpenStackImage(
505 name=os_image_settings.name,
506 image_user=os_image_settings.image_user,
507 img_format=os_image_settings.format,
508 url="http://foo.bar"))
511 self.image_creator.create()
512 except HTTPBadRequest:
516 except Exception as e:
517 self.fail('Invalid Exception ' + str(e))
519 def test_bad_image_image_type(self):
521 Expect an ImageCreationError when the image type bad
523 os_image_settings = openstack_tests.cirros_image_settings(
524 name=self.image_name)
525 self.image_creator = create_image.OpenStackImage(
528 name=os_image_settings.name,
529 image_user=os_image_settings.image_user,
530 img_format='foo', url=os_image_settings.url))
532 with self.assertRaises(Exception):
533 self.image_creator.create()
535 def test_bad_image_file(self):
537 Expect an ImageCreationError when the image file does not exist
539 os_image_settings = openstack_tests.cirros_image_settings(
540 name=self.image_name)
541 self.image_creator = create_image.OpenStackImage(
544 name=os_image_settings.name,
545 image_user=os_image_settings.image_user,
546 img_format=os_image_settings.format, image_file="/foo/bar.qcow"))
547 with self.assertRaises(IOError):
548 self.image_creator.create()
551 class CreateMultiPartImageTests(OSIntegrationTestCase):
553 Test different means for creating a 3-part images
558 Instantiates the CreateImage object that is responsible for
559 downloading and creating an OS image file within OpenStack
561 super(self.__class__, self).__start__()
564 self.image_creators = list()
565 self.image_name = self.__class__.__name__ + '-' + str(guid)
566 self.glance = glance_utils.glance_client(
567 self.os_creds, self.os_session)
569 self.tmp_dir = 'tmp/' + str(guid)
570 if not os.path.exists(self.tmp_dir):
571 os.makedirs(self.tmp_dir)
573 if self.image_metadata and 'glance_tests' in self.image_metadata:
574 self.glance_test_meta = self.image_metadata['glance_tests']
576 self.glance_test_meta = dict()
580 Cleans the images and downloaded image file
582 for image_creator in self.image_creators:
583 image_creator.clean()
585 if os.path.exists(self.tmp_dir) and os.path.isdir(self.tmp_dir):
586 shutil.rmtree(self.tmp_dir)
588 super(self.__class__, self).__clean__()
590 def test_create_three_part_image_from_url(self):
592 Tests the creation of a 3-part OpenStack image from a URL.
594 # Create the kernel image
595 if 'disk_file' not in self.glance_test_meta:
596 image_settings = openstack_tests.cirros_image_settings(
597 name=self.image_name,
600 openstack_tests.CIRROS_DEFAULT_IMAGE_URL,
602 openstack_tests.CIRROS_DEFAULT_KERNEL_IMAGE_URL,
604 openstack_tests.CIRROS_DEFAULT_RAMDISK_IMAGE_URL})
606 image_creator = create_image.OpenStackImage(self.os_creds,
608 self.image_creators.append(image_creator)
609 image_creator.create()
611 main_image = glance_utils.get_image(self.glance,
612 image_settings=image_settings)
613 self.assertIsNotNone(main_image)
614 self.assertIsNotNone(image_creator.get_image())
615 self.assertEqual(image_creator.get_image().id, main_image.id)
617 kernel_image = glance_utils.get_image(
619 image_settings=image_settings.kernel_image_settings)
620 self.assertIsNotNone(kernel_image)
621 self.assertIsNotNone(image_creator.get_kernel_image())
622 self.assertEqual(kernel_image.id,
623 image_creator.get_kernel_image().id)
625 ramdisk_image = glance_utils.get_image(
627 image_settings=image_settings.ramdisk_image_settings)
628 self.assertIsNotNone(ramdisk_image)
629 self.assertIsNotNone(image_creator.get_ramdisk_image())
630 self.assertEqual(ramdisk_image.id,
631 image_creator.get_ramdisk_image().id)
634 'Test not executed as the image metadata requires image files')
636 def test_create_three_part_image_from_file_3_creators(self):
638 Tests the creation of a 3-part OpenStack image from files.
644 if self.glance_test_meta:
645 if 'extra_properties' in self.glance_test_meta:
646 properties = self.glance_test_meta['extra_properties']
647 if 'disk_file' in self.glance_test_meta:
650 # Create the kernel image
651 kernel_file_name = None
652 kernel_url = openstack_tests.CIRROS_DEFAULT_KERNEL_IMAGE_URL
653 if 'kernel_file' in self.glance_test_meta:
654 kernel_file_name = self.glance_test_meta['kernel_file']
655 elif 'kernel_url' in self.glance_test_meta:
656 kernel_url = self.glance_test_meta['kernel_url']
658 kernel_url = openstack_tests.CIRROS_DEFAULT_KERNEL_IMAGE_URL
660 if not kernel_file_name and not file_only:
661 kernel_file_name = file_utils.download(kernel_url,
664 logger.warn('Will not download the kernel image.'
665 ' Cannot execute test')
668 kernel_file_image_settings = openstack_tests.file_image_test_settings(
669 name=self.image_name + '_kernel', file_path=kernel_file_name)
671 self.image_creators.append(create_image.OpenStackImage(
672 self.os_creds, kernel_file_image_settings))
673 kernel_image = self.image_creators[-1].create()
674 self.assertIsNotNone(kernel_image)
675 self.assertEqual(get_image_size(kernel_file_image_settings),
678 # Create the ramdisk image
679 ramdisk_file_name = None
680 ramdisk_url = openstack_tests.CIRROS_DEFAULT_RAMDISK_IMAGE_URL
681 if 'ramdisk_file' in self.glance_test_meta:
682 ramdisk_file_name = self.glance_test_meta['ramdisk_file']
683 elif 'ramdisk_url' in self.glance_test_meta:
684 ramdisk_url = self.glance_test_meta['ramdisk_url']
686 if not ramdisk_file_name and not file_only:
687 ramdisk_file_name = file_utils.download(ramdisk_url,
690 logger.warn('Will not download the ramdisk image.'
691 ' Cannot execute test')
694 ramdisk_file_image_settings = openstack_tests.file_image_test_settings(
695 name=self.image_name + '_ramdisk', file_path=ramdisk_file_name)
696 self.image_creators.append(create_image.OpenStackImage(
697 self.os_creds, ramdisk_file_image_settings))
698 ramdisk_image = self.image_creators[-1].create()
699 self.assertIsNotNone(ramdisk_image)
700 self.assertEqual(get_image_size(ramdisk_file_image_settings),
703 # Create the main disk image
704 disk_file_name = None
705 disk_url = openstack_tests.CIRROS_DEFAULT_IMAGE_URL
706 if 'disk_file' in self.glance_test_meta:
707 disk_file_name = self.glance_test_meta['disk_file']
708 elif 'disk_url' in self.glance_test_meta:
709 disk_url = self.glance_test_meta['disk_url']
711 if not disk_file_name and not file_only:
712 disk_file_name = file_utils.download(disk_url, self.tmp_dir).name
714 logger.warn('Will not download the disk file image.'
715 ' Cannot execute test')
718 file_image_settings = openstack_tests.file_image_test_settings(
719 name=self.image_name, file_path=disk_file_name)
720 properties['kernel_id'] = kernel_image.id
721 properties['ramdisk_id'] = ramdisk_image.id
722 file_image_settings.extra_properties = properties
723 self.image_creators.append(
724 create_image.OpenStackImage(self.os_creds, file_image_settings))
725 created_image = self.image_creators[-1].create()
727 self.assertIsNotNone(created_image)
728 self.assertEqual(self.image_name, created_image.name)
730 retrieved_image = glance_utils.get_image(
731 self.glance, image_settings=file_image_settings)
732 self.assertIsNotNone(retrieved_image)
733 self.assertEqual(self.image_creators[-1].get_image().size,
734 retrieved_image.size)
735 self.assertEqual(get_image_size(file_image_settings),
736 retrieved_image.size)
737 self.assertEqual(created_image.name, retrieved_image.name)
738 self.assertEqual(created_image.id, retrieved_image.id)
739 self.assertEqual(created_image.properties, retrieved_image.properties)
741 def test_create_three_part_image_from_url_3_creators(self):
743 Tests the creation of a 3-part OpenStack image from a URL.
745 if 'disk_file' not in self.glance_test_meta:
748 if self.glance_test_meta and \
749 'extra_properties' in self.glance_test_meta:
750 properties = self.glance_test_meta['extra_properties']
752 # Create the kernel image
753 kernel_image_settings = openstack_tests.cirros_image_settings(
754 name=self.image_name + '_kernel',
755 url=openstack_tests.CIRROS_DEFAULT_KERNEL_IMAGE_URL)
757 if self.glance_test_meta:
758 if 'kernel_url' in self.glance_test_meta:
759 kernel_image_settings.url = self.glance_test_meta[
761 self.image_creators.append(
762 create_image.OpenStackImage(self.os_creds,
763 kernel_image_settings))
764 kernel_image = self.image_creators[-1].create()
765 self.assertIsNotNone(kernel_image)
766 self.assertEqual(get_image_size(kernel_image_settings),
769 # Create the ramdisk image
770 ramdisk_image_settings = openstack_tests.cirros_image_settings(
771 name=self.image_name + '_ramdisk',
772 url=openstack_tests.CIRROS_DEFAULT_RAMDISK_IMAGE_URL)
773 if self.glance_test_meta:
774 if 'ramdisk_url' in self.glance_test_meta:
775 ramdisk_image_settings.url = self.glance_test_meta[
777 self.image_creators.append(
778 create_image.OpenStackImage(self.os_creds,
779 ramdisk_image_settings))
780 ramdisk_image = self.image_creators[-1].create()
781 self.assertIsNotNone(ramdisk_image)
782 self.assertEqual(get_image_size(ramdisk_image_settings),
785 # Create the main image
786 os_image_settings = openstack_tests.cirros_image_settings(
787 name=self.image_name,
788 url=openstack_tests.CIRROS_DEFAULT_IMAGE_URL)
789 if self.glance_test_meta:
790 if 'disk_url' in self.glance_test_meta:
791 os_image_settings.url = self.glance_test_meta['disk_url']
793 properties['kernel_id'] = kernel_image.id
794 properties['ramdisk_id'] = ramdisk_image.id
795 os_image_settings.extra_properties = properties
797 self.image_creators.append(
798 create_image.OpenStackImage(self.os_creds, os_image_settings))
799 created_image = self.image_creators[-1].create()
800 self.assertIsNotNone(created_image)
801 self.assertEqual(self.image_name, created_image.name)
803 retrieved_image = glance_utils.get_image(
804 self.glance, image_settings=os_image_settings)
805 self.assertIsNotNone(retrieved_image)
807 self.assertEqual(self.image_creators[-1].get_image().size,
808 retrieved_image.size)
809 self.assertEqual(get_image_size(os_image_settings),
810 retrieved_image.size)
812 self.assertEqual(created_image.name, retrieved_image.name)
813 self.assertEqual(created_image.id, retrieved_image.id)
814 self.assertEqual(created_image.properties,
815 retrieved_image.properties)
818 'Test not executed as the image metadata requires image files')
821 def get_image_size(image_settings):
823 Returns the expected image size
826 if image_settings.image_file:
827 return os.path.getsize(image_settings.image_file)
828 elif image_settings.url:
829 return int(file_utils.get_content_length(image_settings.url))
832 'Cannot retrieve expected image size. Image filename or URL has '
833 'not been configured')