Merge "Auto Generated INFO.yaml file"
[snaps.git] / snaps / openstack / tests / create_image_tests.py
1 # Copyright (c) 2017 Cable Television Laboratories, Inc. ("CableLabs")
2 #                    and others.  All rights reserved.
3 #
4 # Licensed under the Apache License, Version 2.0 (the "License");
5 # you may not use this file except in compliance with the License.
6 # You may obtain a copy of the License at:
7 #
8 #     http://www.apache.org/licenses/LICENSE-2.0
9 #
10 # Unless required by applicable law or agreed to in writing, software
11 # distributed under the License is distributed on an "AS IS" BASIS,
12 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 # See the License for the specific language governing permissions and
14 # limitations under the License.
15 from glanceclient.exc import HTTPBadRequest
16
17 try:
18     from urllib.request import URLError
19 except ImportError:
20     from urllib2 import URLError
21
22 import logging
23 import shutil
24 import unittest
25 import uuid
26
27 import os
28
29 from snaps import file_utils
30 from snaps.config.image import ImageConfigError
31 from snaps.openstack import create_image
32 from snaps.openstack.create_image import ImageSettings, ImageCreationError
33 from snaps.config.image import ImageConfig
34 from snaps.openstack.tests import openstack_tests
35 from snaps.openstack.tests.os_source_file_test import OSIntegrationTestCase
36 from snaps.openstack.utils import glance_utils
37
38 __author__ = 'spisarski'
39
40 logger = logging.getLogger('create_image_tests')
41
42
43 class ImageSettingsUnitTests(unittest.TestCase):
44     """
45     Tests the construction of the ImageSettings class
46     To be removed once the deprecated class ImageSettings is finally removed
47     from the source tree
48     """
49
50     def test_no_params(self):
51         with self.assertRaises(ImageConfigError):
52             ImageSettings()
53
54     def test_empty_config(self):
55         with self.assertRaises(ImageConfigError):
56             ImageSettings(**dict())
57
58     def test_name_only(self):
59         with self.assertRaises(ImageConfigError):
60             ImageSettings(name='foo')
61
62     def test_config_with_name_only(self):
63         with self.assertRaises(ImageConfigError):
64             ImageSettings(**{'name': 'foo'})
65
66     def test_name_user_only(self):
67         with self.assertRaises(ImageConfigError):
68             ImageSettings(name='foo', image_user='bar')
69
70     def test_config_with_name_user_only(self):
71         with self.assertRaises(ImageConfigError):
72             ImageSettings(**{'name': 'foo', 'image_user': 'bar'})
73
74     def test_name_user_format_only(self):
75         with self.assertRaises(ImageConfigError):
76             ImageSettings(name='foo', image_user='bar', img_format='qcow2')
77
78     def test_config_with_name_user_format_only(self):
79         with self.assertRaises(ImageConfigError):
80             ImageSettings(
81                 **{'name': 'foo', 'image_user': 'bar', 'format': 'qcow2'})
82
83     def test_name_user_format_url_only(self):
84         settings = ImageSettings(name='foo', image_user='bar',
85                                  img_format='qcow2', url='http://foo.com')
86         self.assertEqual('foo', settings.name)
87         self.assertEqual('bar', settings.image_user)
88         self.assertEqual('qcow2', settings.format)
89         self.assertEqual('http://foo.com', settings.url)
90         self.assertIsNone(settings.image_file)
91         self.assertFalse(settings.exists)
92         self.assertFalse(settings.public)
93         self.assertIsNone(settings.nic_config_pb_loc)
94
95     def test_name_user_format_url_only_properties(self):
96         properties = {'hw_video_model': 'vga'}
97         settings = ImageSettings(name='foo', image_user='bar',
98                                  img_format='qcow2', url='http://foo.com',
99                                  extra_properties=properties)
100         self.assertEqual('foo', settings.name)
101         self.assertEqual('bar', settings.image_user)
102         self.assertEqual('qcow2', settings.format)
103         self.assertEqual('http://foo.com', settings.url)
104         self.assertEqual(properties, settings.extra_properties)
105         self.assertIsNone(settings.image_file)
106         self.assertFalse(settings.exists)
107         self.assertFalse(settings.public)
108         self.assertIsNone(settings.nic_config_pb_loc)
109
110     def test_config_with_name_user_format_url_only(self):
111         settings = ImageSettings(
112             **{'name': 'foo', 'image_user': 'bar', 'format': 'qcow2',
113                'download_url': 'http://foo.com'})
114         self.assertEqual('foo', settings.name)
115         self.assertEqual('bar', settings.image_user)
116         self.assertEqual('qcow2', settings.format)
117         self.assertEqual('http://foo.com', settings.url)
118         self.assertIsNone(settings.image_file)
119         self.assertFalse(settings.exists)
120         self.assertFalse(settings.public)
121         self.assertIsNone(settings.nic_config_pb_loc)
122
123     def test_name_user_format_file_only(self):
124         settings = ImageSettings(name='foo', image_user='bar',
125                                  img_format='qcow2',
126                                  image_file='/foo/bar.qcow')
127         self.assertEqual('foo', settings.name)
128         self.assertEqual('bar', settings.image_user)
129         self.assertEqual('qcow2', settings.format)
130         self.assertIsNone(settings.url)
131         self.assertEqual('/foo/bar.qcow', settings.image_file)
132         self.assertFalse(settings.exists)
133         self.assertFalse(settings.public)
134         self.assertIsNone(settings.nic_config_pb_loc)
135
136     def test_config_with_name_user_format_file_only(self):
137         settings = ImageSettings(
138             **{'name': 'foo', 'image_user': 'bar', 'format': 'qcow2',
139                'image_file': '/foo/bar.qcow'})
140         self.assertEqual('foo', settings.name)
141         self.assertEqual('bar', settings.image_user)
142         self.assertEqual('qcow2', settings.format)
143         self.assertIsNone(settings.url)
144         self.assertEqual('/foo/bar.qcow', settings.image_file)
145         self.assertFalse(settings.exists)
146         self.assertFalse(settings.public)
147         self.assertIsNone(settings.nic_config_pb_loc)
148
149     def test_all_url(self):
150         properties = {'hw_video_model': 'vga'}
151         kernel_settings = ImageSettings(name='kernel', url='http://kernel.com',
152                                         image_user='bar', img_format='qcow2')
153         ramdisk_settings = ImageSettings(name='ramdisk',
154                                          url='http://ramdisk.com',
155                                          image_user='bar', img_format='qcow2')
156         settings = ImageSettings(name='foo', image_user='bar',
157                                  img_format='qcow2', url='http://foo.com',
158                                  extra_properties=properties,
159                                  nic_config_pb_loc='/foo/bar',
160                                  kernel_image_settings=kernel_settings,
161                                  ramdisk_image_settings=ramdisk_settings,
162                                  exists=True, public=True)
163         self.assertEqual('foo', settings.name)
164         self.assertEqual('bar', settings.image_user)
165         self.assertEqual('qcow2', settings.format)
166         self.assertEqual('http://foo.com', settings.url)
167         self.assertEqual(properties, settings.extra_properties)
168         self.assertIsNone(settings.image_file)
169         self.assertEqual('/foo/bar', settings.nic_config_pb_loc)
170         self.assertEqual('kernel', settings.kernel_image_settings.name)
171         self.assertEqual('http://kernel.com',
172                          settings.kernel_image_settings.url)
173         self.assertEqual('bar', settings.kernel_image_settings.image_user)
174         self.assertEqual('qcow2', settings.kernel_image_settings.format)
175         self.assertEqual('ramdisk', settings.ramdisk_image_settings.name)
176         self.assertEqual('http://ramdisk.com',
177                          settings.ramdisk_image_settings.url)
178         self.assertEqual('bar', settings.ramdisk_image_settings.image_user)
179         self.assertEqual('qcow2', settings.ramdisk_image_settings.format)
180         self.assertTrue(settings.exists)
181         self.assertTrue(settings.public)
182
183     def test_config_all_url(self):
184         settings = ImageSettings(
185             **{'name': 'foo', 'image_user': 'bar', 'format': 'qcow2',
186                'download_url': 'http://foo.com',
187                'extra_properties': '{\'hw_video_model\': \'vga\'}',
188                'nic_config_pb_loc': '/foo/bar',
189                'kernel_image_settings': {
190                    'name': 'kernel',
191                    'download_url': 'http://kernel.com',
192                    'image_user': 'bar',
193                    'format': 'qcow2'},
194                'ramdisk_image_settings': {
195                    'name': 'ramdisk',
196                    'download_url': 'http://ramdisk.com',
197                    'image_user': 'bar',
198                    'format': 'qcow2'},
199                'exists': True, 'public': True})
200         self.assertEqual('foo', settings.name)
201         self.assertEqual('bar', settings.image_user)
202         self.assertEqual('qcow2', settings.format)
203         self.assertEqual('http://foo.com', settings.url)
204         self.assertEqual('{\'hw_video_model\': \'vga\'}',
205                          settings.extra_properties)
206         self.assertIsNone(settings.image_file)
207         self.assertEqual('/foo/bar', settings.nic_config_pb_loc)
208         self.assertEqual('kernel', settings.kernel_image_settings.name)
209         self.assertEqual('http://kernel.com',
210                          settings.kernel_image_settings.url)
211         self.assertEqual('ramdisk', settings.ramdisk_image_settings.name)
212         self.assertEqual('http://ramdisk.com',
213                          settings.ramdisk_image_settings.url)
214         self.assertTrue(settings.exists)
215         self.assertTrue(settings.public)
216
217     def test_all_file(self):
218         properties = {'hw_video_model': 'vga'}
219         settings = ImageSettings(name='foo', image_user='bar',
220                                  img_format='qcow2',
221                                  image_file='/foo/bar.qcow',
222                                  extra_properties=properties,
223                                  nic_config_pb_loc='/foo/bar', exists=True,
224                                  public=True)
225         self.assertEqual('foo', settings.name)
226         self.assertEqual('bar', settings.image_user)
227         self.assertEqual('qcow2', settings.format)
228         self.assertIsNone(settings.url)
229         self.assertEqual('/foo/bar.qcow', settings.image_file)
230         self.assertEqual(properties, settings.extra_properties)
231         self.assertEqual('/foo/bar', settings.nic_config_pb_loc)
232         self.assertTrue(settings.exists)
233         self.assertTrue(settings.public)
234
235     def test_config_all_file(self):
236         settings = ImageSettings(
237             **{'name': 'foo', 'image_user': 'bar', 'format': 'qcow2',
238                'image_file': '/foo/bar.qcow',
239                'extra_properties': '{\'hw_video_model\' : \'vga\'}',
240                'nic_config_pb_loc': '/foo/bar', 'exists': True,
241                'public': True})
242         self.assertEqual('foo', settings.name)
243         self.assertEqual('bar', settings.image_user)
244         self.assertEqual('qcow2', settings.format)
245         self.assertIsNone(settings.url)
246         self.assertEqual('/foo/bar.qcow', settings.image_file)
247         self.assertEqual('{\'hw_video_model\' : \'vga\'}',
248                          settings.extra_properties)
249         self.assertEqual('/foo/bar', settings.nic_config_pb_loc)
250         self.assertTrue(settings.exists)
251         self.assertTrue(settings.public)
252
253
254 class CreateImageSuccessTests(OSIntegrationTestCase):
255     """
256     Test for the CreateImage class defined in create_image.py
257     """
258
259     def setUp(self):
260         """
261         Instantiates the CreateImage object that is responsible for downloading
262         and creating an OS image file within OpenStack
263         """
264         super(self.__class__, self).__start__()
265
266         guid = uuid.uuid4()
267         self.image_name = self.__class__.__name__ + '-' + str(guid)
268         self.glance = glance_utils.glance_client(
269             self.os_creds, self.os_session)
270         self.image_creator = None
271
272         if self.image_metadata and 'glance_tests' in self.image_metadata:
273             glance_test_meta = self.image_metadata['glance_tests']
274         else:
275             glance_test_meta = None
276
277         self.tmp_dir = 'tmp/' + str(guid)
278         if not os.path.exists(self.tmp_dir):
279             os.makedirs(self.tmp_dir)
280
281         self.image_settings = openstack_tests.cirros_image_settings(
282             name=self.image_name,
283             image_metadata=glance_test_meta)
284
285     def tearDown(self):
286         """
287         Cleans the image and downloaded image file
288         """
289         if self.image_creator:
290             self.image_creator.clean()
291
292         if os.path.exists(self.tmp_dir) and os.path.isdir(self.tmp_dir):
293             shutil.rmtree(self.tmp_dir)
294
295         super(self.__class__, self).__clean__()
296
297     def test_create_image_clean_url(self):
298         """
299         Tests the creation of an OpenStack image from a URL.
300         """
301         # Create Image
302         # Set the default image settings, then set any custom parameters sent
303         # from the app
304
305         self.image_creator = create_image.OpenStackImage(self.os_creds,
306                                                          self.image_settings)
307         created_image = self.image_creator.create()
308         self.assertIsNotNone(created_image)
309
310         retrieved_image = glance_utils.get_image(
311             self.glance, image_settings=self.image_settings)
312         self.assertIsNotNone(retrieved_image)
313         self.assertEqual(created_image.size, retrieved_image.size)
314         self.assertEqual(get_image_size(self.image_settings),
315                          retrieved_image.size)
316         self.assertEqual(created_image.name, retrieved_image.name)
317         self.assertEqual(created_image.id, retrieved_image.id)
318
319     def test_create_image_clean_url_properties(self):
320         """
321         Tests the creation of an OpenStack image from a URL and set properties.
322         """
323         # Create Image
324         # Set the default image settings, then set any custom parameters sent
325         # from the app
326         self.image_creator = create_image.OpenStackImage(self.os_creds,
327                                                          self.image_settings)
328         created_image = self.image_creator.create()
329         self.assertIsNotNone(created_image)
330
331         retrieved_image = glance_utils.get_image(
332             self.glance, image_settings=self.image_settings)
333         self.assertIsNotNone(retrieved_image)
334         self.assertEqual(self.image_creator.get_image().size,
335                          retrieved_image.size)
336         self.assertEqual(get_image_size(self.image_settings),
337                          retrieved_image.size)
338         self.assertEqual(created_image.name, retrieved_image.name)
339         self.assertEqual(created_image.id, retrieved_image.id)
340         self.assertEqual(created_image.properties, retrieved_image.properties)
341
342     def test_create_image_clean_file(self):
343         """
344         Tests the creation of an OpenStack image from a file.
345         """
346         if not self.image_settings.image_file and self.image_settings.url:
347             # Download the file of the image
348             image_file_name = file_utils.download(self.image_settings.url,
349                                                   self.tmp_dir).name
350         else:
351             image_file_name = self.image_settings.image_file
352
353         if image_file_name:
354             file_image_settings = openstack_tests.file_image_test_settings(
355                 name=self.image_name, file_path=image_file_name)
356
357             self.image_creator = create_image.OpenStackImage(
358                 self.os_creds, file_image_settings)
359             created_image = self.image_creator.create()
360             self.assertIsNotNone(created_image)
361             self.assertEqual(self.image_name, created_image.name)
362
363             retrieved_image = glance_utils.get_image(
364                 self.glance, image_settings=file_image_settings)
365             self.assertIsNotNone(retrieved_image)
366             self.assertEqual(self.image_creator.get_image().size,
367                              retrieved_image.size)
368             self.assertEqual(get_image_size(file_image_settings),
369                              retrieved_image.size)
370
371             self.assertEqual(created_image.name, retrieved_image.name)
372             self.assertEqual(created_image.id, retrieved_image.id)
373         else:
374             logger.warn(
375                 'Test not executed as the image metadata requires image files')
376
377     def test_create_delete_image(self):
378         """
379         Tests the creation then deletion of an OpenStack image to ensure
380         clean() does not raise an Exception.
381         """
382         # Create Image
383         self.image_creator = create_image.OpenStackImage(
384             self.os_creds, self.image_settings)
385         created_image = self.image_creator.create()
386         self.assertIsNotNone(created_image)
387
388         retrieved_image = glance_utils.get_image(
389             self.glance, image_settings=self.image_settings)
390         self.assertIsNotNone(retrieved_image)
391         self.assertEqual(self.image_creator.get_image().size,
392                          retrieved_image.size)
393         self.assertEqual(get_image_size(self.image_settings),
394                          retrieved_image.size)
395
396         # Delete Image manually
397         glance_utils.delete_image(self.glance, created_image)
398
399         self.assertIsNone(glance_utils.get_image(
400             self.glance, image_settings=self.image_creator.image_settings))
401
402         # Must not throw an exception when attempting to cleanup non-existent
403         # image
404         self.image_creator.clean()
405         self.assertIsNone(self.image_creator.get_image())
406
407     def test_create_same_image(self):
408         """
409         Tests the creation of an OpenStack image when the image already exists.
410         """
411         # Create Image
412         self.image_creator = create_image.OpenStackImage(self.os_creds,
413                                                          self.image_settings)
414         image1 = self.image_creator.create()
415
416         retrieved_image = glance_utils.get_image(
417             self.glance, image_settings=self.image_settings)
418         self.assertIsNotNone(retrieved_image)
419         self.assertEqual(self.image_creator.get_image().size,
420                          retrieved_image.size)
421         self.assertEqual(get_image_size(self.image_settings),
422                          retrieved_image.size)
423         self.assertEqual(image1.name, retrieved_image.name)
424         self.assertEqual(image1.id, retrieved_image.id)
425         self.assertEqual(image1.properties, retrieved_image.properties)
426
427         # Should be retrieving the instance data
428         os_image_2 = create_image.OpenStackImage(self.os_creds,
429                                                  self.image_settings)
430         image2 = os_image_2.create()
431         self.assertEqual(image1.id, image2.id)
432
433     def test_create_same_image_new_settings(self):
434         """
435         Tests the creation of an OpenStack image when the image already exists
436         and the configuration only contains the name.
437         """
438         # Create Image
439         self.image_creator = create_image.OpenStackImage(self.os_creds,
440                                                          self.image_settings)
441         image1 = self.image_creator.create()
442
443         retrieved_image = glance_utils.get_image(
444             self.glance, image_settings=self.image_settings)
445         self.assertIsNotNone(retrieved_image)
446         self.assertEqual(self.image_creator.get_image().size,
447                          retrieved_image.size)
448         self.assertEqual(get_image_size(self.image_settings),
449                          retrieved_image.size)
450         self.assertEqual(image1.name, retrieved_image.name)
451         self.assertEqual(image1.id, retrieved_image.id)
452         self.assertEqual(image1.properties, retrieved_image.properties)
453
454         # Should be retrieving the instance data
455         image_2_settings = ImageConfig(name=self.image_settings.name,
456                                        image_user='foo', exists=True)
457         os_image_2 = create_image.OpenStackImage(self.os_creds,
458                                                  image_2_settings)
459         image2 = os_image_2.create()
460         self.assertEqual(image1.id, image2.id)
461
462
463 class CreateImageNegativeTests(OSIntegrationTestCase):
464     """
465     Negative test cases for the CreateImage class
466     """
467
468     def setUp(self):
469         super(self.__class__, self).__start__()
470
471         self.image_name = self.__class__.__name__ + '-' + str(uuid.uuid4())
472         self.image_creator = None
473
474     def tearDown(self):
475         if self.image_creator:
476             self.image_creator.clean()
477
478         super(self.__class__, self).__clean__()
479
480     def test_bad_image_name(self):
481         """
482         Expect an ImageCreationError when the image name does not exist when a
483         file or URL has not been configured
484         """
485         os_image_settings = ImageConfig(name='foo', image_user='bar',
486                                         exists=True)
487         self.image_creator = create_image.OpenStackImage(self.os_creds,
488                                                          os_image_settings)
489
490         with self.assertRaises(ImageCreationError):
491             self.image_creator.create()
492
493             self.fail('ImageCreationError should have been raised prior to'
494                       'this line')
495
496     def test_bad_image_url(self):
497         """
498         Expect an ImageCreationError when the image download url is bad
499         """
500         os_image_settings = openstack_tests.cirros_image_settings(
501             name=self.image_name)
502         self.image_creator = create_image.OpenStackImage(
503             self.os_creds,
504             ImageConfig(
505                 name=os_image_settings.name,
506                 image_user=os_image_settings.image_user,
507                 img_format=os_image_settings.format,
508                 url="http://foo.bar"))
509
510         try:
511             self.image_creator.create()
512         except HTTPBadRequest:
513             pass
514         except URLError:
515             pass
516         except Exception as e:
517             self.fail('Invalid Exception ' + str(e))
518
519     def test_bad_image_image_type(self):
520         """
521         Expect an ImageCreationError when the image type bad
522         """
523         os_image_settings = openstack_tests.cirros_image_settings(
524             name=self.image_name)
525         self.image_creator = create_image.OpenStackImage(
526             self.os_creds,
527             ImageConfig(
528                 name=os_image_settings.name,
529                 image_user=os_image_settings.image_user,
530                 img_format='foo', url=os_image_settings.url))
531
532         with self.assertRaises(Exception):
533             self.image_creator.create()
534
535     def test_bad_image_file(self):
536         """
537         Expect an ImageCreationError when the image file does not exist
538         """
539         os_image_settings = openstack_tests.cirros_image_settings(
540             name=self.image_name)
541         self.image_creator = create_image.OpenStackImage(
542             self.os_creds,
543             ImageConfig(
544                 name=os_image_settings.name,
545                 image_user=os_image_settings.image_user,
546                 img_format=os_image_settings.format, image_file="/foo/bar.qcow"))
547         with self.assertRaises(IOError):
548             self.image_creator.create()
549
550
551 class CreateMultiPartImageTests(OSIntegrationTestCase):
552     """
553     Test different means for creating a 3-part images
554     """
555
556     def setUp(self):
557         """
558         Instantiates the CreateImage object that is responsible for
559         downloading and creating an OS image file within OpenStack
560         """
561         super(self.__class__, self).__start__()
562
563         guid = uuid.uuid4()
564         self.image_creators = list()
565         self.image_name = self.__class__.__name__ + '-' + str(guid)
566         self.glance = glance_utils.glance_client(
567             self.os_creds, self.os_session)
568
569         self.tmp_dir = 'tmp/' + str(guid)
570         if not os.path.exists(self.tmp_dir):
571             os.makedirs(self.tmp_dir)
572
573         if self.image_metadata and 'glance_tests' in self.image_metadata:
574             self.glance_test_meta = self.image_metadata['glance_tests']
575         else:
576             self.glance_test_meta = dict()
577
578     def tearDown(self):
579         """
580         Cleans the images and downloaded image file
581         """
582         for image_creator in self.image_creators:
583             image_creator.clean()
584
585         if os.path.exists(self.tmp_dir) and os.path.isdir(self.tmp_dir):
586             shutil.rmtree(self.tmp_dir)
587
588         super(self.__class__, self).__clean__()
589
590     def test_create_three_part_image_from_url(self):
591         """
592         Tests the creation of a 3-part OpenStack image from a URL.
593         """
594         # Create the kernel image
595         if 'disk_file' not in self.glance_test_meta:
596             image_settings = openstack_tests.cirros_image_settings(
597                 name=self.image_name,
598                 image_metadata={
599                     'disk_url':
600                         openstack_tests.CIRROS_DEFAULT_IMAGE_URL,
601                     'kernel_url':
602                         openstack_tests.CIRROS_DEFAULT_KERNEL_IMAGE_URL,
603                     'ramdisk_url':
604                         openstack_tests.CIRROS_DEFAULT_RAMDISK_IMAGE_URL})
605
606             image_creator = create_image.OpenStackImage(self.os_creds,
607                                                         image_settings)
608             self.image_creators.append(image_creator)
609             image_creator.create()
610
611             main_image = glance_utils.get_image(self.glance,
612                                                 image_settings=image_settings)
613             self.assertIsNotNone(main_image)
614             self.assertIsNotNone(image_creator.get_image())
615             self.assertEqual(image_creator.get_image().id, main_image.id)
616
617             kernel_image = glance_utils.get_image(
618                 self.glance,
619                 image_settings=image_settings.kernel_image_settings)
620             self.assertIsNotNone(kernel_image)
621             self.assertIsNotNone(image_creator.get_kernel_image())
622             self.assertEqual(kernel_image.id,
623                              image_creator.get_kernel_image().id)
624
625             ramdisk_image = glance_utils.get_image(
626                 self.glance,
627                 image_settings=image_settings.ramdisk_image_settings)
628             self.assertIsNotNone(ramdisk_image)
629             self.assertIsNotNone(image_creator.get_ramdisk_image())
630             self.assertEqual(ramdisk_image.id,
631                              image_creator.get_ramdisk_image().id)
632         else:
633             logger.warn(
634                 'Test not executed as the image metadata requires image files')
635
636     def test_create_three_part_image_from_file_3_creators(self):
637         """
638         Tests the creation of a 3-part OpenStack image from files.
639         """
640         file_only = False
641
642         # Set properties
643         properties = {}
644         if self.glance_test_meta:
645             if 'extra_properties' in self.glance_test_meta:
646                 properties = self.glance_test_meta['extra_properties']
647             if 'disk_file' in self.glance_test_meta:
648                 file_only = True
649
650         # Create the kernel image
651         kernel_file_name = None
652         kernel_url = openstack_tests.CIRROS_DEFAULT_KERNEL_IMAGE_URL
653         if 'kernel_file' in self.glance_test_meta:
654             kernel_file_name = self.glance_test_meta['kernel_file']
655         elif 'kernel_url' in self.glance_test_meta:
656             kernel_url = self.glance_test_meta['kernel_url']
657         else:
658             kernel_url = openstack_tests.CIRROS_DEFAULT_KERNEL_IMAGE_URL
659
660         if not kernel_file_name and not file_only:
661             kernel_file_name = file_utils.download(kernel_url,
662                                                    self.tmp_dir).name
663         else:
664             logger.warn('Will not download the kernel image.'
665                         ' Cannot execute test')
666             return
667
668         kernel_file_image_settings = openstack_tests.file_image_test_settings(
669             name=self.image_name + '_kernel', file_path=kernel_file_name)
670
671         self.image_creators.append(create_image.OpenStackImage(
672             self.os_creds, kernel_file_image_settings))
673         kernel_image = self.image_creators[-1].create()
674         self.assertIsNotNone(kernel_image)
675         self.assertEqual(get_image_size(kernel_file_image_settings),
676                          kernel_image.size)
677
678         # Create the ramdisk image
679         ramdisk_file_name = None
680         ramdisk_url = openstack_tests.CIRROS_DEFAULT_RAMDISK_IMAGE_URL
681         if 'ramdisk_file' in self.glance_test_meta:
682             ramdisk_file_name = self.glance_test_meta['ramdisk_file']
683         elif 'ramdisk_url' in self.glance_test_meta:
684             ramdisk_url = self.glance_test_meta['ramdisk_url']
685
686         if not ramdisk_file_name and not file_only:
687             ramdisk_file_name = file_utils.download(ramdisk_url,
688                                                     self.tmp_dir).name
689         else:
690             logger.warn('Will not download the ramdisk image.'
691                         ' Cannot execute test')
692             return
693
694         ramdisk_file_image_settings = openstack_tests.file_image_test_settings(
695             name=self.image_name + '_ramdisk', file_path=ramdisk_file_name)
696         self.image_creators.append(create_image.OpenStackImage(
697             self.os_creds, ramdisk_file_image_settings))
698         ramdisk_image = self.image_creators[-1].create()
699         self.assertIsNotNone(ramdisk_image)
700         self.assertEqual(get_image_size(ramdisk_file_image_settings),
701                          ramdisk_image.size)
702
703         # Create the main disk image
704         disk_file_name = None
705         disk_url = openstack_tests.CIRROS_DEFAULT_IMAGE_URL
706         if 'disk_file' in self.glance_test_meta:
707             disk_file_name = self.glance_test_meta['disk_file']
708         elif 'disk_url' in self.glance_test_meta:
709             disk_url = self.glance_test_meta['disk_url']
710
711         if not disk_file_name and not file_only:
712             disk_file_name = file_utils.download(disk_url, self.tmp_dir).name
713         else:
714             logger.warn('Will not download the disk file image.'
715                         ' Cannot execute test')
716             return
717
718         file_image_settings = openstack_tests.file_image_test_settings(
719             name=self.image_name, file_path=disk_file_name)
720         properties['kernel_id'] = kernel_image.id
721         properties['ramdisk_id'] = ramdisk_image.id
722         file_image_settings.extra_properties = properties
723         self.image_creators.append(
724             create_image.OpenStackImage(self.os_creds, file_image_settings))
725         created_image = self.image_creators[-1].create()
726
727         self.assertIsNotNone(created_image)
728         self.assertEqual(self.image_name, created_image.name)
729
730         retrieved_image = glance_utils.get_image(
731             self.glance, image_settings=file_image_settings)
732         self.assertIsNotNone(retrieved_image)
733         self.assertEqual(self.image_creators[-1].get_image().size,
734                          retrieved_image.size)
735         self.assertEqual(get_image_size(file_image_settings),
736                          retrieved_image.size)
737         self.assertEqual(created_image.name, retrieved_image.name)
738         self.assertEqual(created_image.id, retrieved_image.id)
739         self.assertEqual(created_image.properties, retrieved_image.properties)
740
741     def test_create_three_part_image_from_url_3_creators(self):
742         """
743         Tests the creation of a 3-part OpenStack image from a URL.
744         """
745         if 'disk_file' not in self.glance_test_meta:
746             # Set properties
747             properties = {}
748             if self.glance_test_meta and \
749                     'extra_properties' in self.glance_test_meta:
750                 properties = self.glance_test_meta['extra_properties']
751
752             # Create the kernel image
753             kernel_image_settings = openstack_tests.cirros_image_settings(
754                 name=self.image_name + '_kernel',
755                 url=openstack_tests.CIRROS_DEFAULT_KERNEL_IMAGE_URL)
756
757             if self.glance_test_meta:
758                 if 'kernel_url' in self.glance_test_meta:
759                     kernel_image_settings.url = self.glance_test_meta[
760                         'kernel_url']
761             self.image_creators.append(
762                 create_image.OpenStackImage(self.os_creds,
763                                             kernel_image_settings))
764             kernel_image = self.image_creators[-1].create()
765             self.assertIsNotNone(kernel_image)
766             self.assertEqual(get_image_size(kernel_image_settings),
767                              kernel_image.size)
768
769             # Create the ramdisk image
770             ramdisk_image_settings = openstack_tests.cirros_image_settings(
771                 name=self.image_name + '_ramdisk',
772                 url=openstack_tests.CIRROS_DEFAULT_RAMDISK_IMAGE_URL)
773             if self.glance_test_meta:
774                 if 'ramdisk_url' in self.glance_test_meta:
775                     ramdisk_image_settings.url = self.glance_test_meta[
776                         'ramdisk_url']
777             self.image_creators.append(
778                 create_image.OpenStackImage(self.os_creds,
779                                             ramdisk_image_settings))
780             ramdisk_image = self.image_creators[-1].create()
781             self.assertIsNotNone(ramdisk_image)
782             self.assertEqual(get_image_size(ramdisk_image_settings),
783                              ramdisk_image.size)
784
785             # Create the main image
786             os_image_settings = openstack_tests.cirros_image_settings(
787                 name=self.image_name,
788                 url=openstack_tests.CIRROS_DEFAULT_IMAGE_URL)
789             if self.glance_test_meta:
790                 if 'disk_url' in self.glance_test_meta:
791                     os_image_settings.url = self.glance_test_meta['disk_url']
792
793             properties['kernel_id'] = kernel_image.id
794             properties['ramdisk_id'] = ramdisk_image.id
795             os_image_settings.extra_properties = properties
796
797             self.image_creators.append(
798                 create_image.OpenStackImage(self.os_creds, os_image_settings))
799             created_image = self.image_creators[-1].create()
800             self.assertIsNotNone(created_image)
801             self.assertEqual(self.image_name, created_image.name)
802
803             retrieved_image = glance_utils.get_image(
804                 self.glance, image_settings=os_image_settings)
805             self.assertIsNotNone(retrieved_image)
806
807             self.assertEqual(self.image_creators[-1].get_image().size,
808                              retrieved_image.size)
809             self.assertEqual(get_image_size(os_image_settings),
810                              retrieved_image.size)
811
812             self.assertEqual(created_image.name, retrieved_image.name)
813             self.assertEqual(created_image.id, retrieved_image.id)
814             self.assertEqual(created_image.properties,
815                              retrieved_image.properties)
816         else:
817             logger.warn(
818                 'Test not executed as the image metadata requires image files')
819
820
821 def get_image_size(image_settings):
822     """
823     Returns the expected image size
824     :return:
825     """
826     if image_settings.image_file:
827         return os.path.getsize(image_settings.image_file)
828     elif image_settings.url:
829         return int(file_utils.get_content_length(image_settings.url))
830     else:
831         raise Exception(
832             'Cannot retrieve expected image size. Image filename or URL has '
833             'not been configured')