9965f870854f0f973f31bbdd7f298d4fda944ec7
[snaps.git] / snaps / openstack / tests / create_image_tests.py
1 # Copyright (c) 2017 Cable Television Laboratories, Inc. ("CableLabs")
2 #                    and others.  All rights reserved.
3 #
4 # Licensed under the Apache License, Version 2.0 (the "License");
5 # you may not use this file except in compliance with the License.
6 # You may obtain a copy of the License at:
7 #
8 #     http://www.apache.org/licenses/LICENSE-2.0
9 #
10 # Unless required by applicable law or agreed to in writing, software
11 # distributed under the License is distributed on an "AS IS" BASIS,
12 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 # See the License for the specific language governing permissions and
14 # limitations under the License.
15 from glanceclient.exc import HTTPBadRequest
16
17 try:
18     from urllib.request import URLError
19 except ImportError:
20     from urllib2 import URLError
21
22 import logging
23 import shutil
24 import unittest
25 import uuid
26
27 import os
28
29 from snaps import file_utils
30 from snaps.config.image import ImageConfigError
31 from snaps.openstack import create_image
32 from snaps.openstack.create_image import ImageSettings, ImageCreationError
33 from snaps.config.image import ImageConfig
34 from snaps.openstack.tests import openstack_tests
35 from snaps.openstack.tests.os_source_file_test import OSIntegrationTestCase
36 from snaps.openstack.utils import glance_utils
37
38 __author__ = 'spisarski'
39
40 logger = logging.getLogger('create_image_tests')
41
42
43 class ImageSettingsUnitTests(unittest.TestCase):
44     """
45     Tests the construction of the ImageSettings class
46     To be removed once the deprecated class ImageSettings is finally removed
47     from the source tree
48     """
49
50     def test_no_params(self):
51         with self.assertRaises(ImageConfigError):
52             ImageSettings()
53
54     def test_empty_config(self):
55         with self.assertRaises(ImageConfigError):
56             ImageSettings(**dict())
57
58     def test_name_only(self):
59         with self.assertRaises(ImageConfigError):
60             ImageSettings(name='foo')
61
62     def test_config_with_name_only(self):
63         with self.assertRaises(ImageConfigError):
64             ImageSettings(**{'name': 'foo'})
65
66     def test_name_user_only(self):
67         with self.assertRaises(ImageConfigError):
68             ImageSettings(name='foo', image_user='bar')
69
70     def test_config_with_name_user_only(self):
71         with self.assertRaises(ImageConfigError):
72             ImageSettings(**{'name': 'foo', 'image_user': 'bar'})
73
74     def test_name_user_format_only(self):
75         with self.assertRaises(ImageConfigError):
76             ImageSettings(name='foo', image_user='bar', img_format='qcow2')
77
78     def test_config_with_name_user_format_only(self):
79         with self.assertRaises(ImageConfigError):
80             ImageSettings(
81                 **{'name': 'foo', 'image_user': 'bar', 'format': 'qcow2'})
82
83     def test_name_user_format_url_only(self):
84         settings = ImageSettings(name='foo', image_user='bar',
85                                  img_format='qcow2', url='http://foo.com')
86         self.assertEqual('foo', settings.name)
87         self.assertEqual('bar', settings.image_user)
88         self.assertEqual('qcow2', settings.format)
89         self.assertEqual('http://foo.com', settings.url)
90         self.assertIsNone(settings.image_file)
91         self.assertFalse(settings.exists)
92         self.assertFalse(settings.public)
93         self.assertIsNone(settings.nic_config_pb_loc)
94
95     def test_name_user_format_url_only_properties(self):
96         properties = {'hw_video_model': 'vga'}
97         settings = ImageSettings(name='foo', image_user='bar',
98                                  img_format='qcow2', url='http://foo.com',
99                                  extra_properties=properties)
100         self.assertEqual('foo', settings.name)
101         self.assertEqual('bar', settings.image_user)
102         self.assertEqual('qcow2', settings.format)
103         self.assertEqual('http://foo.com', settings.url)
104         self.assertEqual(properties, settings.extra_properties)
105         self.assertIsNone(settings.image_file)
106         self.assertFalse(settings.exists)
107         self.assertFalse(settings.public)
108         self.assertIsNone(settings.nic_config_pb_loc)
109
110     def test_config_with_name_user_format_url_only(self):
111         settings = ImageSettings(
112             **{'name': 'foo', 'image_user': 'bar', 'format': 'qcow2',
113                'download_url': 'http://foo.com'})
114         self.assertEqual('foo', settings.name)
115         self.assertEqual('bar', settings.image_user)
116         self.assertEqual('qcow2', settings.format)
117         self.assertEqual('http://foo.com', settings.url)
118         self.assertIsNone(settings.image_file)
119         self.assertFalse(settings.exists)
120         self.assertFalse(settings.public)
121         self.assertIsNone(settings.nic_config_pb_loc)
122
123     def test_name_user_format_file_only(self):
124         settings = ImageSettings(name='foo', image_user='bar',
125                                  img_format='qcow2',
126                                  image_file='/foo/bar.qcow')
127         self.assertEqual('foo', settings.name)
128         self.assertEqual('bar', settings.image_user)
129         self.assertEqual('qcow2', settings.format)
130         self.assertIsNone(settings.url)
131         self.assertEqual('/foo/bar.qcow', settings.image_file)
132         self.assertFalse(settings.exists)
133         self.assertFalse(settings.public)
134         self.assertIsNone(settings.nic_config_pb_loc)
135
136     def test_config_with_name_user_format_file_only(self):
137         settings = ImageSettings(
138             **{'name': 'foo', 'image_user': 'bar', 'format': 'qcow2',
139                'image_file': '/foo/bar.qcow'})
140         self.assertEqual('foo', settings.name)
141         self.assertEqual('bar', settings.image_user)
142         self.assertEqual('qcow2', settings.format)
143         self.assertIsNone(settings.url)
144         self.assertEqual('/foo/bar.qcow', settings.image_file)
145         self.assertFalse(settings.exists)
146         self.assertFalse(settings.public)
147         self.assertIsNone(settings.nic_config_pb_loc)
148
149     def test_all_url(self):
150         properties = {'hw_video_model': 'vga'}
151         kernel_settings = ImageSettings(name='kernel', url='http://kernel.com',
152                                         image_user='bar', img_format='qcow2')
153         ramdisk_settings = ImageSettings(name='ramdisk',
154                                          url='http://ramdisk.com',
155                                          image_user='bar', img_format='qcow2')
156         settings = ImageSettings(name='foo', image_user='bar',
157                                  img_format='qcow2', url='http://foo.com',
158                                  extra_properties=properties,
159                                  nic_config_pb_loc='/foo/bar',
160                                  kernel_image_settings=kernel_settings,
161                                  ramdisk_image_settings=ramdisk_settings,
162                                  exists=True, public=True)
163         self.assertEqual('foo', settings.name)
164         self.assertEqual('bar', settings.image_user)
165         self.assertEqual('qcow2', settings.format)
166         self.assertEqual('http://foo.com', settings.url)
167         self.assertEqual(properties, settings.extra_properties)
168         self.assertIsNone(settings.image_file)
169         self.assertEqual('/foo/bar', settings.nic_config_pb_loc)
170         self.assertEqual('kernel', settings.kernel_image_settings.name)
171         self.assertEqual('http://kernel.com',
172                          settings.kernel_image_settings.url)
173         self.assertEqual('bar', settings.kernel_image_settings.image_user)
174         self.assertEqual('qcow2', settings.kernel_image_settings.format)
175         self.assertEqual('ramdisk', settings.ramdisk_image_settings.name)
176         self.assertEqual('http://ramdisk.com',
177                          settings.ramdisk_image_settings.url)
178         self.assertEqual('bar', settings.ramdisk_image_settings.image_user)
179         self.assertEqual('qcow2', settings.ramdisk_image_settings.format)
180         self.assertTrue(settings.exists)
181         self.assertTrue(settings.public)
182
183     def test_config_all_url(self):
184         settings = ImageSettings(
185             **{'name': 'foo', 'image_user': 'bar', 'format': 'qcow2',
186                'download_url': 'http://foo.com',
187                'extra_properties': '{\'hw_video_model\': \'vga\'}',
188                'nic_config_pb_loc': '/foo/bar',
189                'kernel_image_settings': {
190                    'name': 'kernel',
191                    'download_url': 'http://kernel.com',
192                    'image_user': 'bar',
193                    'format': 'qcow2'},
194                'ramdisk_image_settings': {
195                    'name': 'ramdisk',
196                    'download_url': 'http://ramdisk.com',
197                    'image_user': 'bar',
198                    'format': 'qcow2'},
199                'exists': True, 'public': True})
200         self.assertEqual('foo', settings.name)
201         self.assertEqual('bar', settings.image_user)
202         self.assertEqual('qcow2', settings.format)
203         self.assertEqual('http://foo.com', settings.url)
204         self.assertEqual('{\'hw_video_model\': \'vga\'}',
205                          settings.extra_properties)
206         self.assertIsNone(settings.image_file)
207         self.assertEqual('/foo/bar', settings.nic_config_pb_loc)
208         self.assertEqual('kernel', settings.kernel_image_settings.name)
209         self.assertEqual('http://kernel.com',
210                          settings.kernel_image_settings.url)
211         self.assertEqual('ramdisk', settings.ramdisk_image_settings.name)
212         self.assertEqual('http://ramdisk.com',
213                          settings.ramdisk_image_settings.url)
214         self.assertTrue(settings.exists)
215         self.assertTrue(settings.public)
216
217     def test_all_file(self):
218         properties = {'hw_video_model': 'vga'}
219         settings = ImageSettings(name='foo', image_user='bar',
220                                  img_format='qcow2',
221                                  image_file='/foo/bar.qcow',
222                                  extra_properties=properties,
223                                  nic_config_pb_loc='/foo/bar', exists=True,
224                                  public=True)
225         self.assertEqual('foo', settings.name)
226         self.assertEqual('bar', settings.image_user)
227         self.assertEqual('qcow2', settings.format)
228         self.assertIsNone(settings.url)
229         self.assertEqual('/foo/bar.qcow', settings.image_file)
230         self.assertEqual(properties, settings.extra_properties)
231         self.assertEqual('/foo/bar', settings.nic_config_pb_loc)
232         self.assertTrue(settings.exists)
233         self.assertTrue(settings.public)
234
235     def test_config_all_file(self):
236         settings = ImageSettings(
237             **{'name': 'foo', 'image_user': 'bar', 'format': 'qcow2',
238                'image_file': '/foo/bar.qcow',
239                'extra_properties': '{\'hw_video_model\' : \'vga\'}',
240                'nic_config_pb_loc': '/foo/bar', 'exists': True,
241                'public': True})
242         self.assertEqual('foo', settings.name)
243         self.assertEqual('bar', settings.image_user)
244         self.assertEqual('qcow2', settings.format)
245         self.assertIsNone(settings.url)
246         self.assertEqual('/foo/bar.qcow', settings.image_file)
247         self.assertEqual('{\'hw_video_model\' : \'vga\'}',
248                          settings.extra_properties)
249         self.assertEqual('/foo/bar', settings.nic_config_pb_loc)
250         self.assertTrue(settings.exists)
251         self.assertTrue(settings.public)
252
253
254 class CreateImageSuccessTests(OSIntegrationTestCase):
255     """
256     Test for the CreateImage class defined in create_image.py
257     """
258
259     def setUp(self):
260         """
261         Instantiates the CreateImage object that is responsible for downloading
262         and creating an OS image file within OpenStack
263         """
264         super(self.__class__, self).__start__()
265
266         guid = uuid.uuid4()
267         self.image_name = self.__class__.__name__ + '-' + str(guid)
268         self.glance = glance_utils.glance_client(self.os_creds)
269         self.image_creator = None
270
271         if self.image_metadata and 'glance_tests' in self.image_metadata:
272             glance_test_meta = self.image_metadata['glance_tests']
273         else:
274             glance_test_meta = None
275
276         self.tmp_dir = 'tmp/' + str(guid)
277         if not os.path.exists(self.tmp_dir):
278             os.makedirs(self.tmp_dir)
279
280         self.image_settings = openstack_tests.cirros_image_settings(
281             name=self.image_name,
282             image_metadata=glance_test_meta)
283
284     def tearDown(self):
285         """
286         Cleans the image and downloaded image file
287         """
288         if self.image_creator:
289             self.image_creator.clean()
290
291         if os.path.exists(self.tmp_dir) and os.path.isdir(self.tmp_dir):
292             shutil.rmtree(self.tmp_dir)
293
294         super(self.__class__, self).__clean__()
295
296     def test_create_image_clean_url(self):
297         """
298         Tests the creation of an OpenStack image from a URL.
299         """
300         # Create Image
301         # Set the default image settings, then set any custom parameters sent
302         # from the app
303
304         self.image_creator = create_image.OpenStackImage(self.os_creds,
305                                                          self.image_settings)
306         created_image = self.image_creator.create()
307         self.assertIsNotNone(created_image)
308
309         retrieved_image = glance_utils.get_image(
310             self.glance, image_settings=self.image_settings)
311         self.assertIsNotNone(retrieved_image)
312         self.assertEqual(created_image.size, retrieved_image.size)
313         self.assertEqual(get_image_size(self.image_settings),
314                          retrieved_image.size)
315         self.assertEqual(created_image.name, retrieved_image.name)
316         self.assertEqual(created_image.id, retrieved_image.id)
317
318     def test_create_image_clean_url_properties(self):
319         """
320         Tests the creation of an OpenStack image from a URL and set properties.
321         """
322         # Create Image
323         # Set the default image settings, then set any custom parameters sent
324         # from the app
325         self.image_creator = create_image.OpenStackImage(self.os_creds,
326                                                          self.image_settings)
327         created_image = self.image_creator.create()
328         self.assertIsNotNone(created_image)
329
330         retrieved_image = glance_utils.get_image(
331             self.glance, image_settings=self.image_settings)
332         self.assertIsNotNone(retrieved_image)
333         self.assertEqual(self.image_creator.get_image().size,
334                          retrieved_image.size)
335         self.assertEqual(get_image_size(self.image_settings),
336                          retrieved_image.size)
337         self.assertEqual(created_image.name, retrieved_image.name)
338         self.assertEqual(created_image.id, retrieved_image.id)
339         self.assertEqual(created_image.properties, retrieved_image.properties)
340
341     def test_create_image_clean_file(self):
342         """
343         Tests the creation of an OpenStack image from a file.
344         """
345         if not self.image_settings.image_file and self.image_settings.url:
346             # Download the file of the image
347             image_file_name = file_utils.download(self.image_settings.url,
348                                                   self.tmp_dir).name
349         else:
350             image_file_name = self.image_settings.image_file
351
352         if image_file_name:
353             file_image_settings = openstack_tests.file_image_test_settings(
354                 name=self.image_name, file_path=image_file_name)
355
356             self.image_creator = create_image.OpenStackImage(
357                 self.os_creds, file_image_settings)
358             created_image = self.image_creator.create()
359             self.assertIsNotNone(created_image)
360             self.assertEqual(self.image_name, created_image.name)
361
362             retrieved_image = glance_utils.get_image(
363                 self.glance, image_settings=file_image_settings)
364             self.assertIsNotNone(retrieved_image)
365             self.assertEqual(self.image_creator.get_image().size,
366                              retrieved_image.size)
367             self.assertEqual(get_image_size(file_image_settings),
368                              retrieved_image.size)
369
370             self.assertEqual(created_image.name, retrieved_image.name)
371             self.assertEqual(created_image.id, retrieved_image.id)
372         else:
373             logger.warn(
374                 'Test not executed as the image metadata requires image files')
375
376     def test_create_delete_image(self):
377         """
378         Tests the creation then deletion of an OpenStack image to ensure
379         clean() does not raise an Exception.
380         """
381         # Create Image
382         self.image_creator = create_image.OpenStackImage(self.os_creds,
383                                                          self.image_settings)
384         created_image = self.image_creator.create()
385         self.assertIsNotNone(created_image)
386
387         retrieved_image = glance_utils.get_image(
388             self.glance, image_settings=self.image_settings)
389         self.assertIsNotNone(retrieved_image)
390         self.assertEqual(self.image_creator.get_image().size,
391                          retrieved_image.size)
392         self.assertEqual(get_image_size(self.image_settings),
393                          retrieved_image.size)
394
395         # Delete Image manually
396         glance_utils.delete_image(self.glance, created_image)
397
398         self.assertIsNone(glance_utils.get_image(
399             self.glance, image_settings=self.image_creator.image_settings))
400
401         # Must not throw an exception when attempting to cleanup non-existent
402         # image
403         self.image_creator.clean()
404         self.assertIsNone(self.image_creator.get_image())
405
406     def test_create_same_image(self):
407         """
408         Tests the creation of an OpenStack image when the image already exists.
409         """
410         # Create Image
411         self.image_creator = create_image.OpenStackImage(self.os_creds,
412                                                          self.image_settings)
413         image1 = self.image_creator.create()
414
415         retrieved_image = glance_utils.get_image(
416             self.glance, image_settings=self.image_settings)
417         self.assertIsNotNone(retrieved_image)
418         self.assertEqual(self.image_creator.get_image().size,
419                          retrieved_image.size)
420         self.assertEqual(get_image_size(self.image_settings),
421                          retrieved_image.size)
422         self.assertEqual(image1.name, retrieved_image.name)
423         self.assertEqual(image1.id, retrieved_image.id)
424         self.assertEqual(image1.properties, retrieved_image.properties)
425
426         # Should be retrieving the instance data
427         os_image_2 = create_image.OpenStackImage(self.os_creds,
428                                                  self.image_settings)
429         image2 = os_image_2.create()
430         self.assertEqual(image1.id, image2.id)
431
432     def test_create_same_image_new_settings(self):
433         """
434         Tests the creation of an OpenStack image when the image already exists
435         and the configuration only contains the name.
436         """
437         # Create Image
438         self.image_creator = create_image.OpenStackImage(self.os_creds,
439                                                          self.image_settings)
440         image1 = self.image_creator.create()
441
442         retrieved_image = glance_utils.get_image(
443             self.glance, image_settings=self.image_settings)
444         self.assertIsNotNone(retrieved_image)
445         self.assertEqual(self.image_creator.get_image().size,
446                          retrieved_image.size)
447         self.assertEqual(get_image_size(self.image_settings),
448                          retrieved_image.size)
449         self.assertEqual(image1.name, retrieved_image.name)
450         self.assertEqual(image1.id, retrieved_image.id)
451         self.assertEqual(image1.properties, retrieved_image.properties)
452
453         # Should be retrieving the instance data
454         image_2_settings = ImageConfig(name=self.image_settings.name,
455                                        image_user='foo', exists=True)
456         os_image_2 = create_image.OpenStackImage(self.os_creds,
457                                                  image_2_settings)
458         image2 = os_image_2.create()
459         self.assertEqual(image1.id, image2.id)
460
461
462 class CreateImageNegativeTests(OSIntegrationTestCase):
463     """
464     Negative test cases for the CreateImage class
465     """
466
467     def setUp(self):
468         super(self.__class__, self).__start__()
469
470         self.image_name = self.__class__.__name__ + '-' + str(uuid.uuid4())
471         self.image_creator = None
472
473     def tearDown(self):
474         if self.image_creator:
475             self.image_creator.clean()
476
477         super(self.__class__, self).__clean__()
478
479     def test_bad_image_name(self):
480         """
481         Expect an ImageCreationError when the image name does not exist when a
482         file or URL has not been configured
483         """
484         os_image_settings = ImageConfig(name='foo', image_user='bar',
485                                         exists=True)
486         self.image_creator = create_image.OpenStackImage(self.os_creds,
487                                                          os_image_settings)
488
489         with self.assertRaises(ImageCreationError):
490             self.image_creator.create()
491
492             self.fail('ImageCreationError should have been raised prior to'
493                       'this line')
494
495     def test_bad_image_url(self):
496         """
497         Expect an ImageCreationError when the image download url is bad
498         """
499         os_image_settings = openstack_tests.cirros_image_settings(
500             name=self.image_name)
501         self.image_creator = create_image.OpenStackImage(
502             self.os_creds,
503             ImageConfig(
504                 name=os_image_settings.name,
505                 image_user=os_image_settings.image_user,
506                 img_format=os_image_settings.format,
507                 url="http://foo.bar"))
508
509         try:
510             self.image_creator.create()
511         except HTTPBadRequest:
512             pass
513         except URLError:
514             pass
515         except Exception as e:
516             self.fail('Invalid Exception ' + str(e))
517
518     def test_bad_image_image_type(self):
519         """
520         Expect an ImageCreationError when the image type bad
521         """
522         os_image_settings = openstack_tests.cirros_image_settings(
523             name=self.image_name)
524         self.image_creator = create_image.OpenStackImage(
525             self.os_creds,
526             ImageConfig(
527                 name=os_image_settings.name,
528                 image_user=os_image_settings.image_user,
529                 img_format='foo', url=os_image_settings.url))
530
531         with self.assertRaises(Exception):
532             self.image_creator.create()
533
534     def test_bad_image_file(self):
535         """
536         Expect an ImageCreationError when the image file does not exist
537         """
538         os_image_settings = openstack_tests.cirros_image_settings(
539             name=self.image_name)
540         self.image_creator = create_image.OpenStackImage(
541             self.os_creds,
542             ImageConfig(
543                 name=os_image_settings.name,
544                 image_user=os_image_settings.image_user,
545                 img_format=os_image_settings.format, image_file="/foo/bar.qcow"))
546         with self.assertRaises(IOError):
547             self.image_creator.create()
548
549
550 class CreateMultiPartImageTests(OSIntegrationTestCase):
551     """
552     Test different means for creating a 3-part images
553     """
554
555     def setUp(self):
556         """
557         Instantiates the CreateImage object that is responsible for
558         downloading and creating an OS image file within OpenStack
559         """
560         super(self.__class__, self).__start__()
561
562         guid = uuid.uuid4()
563         self.image_creators = list()
564         self.image_name = self.__class__.__name__ + '-' + str(guid)
565         self.glance = glance_utils.glance_client(self.os_creds)
566
567         self.tmp_dir = 'tmp/' + str(guid)
568         if not os.path.exists(self.tmp_dir):
569             os.makedirs(self.tmp_dir)
570
571         if self.image_metadata and 'glance_tests' in self.image_metadata:
572             self.glance_test_meta = self.image_metadata['glance_tests']
573         else:
574             self.glance_test_meta = dict()
575
576     def tearDown(self):
577         """
578         Cleans the images and downloaded image file
579         """
580         for image_creator in self.image_creators:
581             image_creator.clean()
582
583         if os.path.exists(self.tmp_dir) and os.path.isdir(self.tmp_dir):
584             shutil.rmtree(self.tmp_dir)
585
586         super(self.__class__, self).__clean__()
587
588     def test_create_three_part_image_from_url(self):
589         """
590         Tests the creation of a 3-part OpenStack image from a URL.
591         """
592         # Create the kernel image
593         if 'disk_file' not in self.glance_test_meta:
594             image_settings = openstack_tests.cirros_image_settings(
595                 name=self.image_name,
596                 image_metadata={
597                     'disk_url':
598                         openstack_tests.CIRROS_DEFAULT_IMAGE_URL,
599                     'kernel_url':
600                         openstack_tests.CIRROS_DEFAULT_KERNEL_IMAGE_URL,
601                     'ramdisk_url':
602                         openstack_tests.CIRROS_DEFAULT_RAMDISK_IMAGE_URL})
603
604             image_creator = create_image.OpenStackImage(self.os_creds,
605                                                         image_settings)
606             self.image_creators.append(image_creator)
607             image_creator.create()
608
609             main_image = glance_utils.get_image(self.glance,
610                                                 image_settings=image_settings)
611             self.assertIsNotNone(main_image)
612             self.assertIsNotNone(image_creator.get_image())
613             self.assertEqual(image_creator.get_image().id, main_image.id)
614
615             kernel_image = glance_utils.get_image(
616                 self.glance,
617                 image_settings=image_settings.kernel_image_settings)
618             self.assertIsNotNone(kernel_image)
619             self.assertIsNotNone(image_creator.get_kernel_image())
620             self.assertEqual(kernel_image.id,
621                              image_creator.get_kernel_image().id)
622
623             ramdisk_image = glance_utils.get_image(
624                 self.glance,
625                 image_settings=image_settings.ramdisk_image_settings)
626             self.assertIsNotNone(ramdisk_image)
627             self.assertIsNotNone(image_creator.get_ramdisk_image())
628             self.assertEqual(ramdisk_image.id,
629                              image_creator.get_ramdisk_image().id)
630         else:
631             logger.warn(
632                 'Test not executed as the image metadata requires image files')
633
634     def test_create_three_part_image_from_file_3_creators(self):
635         """
636         Tests the creation of a 3-part OpenStack image from files.
637         """
638         file_only = False
639
640         # Set properties
641         properties = {}
642         if self.glance_test_meta:
643             if 'extra_properties' in self.glance_test_meta:
644                 properties = self.glance_test_meta['extra_properties']
645             if 'disk_file' in self.glance_test_meta:
646                 file_only = True
647
648         # Create the kernel image
649         kernel_file_name = None
650         kernel_url = openstack_tests.CIRROS_DEFAULT_KERNEL_IMAGE_URL
651         if 'kernel_file' in self.glance_test_meta:
652             kernel_file_name = self.glance_test_meta['kernel_file']
653         elif 'kernel_url' in self.glance_test_meta:
654             kernel_url = self.glance_test_meta['kernel_url']
655         else:
656             kernel_url = openstack_tests.CIRROS_DEFAULT_KERNEL_IMAGE_URL
657
658         if not kernel_file_name and not file_only:
659             kernel_file_name = file_utils.download(kernel_url,
660                                                    self.tmp_dir).name
661         else:
662             logger.warn('Will not download the kernel image.'
663                         ' Cannot execute test')
664             return
665
666         kernel_file_image_settings = openstack_tests.file_image_test_settings(
667             name=self.image_name + '_kernel', file_path=kernel_file_name)
668
669         self.image_creators.append(create_image.OpenStackImage(
670             self.os_creds, kernel_file_image_settings))
671         kernel_image = self.image_creators[-1].create()
672         self.assertIsNotNone(kernel_image)
673         self.assertEqual(get_image_size(kernel_file_image_settings),
674                          kernel_image.size)
675
676         # Create the ramdisk image
677         ramdisk_file_name = None
678         ramdisk_url = openstack_tests.CIRROS_DEFAULT_RAMDISK_IMAGE_URL
679         if 'ramdisk_file' in self.glance_test_meta:
680             ramdisk_file_name = self.glance_test_meta['ramdisk_file']
681         elif 'ramdisk_url' in self.glance_test_meta:
682             ramdisk_url = self.glance_test_meta['ramdisk_url']
683
684         if not ramdisk_file_name and not file_only:
685             ramdisk_file_name = file_utils.download(ramdisk_url,
686                                                     self.tmp_dir).name
687         else:
688             logger.warn('Will not download the ramdisk image.'
689                         ' Cannot execute test')
690             return
691
692         ramdisk_file_image_settings = openstack_tests.file_image_test_settings(
693             name=self.image_name + '_ramdisk', file_path=ramdisk_file_name)
694         self.image_creators.append(create_image.OpenStackImage(
695             self.os_creds, ramdisk_file_image_settings))
696         ramdisk_image = self.image_creators[-1].create()
697         self.assertIsNotNone(ramdisk_image)
698         self.assertEqual(get_image_size(ramdisk_file_image_settings),
699                          ramdisk_image.size)
700
701         # Create the main disk image
702         disk_file_name = None
703         disk_url = openstack_tests.CIRROS_DEFAULT_IMAGE_URL
704         if 'disk_file' in self.glance_test_meta:
705             disk_file_name = self.glance_test_meta['disk_file']
706         elif 'disk_url' in self.glance_test_meta:
707             disk_url = self.glance_test_meta['disk_url']
708
709         if not disk_file_name and not file_only:
710             disk_file_name = file_utils.download(disk_url, self.tmp_dir).name
711         else:
712             logger.warn('Will not download the disk file image.'
713                         ' Cannot execute test')
714             return
715
716         file_image_settings = openstack_tests.file_image_test_settings(
717             name=self.image_name, file_path=disk_file_name)
718         properties['kernel_id'] = kernel_image.id
719         properties['ramdisk_id'] = ramdisk_image.id
720         file_image_settings.extra_properties = properties
721         self.image_creators.append(
722             create_image.OpenStackImage(self.os_creds, file_image_settings))
723         created_image = self.image_creators[-1].create()
724
725         self.assertIsNotNone(created_image)
726         self.assertEqual(self.image_name, created_image.name)
727
728         retrieved_image = glance_utils.get_image(
729             self.glance, image_settings=file_image_settings)
730         self.assertIsNotNone(retrieved_image)
731         self.assertEqual(self.image_creators[-1].get_image().size,
732                          retrieved_image.size)
733         self.assertEqual(get_image_size(file_image_settings),
734                          retrieved_image.size)
735         self.assertEqual(created_image.name, retrieved_image.name)
736         self.assertEqual(created_image.id, retrieved_image.id)
737         self.assertEqual(created_image.properties, retrieved_image.properties)
738
739     def test_create_three_part_image_from_url_3_creators(self):
740         """
741         Tests the creation of a 3-part OpenStack image from a URL.
742         """
743         if 'disk_file' not in self.glance_test_meta:
744             # Set properties
745             properties = {}
746             if self.glance_test_meta and \
747                     'extra_properties' in self.glance_test_meta:
748                 properties = self.glance_test_meta['extra_properties']
749
750             # Create the kernel image
751             kernel_image_settings = openstack_tests.cirros_image_settings(
752                 name=self.image_name + '_kernel',
753                 url=openstack_tests.CIRROS_DEFAULT_KERNEL_IMAGE_URL)
754
755             if self.glance_test_meta:
756                 if 'kernel_url' in self.glance_test_meta:
757                     kernel_image_settings.url = self.glance_test_meta[
758                         'kernel_url']
759             self.image_creators.append(
760                 create_image.OpenStackImage(self.os_creds,
761                                             kernel_image_settings))
762             kernel_image = self.image_creators[-1].create()
763             self.assertIsNotNone(kernel_image)
764             self.assertEqual(get_image_size(kernel_image_settings),
765                              kernel_image.size)
766
767             # Create the ramdisk image
768             ramdisk_image_settings = openstack_tests.cirros_image_settings(
769                 name=self.image_name + '_ramdisk',
770                 url=openstack_tests.CIRROS_DEFAULT_RAMDISK_IMAGE_URL)
771             if self.glance_test_meta:
772                 if 'ramdisk_url' in self.glance_test_meta:
773                     ramdisk_image_settings.url = self.glance_test_meta[
774                         'ramdisk_url']
775             self.image_creators.append(
776                 create_image.OpenStackImage(self.os_creds,
777                                             ramdisk_image_settings))
778             ramdisk_image = self.image_creators[-1].create()
779             self.assertIsNotNone(ramdisk_image)
780             self.assertEqual(get_image_size(ramdisk_image_settings),
781                              ramdisk_image.size)
782
783             # Create the main image
784             os_image_settings = openstack_tests.cirros_image_settings(
785                 name=self.image_name,
786                 url=openstack_tests.CIRROS_DEFAULT_IMAGE_URL)
787             if self.glance_test_meta:
788                 if 'disk_url' in self.glance_test_meta:
789                     os_image_settings.url = self.glance_test_meta['disk_url']
790
791             properties['kernel_id'] = kernel_image.id
792             properties['ramdisk_id'] = ramdisk_image.id
793             os_image_settings.extra_properties = properties
794
795             self.image_creators.append(
796                 create_image.OpenStackImage(self.os_creds, os_image_settings))
797             created_image = self.image_creators[-1].create()
798             self.assertIsNotNone(created_image)
799             self.assertEqual(self.image_name, created_image.name)
800
801             retrieved_image = glance_utils.get_image(
802                 self.glance, image_settings=os_image_settings)
803             self.assertIsNotNone(retrieved_image)
804
805             self.assertEqual(self.image_creators[-1].get_image().size,
806                              retrieved_image.size)
807             self.assertEqual(get_image_size(os_image_settings),
808                              retrieved_image.size)
809
810             self.assertEqual(created_image.name, retrieved_image.name)
811             self.assertEqual(created_image.id, retrieved_image.id)
812             self.assertEqual(created_image.properties,
813                              retrieved_image.properties)
814         else:
815             logger.warn(
816                 'Test not executed as the image metadata requires image files')
817
818
819 def get_image_size(image_settings):
820     """
821     Returns the expected image size
822     :return:
823     """
824     if image_settings.image_file:
825         return os.path.getsize(image_settings.image_file)
826     elif image_settings.url:
827         return int(file_utils.get_content_length(image_settings.url))
828     else:
829         raise Exception(
830             'Cannot retrieve expected image size. Image filename or URL has '
831             'not been configured')